Coverage for oarepo_c4gh/crypt4gh/stream/header.py: 100%
61 statements
« prev ^ index » next coverage.py v7.10.2, created at 2025-08-07 12:05 +0000
« prev ^ index » next coverage.py v7.10.2, created at 2025-08-07 12:05 +0000
1"""This module implements the class responsible for loading Crypt4GH
2from given input stream.
4"""
6from .header_packet import StreamHeaderPacket
7from ...key import Key, KeyCollection
8import io
9from ..util import read_crypt4gh_stream_le_uint32
10from ...exceptions import Crypt4GHHeaderException
11from ..dek_collection import DEKCollection
12from ..dek import DEK
13from ..analyzer import Analyzer
14from typing import Union
15from ..common.header import Header
18CRYPT4GH_MAGIC = b"crypt4gh"
21def check_crypt4gh_magic(magic_bytes: bytes) -> None:
22 """Checks given bytes whether they match the required Crypt4GH
23 magic bytes.
25 Parameters:
26 magic_bytes: the bytes to check
28 Raises:
29 Crypt4GHHeaderException: if not enough or incorrect bytes
31 """
32 magic_bytes_len = len(CRYPT4GH_MAGIC)
33 if len(magic_bytes) != magic_bytes_len:
34 raise Crypt4GHHeaderException(
35 f"Cannot read enough magic bytes {magic_bytes_len}"
36 )
37 if magic_bytes != CRYPT4GH_MAGIC:
38 raise Crypt4GHHeaderException(
39 f"Incorrect Crypt4GH magic: {magic_bytes}"
40 )
43class StreamHeader(Header):
44 """The constructor of this class loads the Crypt4GH header from
45 given stream.
47 """
49 def __init__(
50 self,
51 reader_key_or_collection: Union[Key, KeyCollection],
52 istream: io.RawIOBase,
53 analyzer: Analyzer = None,
54 ) -> None:
55 """Checks the Crypt4GH container signature, version and header
56 packet count. The header packets are loaded lazily when needed.
58 Parameters:
59 reader_key_or_collection: the key used for trying to decrypt header
60 packets (must include the private part) or collection of keys
61 istream: the container input stream
62 analyzer: analyzer for storing packet readability information
64 """
65 self._magic_bytes = istream.read(8)
66 check_crypt4gh_magic(self._magic_bytes)
67 self._version = read_crypt4gh_stream_le_uint32(istream, "version")
68 if self._version != 1:
69 raise Crypt4GHHeaderException(
70 f"Invalid Crypt4GH version {self._version}"
71 )
72 self._packet_count = read_crypt4gh_stream_le_uint32(
73 istream, "packet count"
74 )
75 if isinstance(reader_key_or_collection, KeyCollection):
76 self._reader_keys = reader_key_or_collection
77 else:
78 self._reader_keys = KeyCollection(reader_key_or_collection)
79 self._istream = istream
80 self._packets = None
81 self._deks = DEKCollection()
82 self._analyzer = analyzer
84 def load_packets(self) -> None:
85 """Loads the packets from the input stream and discards the
86 key. It populates the internal Data Encryption Key collection
87 for later use during this process.
89 Raises:
90 Crypt4GHHeaderException: if the reader key cannot perform symmetric key
91 derivation
93 """
94 self._packets = []
95 for idx in range(self._packet_count):
96 packet = StreamHeaderPacket(self._reader_keys, self._istream)
97 if packet.is_data_encryption_parameters:
98 self._deks.add_dek(
99 DEK(packet.data_encryption_key, packet.reader_key)
100 )
101 self._packets.append(packet)
102 if self._analyzer is not None:
103 self._analyzer.analyze_packet(packet)
104 self._reader_keys = None
106 @property
107 def packets(self) -> list:
108 """The accessor to the direct list of header packets.
110 Returns:
111 List of header packets.
113 Raises:
114 Crypt4GHHeaderException: if the reader key cannot perform symmetric key
115 derivation
117 """
118 if self._packets is None:
119 self.load_packets()
120 return self._packets
122 @property
123 def deks(self) -> DEKCollection:
124 """Returns the collection of Data Encryption Keys obtained by
125 processing all header packets. Ensures the header packets were
126 actually processed before returning the reference.
128 Returns:
129 The DEK Collection.
131 Raises:
132 Crypt4GHHeaderException: if packets needed to be loaded and
133 something went wrong
135 """
136 if self._packets is None:
137 self.load_packets()
138 return self._deks
140 @property
141 def magic_bytes(self) -> bytes:
142 """Returns the original magic bytes from the beginning of the
143 container.
145 """
146 return self._magic_bytes
148 @property
149 def version(self) -> int:
150 """Returns the version of this container format (must always
151 return 1).
153 """
154 return self._version
156 @property
157 def reader_keys_used(self) -> list[bytes]:
158 """Returns all reader public keys successfully used in any
159 packets decryption.
161 """
162 return list(
163 set(
164 packet.reader_key
165 for packet in self.packets
166 if packet.reader_key is not None
167 )
168 )