Coverage for oarepo_c4gh/key/gpg_agent.py: 100%
188 statements
« prev ^ index » next coverage.py v7.10.2, created at 2025-08-07 12:05 +0000
« prev ^ index » next coverage.py v7.10.2, created at 2025-08-07 12:05 +0000
1"""This module provides "HSM" implementation of private key usable
2with Crypt4GH. It uses off-the-shelf YubiKey with its OpenPGP Card
3application through `gpg-agent`'s protocol.
5This is not a "real" HSM and it is provided only for testing purposes
6in a non-production environment without actual HSM.
8There are many assumptions:
10- compatible YubiKey must be present in the system
11- gpg-agent must be configured and running
12- there must not be any other key configured in gpg
13- no other application should be accessing gpg-agent
14- works only with gpg 2.4.x
16It is possible to use a gpg-agent socket from a different machine. See
17the `gpg-agent-forward.sh` script.
19"""
21from .external import ExternalKey
22from ..exceptions import Crypt4GHKeyException
23import os
24from typing import IO, List
25import socket
26import time
27from hashlib import sha1
28from base64 import b32encode
29import string
32class GPGAgentKey(ExternalKey):
33 """An instance of this class uses `gpg-agent` to finalize the ECDH
34 computation. The actual key derivation is then performed by
35 ExternalKey's methods.
37 """
39 def __init__(
40 self,
41 socket_path: str = None,
42 home_dir: str = None,
43 keygrip: string = None,
44 ) -> None:
45 """Initializes the instance by storing the path to
46 `gpg-agent`'s socket. It verifies the socket's existence but
47 performs no connection yet.
49 Parameters:
50 socket_path: path to `gpg-agent`'s socket - usually `/run/user/$UID/gnupg/S.gpg-agent`
51 home_dir: path to gpg homedir, used for computing socked path
52 keygrip: hexadecimal representation of the keygrip
54 """
55 self._socket_path = socket_path
56 if self._socket_path is None:
57 socket_dir = compute_socket_dir(home_dir)
58 self._socket_path = f"{socket_dir}/S.gpg-agent"
59 if not os.path.exists(self._socket_path):
60 raise Crypt4GHKeyException(
61 "Cannot initialize GPGAgentKey with non-existent gpg-agent path."
62 )
63 self._req_keygrip = keygrip
64 self._public_key = None
65 self._keygrip = None
67 def compute_ecdh(self, public_point: bytes) -> bytes:
68 """Computes the result of finishing the ECDH key exchange.
70 Parameters:
71 public_point: the other party public point (compressed coordinates, 32 bytes)
73 Returns:
74 The resulting shared secret point (compressed coordinates, 32 bytes).
75 """
76 self.ensure_public_key()
77 client = self.connect_agent()
78 expect_assuan_OK(client)
80 # SETKEY keygrip
81 skm = b"SETKEY " + self._keygrip + b"\n"
82 client.send(skm)
83 expect_assuan_OK(client)
85 # PKDECRYPT
86 client.send(b"PKDECRYPT\n")
87 pdm = client.recv(4096)
88 # not used, might contain S configuration messages or INQUIRE for CIPHERTEXT
90 # D send static encoded data
91 evm = (
92 b"D (7:enc-val(4:ecdh(1:e33:@"
93 + encode_assuan_buffer(public_point)
94 + b")))\n"
95 )
96 client.send(evm)
98 # END
99 client.send(b"END\n")
101 # retrieve result - drop all messages without data
102 msg = b""
103 result = None
104 while True:
105 if msg == b"":
106 msg = client.recv(4096)
107 line0, rest = line_from_dgram(msg)
108 line = decode_assuan_buffer(line0)
109 msg = rest
110 if line[:4] == b"ERR ":
111 client.close()
112 raise Crypt4GHKeyException(
113 "Assuan error: " + line.decode("ascii")
114 )
115 if line[:2] == b"D ":
116 data = line[2:]
117 struct = parse_binary_sexp(line[2:])
118 result = struct[1][1:]
119 break
121 # Done
122 client.close()
123 return result
125 def ensure_public_key(self):
126 """Loads the public key and stores its keygrip from the
127 OpenPGP Card. This method is a no-op if the key was loaded
128 before.
130 """
131 if self._public_key is None:
132 client = self.connect_agent()
134 # Must be "OK Message ..."
135 expect_assuan_OK(client)
137 # Now send request for all keys
138 client.send(b"HAVEKEY --list=1000\n")
139 # Must be only one
140 havekey_dgram = client.recv(4096)
141 havekey_data, havekey_rest1 = line_from_dgram(havekey_dgram)
142 havekey_msg, havekey_rest2 = line_from_dgram(havekey_rest1)
143 keygrips_data = decode_assuan_buffer(havekey_data[2:])
144 num_keygrips = len(keygrips_data) // 20
145 if num_keygrips * 20 != len(keygrips_data):
146 client.close()
147 raise Crypt4GHKeyException(
148 f"invalid keygrips data length: {len(keygrips_data)}"
149 )
150 keygrips = [
151 keygrip_to_hex(keygrips_data[idx * 20 : idx * 20 + 20])
152 for idx in range(num_keygrips)
153 ]
155 # Get detailed information for all keygrips, find Curve25519 one
156 for keygrip in keygrips:
157 # Send READKEY
158 client.send(b"READKEY " + keygrip + b"\n")
160 # Read D S-Exp
161 key_dgram = client.recv(4096)
162 key_line0, key_rest = line_from_dgram(key_dgram)
163 key_line = decode_assuan_buffer(key_line0)
164 key_struct = parse_binary_sexp(key_line[2:])
165 if (
166 (key_struct is None)
167 or (len(key_struct) < 2)
168 or (key_struct[0] != b"public-key")
169 or (len(key_struct[1])) < 1
170 or (key_struct[1][0] != b"ecc")
171 ):
172 continue
173 curve_struct = next(
174 v for v in key_struct[1][1:] if v[0] == b"curve"
175 )
176 q_struct = next(v for v in key_struct[1][1:] if v[0] == b"q")
177 if (
178 (curve_struct is None)
179 or (len(curve_struct) < 2)
180 or (curve_struct[1] != b"Curve25519")
181 or (q_struct is None)
182 or (len(q_struct) < 2)
183 ):
184 continue
185 if (self._req_keygrip is not None) and (
186 self._req_keygrip != keygrip
187 ):
188 continue
189 self._public_key = q_struct[1][1:]
190 self._keygrip = keygrip
191 break
192 # Done
193 client.close()
195 # Error handling
196 if self._public_key is None:
197 raise Crypt4GHKeyException("Cannot determine public key")
199 @property
200 def public_key(self) -> bytes:
201 """Returns the underlying public key."""
202 self.ensure_public_key()
203 return self._public_key
205 def connect_agent(self) -> IO:
206 """Establishes connection to gpg-agent."""
207 try:
208 client = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
209 client.connect(self._socket_path)
210 return client
211 except:
212 raise Crypt4GHKeyException(
213 "Cannot establish connection to gpg-agent."
214 )
217def line_from_dgram(dgram: bytes) -> (bytes, bytes):
218 """Reads single line from given raw data and returns two values:
219 the line read and the remaining data.
221 Parameters:
222 dgram: raw bytes with input
223 """
224 lf_idx = dgram.find(b"\n")
225 if (lf_idx == -1) or (lf_idx == (len(dgram) - 1)):
226 return dgram, b""
227 return dgram[:lf_idx], dgram[lf_idx + 1 :]
230def decode_assuan_buffer(buf: bytes) -> bytes:
231 """Decodes assuan binary buffer with "%xx" replacements for
232 certain characters.
234 Parameters:
235 buf: the buffer received (and encoded by `_assuan_cookie_write_data` originally)
237 Returns:
238 The buffer with resolved escaped bytes.
239 """
240 result = b""
241 idx = 0
242 while idx < len(buf):
243 if buf[idx : idx + 1] == b"%":
244 hh = buf[idx + 1 : idx + 3].decode("ascii")
245 result = result + int(hh, 16).to_bytes(1)
246 idx = idx + 3
247 else:
248 result = result + buf[idx : idx + 1]
249 idx = idx + 1
250 return result
253def encode_assuan_buffer(buf: bytes) -> bytes:
254 """Encodes assuan binary buffer by replacing occurences of \r, \n
255 and % with %0D, %0A and %25 respectively.
257 Parameters:
258 buf: the buffer to encode (for sending typically)
260 Returns:
261 The encoded binary data that can be directly sent to assuan server.
263 """
264 result = b""
265 idx = 0
266 while idx < len(buf):
267 b = buf[idx : idx + 1]
268 if b == b"\n":
269 result = result + b"%0A"
270 elif b == b"\r":
271 result = result + b"%0D"
272 elif b == b"%":
273 result = result + b"%25"
274 else:
275 result = result + b
276 idx = idx + 1
277 return result
280def keygrip_to_hex(kg: bytes) -> bytes:
281 """Converts to hexadecimal representation suitable for KEYINFO and
282 READKEY commands.
284 Parameters:
285 kg: keygrip in binary form (20 bytes)
287 Returns:
288 Hexadecimal string as 40 bytes.
289 """
290 result = b""
291 for b in kg:
292 result = result + hex(0x100 + b)[3:].upper().encode("ascii")
293 return result
296def parse_binary_sexp(data: bytes) -> list:
297 """Reads libassuan binary S-Expression data into a nested lists
298 structure.
300 Parameters:
301 data: binary encoding of S-Expressions
303 Returns:
304 List of bytes and lists.
306 """
307 root = []
308 stack = [root]
309 idx = 0
310 while idx < len(data):
311 if data[idx : idx + 1] == b"(":
312 lst = []
313 stack[len(stack) - 1].append(lst)
314 stack.append(lst)
315 idx = idx + 1
316 elif data[idx : idx + 1] == b")":
317 stack = stack[: len(stack) - 1]
318 idx = idx + 1
319 else:
320 sep_idx = data.find(b":", idx)
321 if sep_idx < 0:
322 break
323 len_str = data[idx:sep_idx].decode("ascii")
324 if len(len_str) == 0:
325 break
326 token_len = int(len_str)
327 stack[len(stack) - 1].append(
328 data[sep_idx + 1 : sep_idx + 1 + token_len]
329 )
330 idx = sep_idx + token_len + 1
331 if len(root) == 0:
332 return None
333 return root[0]
336def expect_assuan_OK(client: IO) -> None:
337 """If the next message received does not start with b"OK", signals
338 an error.
340 Parameters:
341 client: active assuan socket connection
343 """
344 ok_dgram = client.recv(4096)
345 ok_msg, ok_rest = line_from_dgram(ok_dgram)
346 if ok_msg[0:2] != b"OK":
347 client.close()
348 raise Crypt4GHKeyException("Expected Assuan OK message")
349 if len(ok_rest) > 0:
350 client.close()
351 raise Crypt4GHKeyException("Line noise after Assuan OK")
354gen_b32 = "ABCDEFGHIJKLMNOPQRSTUVWXYZ234567"
355gpg_b32 = "ybndrfg8ejkmcpqxot1uwisza345h769"
357gpg_trans = str.maketrans(gen_b32, gpg_b32)
360def compute_socket_dir_hash(path: str) -> str:
361 """Computes partial message digest of given path to be used as
362 shortened path component in the base socket directory path. The
363 implemenation is compatible with gnupg's homedir.c and zb32.c as
364 well as with libgcrypt's SHA1 message digest.
366 Parameters:
367 path: canonical (as understood by gnupg) path to the original directory
369 """
370 bpath = path.encode()
371 md = sha1(path.encode()).digest()
372 md15 = md[:15]
373 b32 = b32encode(md15)
374 s32 = b32.decode("ascii")
375 z32 = s32.translate(gpg_trans)
376 return z32
379def compute_run_gnupg_base(
380 bases: List[str] = ["/run/gnupg", "/run", "/var/run/gnupg", "/var/run"]
381) -> str:
382 """Computes possible gnupg's run directories and verifies their
383 existence.
385 Returns:
386 The actual gnupg's run directory of current user.
388 """
389 uid = os.getuid()
390 ubases = [f"{base}/user/{uid}" for base in bases]
391 for ubase in ubases:
392 if os.path.isdir(ubase):
393 return f"{ubase}/gnupg"
394 raise ArgumentError("Cannot find GnuPG run base directory")
397def compute_socket_dir(homedir: str = None) -> str:
398 """Computes the actual socket dir used by gpg-agent based on given
399 homedir (the private key storage directory).
401 If given directory is None, returns the root run directory for
402 gnupg (as required by gpg-agent).
404 Parameters:
405 homedir: canonical path to the directory
407 Returns:
408 Socket base directory.
410 """
411 base = compute_run_gnupg_base()
412 if homedir is not None:
413 dhash = compute_socket_dir_hash(homedir)
414 return f"{base}/d.{dhash}"
415 return base