Coverage for oarepo_c4gh/key/http_path_key_server.py: 100%

54 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-03 14:58 +0000

1"""This module contains an implementation of the server part of the 

2Crypt4GH key network protocol. The request handler uses uwsgi 

3`start_response` interface. 

4 

5""" 

6 

7from __future__ import annotations 

8 

9import binascii 

10from typing import Iterable 

11from wsgiref.types import StartResponse, WSGIEnvironment 

12 

13from .external import ExternalKey 

14from .external_software import ExternalSoftwareKey 

15from .key import Key 

16from .software import SoftwareKey 

17 

18 

19def split_and_clean(path: str) -> list[str]: 

20 """Splits path by slashes and cleans up to one heading and 

21 trailing empty element. 

22 

23 Parameters: 

24 path: string representing a path-like entity 

25 

26 Returns: 

27 An array with path components. 

28 

29 """ 

30 # remove leading and trailing slashes 

31 path = path.strip("/") 

32 return [x for x in path.split("/") if x] 

33 

34 

35def make_not_found(start_response: StartResponse) -> list[bytes]: 

36 """A common wrapper that starts a Not Found response and returns 

37 empty array. This way it can be used in simple return statements 

38 signalling an error. See the usage in 

39 HTTPPathKeyServer.handle_request. 

40 

41 Parameters: 

42 start_response: a uwsgi application-compatible procedure 

43 

44 Returns: 

45 An empty list. 

46 

47 """ 

48 start_response("404 Not Found", []) 

49 return [] 

50 

51 

52class HTTPPathKeyServer: 

53 """An instance of this class behaves like a collection of keys 

54 where each key is given a unique name. This name is then part of 

55 the URL representing the particular key. An approach like this 

56 allows supporting arbitrary number of keys by single server. 

57 

58 """ 

59 

60 def __init__( 

61 self, mapping: dict[str, Key], prefix: str = "", suffix: str = "x25519" 

62 ) -> None: 

63 """Initializes the instance and ensures all keys in the 

64 mapping can perform ECDH exchange. 

65 

66 Parameters: 

67 mapping: dictionary of name to key pairs. 

68 prefix: path elements preceeding the key name in URL. 

69 suffix: path elements succeeding the key name in URL. 

70 

71 """ 

72 self._prefix = split_and_clean(prefix) 

73 self._suffix = split_and_clean(suffix) 

74 remapping: dict[str, ExternalKey] = {} 

75 for name, key in mapping.items(): 

76 if isinstance(key, ExternalKey): 

77 remapping[name] = key 

78 elif isinstance(key, SoftwareKey): 

79 remapping[name] = ExternalSoftwareKey(key) 

80 else: 

81 raise TypeError( 

82 f"Expected ExternalKey or SoftwareKey instance for key {name}, found {type(key)}" 

83 ) 

84 self._mapping = remapping 

85 

86 # request should look like <prefix>/<key_id>/<suffix>/<public_point> 

87 self._required_request_length = ( 

88 len(self._prefix) + 1 + len(self._suffix) + 1 

89 ) 

90 

91 def handle_path_request( 

92 self, request_path: str, start_response: StartResponse 

93 ) -> list[bytes]: 

94 """All requests for key operations are uniquely identified by 

95 the request path. The key name and public point to be 

96 multiplied by private key are both encoded in the path and 

97 therefore the actual handling depends only upon the path. 

98 

99 Parameters: 

100 request_path: the path element of request URL 

101 start_response: uwsgi-compatible argument 

102 

103 Returns: 

104 List of single byte string of length 32 or an empty list 

105 in case of error. 

106 

107 """ 

108 # request path structure: <prefix>/<key_id>/<suffix>/<public_point> 

109 request_list = split_and_clean(request_path) 

110 

111 if len(request_list) < self._required_request_length: 

112 # too short to contain prefix, key id, suffix and public point 

113 return make_not_found(start_response) 

114 

115 key_pos = len(self._prefix) 

116 public_point_pos = -1 

117 suffix_pos = key_pos + 1 

118 

119 key_id_str = request_list[key_pos] 

120 public_point_hex = request_list[public_point_pos] 

121 

122 # check for prefix 

123 if request_list[:key_pos] != self._prefix: 

124 return make_not_found(start_response) 

125 

126 # check for suffix 

127 if request_list[suffix_pos:public_point_pos] != self._suffix: 

128 return make_not_found(start_response) 

129 

130 if key_id_str not in self._mapping: 

131 # key does not exist 

132 return make_not_found(start_response) 

133 

134 if len(public_point_hex) != 64: 

135 # incorrect public point length 

136 return make_not_found(start_response) 

137 try: 

138 public_point_bytes = binascii.unhexlify(public_point_hex) 

139 except binascii.Error: 

140 return make_not_found(start_response) 

141 

142 key = self._mapping[key_id_str] 

143 result = key.compute_ecdh(public_point_bytes) 

144 start_response( 

145 "200 OK", [("Content-Type", "application/octet-stream")] 

146 ) 

147 return [result] 

148 

149 def handle_uwsgi_request( 

150 self, env: WSGIEnvironment, start_response: StartResponse 

151 ) -> Iterable[bytes]: 

152 """A small wrapper that allows passing the uwsgi arguents 

153 directly to this key server implementation. 

154 

155 Parameters: 

156 env: HTTP environment sent by uwsgi 

157 start_response: uwsgi's start_response argument 

158 

159 Returns: 

160 List of one byte string of length 32 or an empty list in 

161 case of error. 

162 """ 

163 return self.handle_path_request(env["PATH_INFO"], start_response)