Coverage for oarepo_c4gh/crypt4gh/stream/header_packet.py: 100%
44 statements
« prev ^ index » next coverage.py v7.10.2, created at 2025-08-07 12:05 +0000
« prev ^ index » next coverage.py v7.10.2, created at 2025-08-07 12:05 +0000
1"""Implementation of single Crypt4GH header packet stream parser.
3"""
5from ..common.header_packet import HeaderPacket
6from ...key import KeyCollection
7import io
8from ..util import (
9 read_crypt4gh_stream_le_uint32,
10 read_crypt4gh_bytes_le_uint32,
11)
12from nacl.bindings import crypto_aead_chacha20poly1305_ietf_decrypt
13from nacl.exceptions import CryptoError
14from ...exceptions import Crypt4GHHeaderPacketException
17class StreamHeaderPacket(HeaderPacket):
18 """Loads the header packet from stream."""
20 def __init__(
21 self, reader_keys: KeyCollection, istream: io.RawIOBase
22 ) -> None:
23 """Tries parsing a single packet from given input stream and
24 stores it for future processing. If it is possible to decrypt
25 the packet with given reader key, the contents are parsed and
26 interpreted as well.
28 Parameters:
29 reader_keys: the key collection used for decryption attempts
30 istream: the container input stream
32 Raises:
33 Crypt4GHHeaderPacketException: if any problem in parsing the packet occurs.
35 """
36 _packet_length = read_crypt4gh_stream_le_uint32(
37 istream, "packet length"
38 )
39 _packet_data = _packet_length.to_bytes(4, "little") + istream.read(
40 _packet_length - 4
41 )
42 if len(_packet_data) != _packet_length:
43 raise Crypt4GHHeaderPacketException(
44 f"Header packet: read only {len(_packet_data)} "
45 f"instead of {_packet_length}"
46 )
47 encryption_method = read_crypt4gh_bytes_le_uint32(
48 _packet_data, 4, "encryption method"
49 )
50 if encryption_method != 0:
51 raise Crypt4GHHeaderPacketException(
52 f"Unsupported encryption method {encryption_method}"
53 )
54 writer_public_key = _packet_data[8:40]
55 nonce = _packet_data[40:52]
56 payload_length = _packet_length - 4 - 4 - 32 - 12 - 16
57 payload = _packet_data[52:]
58 for maybe_reader_key in reader_keys.keys:
59 symmetric_key = maybe_reader_key.compute_read_key(
60 writer_public_key
61 )
62 _content = None
63 _reader_key = None
64 try:
65 _content = crypto_aead_chacha20poly1305_ietf_decrypt(
66 payload, None, nonce, symmetric_key
67 )
68 _reader_key = maybe_reader_key.public_key
69 break
70 except CryptoError as cerr:
71 pass
72 _data_encryption_method = None
73 _packet_type = None
74 _data_encryption_key = None
75 if _content is not None:
76 _packet_type = read_crypt4gh_bytes_le_uint32(
77 _content, 0, "packet type"
78 )
79 if _packet_type == 0:
80 _data_encryption_method = read_crypt4gh_bytes_le_uint32(
81 _content, 4, "encryption method"
82 )
83 if _data_encryption_method != 0:
84 raise Crypt4GHHeaderPacketException(
85 f"Unknown data encryption method "
86 f"{_data_encryption_method}."
87 )
88 _data_encryption_key = _content[8:40]
89 elif _packet_type == 1:
90 # Edit List
91 pass
92 else:
93 # Report error? Warning?
94 pass
95 super().__init__(
96 _packet_length,
97 _packet_data,
98 _content,
99 _reader_key,
100 _packet_type,
101 _data_encryption_method,
102 _data_encryption_key,
103 )