Coverage for oarepo_c4gh / crypt4gh / stream / header_packet.py: 100%
47 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-06 16:58 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-06 16:58 +0000
1"""Implementation of single Crypt4GH header packet stream parser.
3"""
5from ..common.header_packet import HeaderPacket
6from ...key import KeyCollection
7import io
8from ..util import (
9 read_crypt4gh_stream_le_uint32,
10 read_crypt4gh_bytes_le_uint32,
11 read_crypt4gh_bytes_le_uint64,
12)
13from nacl.bindings import crypto_aead_chacha20poly1305_ietf_decrypt
14from nacl.exceptions import CryptoError
15from ...exceptions import Crypt4GHHeaderPacketException
18class StreamHeaderPacket(HeaderPacket):
19 """Loads the header packet from stream."""
21 def __init__(
22 self, reader_keys: KeyCollection, istream: io.RawIOBase
23 ) -> None:
24 """Tries parsing a single packet from given input stream and
25 stores it for future processing. If it is possible to decrypt
26 the packet with given reader key, the contents are parsed and
27 interpreted as well.
29 Parameters:
30 reader_keys: the key collection used for decryption attempts
31 istream: the container input stream
33 Raises:
34 Crypt4GHHeaderPacketException: if any problem in parsing the packet occurs.
36 """
37 _packet_length = read_crypt4gh_stream_le_uint32(
38 istream, "packet length"
39 )
40 _packet_data = _packet_length.to_bytes(4, "little") + istream.read(
41 _packet_length - 4
42 )
43 if len(_packet_data) != _packet_length:
44 raise Crypt4GHHeaderPacketException(
45 f"Header packet: read only {len(_packet_data)} "
46 f"instead of {_packet_length}"
47 )
48 encryption_method = read_crypt4gh_bytes_le_uint32(
49 _packet_data, 4, "encryption method"
50 )
51 if encryption_method != 0:
52 raise Crypt4GHHeaderPacketException(
53 f"Unsupported encryption method {encryption_method}"
54 )
55 writer_public_key = _packet_data[8:40]
56 nonce = _packet_data[40:52]
57 payload_length = _packet_length - 4 - 4 - 32 - 12 - 16
58 payload = _packet_data[52:]
59 for maybe_reader_key in reader_keys.keys:
60 symmetric_key = maybe_reader_key.compute_read_key(
61 writer_public_key
62 )
63 _content = None
64 _reader_key = None
65 try:
66 _content = crypto_aead_chacha20poly1305_ietf_decrypt(
67 payload, None, nonce, symmetric_key
68 )
69 _reader_key = maybe_reader_key.public_key
70 break
71 except CryptoError as cerr:
72 pass
73 _data_encryption_method = None
74 _packet_type = None
75 _data_encryption_key = None
76 _lengths = None
77 if _content is not None:
78 _packet_type = read_crypt4gh_bytes_le_uint32(
79 _content, 0, "packet type"
80 )
81 if _packet_type == 0:
82 _data_encryption_method = read_crypt4gh_bytes_le_uint32(
83 _content, 4, "encryption method"
84 )
85 if _data_encryption_method != 0:
86 raise Crypt4GHHeaderPacketException(
87 f"Unknown data encryption method "
88 f"{_data_encryption_method}."
89 )
90 _data_encryption_key = _content[8:40]
91 elif _packet_type == 1:
92 # Edit List
93 _number_lengths = read_crypt4gh_bytes_le_uint32(
94 _content, 4, "Number lengths"
95 )
96 _lengths = [
97 read_crypt4gh_bytes_le_uint64(
98 _content, n * 8 + 8, "Edit list length"
99 )
100 for n in range(_number_lengths)
101 ]
102 pass
103 else:
104 # Report error? Warning?
105 pass
106 super().__init__(
107 _packet_length,
108 _packet_data,
109 _content,
110 _reader_key,
111 _packet_type,
112 _data_encryption_method,
113 _data_encryption_key,
114 _lengths,
115 )