Coverage for oarepo_c4gh/key/key_collection.py: 100%

23 statements  

« prev     ^ index     » next       coverage.py v7.10.2, created at 2025-08-07 12:05 +0000

1"""This module implements a key collection that is to be used when 

2reading the container header packets instead to support multiple 

3available reader keys. 

4 

5""" 

6 

7from .key import Key 

8from ..exceptions import Crypt4GHKeyException 

9from typing import List, Generator 

10 

11 

12class KeyCollection: 

13 """This class implements a simple storage for a collection of 

14 reader keys and gives a reusable iterator which is guaranteed to 

15 iterate over all the keys at most once. Each round of iterations 

16 starts with the last key was used in the previous round. This 

17 ensures that if a reader key successfully reads a packet, it will 

18 always be the first to try for the very next packet. 

19 

20 """ 

21 

22 def __init__(self, *keys: List[Key]) -> None: 

23 """Initializes the collection with a list of keys. 

24 

25 Parameters: 

26 keys: list of instances of classes implementing the Key Protocol 

27 

28 Raises: 

29 Crypt4GHKeyException: if some key(s) do not have access to 

30 private part or no keys were given 

31 

32 """ 

33 if len(keys) == 0: 

34 raise Crypt4GHKeyException("Collection needs at least one key") 

35 for key in keys: 

36 if not key.can_compute_symmetric_keys: 

37 raise Crypt4GHKeyException( 

38 "KeyCollection is only for keys with access to private key" 

39 ) 

40 self._keys = keys 

41 self._current = 0 

42 

43 @property 

44 def count(self) -> int: 

45 """Returns the number of keys in this collection.""" 

46 return len(self._keys) 

47 

48 @property 

49 def keys(self) -> Generator[Key, None, None]: 

50 """Multiple-use iterator that yields each key at most 

51 once. When re-used, the iteration always starts with the most 

52 recently yielded key. 

53 

54 """ 

55 first_current = self._current 

56 while True: 

57 yield self._keys[self._current] 

58 self._current = (self._current + 1) % self.count 

59 if self._current == first_current: 

60 break