Coverage for oarepo_c4gh / crypt4gh / stream / header_packet.py: 100%

47 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-06 16:58 +0000

1"""Implementation of single Crypt4GH header packet stream parser. 

2 

3""" 

4 

5from ..common.header_packet import HeaderPacket 

6from ...key import KeyCollection 

7import io 

8from ..util import ( 

9 read_crypt4gh_stream_le_uint32, 

10 read_crypt4gh_bytes_le_uint32, 

11 read_crypt4gh_bytes_le_uint64, 

12) 

13from nacl.bindings import crypto_aead_chacha20poly1305_ietf_decrypt 

14from nacl.exceptions import CryptoError 

15from ...exceptions import Crypt4GHHeaderPacketException 

16 

17 

18class StreamHeaderPacket(HeaderPacket): 

19 """Loads the header packet from stream.""" 

20 

21 def __init__( 

22 self, reader_keys: KeyCollection, istream: io.RawIOBase 

23 ) -> None: 

24 """Tries parsing a single packet from given input stream and 

25 stores it for future processing. If it is possible to decrypt 

26 the packet with given reader key, the contents are parsed and 

27 interpreted as well. 

28 

29 Parameters: 

30 reader_keys: the key collection used for decryption attempts 

31 istream: the container input stream 

32 

33 Raises: 

34 Crypt4GHHeaderPacketException: if any problem in parsing the packet occurs. 

35 

36 """ 

37 _packet_length = read_crypt4gh_stream_le_uint32( 

38 istream, "packet length" 

39 ) 

40 _packet_data = _packet_length.to_bytes(4, "little") + istream.read( 

41 _packet_length - 4 

42 ) 

43 if len(_packet_data) != _packet_length: 

44 raise Crypt4GHHeaderPacketException( 

45 f"Header packet: read only {len(_packet_data)} " 

46 f"instead of {_packet_length}" 

47 ) 

48 encryption_method = read_crypt4gh_bytes_le_uint32( 

49 _packet_data, 4, "encryption method" 

50 ) 

51 if encryption_method != 0: 

52 raise Crypt4GHHeaderPacketException( 

53 f"Unsupported encryption method {encryption_method}" 

54 ) 

55 writer_public_key = _packet_data[8:40] 

56 nonce = _packet_data[40:52] 

57 payload_length = _packet_length - 4 - 4 - 32 - 12 - 16 

58 payload = _packet_data[52:] 

59 for maybe_reader_key in reader_keys.keys: 

60 symmetric_key = maybe_reader_key.compute_read_key( 

61 writer_public_key 

62 ) 

63 _content = None 

64 _reader_key = None 

65 try: 

66 _content = crypto_aead_chacha20poly1305_ietf_decrypt( 

67 payload, None, nonce, symmetric_key 

68 ) 

69 _reader_key = maybe_reader_key.public_key 

70 break 

71 except CryptoError as cerr: 

72 pass 

73 _data_encryption_method = None 

74 _packet_type = None 

75 _data_encryption_key = None 

76 _lengths = None 

77 if _content is not None: 

78 _packet_type = read_crypt4gh_bytes_le_uint32( 

79 _content, 0, "packet type" 

80 ) 

81 if _packet_type == 0: 

82 _data_encryption_method = read_crypt4gh_bytes_le_uint32( 

83 _content, 4, "encryption method" 

84 ) 

85 if _data_encryption_method != 0: 

86 raise Crypt4GHHeaderPacketException( 

87 f"Unknown data encryption method " 

88 f"{_data_encryption_method}." 

89 ) 

90 _data_encryption_key = _content[8:40] 

91 elif _packet_type == 1: 

92 # Edit List 

93 _number_lengths = read_crypt4gh_bytes_le_uint32( 

94 _content, 4, "Number lengths" 

95 ) 

96 _lengths = [ 

97 read_crypt4gh_bytes_le_uint64( 

98 _content, n * 8 + 8, "Edit list length" 

99 ) 

100 for n in range(_number_lengths) 

101 ] 

102 pass 

103 else: 

104 # Report error? Warning? 

105 pass 

106 super().__init__( 

107 _packet_length, 

108 _packet_data, 

109 _content, 

110 _reader_key, 

111 _packet_type, 

112 _data_encryption_method, 

113 _data_encryption_key, 

114 _lengths, 

115 )