Coverage for oarepo_c4gh/key/gpg_agent.py: 100%

188 statements  

« prev     ^ index     » next       coverage.py v7.10.2, created at 2025-08-07 12:05 +0000

1"""This module provides "HSM" implementation of private key usable 

2with Crypt4GH. It uses off-the-shelf YubiKey with its OpenPGP Card 

3application through `gpg-agent`'s protocol. 

4 

5This is not a "real" HSM and it is provided only for testing purposes 

6in a non-production environment without actual HSM. 

7 

8There are many assumptions: 

9 

10- compatible YubiKey must be present in the system 

11- gpg-agent must be configured and running 

12- there must not be any other key configured in gpg 

13- no other application should be accessing gpg-agent 

14- works only with gpg 2.4.x 

15 

16It is possible to use a gpg-agent socket from a different machine. See 

17the `gpg-agent-forward.sh` script. 

18 

19""" 

20 

21from .external import ExternalKey 

22from ..exceptions import Crypt4GHKeyException 

23import os 

24from typing import IO, List 

25import socket 

26import time 

27from hashlib import sha1 

28from base64 import b32encode 

29import string 

30 

31 

32class GPGAgentKey(ExternalKey): 

33 """An instance of this class uses `gpg-agent` to finalize the ECDH 

34 computation. The actual key derivation is then performed by 

35 ExternalKey's methods. 

36 

37 """ 

38 

39 def __init__( 

40 self, 

41 socket_path: str = None, 

42 home_dir: str = None, 

43 keygrip: string = None, 

44 ) -> None: 

45 """Initializes the instance by storing the path to 

46 `gpg-agent`'s socket. It verifies the socket's existence but 

47 performs no connection yet. 

48 

49 Parameters: 

50 socket_path: path to `gpg-agent`'s socket - usually `/run/user/$UID/gnupg/S.gpg-agent` 

51 home_dir: path to gpg homedir, used for computing socked path 

52 keygrip: hexadecimal representation of the keygrip 

53 

54 """ 

55 self._socket_path = socket_path 

56 if self._socket_path is None: 

57 socket_dir = compute_socket_dir(home_dir) 

58 self._socket_path = f"{socket_dir}/S.gpg-agent" 

59 if not os.path.exists(self._socket_path): 

60 raise Crypt4GHKeyException( 

61 "Cannot initialize GPGAgentKey with non-existent gpg-agent path." 

62 ) 

63 self._req_keygrip = keygrip 

64 self._public_key = None 

65 self._keygrip = None 

66 

67 def compute_ecdh(self, public_point: bytes) -> bytes: 

68 """Computes the result of finishing the ECDH key exchange. 

69 

70 Parameters: 

71 public_point: the other party public point (compressed coordinates, 32 bytes) 

72 

73 Returns: 

74 The resulting shared secret point (compressed coordinates, 32 bytes). 

75 """ 

76 self.ensure_public_key() 

77 client = self.connect_agent() 

78 expect_assuan_OK(client) 

79 

80 # SETKEY keygrip 

81 skm = b"SETKEY " + self._keygrip + b"\n" 

82 client.send(skm) 

83 expect_assuan_OK(client) 

84 

85 # PKDECRYPT 

86 client.send(b"PKDECRYPT\n") 

87 pdm = client.recv(4096) 

88 # not used, might contain S configuration messages or INQUIRE for CIPHERTEXT 

89 

90 # D send static encoded data 

91 evm = ( 

92 b"D (7:enc-val(4:ecdh(1:e33:@" 

93 + encode_assuan_buffer(public_point) 

94 + b")))\n" 

95 ) 

96 client.send(evm) 

97 

98 # END 

99 client.send(b"END\n") 

100 

101 # retrieve result - drop all messages without data 

102 msg = b"" 

103 result = None 

104 while True: 

105 if msg == b"": 

106 msg = client.recv(4096) 

107 line0, rest = line_from_dgram(msg) 

108 line = decode_assuan_buffer(line0) 

109 msg = rest 

110 if line[:4] == b"ERR ": 

111 client.close() 

112 raise Crypt4GHKeyException( 

113 "Assuan error: " + line.decode("ascii") 

114 ) 

115 if line[:2] == b"D ": 

116 data = line[2:] 

117 struct = parse_binary_sexp(line[2:]) 

118 result = struct[1][1:] 

119 break 

120 

121 # Done 

122 client.close() 

123 return result 

124 

125 def ensure_public_key(self): 

126 """Loads the public key and stores its keygrip from the 

127 OpenPGP Card. This method is a no-op if the key was loaded 

128 before. 

129 

130 """ 

131 if self._public_key is None: 

132 client = self.connect_agent() 

133 

134 # Must be "OK Message ..." 

135 expect_assuan_OK(client) 

136 

137 # Now send request for all keys 

138 client.send(b"HAVEKEY --list=1000\n") 

