Coverage for oarepo_c4gh / crypt4gh / stream / header.py: 100%
76 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-06 16:58 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-06 16:58 +0000
1"""This module implements the class responsible for loading Crypt4GH
2from given input stream.
4"""
6from .header_packet import StreamHeaderPacket
7from ...key import Key, KeyCollection
8import io
9from ..util import read_crypt4gh_stream_le_uint32
10from ...exceptions import Crypt4GHHeaderException
11from ..dek_collection import DEKCollection
12from ..dek import DEK
13from ..analyzer import Analyzer
14from typing import Union
15from ..common.header import Header
18CRYPT4GH_MAGIC = b"crypt4gh"
21def check_crypt4gh_magic(magic_bytes: bytes) -> None:
22 """Checks given bytes whether they match the required Crypt4GH
23 magic bytes.
25 Parameters:
26 magic_bytes: the bytes to check
28 Raises:
29 Crypt4GHHeaderException: if not enough or incorrect bytes
31 """
32 magic_bytes_len = len(CRYPT4GH_MAGIC)
33 if len(magic_bytes) != magic_bytes_len:
34 raise Crypt4GHHeaderException(
35 f"Cannot read enough magic bytes {magic_bytes_len}"
36 )
37 if magic_bytes != CRYPT4GH_MAGIC:
38 raise Crypt4GHHeaderException(
39 f"Incorrect Crypt4GH magic: {magic_bytes}"
40 )
43class StreamHeader(Header):
44 """The constructor of this class loads the Crypt4GH header from
45 given stream.
47 """
49 def __init__(
50 self,
51 reader_key_or_collection: Union[Key, KeyCollection],
52 istream: io.RawIOBase,
53 analyzer: Analyzer = None,
54 ) -> None:
55 """Checks the Crypt4GH container signature, version and header
56 packet count. The header packets are loaded lazily when needed.
58 Parameters:
59 reader_key_or_collection: the key used for trying to decrypt header
60 packets (must include the private part) or collection of keys
61 istream: the container input stream
62 analyzer: analyzer for storing packet readability information
64 """
65 self._magic_bytes = istream.read(8)
66 check_crypt4gh_magic(self._magic_bytes)
67 self._version = read_crypt4gh_stream_le_uint32(istream, "version")
68 if self._version != 1:
69 raise Crypt4GHHeaderException(
70 f"Invalid Crypt4GH version {self._version}"
71 )
72 self._packet_count = read_crypt4gh_stream_le_uint32(
73 istream, "packet count"
74 )
75 if isinstance(reader_key_or_collection, KeyCollection):
76 self._reader_keys = reader_key_or_collection
77 else:
78 self._reader_keys = KeyCollection(reader_key_or_collection)
79 self._istream = istream
80 self._packets = None
81 self._deks = DEKCollection()
82 self._analyzer = analyzer
83 self._edit_list = []
85 def load_packets(self) -> None:
86 """Loads the packets from the input stream and discards the
87 key. It populates the internal Data Encryption Key collection
88 for later use during this process.
90 Performs edit list validation as follows:
92 If there are no edit lists, no checking is performed and an
93 empty edit list is assumed.
95 If there is only one readable edit list, no checking is
96 performed and this edit list is used.
98 If there are two edit lists readable by the same reader key,
99 an assertion violation is signalled.
101 If there is more than one edit list readable by distinct
102 reader keys and these edit lists are the same, no problem is
103 reported and the edit list is used.
105 If there is more than one edit list readable by distinct
106 reader keys but these are not identical, an assertion
107 violation is signalled.
109 Raises:
110 Crypt4GHHeaderException: if the reader key cannot perform symmetric key
111 derivation
112 AssertionError: if any problem with edit lists is found
114 """
115 self._packets = []
116 _edit_list_reader_keys = []
117 _edit_list = None
118 for idx in range(self._packet_count):
119 packet = StreamHeaderPacket(self._reader_keys, self._istream)
120 if packet.is_data_encryption_parameters:
121 self._deks.add_dek(
122 DEK(packet.data_encryption_key, packet.reader_key)
123 )
124 self._packets.append(packet)
125 if packet.is_edit_list:
126 assert (
127 packet.reader_key not in _edit_list_reader_keys
128 ), "more than one edit list with the same reader key"
129 _edit_list_reader_keys.append(packet.reader_key)
130 if _edit_list is None:
131 _edit_list = packet.lengths
132 else:
133 assert (
134 packet.lengths == _edit_list
135 ), "multiple different edit lists"
136 if self._analyzer is not None:
137 self._analyzer.analyze_packet(packet)
138 self._reader_keys = None
139 if _edit_list is not None:
140 self._edit_list = _edit_list
142 @property
143 def packets(self) -> list:
144 """The accessor to the direct list of header packets.
146 Returns:
147 List of header packets.
149 Raises:
150 Crypt4GHHeaderException: if the reader key cannot perform symmetric key
151 derivation
153 """
154 if self._packets is None:
155 self.load_packets()
156 return self._packets
158 @property
159 def deks(self) -> DEKCollection:
160 """Returns the collection of Data Encryption Keys obtained by
161 processing all header packets. Ensures the header packets were
162 actually processed before returning the reference.
164 Returns:
165 The DEK Collection.
167 Raises:
168 Crypt4GHHeaderException: if packets needed to be loaded and
169 something went wrong
171 """
172 if self._packets is None:
173 self.load_packets()
174 return self._deks
176 @property
177 def magic_bytes(self) -> bytes:
178 """Returns the original magic bytes from the beginning of the
179 container.
181 """
182 return self._magic_bytes
184 @property
185 def version(self) -> int:
186 """Returns the version of this container format (must always
187 return 1).
189 """
190 return self._version
192 @property
193 def reader_keys_used(self) -> list[bytes]:
194 """Returns all reader public keys successfully used in any
195 packets decryption.
197 """
198 return list(
199 set(
200 packet.reader_key
201 for packet in self.packets
202 if packet.reader_key is not None
203 )
204 )
206 @property
207 def edit_list(self) -> list[int]:
208 """Returns the skip and keep lengths list of the edit list."""
209 self.packets
210 return self._edit_list