Coverage for oarepo_c4gh/key/c4gh.py: 100%

91 statements  

« prev     ^ index     » next       coverage.py v7.10.2, created at 2025-08-07 12:05 +0000

1"""Class for loading the Crypt4GH reference key format. 

2 

3""" 

4 

5from .software import SoftwareKey 

6from io import RawIOBase, BytesIO 

7from typing import Self 

8from base64 import b64decode 

9from cryptography.hazmat.primitives.ciphers.aead import ChaCha20Poly1305 

10from ..exceptions import Crypt4GHKeyException 

11 

12# 7 bytes magic word that is at the very beginning of any private key 

13C4GH_MAGIC_WORD = b"c4gh-v1" 

14 

15# Supported KDFs of Crypt4GH 

16C4GH_KDFS = b"scrypt" b"bcrypt" b"pbkdf2_hmac_sha256" 

17 

18 

19def check_c4gh_kdf(kdf_name: bytes) -> bool: 

20 """Returns true if given KDF is supported. 

21 

22 Parameters: 

23 kdf_name: KDF name string as bytes 

24 

25 Returns: 

26 True if the KDF is supported. 

27 """ 

28 return kdf_name in C4GH_KDFS 

29 

30 

31def default_passphrase_callback() -> None: 

32 """By default the constructor has no means of obtaining the 

33 passphrase and therefore this function unconditionally raises an 

34 exception when called. 

35 

36 """ 

37 raise Crypt4GHKeyException("No password callback provided!") 

38 

39 

40def decode_b64_envelope(istream: RawIOBase) -> (bytes, bytes): 

41 """Reads PEM-like format and returns its label and decoded bytes. 

42 

43 Parameters: 

44 istream: input stream with the data. 

45 

46 Returns: 

47 Label of the envelope and decoded content bytes. 

48 

49 """ 

50 lines = list( 

51 filter( 

52 lambda line: line, 

53 map(lambda raw_line: raw_line.strip(), istream.readlines()), 

54 ) 

55 ) 

56 assert ( 

57 len(lines) >= 3 

58 ), "At least 3 lines are needed - 2 for envelope and 1 with data." 

59 assert lines[0].startswith( 

60 b"-----BEGIN " 

61 ), f"Must start with BEGIN line {lines[0]}." 

62 assert lines[-1].startswith( 

63 b"-----END " 

64 ), f"Must end with END line {lines[-1]}." 

65 data = b64decode(b"".join(lines[1:-1])) 

66 begin_label = lines[0][11:-1].strip(b"-") 

67 end_label = lines[-1][9:-1].strip(b"-") 

68 assert ( 

69 begin_label == end_label 

70 ), f"BEGIN {begin_label} not END {end_label}!" 

71 return begin_label, data 

72 

73 

74def decode_c4gh_bytes(istream: RawIOBase) -> bytes: 

75 """Decodes binary string encoded as two-byte big-endian integer 

76 length and the actual data that follows this length field. 

77 

78 Parameters: 

79 istream: input stream from which to decode the bytes string. 

80 

81 Returns: 

82 The decoded bytes string. 

83 

84 Raises: 

85 Crypt4GHKeyException: if there is not enough data in the stream 

86 

87 """ 

88 lengthb = istream.read(2) 

89 lengthb_length = len(lengthb) 

90 if len(lengthb) != 2: 

91 raise Crypt4GHKeyException( 

92 f"Binary string read - not enought data to read the length: " 

93 f"{lengthb_length} != 2" 

94 ) 

95 length = int.from_bytes(lengthb, byteorder="big") 

96 string = istream.read(length) 

97 read_length = len(string) 

98 if read_length != length: 

99 raise Crypt4GHKeyException( 

100 f"Binary string read - not enough data: {read_length} != {length}" 

101 ) 

102 return string 

103 

104 

105def check_c4gh_stream_magic(istreamb: RawIOBase) -> None: 

106 """Reads enough bytes from given input stream and checks whether 

107 they contain the correct Crypt4GH signature. Raises error if it 

108 doesn't. 

109 

110 Parameters: 

111 istreamb: input stream with the raw Crypt4GH binary key stream. 

112 

113 Raises: 

114 Crypt4GHKeyException: if the signature does not match. 

115 

116 """ 

117 magic_to_check = istreamb.read(len(C4GH_MAGIC_WORD)) 

118 if magic_to_check != C4GH_MAGIC_WORD: 

119 raise Crypt4GHKeyException("Not a Crypt4GH private key!") 

120 

121 

122def parse_c4gh_kdf_options(istreamb: RawIOBase) -> (bytes, int, bytes): 

123 """Parses KDF name and options (if applicable) from given input 

124 stream. 

125 

126 Parameters: 

127 istreamb: input stream with the raw Crypt4GH binary stream. 

128 

129 Returns: 

130 kdf_name: the name of the KDF as binary string 

131 kdf_rounds: number of hashing rounds for KDF 

132 kdf_salt: salt for initializing the hashing 

133 

134 Raises: 

135 Crypt4GHKeyException: if parsed KDF name is not supported 

136 

137 """ 

138 kdf_name = decode_c4gh_bytes(istreamb) 

139 if kdf_name == b"none": 

140 return (kdf_name, None, None) 

141 elif check_c4gh_kdf(kdf_name): 

142 kdf_options = decode_c4gh_bytes(istreamb) 

