Coverage for oarepo_c4gh/crypt4gh/dek_collection.py: 100%
42 statements
« prev ^ index » next coverage.py v7.10.2, created at 2025-08-07 12:05 +0000
« prev ^ index » next coverage.py v7.10.2, created at 2025-08-07 12:05 +0000
1"""This module provides a persistent storage for multiple Data
2Encryption Keys and automates the mechanisms used for decrypting
3individual Data Blocks. It ensures the last working DEK is always
4tried first and properly reports decryption failure if no key managed
5to decrypt the data.
7"""
9from functools import reduce
10from ..exceptions import Crypt4GHDEKException
11import io
12from nacl.bindings import crypto_aead_chacha20poly1305_ietf_decrypt
13from nacl.exceptions import CryptoError
14from .dek import DEK
17class DEKCollection:
18 """This class contains a list of Data Encryption Keys and provides
19 functionality for the Header4GH reader to add new DEKs. When
20 fully populated it can be then used for decrypting a stream of
21 Data Blocks.
23 """
25 def __init__(self) -> None:
26 """Initializes an empty collection."""
27 self._deks = []
28 self._current = 0
30 @property
31 def count(self) -> int:
32 """The current number of DEKs in the collection."""
33 return len(self._deks)
35 @property
36 def empty(self) -> bool:
37 """True if there are no DEKs available."""
38 return self.count == 0
40 def contains_dek(self, dek: DEK) -> bool:
41 """Check for duplicate DEKS.
43 Parameters:
44 dek: a Data Encryption Key to check
46 Returns:
47 True if given DEK is already contained.
48 """
49 return next((True for v in self._deks if dek.dek == v.dek), False)
51 def add_dek(self, dek: DEK) -> None:
52 """Adds a new dek to the collection if it is not already
53 there.
55 Parameters:
56 dek: a Data Encryption Key to add
58 """
59 if not self.contains_dek(dek):
60 self._deks.append(dek)
62 def decrypt_packet(self, istream: io.RawIOBase) -> (bytes, bytes, int):
63 """Internal procedure for decrypting single data block from
64 the stream. If there is not enough data (for example at EOF),
65 two None values are returned. If the block cannot be decrypted
66 using known DEKs, the encrypted version is returned as-is and
67 None is returned as the cleartext version. If the block can be
68 decrypted, both the ciphertext and cleartext versions are
69 returned.
71 Updates current key upon successfull decryption so that
72 subsequent attempts will try this key first.
74 Tries all DEKs in the collection in circular order until all
75 have been tried or one succeeded.
77 Parameters:
78 istream: input stream with data blocks
80 Returns:
81 Two values, the first representing the encrypted
82 version of the data block and second one containing
83 decrypted contents if possible. Both are none when no
84 packet has been read.
86 """
87 nonce = istream.read(12)
88 if len(nonce) != 12:
89 return (None, None, None)
90 datamac = istream.read(65536 + 16)
91 if len(datamac) < 16:
92 return (None, None, None)
93 current = self._current
94 while True:
95 dek = self._deks[current]
96 try:
97 cleartext = crypto_aead_chacha20poly1305_ietf_decrypt(
98 datamac, None, nonce, dek.dek
99 )
100 self._current = current
101 return (nonce + datamac, cleartext, current)
102 except CryptoError as cerr:
103 pass
104 current = (current + 1) % self.count
105 if current == self._current:
106 return (nonce + datamac, None, None)
108 def __getitem__(self, idx: int) -> DEK:
109 """Returns DEK at given index.
111 Parameters:
112 idx: 0-based index (must be obtained elsewhere)
114 """
115 return self._deks[idx]