Coverage for oarepo_c4gh/crypt4gh/stream/header_packet.py: 100%

44 statements  

« prev     ^ index     » next       coverage.py v7.10.2, created at 2025-08-07 12:05 +0000

1"""Implementation of single Crypt4GH header packet stream parser. 

2 

3""" 

4 

5from ..common.header_packet import HeaderPacket 

6from ...key import KeyCollection 

7import io 

8from ..util import ( 

9 read_crypt4gh_stream_le_uint32, 

10 read_crypt4gh_bytes_le_uint32, 

11) 

12from nacl.bindings import crypto_aead_chacha20poly1305_ietf_decrypt 

13from nacl.exceptions import CryptoError 

14from ...exceptions import Crypt4GHHeaderPacketException 

15 

16 

17class StreamHeaderPacket(HeaderPacket): 

18 """Loads the header packet from stream.""" 

19 

20 def __init__( 

21 self, reader_keys: KeyCollection, istream: io.RawIOBase 

22 ) -> None: 

23 """Tries parsing a single packet from given input stream and 

24 stores it for future processing. If it is possible to decrypt 

25 the packet with given reader key, the contents are parsed and 

26 interpreted as well. 

27 

28 Parameters: 

29 reader_keys: the key collection used for decryption attempts 

30 istream: the container input stream 

31 

32 Raises: 

33 Crypt4GHHeaderPacketException: if any problem in parsing the packet occurs. 

34 

35 """ 

36 _packet_length = read_crypt4gh_stream_le_uint32( 

37 istream, "packet length" 

38 ) 

39 _packet_data = _packet_length.to_bytes(4, "little") + istream.read( 

40 _packet_length - 4 

41 ) 

42 if len(_packet_data) != _packet_length: 

43 raise Crypt4GHHeaderPacketException( 

44 f"Header packet: read only {len(_packet_data)} " 

45 f"instead of {_packet_length}" 

46 ) 

47 encryption_method = read_crypt4gh_bytes_le_uint32( 

48 _packet_data, 4, "encryption method" 

49 ) 

50 if encryption_method != 0: 

51 raise Crypt4GHHeaderPacketException( 

52 f"Unsupported encryption method {encryption_method}" 

53 ) 

54 writer_public_key = _packet_data[8:40] 

55 nonce = _packet_data[40:52] 

56 payload_length = _packet_length - 4 - 4 - 32 - 12 - 16 

57 payload = _packet_data[52:] 

58 for maybe_reader_key in reader_keys.keys: 

59 symmetric_key = maybe_reader_key.compute_read_key( 

60 writer_public_key 

61 ) 

62 _content = None 

63 _reader_key = None 

64 try: 

65 _content = crypto_aead_chacha20poly1305_ietf_decrypt( 

66 payload, None, nonce, symmetric_key 

67 ) 

68 _reader_key = maybe_reader_key.public_key 

69 break 

70 except CryptoError as cerr: 

71 pass 

72 _data_encryption_method = None 

73 _packet_type = None 

74 _data_encryption_key = None 

75 if _content is not None: 

76 _packet_type = read_crypt4gh_bytes_le_uint32( 

77 _content, 0, "packet type" 

78 ) 

79 if _packet_type == 0: 

80 _data_encryption_method = read_crypt4gh_bytes_le_uint32( 

81 _content, 4, "encryption method" 

82 ) 

83 if _data_encryption_method != 0: 

84 raise Crypt4GHHeaderPacketException( 

85 f"Unknown data encryption method " 

86 f"{_data_encryption_method}." 

87 ) 

88 _data_encryption_key = _content[8:40] 

89 elif _packet_type == 1: 

90 # Edit List 

91 pass 

92 else: 

93 # Report error? Warning? 

94 pass 

95 super().__init__( 

96 _packet_length, 

97 _packet_data, 

98 _content, 

99 _reader_key, 

100 _packet_type, 

101 _data_encryption_method, 

102 _data_encryption_key, 

103 )