Coverage for oarepo_c4gh/crypt4gh/stream/header.py: 100%

61 statements  

« prev     ^ index     » next       coverage.py v7.10.2, created at 2025-08-07 12:05 +0000

1"""This module implements the class responsible for loading Crypt4GH 

2from given input stream. 

3 

4""" 

5 

6from .header_packet import StreamHeaderPacket 

7from ...key import Key, KeyCollection 

8import io 

9from ..util import read_crypt4gh_stream_le_uint32 

10from ...exceptions import Crypt4GHHeaderException 

11from ..dek_collection import DEKCollection 

12from ..dek import DEK 

13from ..analyzer import Analyzer 

14from typing import Union 

15from ..common.header import Header 

16 

17 

18CRYPT4GH_MAGIC = b"crypt4gh" 

19 

20 

21def check_crypt4gh_magic(magic_bytes: bytes) -> None: 

22 """Checks given bytes whether they match the required Crypt4GH 

23 magic bytes. 

24 

25 Parameters: 

26 magic_bytes: the bytes to check 

27 

28 Raises: 

29 Crypt4GHHeaderException: if not enough or incorrect bytes 

30 

31 """ 

32 magic_bytes_len = len(CRYPT4GH_MAGIC) 

33 if len(magic_bytes) != magic_bytes_len: 

34 raise Crypt4GHHeaderException( 

35 f"Cannot read enough magic bytes {magic_bytes_len}" 

36 ) 

37 if magic_bytes != CRYPT4GH_MAGIC: 

38 raise Crypt4GHHeaderException( 

39 f"Incorrect Crypt4GH magic: {magic_bytes}" 

40 ) 

41 

42 

43class StreamHeader(Header): 

44 """The constructor of this class loads the Crypt4GH header from 

45 given stream. 

46 

47 """ 

48 

49 def __init__( 

50 self, 

51 reader_key_or_collection: Union[Key, KeyCollection], 

52 istream: io.RawIOBase, 

53 analyzer: Analyzer = None, 

54 ) -> None: 

55 """Checks the Crypt4GH container signature, version and header 

56 packet count. The header packets are loaded lazily when needed. 

57 

58 Parameters: 

59 reader_key_or_collection: the key used for trying to decrypt header 

60 packets (must include the private part) or collection of keys 

61 istream: the container input stream 

62 analyzer: analyzer for storing packet readability information 

63 

64 """ 

65 self._magic_bytes = istream.read(8) 

66 check_crypt4gh_magic(self._magic_bytes) 

67 self._version = read_crypt4gh_stream_le_uint32(istream, "version") 

68 if self._version != 1: 

69 raise Crypt4GHHeaderException( 

70 f"Invalid Crypt4GH version {self._version}" 

71 ) 

72 self._packet_count = read_crypt4gh_stream_le_uint32( 

73 istream, "packet count" 

74 ) 

75 if isinstance(reader_key_or_collection, KeyCollection): 

76 self._reader_keys = reader_key_or_collection 

77 else: 

78 self._reader_keys = KeyCollection(reader_key_or_collection) 

79 self._istream = istream 

80 self._packets = None 

81 self._deks = DEKCollection() 

82 self._analyzer = analyzer 

83 

84 def load_packets(self) -> None: 

85 """Loads the packets from the input stream and discards the 

86 key. It populates the internal Data Encryption Key collection 

87 for later use during this process. 

88 

89 Raises: 

90 Crypt4GHHeaderException: if the reader key cannot perform symmetric key 

91 derivation 

92 

93 """ 

94 self._packets = [] 

95 for idx in range(self._packet_count): 

96 packet = StreamHeaderPacket(self._reader_keys, self._istream) 

97 if packet.is_data_encryption_parameters: 

98 self._deks.add_dek( 

99 DEK(packet.data_encryption_key, packet.reader_key) 

100 ) 

101 self._packets.append(packet) 

102 if self._analyzer is not None: 

103 self._analyzer.analyze_packet(packet) 

104 self._reader_keys = None 

105 

106 @property 

107 def packets(self) -> list: 

108 """The accessor to the direct list of header packets. 

109 

110 Returns: 

111 List of header packets. 

112 

113 Raises: 

114 Crypt4GHHeaderException: if the reader key cannot perform symmetric key 

115 derivation 

116 

117 """ 

118 if self._packets is None: 

119 self.load_packets() 

120 return self._packets 

121 

122 @property 

123 def deks(self) -> DEKCollection: 

124 """Returns the collection of Data Encryption Keys obtained by 

125 processing all header packets. Ensures the header packets were 

126 actually processed before returning the reference. 

127 

128 Returns: 

129 The DEK Collection. 

130 

131 Raises: 

132 Crypt4GHHeaderException: if packets needed to be loaded and 

133 something went wrong 

134 

135 """ 

136 if self._packets is None: 

137 self.load_packets() 

138 return self._deks 

139 

140 @property 

141 def magic_bytes(self) -> bytes: 

142 """Returns the original magic bytes from the beginning of the 

143 container. 

144 

145 """ 

146 return self._magic_bytes 

147 

148 @property 

149 def version(self) -> int: 

150 """Returns the version of this container format (must always 

151 return 1). 

152 

153 """ 

154 return self._version 

155 

156 @property 

157 def reader_keys_used(self) -> list[bytes]: 

158 """Returns all reader public keys successfully used in any 

159 packets decryption. 

160 

161 """ 

162 return list( 

163 set( 

164 packet.reader_key 

165 for packet in self.packets 

166 if packet.reader_key is not None 

167 ) 

168 )