Coverage for oarepo_c4gh/key/http.py: 100%

36 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-03 14:58 +0000

1"""The Crypt4GH key network protocol client implementation. The class 

2provided by this module can be used as any other key implementation 

3allowing computing the reader and writer keys using server's private 

4key. 

5 

6""" 

7 

8from urllib.parse import urlparse 

9from urllib.request import urlopen 

10from .external import ExternalKey 

11from .key import key_x25519_generator_point 

12from ..exceptions import Crypt4GHKeyException 

13from binascii import hexlify 

14 

15 

16class HTTPKey(ExternalKey): 

17 """This class implements the client for the Crypt4GH key network 

18 protocol. 

19 

20 """ 

21 

22 def __init__(self, url: str) -> None: 

23 """Initializes the key instance and performs rudimentary 

24 validation of arguments given. 

25 

26 Parameters: 

27 url: URL for requesting scalar multiplication by the private key. 

28 

29 """ 

30 pu = urlparse(url) 

31 assert pu.scheme != "https", f"HTTPS is not supported yet" 

32 assert ( 

33 pu.scheme == "http" 

34 ), f"invalid scheme '{pu.scheme}', only HTTP is supported" 

35 self._url = url 

36 self._public_key = None 

37 

38 def compute_ecdh(self, public_point: bytes) -> bytes: 

39 """Computes the result of finishing the ECDH key exchange. 

40 

41 Parameters: 

42 public_point: the other party public point (compressed coordinates, 32 bytes) 

43 

44 Returns: 

45 The resulting shared secret point (compressed coordinates, 32 bytes). 

46 """ 

47 if len(public_point) != 32: 

48 raise Crypt4GHKeyException( 

49 f"Invalid public point coordinate size {len(public_point)} != 32" 

50 ) 

51 requrl = self._url 

52 if not requrl.endswith("/"): 

53 requrl += "/" 

54 encoded_pp = hexlify(public_point).decode("ascii") 

55 requrl += encoded_pp 

56 try: 

57 resp = urlopen(requrl) 

58 except Exception as ex: 

59 raise Crypt4GHKeyException(f"urllib exception {ex}") 

60 if resp.status == 200: 

61 result = resp.read() 

62 if len(result) != 32: 

63 raise Crypt4GHKeyException( 

64 f"Invalid result point size {len(result)} != 32" 

65 ) 

66 return result 

67 else: 

68 raise Crypt4GHKeyException(f"Invalid response {resp.status}") 

69 

70 @property 

71 def public_key(self) -> bytes: 

72 """Returns the underlying public key. 

73 

74 As the network protocol does not provide any functionality 

75 which is not strictly necessary, the public key is simply 

76 computed as the curve generator multiplied by the private 

77 key. This approach ensures that any implementation backing the 

78 server will allow the user to retrieve the public key 

79 independently on any vendor API that might be needed for such 

80 retrieval. 

81 

82 Returns: 

83 32 bytes of compressed public key point (the X coordinate). 

84 

85 """ 

86 if self._public_key == None: 

87 self._public_key = self.compute_ecdh(key_x25519_generator_point) 

88 return self._public_key