Coverage for oarepo_c4gh/crypt4gh/dek_collection.py: 100%

42 statements  

« prev     ^ index     » next       coverage.py v7.10.2, created at 2025-08-07 12:05 +0000

1"""This module provides a persistent storage for multiple Data 

2Encryption Keys and automates the mechanisms used for decrypting 

3individual Data Blocks. It ensures the last working DEK is always 

4tried first and properly reports decryption failure if no key managed 

5to decrypt the data. 

6 

7""" 

8 

9from functools import reduce 

10from ..exceptions import Crypt4GHDEKException 

11import io 

12from nacl.bindings import crypto_aead_chacha20poly1305_ietf_decrypt 

13from nacl.exceptions import CryptoError 

14from .dek import DEK 

15 

16 

17class DEKCollection: 

18 """This class contains a list of Data Encryption Keys and provides 

19 functionality for the Header4GH reader to add new DEKs. When 

20 fully populated it can be then used for decrypting a stream of 

21 Data Blocks. 

22 

23 """ 

24 

25 def __init__(self) -> None: 

26 """Initializes an empty collection.""" 

27 self._deks = [] 

28 self._current = 0 

29 

30 @property 

31 def count(self) -> int: 

32 """The current number of DEKs in the collection.""" 

33 return len(self._deks) 

34 

35 @property 

36 def empty(self) -> bool: 

37 """True if there are no DEKs available.""" 

38 return self.count == 0 

39 

40 def contains_dek(self, dek: DEK) -> bool: 

41 """Check for duplicate DEKS. 

42 

43 Parameters: 

44 dek: a Data Encryption Key to check 

45 

46 Returns: 

47 True if given DEK is already contained. 

48 """ 

49 return next((True for v in self._deks if dek.dek == v.dek), False) 

50 

51 def add_dek(self, dek: DEK) -> None: 

52 """Adds a new dek to the collection if it is not already 

53 there. 

54 

55 Parameters: 

56 dek: a Data Encryption Key to add 

57 

58 """ 

59 if not self.contains_dek(dek): 

60 self._deks.append(dek) 

61 

62 def decrypt_packet(self, istream: io.RawIOBase) -> (bytes, bytes, int): 

63 """Internal procedure for decrypting single data block from 

64 the stream. If there is not enough data (for example at EOF), 

65 two None values are returned. If the block cannot be decrypted 

66 using known DEKs, the encrypted version is returned as-is and 

67 None is returned as the cleartext version. If the block can be 

68 decrypted, both the ciphertext and cleartext versions are 

69 returned. 

70 

71 Updates current key upon successfull decryption so that 

72 subsequent attempts will try this key first. 

73 

74 Tries all DEKs in the collection in circular order until all 

75 have been tried or one succeeded. 

76 

77 Parameters: 

78 istream: input stream with data blocks 

79 

80 Returns: 

81 Two values, the first representing the encrypted 

82 version of the data block and second one containing 

83 decrypted contents if possible. Both are none when no 

84 packet has been read. 

85 

86 """ 

87 nonce = istream.read(12) 

88 if len(nonce) != 12: 

89 return (None, None, None) 

90 datamac = istream.read(65536 + 16) 

91 if len(datamac) < 16: 

92 return (None, None, None) 

93 current = self._current 

94 while True: 

95 dek = self._deks[current] 

96 try: 

97 cleartext = crypto_aead_chacha20poly1305_ietf_decrypt( 

98 datamac, None, nonce, dek.dek 

99 ) 

100 self._current = current 

101 return (nonce + datamac, cleartext, current) 

102 except CryptoError as cerr: 

103 pass 

104 current = (current + 1) % self.count 

105 if current == self._current: 

106 return (nonce + datamac, None, None) 

107 

108 def __getitem__(self, idx: int) -> DEK: 

109 """Returns DEK at given index. 

110 

111 Parameters: 

112 idx: 0-based index (must be obtained elsewhere) 

113 

114 """ 

115 return self._deks[idx]