143 kdf_rounds = int.from_bytes(kdf_options[:4], byteorder="big") 

144 kdf_salt = kdf_options[4:] 

145 return (kdf_name, kdf_rounds, kdf_salt) 

146 else: 

147 raise Crypt4GHKeyException(f"Unsupported KDF {kdf_name}") 

148 

149 

150def derive_c4gh_key( 

151 algo: bytes, passphrase: bytes, salt: bytes, rounds: int 

152) -> bytes: 

153 """Derives the symmetric key for decrypting the private key. 

154 

155 Parameters: 

156 algo: the algorithm for key derivation 

157 passphrase: the passphrase from which to derive the key 

158 rounds: number of hashing rounds 

159 

160 Returns: 

161 The derived symmetric key. 

162 

163 Raises: 

164 Crypt4GHKeyException: if given KDF algorithm is not supported (should not happen 

165 as this is expected to be called after parse_c4gh_kdf_options). 

166 """ 

167 if algo == b"scrypt": 

168 from hashlib import scrypt 

169 

170 return scrypt(passphrase, salt=salt, n=1 << 14, r=8, p=1, dklen=32) 

171 if algo == b"bcrypt": 

172 import bcrypt 

173 

174 return bcrypt.kdf( 

175 passphrase, 

176 salt=salt, 

177 desired_key_bytes=32, 

178 rounds=rounds, 

179 ignore_few_rounds=True, 

180 ) 

181 if algo == b"pbkdf2_hmac_sha256": 

182 from hashlib import pbkdf2_hmac 

183 

184 return pbkdf2_hmac("sha256", passphrase, salt, rounds, dklen=32) 

185 raise Crypt4GHKeyException(f"Unsupported KDF: {algo}") 

186 

187 

188class C4GHKey(SoftwareKey): 

189 """This class implements the loader for Crypt4GH key file format.""" 

190 

191 @classmethod 

192 def from_file( 

193 self, file_name: str, callback: callable = default_passphrase_callback 

194 ) -> Self: 

195 """Opens file stream and loads the Crypt4GH key from it. 

196 

197 Parameters: 

198 file_name: path to the file with the key. 

199 callback: must return passphrase for decryption if called. 

200 

201 Returns: 

202 Initialized C4GHKey instance. 

203 

204 """ 

205 return C4GHKey.from_stream(open(file_name, "rb"), callback) 

206 

207 @classmethod 

208 def from_string( 

209 self, contents: str, callback: callable = default_passphrase_callback 

210 ) -> Self: 

211 """Converts string to bytes which is opened as binary stream 

212 and loads the Crypt4GH key from it. 

213 

214 Parameters: 

215 contents: complete contents of the file with Crypt4GH key. 

216 callback: must return passphrase for decryption if called. 

217 

218 Returns: 

219 Initialized C4GHKey instance. 

220 

221 """ 

222 return C4GHKey.from_bytes(bytes(contents, "ASCII"), callback) 

223 

224 @classmethod 

225 def from_bytes( 

226 self, contents: bytes, callback: callable = default_passphrase_callback 

227 ) -> Self: 

228 """Opens the contents bytes as binary stream and loads the 

229 Crypt4GH key from it. 

230 

231 Parameters: 

232 contents: complete contents of the file with Crypt4GH key. 

233 callback: must return passphrase for decryption if called. 

234 

235 Returns: 

236 Initialized C4GHKey instance. 

237 

238 """ 

239 return C4GHKey.from_stream(BytesIO(contents), callback) 

240 

241 @classmethod 

242 def from_stream( 

243 self, 

244 istream: RawIOBase, 

245 callback: callable = default_passphrase_callback, 

246 ) -> Self: 

247 """Parses the stream with stored key. 

248 

249 Parameters: 

250 istream: input stream with the key file contents. 

251 callback: must return passphrase for decryption if called 

252 

253 Returns: 

254 The newly constructed key instance. 

255 """ 

256 slabel, sdata = decode_b64_envelope(istream) 

257 istream.close() 

258 if slabel == b"CRYPT4GH PUBLIC KEY": 

259 return C4GHKey(sdata, True) 

260 else: 

261 istreamb = BytesIO(sdata) 

262 check_c4gh_stream_magic(istreamb) 

263 kdf_name, kdf_rounds, kdf_salt = parse_c4gh_kdf_options(istreamb) 

264 cipher_name = decode_c4gh_bytes(istreamb) 

265 if cipher_name == b"none": 

266 secret_data = decode_c4gh_bytes(istreamb) 

267 return C4GHKey(secret_data, False) 

268 if cipher_name != b"chacha20_poly1305": 

269 raise Crypt4GHKeyException( 

270 f"Unsupported cipher: {cipher_name}" 

271 ) 

272 assert callable( 

273 callback 

274 ), "Invalid passphrase callback (non-callable)" 

275 passphrase = callback().encode() 

276 symmetric_key = derive_c4gh_key( 

277 kdf_name, passphrase, kdf_salt, kdf_rounds 

278 ) 

279 nonce_and_encrypted_data = decode_c4gh_bytes(istreamb) 

280 nonce = nonce_and_encrypted_data[:12] 

281 encrypted_data = nonce_and_encrypted_data[12:] 

282 decrypted_data = ChaCha20Poly1305(symmetric_key).decrypt( 

283 nonce, encrypted_data, None 

284 ) 

285 return C4GHKey(decrypted_data, False)