139 # Must be only one 

140 havekey_dgram = client.recv(4096) 

141 havekey_data, havekey_rest1 = line_from_dgram(havekey_dgram) 

142 havekey_msg, havekey_rest2 = line_from_dgram(havekey_rest1) 

143 keygrips_data = decode_assuan_buffer(havekey_data[2:]) 

144 num_keygrips = len(keygrips_data) // 20 

145 if num_keygrips * 20 != len(keygrips_data): 

146 client.close() 

147 raise Crypt4GHKeyException( 

148 f"invalid keygrips data length: {len(keygrips_data)}" 

149 ) 

150 keygrips = [ 

151 keygrip_to_hex(keygrips_data[idx * 20 : idx * 20 + 20]) 

152 for idx in range(num_keygrips) 

153 ] 

154 

155 # Get detailed information for all keygrips, find Curve25519 one 

156 for keygrip in keygrips: 

157 # Send READKEY 

158 client.send(b"READKEY " + keygrip + b"\n") 

159 

160 # Read D S-Exp 

161 key_dgram = client.recv(4096) 

162 key_line0, key_rest = line_from_dgram(key_dgram) 

163 key_line = decode_assuan_buffer(key_line0) 

164 key_struct = parse_binary_sexp(key_line[2:]) 

165 if ( 

166 (key_struct is None) 

167 or (len(key_struct) < 2) 

168 or (key_struct[0] != b"public-key") 

169 or (len(key_struct[1])) < 1 

170 or (key_struct[1][0] != b"ecc") 

171 ): 

172 continue 

173 curve_struct = next( 

174 v for v in key_struct[1][1:] if v[0] == b"curve" 

175 ) 

176 q_struct = next(v for v in key_struct[1][1:] if v[0] == b"q") 

177 if ( 

178 (curve_struct is None) 

179 or (len(curve_struct) < 2) 

180 or (curve_struct[1] != b"Curve25519") 

181 or (q_struct is None) 

182 or (len(q_struct) < 2) 

183 ): 

184 continue 

185 if (self._req_keygrip is not None) and ( 

186 self._req_keygrip != keygrip 

187 ): 

188 continue 

189 self._public_key = q_struct[1][1:] 

190 self._keygrip = keygrip 

191 break 

192 # Done 

193 client.close() 

194 

195 # Error handling 

196 if self._public_key is None: 

197 raise Crypt4GHKeyException("Cannot determine public key") 

198 

199 @property 

200 def public_key(self) -> bytes: 

201 """Returns the underlying public key.""" 

202 self.ensure_public_key() 

203 return self._public_key 

204 

205 def connect_agent(self) -> IO: 

206 """Establishes connection to gpg-agent.""" 

207 try: 

208 client = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 

209 client.connect(self._socket_path) 

210 return client 

211 except: 

212 raise Crypt4GHKeyException( 

213 "Cannot establish connection to gpg-agent." 

214 ) 

215 

216 

217def line_from_dgram(dgram: bytes) -> (bytes, bytes): 

218 """Reads single line from given raw data and returns two values: 

219 the line read and the remaining data. 

220 

221 Parameters: 

222 dgram: raw bytes with input 

223 """ 

224 lf_idx = dgram.find(b"\n") 

225 if (lf_idx == -1) or (lf_idx == (len(dgram) - 1)): 

226 return dgram, b"" 

227 return dgram[:lf_idx], dgram[lf_idx + 1 :] 

228 

229 

230def decode_assuan_buffer(buf: bytes) -> bytes: 

231 """Decodes assuan binary buffer with "%xx" replacements for 

232 certain characters. 

233 

234 Parameters: 

235 buf: the buffer received (and encoded by `_assuan_cookie_write_data` originally) 

236 

237 Returns: 

238 The buffer with resolved escaped bytes. 

239 """ 

240 result = b"" 

241 idx = 0 

242 while idx < len(buf): 

243 if buf[idx : idx + 1] == b"%": 

244 hh = buf[idx + 1 : idx + 3].decode("ascii") 

245 result = result + int(hh, 16).to_bytes(1) 

246 idx = idx + 3 

247 else: 

248 result = result + buf[idx : idx + 1] 

249 idx = idx + 1 

250 return result 

251 

252 

253def encode_assuan_buffer(buf: bytes) -> bytes: 

254 """Encodes assuan binary buffer by replacing occurences of \r, \n 

255 and % with %0D, %0A and %25 respectively. 

256 

257 Parameters: 

258 buf: the buffer to encode (for sending typically) 

259 

260 Returns: 

261 The encoded binary data that can be directly sent to assuan server. 

262 

263 """ 

264 result = b"" 

265 idx = 0 

266 while idx < len(buf): 

267 b = buf[idx : idx + 1] 

268 if b == b"\n": 

269 result = result + b"%0A" 

270 elif b == b"\r": 

