Coverage for oarepo_c4gh/key/http_path_key_server.py: 100%
54 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-03 14:58 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-03 14:58 +0000
1"""This module contains an implementation of the server part of the
2Crypt4GH key network protocol. The request handler uses uwsgi
3`start_response` interface.
5"""
7from __future__ import annotations
9import binascii
10from typing import Iterable
11from wsgiref.types import StartResponse, WSGIEnvironment
13from .external import ExternalKey
14from .external_software import ExternalSoftwareKey
15from .key import Key
16from .software import SoftwareKey
19def split_and_clean(path: str) -> list[str]:
20 """Splits path by slashes and cleans up to one heading and
21 trailing empty element.
23 Parameters:
24 path: string representing a path-like entity
26 Returns:
27 An array with path components.
29 """
30 # remove leading and trailing slashes
31 path = path.strip("/")
32 return [x for x in path.split("/") if x]
35def make_not_found(start_response: StartResponse) -> list[bytes]:
36 """A common wrapper that starts a Not Found response and returns
37 empty array. This way it can be used in simple return statements
38 signalling an error. See the usage in
39 HTTPPathKeyServer.handle_request.
41 Parameters:
42 start_response: a uwsgi application-compatible procedure
44 Returns:
45 An empty list.
47 """
48 start_response("404 Not Found", [])
49 return []
52class HTTPPathKeyServer:
53 """An instance of this class behaves like a collection of keys
54 where each key is given a unique name. This name is then part of
55 the URL representing the particular key. An approach like this
56 allows supporting arbitrary number of keys by single server.
58 """
60 def __init__(
61 self, mapping: dict[str, Key], prefix: str = "", suffix: str = "x25519"
62 ) -> None:
63 """Initializes the instance and ensures all keys in the
64 mapping can perform ECDH exchange.
66 Parameters:
67 mapping: dictionary of name to key pairs.
68 prefix: path elements preceeding the key name in URL.
69 suffix: path elements succeeding the key name in URL.
71 """
72 self._prefix = split_and_clean(prefix)
73 self._suffix = split_and_clean(suffix)
74 remapping: dict[str, ExternalKey] = {}
75 for name, key in mapping.items():
76 if isinstance(key, ExternalKey):
77 remapping[name] = key
78 elif isinstance(key, SoftwareKey):
79 remapping[name] = ExternalSoftwareKey(key)
80 else:
81 raise TypeError(
82 f"Expected ExternalKey or SoftwareKey instance for key {name}, found {type(key)}"
83 )
84 self._mapping = remapping
86 # request should look like <prefix>/<key_id>/<suffix>/<public_point>
87 self._required_request_length = (
88 len(self._prefix) + 1 + len(self._suffix) + 1
89 )
91 def handle_path_request(
92 self, request_path: str, start_response: StartResponse
93 ) -> list[bytes]:
94 """All requests for key operations are uniquely identified by
95 the request path. The key name and public point to be
96 multiplied by private key are both encoded in the path and
97 therefore the actual handling depends only upon the path.
99 Parameters:
100 request_path: the path element of request URL
101 start_response: uwsgi-compatible argument
103 Returns:
104 List of single byte string of length 32 or an empty list
105 in case of error.
107 """
108 # request path structure: <prefix>/<key_id>/<suffix>/<public_point>
109 request_list = split_and_clean(request_path)
111 if len(request_list) < self._required_request_length:
112 # too short to contain prefix, key id, suffix and public point
113 return make_not_found(start_response)
115 key_pos = len(self._prefix)
116 public_point_pos = -1
117 suffix_pos = key_pos + 1
119 key_id_str = request_list[key_pos]
120 public_point_hex = request_list[public_point_pos]
122 # check for prefix
123 if request_list[:key_pos] != self._prefix:
124 return make_not_found(start_response)
126 # check for suffix
127 if request_list[suffix_pos:public_point_pos] != self._suffix:
128 return make_not_found(start_response)
130 if key_id_str not in self._mapping:
131 # key does not exist
132 return make_not_found(start_response)
134 if len(public_point_hex) != 64:
135 # incorrect public point length
136 return make_not_found(start_response)
137 try:
138 public_point_bytes = binascii.unhexlify(public_point_hex)
139 except binascii.Error:
140 return make_not_found(start_response)
142 key = self._mapping[key_id_str]
143 result = key.compute_ecdh(public_point_bytes)
144 start_response(
145 "200 OK", [("Content-Type", "application/octet-stream")]
146 )
147 return [result]
149 def handle_uwsgi_request(
150 self, env: WSGIEnvironment, start_response: StartResponse
151 ) -> Iterable[bytes]:
152 """A small wrapper that allows passing the uwsgi arguents
153 directly to this key server implementation.
155 Parameters:
156 env: HTTP environment sent by uwsgi
157 start_response: uwsgi's start_response argument
159 Returns:
160 List of one byte string of length 32 or an empty list in
161 case of error.
162 """
163 return self.handle_path_request(env["PATH_INFO"], start_response)