Coverage for oarepo_c4gh / crypt4gh / stream / header.py: 100%

76 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-06 16:58 +0000

1"""This module implements the class responsible for loading Crypt4GH 

2from given input stream. 

3 

4""" 

5 

6from .header_packet import StreamHeaderPacket 

7from ...key import Key, KeyCollection 

8import io 

9from ..util import read_crypt4gh_stream_le_uint32 

10from ...exceptions import Crypt4GHHeaderException 

11from ..dek_collection import DEKCollection 

12from ..dek import DEK 

13from ..analyzer import Analyzer 

14from typing import Union 

15from ..common.header import Header 

16 

17 

18CRYPT4GH_MAGIC = b"crypt4gh" 

19 

20 

21def check_crypt4gh_magic(magic_bytes: bytes) -> None: 

22 """Checks given bytes whether they match the required Crypt4GH 

23 magic bytes. 

24 

25 Parameters: 

26 magic_bytes: the bytes to check 

27 

28 Raises: 

29 Crypt4GHHeaderException: if not enough or incorrect bytes 

30 

31 """ 

32 magic_bytes_len = len(CRYPT4GH_MAGIC) 

33 if len(magic_bytes) != magic_bytes_len: 

34 raise Crypt4GHHeaderException( 

35 f"Cannot read enough magic bytes {magic_bytes_len}" 

36 ) 

37 if magic_bytes != CRYPT4GH_MAGIC: 

38 raise Crypt4GHHeaderException( 

39 f"Incorrect Crypt4GH magic: {magic_bytes}" 

40 ) 

41 

42 

43class StreamHeader(Header): 

44 """The constructor of this class loads the Crypt4GH header from 

45 given stream. 

46 

47 """ 

48 

49 def __init__( 

50 self, 

51 reader_key_or_collection: Union[Key, KeyCollection], 

52 istream: io.RawIOBase, 

53 analyzer: Analyzer = None, 

54 ) -> None: 

55 """Checks the Crypt4GH container signature, version and header 

56 packet count. The header packets are loaded lazily when needed. 

57 

58 Parameters: 

59 reader_key_or_collection: the key used for trying to decrypt header 

60 packets (must include the private part) or collection of keys 

61 istream: the container input stream 

62 analyzer: analyzer for storing packet readability information 

63 

64 """ 

65 self._magic_bytes = istream.read(8) 

66 check_crypt4gh_magic(self._magic_bytes) 

67 self._version = read_crypt4gh_stream_le_uint32(istream, "version") 

68 if self._version != 1: 

69 raise Crypt4GHHeaderException( 

70 f"Invalid Crypt4GH version {self._version}" 

71 ) 

72 self._packet_count = read_crypt4gh_stream_le_uint32( 

73 istream, "packet count" 

74 ) 

75 if isinstance(reader_key_or_collection, KeyCollection): 

76 self._reader_keys = reader_key_or_collection 

77 else: 

78 self._reader_keys = KeyCollection(reader_key_or_collection) 

79 self._istream = istream 

80 self._packets = None 

81 self._deks = DEKCollection() 

82 self._analyzer = analyzer 

83 self._edit_list = [] 

84 

85 def load_packets(self) -> None: 

86 """Loads the packets from the input stream and discards the 

87 key. It populates the internal Data Encryption Key collection 

88 for later use during this process. 

89 

90 Performs edit list validation as follows: 

91 

92 If there are no edit lists, no checking is performed and an 

93 empty edit list is assumed. 

94 

95 If there is only one readable edit list, no checking is 

96 performed and this edit list is used. 

97 

98 If there are two edit lists readable by the same reader key, 

99 an assertion violation is signalled. 

100 

101 If there is more than one edit list readable by distinct 

102 reader keys and these edit lists are the same, no problem is 

103 reported and the edit list is used. 

104 

105 If there is more than one edit list readable by distinct 

106 reader keys but these are not identical, an assertion 

107 violation is signalled. 

108 

109 Raises: 

110 Crypt4GHHeaderException: if the reader key cannot perform symmetric key 

111 derivation 

112 AssertionError: if any problem with edit lists is found 

113 

114 """ 

115 self._packets = [] 

116 _edit_list_reader_keys = [] 

117 _edit_list = None 

118 for idx in range(self._packet_count): 

119 packet = StreamHeaderPacket(self._reader_keys, self._istream) 

120 if packet.is_data_encryption_parameters: 

121 self._deks.add_dek( 

122 DEK(packet.data_encryption_key, packet.reader_key) 

123 ) 

124 self._packets.append(packet) 

125 if packet.is_edit_list: 

126 assert ( 

127 packet.reader_key not in _edit_list_reader_keys 

128 ), "more than one edit list with the same reader key" 

129 _edit_list_reader_keys.append(packet.reader_key) 

130 if _edit_list is None: 

131 _edit_list = packet.lengths 

132 else: 

133 assert ( 

134 packet.lengths == _edit_list 

135 ), "multiple different edit lists" 

136 if self._analyzer is not None: 

137 self._analyzer.analyze_packet(packet) 

138 self._reader_keys = None 

139 if _edit_list is not None: 

140 self._edit_list = _edit_list 

141 

142 @property 

143 def packets(self) -> list: 

144 """The accessor to the direct list of header packets. 

145 

146 Returns: 

147 List of header packets. 

148 

149 Raises: 

150 Crypt4GHHeaderException: if the reader key cannot perform symmetric key 

151 derivation 

152 

153 """ 

154 if self._packets is None: 

155 self.load_packets() 

156 return self._packets 

157 

158 @property 

159 def deks(self) -> DEKCollection: 

160 """Returns the collection of Data Encryption Keys obtained by 

161 processing all header packets. Ensures the header packets were 

162 actually processed before returning the reference. 

163 

164 Returns: 

165 The DEK Collection. 

166 

167 Raises: 

168 Crypt4GHHeaderException: if packets needed to be loaded and 

169 something went wrong 

170 

171 """ 

172 if self._packets is None: 

173 self.load_packets() 

174 return self._deks 

175 

176 @property 

177 def magic_bytes(self) -> bytes: 

178 """Returns the original magic bytes from the beginning of the 

179 container. 

180 

181 """ 

182 return self._magic_bytes 

183 

184 @property 

185 def version(self) -> int: 

186 """Returns the version of this container format (must always 

187 return 1). 

188 

189 """ 

190 return self._version 

191 

192 @property 

193 def reader_keys_used(self) -> list[bytes]: 

194 """Returns all reader public keys successfully used in any 

195 packets decryption. 

196 

197 """ 

198 return list( 

199 set( 

200 packet.reader_key 

201 for packet in self.packets 

202 if packet.reader_key is not None 

203 ) 

204 ) 

205 

206 @property 

207 def edit_list(self) -> list[int]: 

208 """Returns the skip and keep lengths list of the edit list.""" 

209 self.packets 

210 return self._edit_list