271 result = result + b"%0D" 

272 elif b == b"%": 

273 result = result + b"%25" 

274 else: 

275 result = result + b 

276 idx = idx + 1 

277 return result 

278 

279 

280def keygrip_to_hex(kg: bytes) -> bytes: 

281 """Converts to hexadecimal representation suitable for KEYINFO and 

282 READKEY commands. 

283 

284 Parameters: 

285 kg: keygrip in binary form (20 bytes) 

286 

287 Returns: 

288 Hexadecimal string as 40 bytes. 

289 """ 

290 result = b"" 

291 for b in kg: 

292 result = result + hex(0x100 + b)[3:].upper().encode("ascii") 

293 return result 

294 

295 

296def parse_binary_sexp(data: bytes) -> list: 

297 """Reads libassuan binary S-Expression data into a nested lists 

298 structure. 

299 

300 Parameters: 

301 data: binary encoding of S-Expressions 

302 

303 Returns: 

304 List of bytes and lists. 

305 

306 """ 

307 root = [] 

308 stack = [root] 

309 idx = 0 

310 while idx < len(data): 

311 if data[idx : idx + 1] == b"(": 

312 lst = [] 

313 stack[len(stack) - 1].append(lst) 

314 stack.append(lst) 

315 idx = idx + 1 

316 elif data[idx : idx + 1] == b")": 

317 stack = stack[: len(stack) - 1] 

318 idx = idx + 1 

319 else: 

320 sep_idx = data.find(b":", idx) 

321 if sep_idx < 0: 

322 break 

323 len_str = data[idx:sep_idx].decode("ascii") 

324 if len(len_str) == 0: 

325 break 

326 token_len = int(len_str) 

327 stack[len(stack) - 1].append( 

328 data[sep_idx + 1 : sep_idx + 1 + token_len] 

329 ) 

330 idx = sep_idx + token_len + 1 

331 if len(root) == 0: 

332 return None 

333 return root[0] 

334 

335 

336def expect_assuan_OK(client: IO) -> None: 

337 """If the next message received does not start with b"OK", signals 

338 an error. 

339 

340 Parameters: 

341 client: active assuan socket connection 

342 

343 """ 

344 ok_dgram = client.recv(4096) 

345 ok_msg, ok_rest = line_from_dgram(ok_dgram) 

346 if ok_msg[0:2] != b"OK": 

347 client.close() 

348 raise Crypt4GHKeyException("Expected Assuan OK message") 

349 if len(ok_rest) > 0: 

350 client.close() 

351 raise Crypt4GHKeyException("Line noise after Assuan OK") 

352 

353 

354gen_b32 = "ABCDEFGHIJKLMNOPQRSTUVWXYZ234567" 

355gpg_b32 = "ybndrfg8ejkmcpqxot1uwisza345h769" 

356 

357gpg_trans = str.maketrans(gen_b32, gpg_b32) 

358 

359 

360def compute_socket_dir_hash(path: str) -> str: 

361 """Computes partial message digest of given path to be used as 

362 shortened path component in the base socket directory path. The 

363 implemenation is compatible with gnupg's homedir.c and zb32.c as 

364 well as with libgcrypt's SHA1 message digest. 

365 

366 Parameters: 

367 path: canonical (as understood by gnupg) path to the original directory 

368 

369 """ 

370 bpath = path.encode() 

371 md = sha1(path.encode()).digest() 

372 md15 = md[:15] 

373 b32 = b32encode(md15) 

374 s32 = b32.decode("ascii") 

375 z32 = s32.translate(gpg_trans) 

376 return z32 

377 

378 

379def compute_run_gnupg_base( 

380 bases: List[str] = ["/run/gnupg", "/run", "/var/run/gnupg", "/var/run"] 

381) -> str: 

382 """Computes possible gnupg's run directories and verifies their 

383 existence. 

384 

385 Returns: 

386 The actual gnupg's run directory of current user. 

387 

388 """ 

389 uid = os.getuid() 

390 ubases = [f"{base}/user/{uid}" for base in bases] 

391 for ubase in ubases: 

392 if os.path.isdir(ubase): 

393 return f"{ubase}/gnupg" 

394 raise ArgumentError("Cannot find GnuPG run base directory") 

395 

396 

397def compute_socket_dir(homedir: str = None) -> str: 

398 """Computes the actual socket dir used by gpg-agent based on given 

399 homedir (the private key storage directory). 

400 

401 If given directory is None, returns the root run directory for 

402 gnupg (as required by gpg-agent). 

403 

404 Parameters: 

405 homedir: canonical path to the directory 

406 

407 Returns: 

408 Socket base directory. 

409 

410 """ 

411 base = compute_run_gnupg_base() 

412 if homedir is not None: 

413 dhash = compute_socket_dir_hash(homedir) 

414 return f"{base}/d.{dhash}" 

415 return base