Coverage for oarepo_c4gh/key/c4gh.py: 100%
91 statements
« prev ^ index » next coverage.py v7.10.2, created at 2025-08-07 12:05 +0000
« prev ^ index » next coverage.py v7.10.2, created at 2025-08-07 12:05 +0000
1"""Class for loading the Crypt4GH reference key format.
3"""
5from .software import SoftwareKey
6from io import RawIOBase, BytesIO
7from typing import Self
8from base64 import b64decode
9from cryptography.hazmat.primitives.ciphers.aead import ChaCha20Poly1305
10from ..exceptions import Crypt4GHKeyException
12# 7 bytes magic word that is at the very beginning of any private key
13C4GH_MAGIC_WORD = b"c4gh-v1"
15# Supported KDFs of Crypt4GH
16C4GH_KDFS = b"scrypt" b"bcrypt" b"pbkdf2_hmac_sha256"
19def check_c4gh_kdf(kdf_name: bytes) -> bool:
20 """Returns true if given KDF is supported.
22 Parameters:
23 kdf_name: KDF name string as bytes
25 Returns:
26 True if the KDF is supported.
27 """
28 return kdf_name in C4GH_KDFS
31def default_passphrase_callback() -> None:
32 """By default the constructor has no means of obtaining the
33 passphrase and therefore this function unconditionally raises an
34 exception when called.
36 """
37 raise Crypt4GHKeyException("No password callback provided!")
40def decode_b64_envelope(istream: RawIOBase) -> (bytes, bytes):
41 """Reads PEM-like format and returns its label and decoded bytes.
43 Parameters:
44 istream: input stream with the data.
46 Returns:
47 Label of the envelope and decoded content bytes.
49 """
50 lines = list(
51 filter(
52 lambda line: line,
53 map(lambda raw_line: raw_line.strip(), istream.readlines()),
54 )
55 )
56 assert (
57 len(lines) >= 3
58 ), "At least 3 lines are needed - 2 for envelope and 1 with data."
59 assert lines[0].startswith(
60 b"-----BEGIN "
61 ), f"Must start with BEGIN line {lines[0]}."
62 assert lines[-1].startswith(
63 b"-----END "
64 ), f"Must end with END line {lines[-1]}."
65 data = b64decode(b"".join(lines[1:-1]))
66 begin_label = lines[0][11:-1].strip(b"-")
67 end_label = lines[-1][9:-1].strip(b"-")
68 assert (
69 begin_label == end_label
70 ), f"BEGIN {begin_label} not END {end_label}!"
71 return begin_label, data
74def decode_c4gh_bytes(istream: RawIOBase) -> bytes:
75 """Decodes binary string encoded as two-byte big-endian integer
76 length and the actual data that follows this length field.
78 Parameters:
79 istream: input stream from which to decode the bytes string.
81 Returns:
82 The decoded bytes string.
84 Raises:
85 Crypt4GHKeyException: if there is not enough data in the stream
87 """
88 lengthb = istream.read(2)
89 lengthb_length = len(lengthb)
90 if len(lengthb) != 2:
91 raise Crypt4GHKeyException(
92 f"Binary string read - not enought data to read the length: "
93 f"{lengthb_length} != 2"
94 )
95 length = int.from_bytes(lengthb, byteorder="big")
96 string = istream.read(length)
97 read_length = len(string)
98 if read_length != length:
99 raise Crypt4GHKeyException(
100 f"Binary string read - not enough data: {read_length} != {length}"
101 )
102 return string
105def check_c4gh_stream_magic(istreamb: RawIOBase) -> None:
106 """Reads enough bytes from given input stream and checks whether
107 they contain the correct Crypt4GH signature. Raises error if it
108 doesn't.
110 Parameters:
111 istreamb: input stream with the raw Crypt4GH binary key stream.
113 Raises:
114 Crypt4GHKeyException: if the signature does not match.
116 """
117 magic_to_check = istreamb.read(len(C4GH_MAGIC_WORD))
118 if magic_to_check != C4GH_MAGIC_WORD:
119 raise Crypt4GHKeyException("Not a Crypt4GH private key!")
122def parse_c4gh_kdf_options(istreamb: RawIOBase) -> (bytes, int, bytes):
123 """Parses KDF name and options (if applicable) from given input
124 stream.
126 Parameters:
127 istreamb: input stream with the raw Crypt4GH binary stream.
129 Returns:
130 kdf_name: the name of the KDF as binary string
131 kdf_rounds: number of hashing rounds for KDF
132 kdf_salt: salt for initializing the hashing
134 Raises:
135 Crypt4GHKeyException: if parsed KDF name is not supported
137 """
138 kdf_name = decode_c4gh_bytes(istreamb)
139 if kdf_name == b"none":
140 return (kdf_name, None, None)
141 elif check_c4gh_kdf(kdf_name):
142 kdf_options = decode_c4gh_bytes(istreamb)
143 kdf_rounds = int.from_bytes(kdf_options[:4], byteorder="big")
144 kdf_salt = kdf_options[4:]
145 return (kdf_name, kdf_rounds, kdf_salt)
146 else:
147 raise Crypt4GHKeyException(f"Unsupported KDF {kdf_name}")
150def derive_c4gh_key(
151 algo: bytes, passphrase: bytes, salt: bytes, rounds: int
152) -> bytes:
153 """Derives the symmetric key for decrypting the private key.
155 Parameters:
156 algo: the algorithm for key derivation
157 passphrase: the passphrase from which to derive the key
158 rounds: number of hashing rounds
160 Returns:
161 The derived symmetric key.
163 Raises:
164 Crypt4GHKeyException: if given KDF algorithm is not supported (should not happen
165 as this is expected to be called after parse_c4gh_kdf_options).
166 """
167 if algo == b"scrypt":
168 from hashlib import scrypt
170 return scrypt(passphrase, salt=salt, n=1 << 14, r=8, p=1, dklen=32)
171 if algo == b"bcrypt":
172 import bcrypt
174 return bcrypt.kdf(
175 passphrase,
176 salt=salt,
177 desired_key_bytes=32,
178 rounds=rounds,
179 ignore_few_rounds=True,
180 )
181 if algo == b"pbkdf2_hmac_sha256":
182 from hashlib import pbkdf2_hmac
184 return pbkdf2_hmac("sha256", passphrase, salt, rounds, dklen=32)
185 raise Crypt4GHKeyException(f"Unsupported KDF: {algo}")
188class C4GHKey(SoftwareKey):
189 """This class implements the loader for Crypt4GH key file format."""
191 @classmethod
192 def from_file(
193 self, file_name: str, callback: callable = default_passphrase_callback
194 ) -> Self:
195 """Opens file stream and loads the Crypt4GH key from it.
197 Parameters:
198 file_name: path to the file with the key.
199 callback: must return passphrase for decryption if called.
201 Returns:
202 Initialized C4GHKey instance.
204 """
205 return C4GHKey.from_stream(open(file_name, "rb"), callback)
207 @classmethod
208 def from_string(
209 self, contents: str, callback: callable = default_passphrase_callback
210 ) -> Self:
211 """Converts string to bytes which is opened as binary stream
212 and loads the Crypt4GH key from it.
214 Parameters:
215 contents: complete contents of the file with Crypt4GH key.
216 callback: must return passphrase for decryption if called.
218 Returns:
219 Initialized C4GHKey instance.
221 """
222 return C4GHKey.from_bytes(bytes(contents, "ASCII"), callback)
224 @classmethod
225 def from_bytes(
226 self, contents: bytes, callback: callable = default_passphrase_callback
227 ) -> Self:
228 """Opens the contents bytes as binary stream and loads the
229 Crypt4GH key from it.
231 Parameters:
232 contents: complete contents of the file with Crypt4GH key.
233 callback: must return passphrase for decryption if called.
235 Returns:
236 Initialized C4GHKey instance.
238 """
239 return C4GHKey.from_stream(BytesIO(contents), callback)
241 @classmethod
242 def from_stream(
243 self,
244 istream: RawIOBase,
245 callback: callable = default_passphrase_callback,
246 ) -> Self:
247 """Parses the stream with stored key.
249 Parameters:
250 istream: input stream with the key file contents.
251 callback: must return passphrase for decryption if called
253 Returns:
254 The newly constructed key instance.
255 """
256 slabel, sdata = decode_b64_envelope(istream)
257 istream.close()
258 if slabel == b"CRYPT4GH PUBLIC KEY":
259 return C4GHKey(sdata, True)
260 else:
261 istreamb = BytesIO(sdata)
262 check_c4gh_stream_magic(istreamb)
263 kdf_name, kdf_rounds, kdf_salt = parse_c4gh_kdf_options(istreamb)
264 cipher_name = decode_c4gh_bytes(istreamb)
265 if cipher_name == b"none":
266 secret_data = decode_c4gh_bytes(istreamb)
267 return C4GHKey(secret_data, False)
268 if cipher_name != b"chacha20_poly1305":
269 raise Crypt4GHKeyException(
270 f"Unsupported cipher: {cipher_name}"
271 )
272 assert callable(
273 callback
274 ), "Invalid passphrase callback (non-callable)"
275 passphrase = callback().encode()
276 symmetric_key = derive_c4gh_key(
277 kdf_name, passphrase, kdf_salt, kdf_rounds
278 )
279 nonce_and_encrypted_data = decode_c4gh_bytes(istreamb)
280 nonce = nonce_and_encrypted_data[:12]
281 encrypted_data = nonce_and_encrypted_data[12:]
282 decrypted_data = ChaCha20Poly1305(symmetric_key).decrypt(
283 nonce, encrypted_data, None
284 )
285 return C4GHKey(decrypted_data, False)