Skip to content

Key Support

This part of the library implements support for public and private keys used for reading and writing Crypt4GH containers.

Protocol

oarepo_c4gh.key.key

An abstract Base Class for Asymmetric Secret Keys

This module contains only the interface specification for all key classes implementations.

Key

Bases: Protocol

This is an abstract class, containing only abstract methods used to compute the Diffie-Hellman key exchange over the Montgomery curve Curve25519 as specified by the X25519 standard and auxilliary informative helpers.

Source code in oarepo_c4gh/key/key.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
class Key(Protocol):
    """This is an abstract class, containing only abstract methods
    used to compute the Diffie-Hellman key exchange over the
    Montgomery curve Curve25519 as specified by the X25519 standard
    and auxilliary informative helpers.

    """

    @property
    @abstractmethod
    def public_key(self) -> bytes:
        """The derived classes must implement providing corresponding
        public key in this method.

        Returns:
            The 32 bytes of the public key.

        """
        ...

    @abstractmethod
    def compute_write_key(self, reader_public_key: bytes) -> bytes:
        """Accepts the intended reader public key and computes the
        shared secret based on the public and secret key (this key) of
        the writer particular key source implementation.

        Parameters:
            reader_public_key: the 32 bytes of the reader public key

        Returns:
            The shared secret as 32 bytes - usable as symmetric key.

        """
        ...

    @abstractmethod
    def compute_read_key(self, writer_public_key: bytes) -> bytes:
        """Accepts the writer public key and computes the shared
        secret based on the public and secret key (this key) of the
        reader particular key source implementation.

        Parameters:
            writer_public_key: the 32 bytes of the writer public key

        Returns:
            The shared secret as 32 bytes - usable as symmetric key.

        """
        ...

    @property
    @abstractmethod
    def can_compute_symmetric_keys(self) -> bool:
        """A predicate returning true if this key instance can perform
        read/write key derivation. This is usually determined by
        having access to the private key (for software implementation)
        or some other means of working with the private key (for HSM).

        Returns:
            true if it can perform symmetric key derivation

        """
        return False

    def __bytes__(self) -> bytes:
        """Default converter to bytes returns the public key bytes."""
        return self.public_key

can_compute_symmetric_keys: bool property

A predicate returning true if this key instance can perform read/write key derivation. This is usually determined by having access to the private key (for software implementation) or some other means of working with the private key (for HSM).

Returns:

Type Description
bool

true if it can perform symmetric key derivation

public_key: bytes property

The derived classes must implement providing corresponding public key in this method.

Returns:

Type Description
bytes

The 32 bytes of the public key.

__bytes__() -> bytes

Default converter to bytes returns the public key bytes.

Source code in oarepo_c4gh/key/key.py
75
76
77
def __bytes__(self) -> bytes:
    """Default converter to bytes returns the public key bytes."""
    return self.public_key

compute_read_key(writer_public_key: bytes) -> bytes

Accepts the writer public key and computes the shared secret based on the public and secret key (this key) of the reader particular key source implementation.

Parameters:

Name Type Description Default
writer_public_key bytes

the 32 bytes of the writer public key

required

Returns:

Type Description
bytes

The shared secret as 32 bytes - usable as symmetric key.

Source code in oarepo_c4gh/key/key.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
@abstractmethod
def compute_read_key(self, writer_public_key: bytes) -> bytes:
    """Accepts the writer public key and computes the shared
    secret based on the public and secret key (this key) of the
    reader particular key source implementation.

    Parameters:
        writer_public_key: the 32 bytes of the writer public key

    Returns:
        The shared secret as 32 bytes - usable as symmetric key.

    """
    ...

compute_write_key(reader_public_key: bytes) -> bytes

Accepts the intended reader public key and computes the shared secret based on the public and secret key (this key) of the writer particular key source implementation.

Parameters:

Name Type Description Default
reader_public_key bytes

the 32 bytes of the reader public key

required

Returns:

Type Description
bytes

The shared secret as 32 bytes - usable as symmetric key.

Source code in oarepo_c4gh/key/key.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
@abstractmethod
def compute_write_key(self, reader_public_key: bytes) -> bytes:
    """Accepts the intended reader public key and computes the
    shared secret based on the public and secret key (this key) of
    the writer particular key source implementation.

    Parameters:
        reader_public_key: the 32 bytes of the reader public key

    Returns:
        The shared secret as 32 bytes - usable as symmetric key.

    """
    ...

Implementations

oarepo_c4gh.key.software

A base class for all software-defined keys.

This module implements the Diffie-Hellman key exchange using software keys and NaCl bindings. The class contained here also provides an interface for setting the private key instance property by derived classes that should implement particular key loaders.

SoftwareKey

Bases: Key

This class implements the actual Diffie-Hellman key exchange with locally stored private key in the class instance.

Source code in oarepo_c4gh/key/software.py
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
class SoftwareKey(Key):
    """This class implements the actual Diffie-Hellman key exchange
    with locally stored private key in the class instance.

    """

    def __init__(self, key_data: bytes, only_public: bool = False) -> None:
        """Performs rudimentary key data validation and initializes
        either only the public key or both the public and private key.

        Parameters:
            key_data: the 32 bytes of key material
            only_public: whether this contains only the public point

        Raises:
            AssertionError: is the key_data does not contain exactly 32 bytes

        """
        assert len(key_data) == 32, (
            f"The X25519 key must be 32 bytes long" f" ({len(key_data)})!"
        )
        if only_public:
            self._public_key = key_data
            self._private_key = None
        else:
            private_key_obj = PrivateKey(key_data)
            self._private_key = bytes(private_key_obj)
            public_key_obj = private_key_obj.public_key
            self._public_key = bytes(public_key_obj)

    @property
    def public_key(self) -> bytes:
        """Returns the public key corresponding to the private key
        used.

        """
        return self._public_key

    def compute_write_key(self, reader_public_key: bytes) -> bytes:
        """Computes secret symmetric key used for writing Crypt4GH
        encrypted header packets. The instance of this class
        represents the writer key.

        Parameters:
            reader_public_key: the 32 bytes of the reader public key

        Returns:
            Writer symmetric key as 32 bytes.

        Raises:
            Crypt4GHKeyException: if only public key is available

        The algorithm used is not just a Diffie-Hellman key exchange
        to establish shared secret but it also includes derivation of
        two symmetric keys used in bi-directional connection. This
        pair of keys is derived from the shared secret concatenated
        with client public key and server public key by hashing such
        binary string with BLAKE2B-512 hash.

        For server - and therefore the writer - participant it is the
        "transmit" key of the imaginary connection.

        ```
        rx || tx = BLAKE2B-512(p.n || client_pk || server_pk)
        ```

        The order of shared secret and client and server public keys
        in the binary string being matches must be the same on both
        sides. Therefore the same symmetric keys are derived. However
        for maintaining this ordering, each party must know which one
        it is - otherwise even with correctly computed shared secret
        the resulting pair of keys would be different.

        """
        if self._private_key is None:
            raise Crypt4GHKeyException(
                "Only keys with private part can be used"
                " for computing shared key"
            )
        _, shared_key = crypto_kx_server_session_keys(
            self._public_key, self._private_key, reader_public_key
        )
        return shared_key

    def compute_read_key(self, writer_public_key: bytes) -> bytes:
        """Computes secret symmetric key used for reading Crypt4GH
        encrypted header packets. The instance of this class
        represents the reader key.

        See detailed description of ``compute_write_key``.

        For this function the "receive" key is used - which is the
        same as the "transmit" key of the writer.

        Parameters:
            writer_public_key: the 32 bytes of the writer public key

        Returns:
            Reader symmetric key as 32 bytes.

        Raises:
            Crypt4GHKeyException: if only public key is available

        """
        if self._private_key is None:
            raise Crypt4GHKeyException(
                "Only keys with private part can be used"
                " for computing shared key"
            )
        shared_key, _ = crypto_kx_client_session_keys(
            self._public_key, self._private_key, writer_public_key
        )
        return shared_key

    @property
    def can_compute_symmetric_keys(self) -> bool:
        """Returns True if this key contains the private part.

        Returns:
            True if private key is available.

        """
        return self._private_key is not None

    @classmethod
    def generate(self) -> None:
        token = secrets.token_bytes(32)
        return SoftwareKey(token)

can_compute_symmetric_keys: bool property

Returns True if this key contains the private part.

Returns:

Type Description
bool

True if private key is available.

public_key: bytes property

Returns the public key corresponding to the private key used.

__init__(key_data: bytes, only_public: bool = False) -> None

Performs rudimentary key data validation and initializes either only the public key or both the public and private key.

Parameters:

Name Type Description Default
key_data bytes

the 32 bytes of key material

required
only_public bool

whether this contains only the public point

False

Raises:

Type Description
AssertionError

is the key_data does not contain exactly 32 bytes

Source code in oarepo_c4gh/key/software.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
def __init__(self, key_data: bytes, only_public: bool = False) -> None:
    """Performs rudimentary key data validation and initializes
    either only the public key or both the public and private key.

    Parameters:
        key_data: the 32 bytes of key material
        only_public: whether this contains only the public point

    Raises:
        AssertionError: is the key_data does not contain exactly 32 bytes

    """
    assert len(key_data) == 32, (
        f"The X25519 key must be 32 bytes long" f" ({len(key_data)})!"
    )
    if only_public:
        self._public_key = key_data
        self._private_key = None
    else:
        private_key_obj = PrivateKey(key_data)
        self._private_key = bytes(private_key_obj)
        public_key_obj = private_key_obj.public_key
        self._public_key = bytes(public_key_obj)

compute_read_key(writer_public_key: bytes) -> bytes

Computes secret symmetric key used for reading Crypt4GH encrypted header packets. The instance of this class represents the reader key.

See detailed description of compute_write_key.

For this function the "receive" key is used - which is the same as the "transmit" key of the writer.

Parameters:

Name Type Description Default
writer_public_key bytes

the 32 bytes of the writer public key

required

Returns:

Type Description
bytes

Reader symmetric key as 32 bytes.

Raises:

Type Description
Crypt4GHKeyException

if only public key is available

Source code in oarepo_c4gh/key/software.py
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
def compute_read_key(self, writer_public_key: bytes) -> bytes:
    """Computes secret symmetric key used for reading Crypt4GH
    encrypted header packets. The instance of this class
    represents the reader key.

    See detailed description of ``compute_write_key``.

    For this function the "receive" key is used - which is the
    same as the "transmit" key of the writer.

    Parameters:
        writer_public_key: the 32 bytes of the writer public key

    Returns:
        Reader symmetric key as 32 bytes.

    Raises:
        Crypt4GHKeyException: if only public key is available

    """
    if self._private_key is None:
        raise Crypt4GHKeyException(
            "Only keys with private part can be used"
            " for computing shared key"
        )
    shared_key, _ = crypto_kx_client_session_keys(
        self._public_key, self._private_key, writer_public_key
    )
    return shared_key

compute_write_key(reader_public_key: bytes) -> bytes

Computes secret symmetric key used for writing Crypt4GH encrypted header packets. The instance of this class represents the writer key.

Parameters:

Name Type Description Default
reader_public_key bytes

the 32 bytes of the reader public key

required

Returns:

Type Description
bytes

Writer symmetric key as 32 bytes.

Raises:

Type Description
Crypt4GHKeyException

if only public key is available

The algorithm used is not just a Diffie-Hellman key exchange to establish shared secret but it also includes derivation of two symmetric keys used in bi-directional connection. This pair of keys is derived from the shared secret concatenated with client public key and server public key by hashing such binary string with BLAKE2B-512 hash.

For server - and therefore the writer - participant it is the "transmit" key of the imaginary connection.

rx || tx = BLAKE2B-512(p.n || client_pk || server_pk)

The order of shared secret and client and server public keys in the binary string being matches must be the same on both sides. Therefore the same symmetric keys are derived. However for maintaining this ordering, each party must know which one it is - otherwise even with correctly computed shared secret the resulting pair of keys would be different.

Source code in oarepo_c4gh/key/software.py
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
def compute_write_key(self, reader_public_key: bytes) -> bytes:
    """Computes secret symmetric key used for writing Crypt4GH
    encrypted header packets. The instance of this class
    represents the writer key.

    Parameters:
        reader_public_key: the 32 bytes of the reader public key

    Returns:
        Writer symmetric key as 32 bytes.

    Raises:
        Crypt4GHKeyException: if only public key is available

    The algorithm used is not just a Diffie-Hellman key exchange
    to establish shared secret but it also includes derivation of
    two symmetric keys used in bi-directional connection. This
    pair of keys is derived from the shared secret concatenated
    with client public key and server public key by hashing such
    binary string with BLAKE2B-512 hash.

    For server - and therefore the writer - participant it is the
    "transmit" key of the imaginary connection.

    ```
    rx || tx = BLAKE2B-512(p.n || client_pk || server_pk)
    ```

    The order of shared secret and client and server public keys
    in the binary string being matches must be the same on both
    sides. Therefore the same symmetric keys are derived. However
    for maintaining this ordering, each party must know which one
    it is - otherwise even with correctly computed shared secret
    the resulting pair of keys would be different.

    """
    if self._private_key is None:
        raise Crypt4GHKeyException(
            "Only keys with private part can be used"
            " for computing shared key"
        )
    _, shared_key = crypto_kx_server_session_keys(
        self._public_key, self._private_key, reader_public_key
    )
    return shared_key

oarepo_c4gh.key.c4gh

Class for loading the Crypt4GH reference key format.

C4GHKey

Bases: SoftwareKey

This class implements the loader for Crypt4GH key file format.

Source code in oarepo_c4gh/key/c4gh.py
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
class C4GHKey(SoftwareKey):
    """This class implements the loader for Crypt4GH key file format."""

    @classmethod
    def from_file(
        self, file_name: str, callback: callable = default_passphrase_callback
    ) -> Self:
        """Opens file stream and loads the Crypt4GH key from it.

        Parameters:
            file_name: path to the file with the key.
            callback: must return passphrase for decryption if called.

        Returns:
            Initialized C4GHKey instance.

        """
        return C4GHKey.from_stream(open(file_name, "rb"), callback)

    @classmethod
    def from_string(
        self, contents: str, callback: callable = default_passphrase_callback
    ) -> Self:
        """Converts string to bytes which is opened as binary stream
        and loads the Crypt4GH key from it.

        Parameters:
            contents: complete contents of the file with Crypt4GH key.
            callback: must return passphrase for decryption if called.

        Returns:
            Initialized C4GHKey instance.

        """
        return C4GHKey.from_bytes(bytes(contents, "ASCII"), callback)

    @classmethod
    def from_bytes(
        self, contents: bytes, callback: callable = default_passphrase_callback
    ) -> Self:
        """Opens the contents bytes as binary stream and loads the
        Crypt4GH key from it.

        Parameters:
            contents: complete contents of the file with Crypt4GH key.
            callback: must return passphrase for decryption if called.

        Returns:
            Initialized C4GHKey instance.

        """
        return C4GHKey.from_stream(BytesIO(contents), callback)

    @classmethod
    def from_stream(
        self,
        istream: RawIOBase,
        callback: callable = default_passphrase_callback,
    ) -> Self:
        """Parses the stream with stored key.

        Parameters:
            istream: input stream with the key file contents.
            callback: must return passphrase for decryption if called

        Returns:
            The newly constructed key instance.
        """
        slabel, sdata = decode_b64_envelope(istream)
        istream.close()
        if slabel == b"CRYPT4GH PUBLIC KEY":
            return C4GHKey(sdata, True)
        else:
            istreamb = BytesIO(sdata)
            check_c4gh_stream_magic(istreamb)
            kdf_name, kdf_rounds, kdf_salt = parse_c4gh_kdf_options(istreamb)
            cipher_name = decode_c4gh_bytes(istreamb)
            if cipher_name == b"none":
                secret_data = decode_c4gh_bytes(istreamb)
                return C4GHKey(secret_data, False)
            if cipher_name != b"chacha20_poly1305":
                raise Crypt4GHKeyException(
                    f"Unsupported cipher: {cipher_name}"
                )
            assert callable(
                callback
            ), "Invalid passphrase callback (non-callable)"
            passphrase = callback().encode()
            symmetric_key = derive_c4gh_key(
                kdf_name, passphrase, kdf_salt, kdf_rounds
            )
            nonce_and_encrypted_data = decode_c4gh_bytes(istreamb)
            nonce = nonce_and_encrypted_data[:12]
            encrypted_data = nonce_and_encrypted_data[12:]
            decrypted_data = ChaCha20Poly1305(symmetric_key).decrypt(
                nonce, encrypted_data, None
            )
            return C4GHKey(decrypted_data, False)

from_bytes(contents: bytes, callback: callable = default_passphrase_callback) -> Self classmethod

Opens the contents bytes as binary stream and loads the Crypt4GH key from it.

Parameters:

Name Type Description Default
contents bytes

complete contents of the file with Crypt4GH key.

required
callback callable

must return passphrase for decryption if called.

default_passphrase_callback

Returns:

Type Description
Self

Initialized C4GHKey instance.

Source code in oarepo_c4gh/key/c4gh.py
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
@classmethod
def from_bytes(
    self, contents: bytes, callback: callable = default_passphrase_callback
) -> Self:
    """Opens the contents bytes as binary stream and loads the
    Crypt4GH key from it.

    Parameters:
        contents: complete contents of the file with Crypt4GH key.
        callback: must return passphrase for decryption if called.

    Returns:
        Initialized C4GHKey instance.

    """
    return C4GHKey.from_stream(BytesIO(contents), callback)

from_file(file_name: str, callback: callable = default_passphrase_callback) -> Self classmethod

Opens file stream and loads the Crypt4GH key from it.

Parameters:

Name Type Description Default
file_name str

path to the file with the key.

required
callback callable

must return passphrase for decryption if called.

default_passphrase_callback

Returns:

Type Description
Self

Initialized C4GHKey instance.

Source code in oarepo_c4gh/key/c4gh.py
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
@classmethod
def from_file(
    self, file_name: str, callback: callable = default_passphrase_callback
) -> Self:
    """Opens file stream and loads the Crypt4GH key from it.

    Parameters:
        file_name: path to the file with the key.
        callback: must return passphrase for decryption if called.

    Returns:
        Initialized C4GHKey instance.

    """
    return C4GHKey.from_stream(open(file_name, "rb"), callback)

from_stream(istream: RawIOBase, callback: callable = default_passphrase_callback) -> Self classmethod

Parses the stream with stored key.

Parameters:

Name Type Description Default
istream RawIOBase

input stream with the key file contents.

required
callback callable

must return passphrase for decryption if called

default_passphrase_callback

Returns:

Type Description
Self

The newly constructed key instance.

Source code in oarepo_c4gh/key/c4gh.py
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
@classmethod
def from_stream(
    self,
    istream: RawIOBase,
    callback: callable = default_passphrase_callback,
) -> Self:
    """Parses the stream with stored key.

    Parameters:
        istream: input stream with the key file contents.
        callback: must return passphrase for decryption if called

    Returns:
        The newly constructed key instance.
    """
    slabel, sdata = decode_b64_envelope(istream)
    istream.close()
    if slabel == b"CRYPT4GH PUBLIC KEY":
        return C4GHKey(sdata, True)
    else:
        istreamb = BytesIO(sdata)
        check_c4gh_stream_magic(istreamb)
        kdf_name, kdf_rounds, kdf_salt = parse_c4gh_kdf_options(istreamb)
        cipher_name = decode_c4gh_bytes(istreamb)
        if cipher_name == b"none":
            secret_data = decode_c4gh_bytes(istreamb)
            return C4GHKey(secret_data, False)
        if cipher_name != b"chacha20_poly1305":
            raise Crypt4GHKeyException(
                f"Unsupported cipher: {cipher_name}"
            )
        assert callable(
            callback
        ), "Invalid passphrase callback (non-callable)"
        passphrase = callback().encode()
        symmetric_key = derive_c4gh_key(
            kdf_name, passphrase, kdf_salt, kdf_rounds
        )
        nonce_and_encrypted_data = decode_c4gh_bytes(istreamb)
        nonce = nonce_and_encrypted_data[:12]
        encrypted_data = nonce_and_encrypted_data[12:]
        decrypted_data = ChaCha20Poly1305(symmetric_key).decrypt(
            nonce, encrypted_data, None
        )
        return C4GHKey(decrypted_data, False)

from_string(contents: str, callback: callable = default_passphrase_callback) -> Self classmethod

Converts string to bytes which is opened as binary stream and loads the Crypt4GH key from it.

Parameters:

Name Type Description Default
contents str

complete contents of the file with Crypt4GH key.

required
callback callable

must return passphrase for decryption if called.

default_passphrase_callback

Returns:

Type Description
Self

Initialized C4GHKey instance.

Source code in oarepo_c4gh/key/c4gh.py
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
@classmethod
def from_string(
    self, contents: str, callback: callable = default_passphrase_callback
) -> Self:
    """Converts string to bytes which is opened as binary stream
    and loads the Crypt4GH key from it.

    Parameters:
        contents: complete contents of the file with Crypt4GH key.
        callback: must return passphrase for decryption if called.

    Returns:
        Initialized C4GHKey instance.

    """
    return C4GHKey.from_bytes(bytes(contents, "ASCII"), callback)

check_c4gh_kdf(kdf_name: bytes) -> bool

Returns true if given KDF is supported.

Parameters:

Name Type Description Default
kdf_name bytes

KDF name string as bytes

required

Returns:

Type Description
bool

True if the KDF is supported.

Source code in oarepo_c4gh/key/c4gh.py
19
20
21
22
23
24
25
26
27
28
def check_c4gh_kdf(kdf_name: bytes) -> bool:
    """Returns true if given KDF is supported.

    Parameters:
        kdf_name: KDF name string as bytes

    Returns:
        True if the KDF is supported.
    """
    return kdf_name in C4GH_KDFS

check_c4gh_stream_magic(istreamb: RawIOBase) -> None

Reads enough bytes from given input stream and checks whether they contain the correct Crypt4GH signature. Raises error if it doesn't.

Parameters:

Name Type Description Default
istreamb RawIOBase

input stream with the raw Crypt4GH binary key stream.

required

Raises:

Type Description
Crypt4GHKeyException

if the signature does not match.

Source code in oarepo_c4gh/key/c4gh.py
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
def check_c4gh_stream_magic(istreamb: RawIOBase) -> None:
    """Reads enough bytes from given input stream and checks whether
    they contain the correct Crypt4GH signature. Raises error if it
    doesn't.

    Parameters:
        istreamb: input stream with the raw Crypt4GH binary key stream.

    Raises:
        Crypt4GHKeyException: if the signature does not match.

    """
    magic_to_check = istreamb.read(len(C4GH_MAGIC_WORD))
    if magic_to_check != C4GH_MAGIC_WORD:
        raise Crypt4GHKeyException("Not a Crypt4GH private key!")

decode_b64_envelope(istream: RawIOBase) -> (bytes, bytes)

Reads PEM-like format and returns its label and decoded bytes.

Parameters:

Name Type Description Default
istream RawIOBase

input stream with the data.

required

Returns:

Type Description
(bytes, bytes)

Label of the envelope and decoded content bytes.

Source code in oarepo_c4gh/key/c4gh.py
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
def decode_b64_envelope(istream: RawIOBase) -> (bytes, bytes):
    """Reads PEM-like format and returns its label and decoded bytes.

    Parameters:
        istream: input stream with the data.

    Returns:
        Label of the envelope and decoded content bytes.

    """
    lines = list(
        filter(
            lambda line: line,
            map(lambda raw_line: raw_line.strip(), istream.readlines()),
        )
    )
    assert (
        len(lines) >= 3
    ), "At least 3 lines are needed - 2 for envelope and 1 with data."
    assert lines[0].startswith(
        b"-----BEGIN "
    ), f"Must start with BEGIN line {lines[0]}."
    assert lines[-1].startswith(
        b"-----END "
    ), f"Must end with END line {lines[-1]}."
    data = b64decode(b"".join(lines[1:-1]))
    begin_label = lines[0][11:-1].strip(b"-")
    end_label = lines[-1][9:-1].strip(b"-")
    assert (
        begin_label == end_label
    ), f"BEGIN {begin_label} not END {end_label}!"
    return begin_label, data

decode_c4gh_bytes(istream: RawIOBase) -> bytes

Decodes binary string encoded as two-byte big-endian integer length and the actual data that follows this length field.

Parameters:

Name Type Description Default
istream RawIOBase

input stream from which to decode the bytes string.

required

Returns:

Type Description
bytes

The decoded bytes string.

Raises:

Type Description
Crypt4GHKeyException

if there is not enough data in the stream

Source code in oarepo_c4gh/key/c4gh.py
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
def decode_c4gh_bytes(istream: RawIOBase) -> bytes:
    """Decodes binary string encoded as two-byte big-endian integer
    length and the actual data that follows this length field.

    Parameters:
        istream: input stream from which to decode the bytes string.

    Returns:
        The decoded bytes string.

    Raises:
        Crypt4GHKeyException: if there is not enough data in the stream

    """
    lengthb = istream.read(2)
    lengthb_length = len(lengthb)
    if len(lengthb) != 2:
        raise Crypt4GHKeyException(
            f"Binary string read - not enought data to read the length: "
            f"{lengthb_length} != 2"
        )
    length = int.from_bytes(lengthb, byteorder="big")
    string = istream.read(length)
    read_length = len(string)
    if read_length != length:
        raise Crypt4GHKeyException(
            f"Binary string read - not enough data: {read_length} != {length}"
        )
    return string

default_passphrase_callback() -> None

By default the constructor has no means of obtaining the passphrase and therefore this function unconditionally raises an exception when called.

Source code in oarepo_c4gh/key/c4gh.py
31
32
33
34
35
36
37
def default_passphrase_callback() -> None:
    """By default the constructor has no means of obtaining the
    passphrase and therefore this function unconditionally raises an
    exception when called.

    """
    raise Crypt4GHKeyException("No password callback provided!")

derive_c4gh_key(algo: bytes, passphrase: bytes, salt: bytes, rounds: int) -> bytes

Derives the symmetric key for decrypting the private key.

Parameters:

Name Type Description Default
algo bytes

the algorithm for key derivation

required
passphrase bytes

the passphrase from which to derive the key

required
rounds int

number of hashing rounds

required

Returns:

Type Description
bytes

The derived symmetric key.

Raises:

Type Description
Crypt4GHKeyException

if given KDF algorithm is not supported (should not happen as this is expected to be called after parse_c4gh_kdf_options).

Source code in oarepo_c4gh/key/c4gh.py
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
def derive_c4gh_key(
    algo: bytes, passphrase: bytes, salt: bytes, rounds: int
) -> bytes:
    """Derives the symmetric key for decrypting the private key.

    Parameters:
        algo: the algorithm for key derivation
        passphrase: the passphrase from which to derive the key
        rounds: number of hashing rounds

    Returns:
        The derived symmetric key.

    Raises:
        Crypt4GHKeyException: if given KDF algorithm is not supported (should not happen
            as this is expected to be called after parse_c4gh_kdf_options).
    """
    if algo == b"scrypt":
        from hashlib import scrypt

        return scrypt(passphrase, salt=salt, n=1 << 14, r=8, p=1, dklen=32)
    if algo == b"bcrypt":
        import bcrypt

        return bcrypt.kdf(
            passphrase,
            salt=salt,
            desired_key_bytes=32,
            rounds=rounds,
            ignore_few_rounds=True,
        )
    if algo == b"pbkdf2_hmac_sha256":
        from hashlib import pbkdf2_hmac

        return pbkdf2_hmac("sha256", passphrase, salt, rounds, dklen=32)
    raise Crypt4GHKeyException(f"Unsupported KDF: {algo}")

parse_c4gh_kdf_options(istreamb: RawIOBase) -> (bytes, int, bytes)

Parses KDF name and options (if applicable) from given input stream.

Parameters:

Name Type Description Default
istreamb RawIOBase

input stream with the raw Crypt4GH binary stream.

required

Returns:

Name Type Description
kdf_name (bytes, int, bytes)

the name of the KDF as binary string

kdf_rounds (bytes, int, bytes)

number of hashing rounds for KDF

kdf_salt (bytes, int, bytes)

salt for initializing the hashing

Raises:

Type Description
Crypt4GHKeyException

if parsed KDF name is not supported

Source code in oarepo_c4gh/key/c4gh.py
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
def parse_c4gh_kdf_options(istreamb: RawIOBase) -> (bytes, int, bytes):
    """Parses KDF name and options (if applicable) from given input
    stream.

    Parameters:
        istreamb: input stream with the raw Crypt4GH binary stream.

    Returns:
        kdf_name: the name of the KDF as binary string
        kdf_rounds: number of hashing rounds for KDF
        kdf_salt: salt for initializing the hashing

    Raises:
        Crypt4GHKeyException: if parsed KDF name is not supported

    """
    kdf_name = decode_c4gh_bytes(istreamb)
    if kdf_name == b"none":
        return (kdf_name, None, None)
    elif check_c4gh_kdf(kdf_name):
        kdf_options = decode_c4gh_bytes(istreamb)
        kdf_rounds = int.from_bytes(kdf_options[:4], byteorder="big")
        kdf_salt = kdf_options[4:]
        return (kdf_name, kdf_rounds, kdf_salt)
    else:
        raise Crypt4GHKeyException(f"Unsupported KDF {kdf_name}")

oarepo_c4gh.key.key_collection

This module implements a key collection that is to be used when reading the container header packets instead to support multiple available reader keys.

KeyCollection

This class implements a simple storage for a collection of reader keys and gives a reusable iterator which is guaranteed to iterate over all the keys at most once. Each round of iterations starts with the last key was used in the previous round. This ensures that if a reader key successfully reads a packet, it will always be the first to try for the very next packet.

Source code in oarepo_c4gh/key/key_collection.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
class KeyCollection:
    """This class implements a simple storage for a collection of
    reader keys and gives a reusable iterator which is guaranteed to
    iterate over all the keys at most once. Each round of iterations
    starts with the last key was used in the previous round. This
    ensures that if a reader key successfully reads a packet, it will
    always be the first to try for the very next packet.

    """

    def __init__(self, *keys: List[Key]) -> None:
        """Initializes the collection with a list of keys.

        Parameters:
            keys: list of instances of classes implementing the Key Protocol

        Raises:
            Crypt4GHKeyException: if some key(s) do not have access to
                                  private part or no keys were given

        """
        if len(keys) == 0:
            raise Crypt4GHKeyException("Collection needs at least one key")
        for key in keys:
            if not key.can_compute_symmetric_keys:
                raise Crypt4GHKeyException(
                    "KeyCollection is only for keys with access to private key"
                )
        self._keys = keys
        self._current = 0

    @property
    def count(self) -> int:
        """Returns the number of keys in this collection."""
        return len(self._keys)

    @property
    def keys(self) -> Generator[Key, None, None]:
        """Multiple-use iterator that yields each key at most
        once. When re-used, the iteration always starts with the most
        recently yielded key.

        """
        first_current = self._current
        while True:
            yield self._keys[self._current]
            self._current = (self._current + 1) % self.count
            if self._current == first_current:
                break

count: int property

Returns the number of keys in this collection.

keys: Generator[Key, None, None] property

Multiple-use iterator that yields each key at most once. When re-used, the iteration always starts with the most recently yielded key.

__init__(*keys: List[Key]) -> None

Initializes the collection with a list of keys.

Parameters:

Name Type Description Default
keys List[Key]

list of instances of classes implementing the Key Protocol

()

Raises:

Type Description
Crypt4GHKeyException

if some key(s) do not have access to private part or no keys were given

Source code in oarepo_c4gh/key/key_collection.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
def __init__(self, *keys: List[Key]) -> None:
    """Initializes the collection with a list of keys.

    Parameters:
        keys: list of instances of classes implementing the Key Protocol

    Raises:
        Crypt4GHKeyException: if some key(s) do not have access to
                              private part or no keys were given

    """
    if len(keys) == 0:
        raise Crypt4GHKeyException("Collection needs at least one key")
    for key in keys:
        if not key.can_compute_symmetric_keys:
            raise Crypt4GHKeyException(
                "KeyCollection is only for keys with access to private key"
            )
    self._keys = keys
    self._current = 0

External Keys

oarepo_c4gh.key.external

This module provides partial implementation of external (hardware or network) private keys that allow for computing symmetric keys. It assumes a derived class will implement the actual ECDH finalization.

ExternalKey

Bases: Key

This class implements the Crypt4GH symmetric key derivation from ECDH result. The actual ECDH computation must be implemented by derived class.

Source code in oarepo_c4gh/key/external.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
class ExternalKey(Key):
    """This class implements the Crypt4GH symmetric key derivation
    from ECDH result. The actual ECDH computation must be implemented
    by derived class.

    """

    @abstractmethod
    def compute_ecdh(self, public_point: bytes) -> bytes:
        """Given a public point on the curve, this function must
        multiply it by the private key and return the resulting point
        in compressed format (32 bytes).

        Parameters:
            public_point: the public point generated by the other party in compressed format

        Returns:
            The resulting point in compressed format (32 bytes).

        """
        ...

    def compute_write_key(self, reader_public_key: bytes) -> bytes:
        """Computes the write key using this instance's private key
        and the provided reader public key. See
        [`Software.compute_write_key`][oarepo_c4gh.key.software.SoftwareKey.compute_write_key]
        for details.

        Parameters:
            reader_public_key: the reader public key (point) in compressed format

        Returns:
            The writer symmetric key as raw 32 bytes.

        """
        shared_secret = self.compute_ecdh(reader_public_key)
        hash_source = shared_secret + reader_public_key + self.public_key
        the_hash = blake2b(digest_size=64)
        the_hash.update(hash_source)
        digest = the_hash.digest()
        return digest[:32]

    def compute_read_key(self, writer_public_key: bytes) -> bytes:
        """Computes the reader key using this instance's private key
        and provided writer public key. See
        [`Software.compute_read_key`][oarepo_c4gh.key.software.SoftwareKey.compute_read_key]
        for details.

        Parameters:
            writer_public_key: the writer public key (point) in compressed format

        Returns:
            The reader symmetric key as raw 32 bytes.

        """
        shared_secret = self.compute_ecdh(writer_public_key)
        hash_source = shared_secret + self.public_key + writer_public_key
        the_hash = blake2b(digest_size=64)
        the_hash.update(hash_source)
        digest = the_hash.digest()
        return digest[:32]

    @property
    def can_compute_symmetric_keys(self) -> bool:
        """External keys always have private key and therefore can
        always compute the symmetric keys.

        Returns:
            Always True.

        """
        return True

can_compute_symmetric_keys: bool property

External keys always have private key and therefore can always compute the symmetric keys.

Returns:

Type Description
bool

Always True.

compute_ecdh(public_point: bytes) -> bytes

Given a public point on the curve, this function must multiply it by the private key and return the resulting point in compressed format (32 bytes).

Parameters:

Name Type Description Default
public_point bytes

the public point generated by the other party in compressed format

required

Returns:

Type Description
bytes

The resulting point in compressed format (32 bytes).

Source code in oarepo_c4gh/key/external.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@abstractmethod
def compute_ecdh(self, public_point: bytes) -> bytes:
    """Given a public point on the curve, this function must
    multiply it by the private key and return the resulting point
    in compressed format (32 bytes).

    Parameters:
        public_point: the public point generated by the other party in compressed format

    Returns:
        The resulting point in compressed format (32 bytes).

    """
    ...

compute_read_key(writer_public_key: bytes) -> bytes

Computes the reader key using this instance's private key and provided writer public key. See Software.compute_read_key for details.

Parameters:

Name Type Description Default
writer_public_key bytes

the writer public key (point) in compressed format

required

Returns:

Type Description
bytes

The reader symmetric key as raw 32 bytes.

Source code in oarepo_c4gh/key/external.py
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
def compute_read_key(self, writer_public_key: bytes) -> bytes:
    """Computes the reader key using this instance's private key
    and provided writer public key. See
    [`Software.compute_read_key`][oarepo_c4gh.key.software.SoftwareKey.compute_read_key]
    for details.

    Parameters:
        writer_public_key: the writer public key (point) in compressed format

    Returns:
        The reader symmetric key as raw 32 bytes.

    """
    shared_secret = self.compute_ecdh(writer_public_key)
    hash_source = shared_secret + self.public_key + writer_public_key
    the_hash = blake2b(digest_size=64)
    the_hash.update(hash_source)
    digest = the_hash.digest()
    return digest[:32]

compute_write_key(reader_public_key: bytes) -> bytes

Computes the write key using this instance's private key and the provided reader public key. See Software.compute_write_key for details.

Parameters:

Name Type Description Default
reader_public_key bytes

the reader public key (point) in compressed format

required

Returns:

Type Description
bytes

The writer symmetric key as raw 32 bytes.

Source code in oarepo_c4gh/key/external.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
def compute_write_key(self, reader_public_key: bytes) -> bytes:
    """Computes the write key using this instance's private key
    and the provided reader public key. See
    [`Software.compute_write_key`][oarepo_c4gh.key.software.SoftwareKey.compute_write_key]
    for details.

    Parameters:
        reader_public_key: the reader public key (point) in compressed format

    Returns:
        The writer symmetric key as raw 32 bytes.

    """
    shared_secret = self.compute_ecdh(reader_public_key)
    hash_source = shared_secret + reader_public_key + self.public_key
    the_hash = blake2b(digest_size=64)
    the_hash.update(hash_source)
    digest = the_hash.digest()
    return digest[:32]

oarepo_c4gh.key.external_software

This module implemens a virtual external key that is actually backed by SoftwareKey and uses its private key directly.

This module is intended ONLY for testing related functionality and should NEVER be used in production.

ExternalSoftwareKey

Bases: ExternalKey

This is a virtual external key backed by any SoftwareKey implementation.

Do NOT use this class in any production code.

Source code in oarepo_c4gh/key/external_software.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
class ExternalSoftwareKey(ExternalKey):
    """This is a virtual external key backed by any SoftwareKey
    implementation.

    Do NOT use this class in any production code.

    """

    def __init__(self, softkey: SoftwareKey) -> None:
        """Gets its backing private+public key pair from the provided
        SoftwareKey implementation.

        Do NOT use in production code.

        Parameters:
            softkey: the backing key which must include private key

        """
        if not softkey.can_compute_symmetric_keys:
            raise Crypt4GHKeyException(
                "ExternalSoftwareKey needs a private key"
            )
        self._private_key = softkey._private_key
        self._public_key = softkey._public_key

    def compute_ecdh(self, public_point: bytes) -> bytes:
        """Computes directly the final result of ECDH from given
        public point. This implementation is using crypto_scalarmult
        from nacl.bindings.

        Do NOT use in production code.

        """
        return crypto_scalarmult(self._private_key, public_point)

    @property
    def public_key(self) -> bytes:
        """Returns the underlying public key."""
        return self._public_key

public_key: bytes property

Returns the underlying public key.

__init__(softkey: SoftwareKey) -> None

Gets its backing private+public key pair from the provided SoftwareKey implementation.

Do NOT use in production code.

Parameters:

Name Type Description Default
softkey SoftwareKey

the backing key which must include private key

required
Source code in oarepo_c4gh/key/external_software.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
def __init__(self, softkey: SoftwareKey) -> None:
    """Gets its backing private+public key pair from the provided
    SoftwareKey implementation.

    Do NOT use in production code.

    Parameters:
        softkey: the backing key which must include private key

    """
    if not softkey.can_compute_symmetric_keys:
        raise Crypt4GHKeyException(
            "ExternalSoftwareKey needs a private key"
        )
    self._private_key = softkey._private_key
    self._public_key = softkey._public_key

compute_ecdh(public_point: bytes) -> bytes

Computes directly the final result of ECDH from given public point. This implementation is using crypto_scalarmult from nacl.bindings.

Do NOT use in production code.

Source code in oarepo_c4gh/key/external_software.py
40
41
42
43
44
45
46
47
48
def compute_ecdh(self, public_point: bytes) -> bytes:
    """Computes directly the final result of ECDH from given
    public point. This implementation is using crypto_scalarmult
    from nacl.bindings.

    Do NOT use in production code.

    """
    return crypto_scalarmult(self._private_key, public_point)

oarepo_c4gh.key.gpg_agent

This module provides "HSM" implementation of private key usable with Crypt4GH. It uses off-the-shelf YubiKey with its OpenPGP Card application through gpg-agent's protocol.

This is not a "real" HSM and it is provided only for testing purposes in a non-production environment without actual HSM.

There are many assumptions:

  • compatible YubiKey must be present in the system
  • gpg-agent must be configured and running
  • there must not be any other key configured in gpg
  • no other application should be accessing gpg-agent
  • works only with gpg 2.4.x

GPGAgentKey

Bases: ExternalKey

And instance of this class uses gpg-agent to finalize the ECDH computation. The actual key derivation is then performed by ExternalKey's methods.

Source code in oarepo_c4gh/key/gpg_agent.py
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
class GPGAgentKey(ExternalKey):
    """And instance of this class uses `gpg-agent` to finalize the
    ECDH computation. The actual key derivation is then performed by
    ExternalKey's methods.

    """

    def __init__(
        self,
        socket_path: str = None,
        home_dir: str = None,
        keygrip: string = None,
    ) -> None:
        """Initializes the instance by storing the path to
        `gpg-agent`'s socket. It verifies the socket's existence but
        performs no connection yet.

        Parameters:
            socket_path: path to `gpg-agent`'s socket - usually `/run/user/$UID/gnupg/S.gpg-agent`
            home_dir: path to gpg homedir, used for computing socked path
            keygrip: hexadecimal representation of the keygrip

        """
        self._socket_path = socket_path
        if self._socket_path is None:
            socket_dir = compute_socket_dir(home_dir)
            self._socket_path = f"{socket_dir}/S.gpg-agent"
        if not os.path.exists(self._socket_path):
            raise Crypt4GHKeyException(
                "Cannot initialize GPGAgentKey with non-existent gpg-agent path."
            )
        self._req_keygrip = keygrip
        self._public_key = None
        self._keygrip = None

    def compute_ecdh(self, public_point: bytes) -> bytes:
        """Computes the result of finishing the ECDH key exchange.

        Parameters:
            public_point: the other party public point (compressed coordinates, 32 bytes)

        Returns:
            The resulting shared secret point (compressed coordinates, 32 bytes).
        """
        self.ensure_public_key()
        client = self.connect_agent()
        expect_assuan_OK(client)

        # SETKEY keygrip
        skm = b"SETKEY " + self._keygrip + b"\n"
        client.send(skm)
        expect_assuan_OK(client)

        # PKDECRYPT
        client.send(b"PKDECRYPT\n")
        pdm = client.recv(4096)
        # not used, might contain S configuration messages or INQUIRE for CIPHERTEXT

        # D send static encoded data
        evm = (
            b"D (7:enc-val(4:ecdh(1:e33:@"
            + encode_assuan_buffer(public_point)
            + b")))\n"
        )
        client.send(evm)

        # END
        client.send(b"END\n")

        # retrieve result - drop all messages without data
        msg = b""
        result = None
        while True:
            if msg == b"":
                msg = client.recv(4096)
            line0, rest = line_from_dgram(msg)
            line = decode_assuan_buffer(line0)
            msg = rest
            if line[:4] == b"ERR ":
                client.close()
                raise Crypt4GHKeyException(
                    "Assuan error: " + line.decode("ascii")
                )
            if line[:2] == b"D ":
                data = line[2:]
                struct = parse_binary_sexp(line[2:])
                result = struct[1][1:]
                break

        # Done
        client.close()
        return result

    def ensure_public_key(self):
        """Loads the public key and stores its keygrip from the
        OpenPGP Card. This method is a no-op if the key was loaded
        before.

        """
        if self._public_key is None:
            client = self.connect_agent()

            # Must be "OK Message ..."
            expect_assuan_OK(client)

            # Now send request for all keys
            client.send(b"HAVEKEY --list=1000\n")
            # Must be only one
            havekey_dgram = client.recv(4096)
            havekey_data, havekey_rest1 = line_from_dgram(havekey_dgram)
            havekey_msg, havekey_rest2 = line_from_dgram(havekey_rest1)
            keygrips_data = decode_assuan_buffer(havekey_data[2:])
            num_keygrips = len(keygrips_data) // 20
            if num_keygrips * 20 != len(keygrips_data):
                client.close()
                raise Crypt4GHKeyException(
                    f"invalid keygrips data length: {len(keygrips_data)}"
                )
            keygrips = [
                keygrip_to_hex(keygrips_data[idx * 20 : idx * 20 + 20])
                for idx in range(num_keygrips)
            ]

            # Get detailed information for all keygrips, find Curve25519 one
            for keygrip in keygrips:
                # Send READKEY
                client.send(b"READKEY " + keygrip + b"\n")

                # Read D S-Exp
                key_dgram = client.recv(4096)
                key_line0, key_rest = line_from_dgram(key_dgram)
                key_line = decode_assuan_buffer(key_line0)
                key_struct = parse_binary_sexp(key_line[2:])
                if (
                    (key_struct is None)
                    or (len(key_struct) < 2)
                    or (key_struct[0] != b"public-key")
                    or (len(key_struct[1])) < 1
                    or (key_struct[1][0] != b"ecc")
                ):
                    continue
                curve_struct = next(
                    v for v in key_struct[1][1:] if v[0] == b"curve"
                )
                q_struct = next(v for v in key_struct[1][1:] if v[0] == b"q")
                if (
                    (curve_struct is None)
                    or (len(curve_struct) < 2)
                    or (curve_struct[1] != b"Curve25519")
                    or (q_struct is None)
                    or (len(q_struct) < 2)
                ):
                    continue
                if (self._req_keygrip is not None) and (
                    self._req_keygrip != keygrip
                ):
                    continue
                self._public_key = q_struct[1][1:]
                self._keygrip = keygrip
                break
            # Done
            client.close()

            # Error handling
            if self._public_key is None:
                raise Crypt4GHKeyException("Cannot determine public key")

    @property
    def public_key(self) -> bytes:
        """Returns the underlying public key."""
        self.ensure_public_key()
        return self._public_key

    def connect_agent(self) -> IO:
        """Establishes connection to gpg-agent."""
        try:
            client = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
            client.connect(self._socket_path)
            return client
        except:
            raise Crypt4GHKeyException(
                "Cannot establish connection to gpg-agent."
            )

public_key: bytes property

Returns the underlying public key.

__init__(socket_path: str = None, home_dir: str = None, keygrip: string = None) -> None

Initializes the instance by storing the path to gpg-agent's socket. It verifies the socket's existence but performs no connection yet.

Parameters:

Name Type Description Default
socket_path str

path to gpg-agent's socket - usually /run/user/$UID/gnupg/S.gpg-agent

None
home_dir str

path to gpg homedir, used for computing socked path

None
keygrip string

hexadecimal representation of the keygrip

None
Source code in oarepo_c4gh/key/gpg_agent.py
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
def __init__(
    self,
    socket_path: str = None,
    home_dir: str = None,
    keygrip: string = None,
) -> None:
    """Initializes the instance by storing the path to
    `gpg-agent`'s socket. It verifies the socket's existence but
    performs no connection yet.

    Parameters:
        socket_path: path to `gpg-agent`'s socket - usually `/run/user/$UID/gnupg/S.gpg-agent`
        home_dir: path to gpg homedir, used for computing socked path
        keygrip: hexadecimal representation of the keygrip

    """
    self._socket_path = socket_path
    if self._socket_path is None:
        socket_dir = compute_socket_dir(home_dir)
        self._socket_path = f"{socket_dir}/S.gpg-agent"
    if not os.path.exists(self._socket_path):
        raise Crypt4GHKeyException(
            "Cannot initialize GPGAgentKey with non-existent gpg-agent path."
        )
    self._req_keygrip = keygrip
    self._public_key = None
    self._keygrip = None

compute_ecdh(public_point: bytes) -> bytes

Computes the result of finishing the ECDH key exchange.

Parameters:

Name Type Description Default
public_point bytes

the other party public point (compressed coordinates, 32 bytes)

required

Returns:

Type Description
bytes

The resulting shared secret point (compressed coordinates, 32 bytes).

Source code in oarepo_c4gh/key/gpg_agent.py
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
def compute_ecdh(self, public_point: bytes) -> bytes:
    """Computes the result of finishing the ECDH key exchange.

    Parameters:
        public_point: the other party public point (compressed coordinates, 32 bytes)

    Returns:
        The resulting shared secret point (compressed coordinates, 32 bytes).
    """
    self.ensure_public_key()
    client = self.connect_agent()
    expect_assuan_OK(client)

    # SETKEY keygrip
    skm = b"SETKEY " + self._keygrip + b"\n"
    client.send(skm)
    expect_assuan_OK(client)

    # PKDECRYPT
    client.send(b"PKDECRYPT\n")
    pdm = client.recv(4096)
    # not used, might contain S configuration messages or INQUIRE for CIPHERTEXT

    # D send static encoded data
    evm = (
        b"D (7:enc-val(4:ecdh(1:e33:@"
        + encode_assuan_buffer(public_point)
        + b")))\n"
    )
    client.send(evm)

    # END
    client.send(b"END\n")

    # retrieve result - drop all messages without data
    msg = b""
    result = None
    while True:
        if msg == b"":
            msg = client.recv(4096)
        line0, rest = line_from_dgram(msg)
        line = decode_assuan_buffer(line0)
        msg = rest
        if line[:4] == b"ERR ":
            client.close()
            raise Crypt4GHKeyException(
                "Assuan error: " + line.decode("ascii")
            )
        if line[:2] == b"D ":
            data = line[2:]
            struct = parse_binary_sexp(line[2:])
            result = struct[1][1:]
            break

    # Done
    client.close()
    return result

connect_agent() -> IO

Establishes connection to gpg-agent.

Source code in oarepo_c4gh/key/gpg_agent.py
202
203
204
205
206
207
208
209
210
211
def connect_agent(self) -> IO:
    """Establishes connection to gpg-agent."""
    try:
        client = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
        client.connect(self._socket_path)
        return client
    except:
        raise Crypt4GHKeyException(
            "Cannot establish connection to gpg-agent."
        )

ensure_public_key()

Loads the public key and stores its keygrip from the OpenPGP Card. This method is a no-op if the key was loaded before.

Source code in oarepo_c4gh/key/gpg_agent.py
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
def ensure_public_key(self):
    """Loads the public key and stores its keygrip from the
    OpenPGP Card. This method is a no-op if the key was loaded
    before.

    """
    if self._public_key is None:
        client = self.connect_agent()

        # Must be "OK Message ..."
        expect_assuan_OK(client)

        # Now send request for all keys
        client.send(b"HAVEKEY --list=1000\n")
        # Must be only one
        havekey_dgram = client.recv(4096)
        havekey_data, havekey_rest1 = line_from_dgram(havekey_dgram)
        havekey_msg, havekey_rest2 = line_from_dgram(havekey_rest1)
        keygrips_data = decode_assuan_buffer(havekey_data[2:])
        num_keygrips = len(keygrips_data) // 20
        if num_keygrips * 20 != len(keygrips_data):
            client.close()
            raise Crypt4GHKeyException(
                f"invalid keygrips data length: {len(keygrips_data)}"
            )
        keygrips = [
            keygrip_to_hex(keygrips_data[idx * 20 : idx * 20 + 20])
            for idx in range(num_keygrips)
        ]

        # Get detailed information for all keygrips, find Curve25519 one
        for keygrip in keygrips:
            # Send READKEY
            client.send(b"READKEY " + keygrip + b"\n")

            # Read D S-Exp
            key_dgram = client.recv(4096)
            key_line0, key_rest = line_from_dgram(key_dgram)
            key_line = decode_assuan_buffer(key_line0)
            key_struct = parse_binary_sexp(key_line[2:])
            if (
                (key_struct is None)
                or (len(key_struct) < 2)
                or (key_struct[0] != b"public-key")
                or (len(key_struct[1])) < 1
                or (key_struct[1][0] != b"ecc")
            ):
                continue
            curve_struct = next(
                v for v in key_struct[1][1:] if v[0] == b"curve"
            )
            q_struct = next(v for v in key_struct[1][1:] if v[0] == b"q")
            if (
                (curve_struct is None)
                or (len(curve_struct) < 2)
                or (curve_struct[1] != b"Curve25519")
                or (q_struct is None)
                or (len(q_struct) < 2)
            ):
                continue
            if (self._req_keygrip is not None) and (
                self._req_keygrip != keygrip
            ):
                continue
            self._public_key = q_struct[1][1:]
            self._keygrip = keygrip
            break
        # Done
        client.close()

        # Error handling
        if self._public_key is None:
            raise Crypt4GHKeyException("Cannot determine public key")

compute_run_gnupg_base(bases: List[str] = ['/run/gnupg', '/run', '/var/run/gnupg', '/var/run']) -> str

Computes possible gnupg's run directories and verifies their existence.

Returns:

Type Description
str

The actual gnupg's run directory of current user.

Source code in oarepo_c4gh/key/gpg_agent.py
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
def compute_run_gnupg_base(
    bases: List[str] = ["/run/gnupg", "/run", "/var/run/gnupg", "/var/run"]
) -> str:
    """Computes possible gnupg's run directories and verifies their
    existence.

    Returns:
        The actual gnupg's run directory of current user.

    """
    uid = os.getuid()
    ubases = [f"{base}/user/{uid}" for base in bases]
    for ubase in ubases:
        if os.path.isdir(ubase):
            return f"{ubase}/gnupg"
    raise ArgumentError("Cannot find GnuPG run base directory")

compute_socket_dir(homedir: str = None) -> str

Computes the actual socket dir used by gpg-agent based on given homedir (the private key storage directory).

If given directory is None, returns the root run directory for gnupg (as required by gpg-agent).

Parameters:

Name Type Description Default
homedir str

canonical path to the directory

None

Returns:

Type Description
str

Socket base directory.

Source code in oarepo_c4gh/key/gpg_agent.py
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
def compute_socket_dir(homedir: str = None) -> str:
    """Computes the actual socket dir used by gpg-agent based on given
    homedir (the private key storage directory).

    If given directory is None, returns the root run directory for
    gnupg (as required by gpg-agent).

    Parameters:
        homedir: canonical path to the directory

    Returns:
        Socket base directory.

    """
    base = compute_run_gnupg_base()
    if homedir is not None:
        dhash = compute_socket_dir_hash(homedir)
        return f"{base}/d.{dhash}"
    return base

compute_socket_dir_hash(path: str) -> str

Computes partial message digest of given path to be used as shortened path component in the base socket directory path. The implemenation is compatible with gnupg's homedir.c and zb32.c as well as with libgcrypt's SHA1 message digest.

Parameters:

Name Type Description Default
path str

canonical (as understood by gnupg) path to the original directory

required
Source code in oarepo_c4gh/key/gpg_agent.py
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
def compute_socket_dir_hash(path: str) -> str:
    """Computes partial message digest of given path to be used as
    shortened path component in the base socket directory path. The
    implemenation is compatible with gnupg's homedir.c and zb32.c as
    well as with libgcrypt's SHA1 message digest.

    Parameters:
        path: canonical (as understood by gnupg) path to the original directory

    """
    bpath = path.encode()
    md = sha1(path.encode()).digest()
    md15 = md[:15]
    b32 = b32encode(md15)
    s32 = b32.decode("ascii")
    z32 = s32.translate(gpg_trans)
    return z32

decode_assuan_buffer(buf: bytes) -> bytes

Decodes assuan binary buffer with "%xx" replacements for certain characters.

Parameters:

Name Type Description Default
buf bytes

the buffer received (and encoded by _assuan_cookie_write_data originally)

required

Returns:

Type Description
bytes

The buffer with resolved escaped bytes.

Source code in oarepo_c4gh/key/gpg_agent.py
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
def decode_assuan_buffer(buf: bytes) -> bytes:
    """Decodes assuan binary buffer with "%xx" replacements for
    certain characters.

    Parameters:
        buf: the buffer received (and encoded by `_assuan_cookie_write_data` originally)

    Returns:
        The buffer with resolved escaped bytes.
    """
    result = b""
    idx = 0
    while idx < len(buf):
        if buf[idx : idx + 1] == b"%":
            hh = buf[idx + 1 : idx + 3].decode("ascii")
            result = result + int(hh, 16).to_bytes(1)
            idx = idx + 3
        else:
            result = result + buf[idx : idx + 1]
            idx = idx + 1
    return result

encode_assuan_buffer(buf: bytes) -> bytes

Encodes assuan binary buffer by replacing occurences of ,

and % with %0D, %0A and %25 respectively.

Parameters:

Name Type Description Default
buf bytes

the buffer to encode (for sending typically)

required

Returns:

Type Description
bytes

The encoded binary data that can be directly sent to assuan server.

Source code in oarepo_c4gh/key/gpg_agent.py
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
def encode_assuan_buffer(buf: bytes) -> bytes:
    """Encodes assuan binary buffer by replacing occurences of \r, \n
    and % with %0D, %0A and %25 respectively.

    Parameters:
        buf: the buffer to encode (for sending typically)

    Returns:
        The encoded binary data that can be directly sent to assuan server.

    """
    result = b""
    idx = 0
    while idx < len(buf):
        b = buf[idx : idx + 1]
        if b == b"\n":
            result = result + b"%0A"
        elif b == b"\r":
            result = result + b"%0D"
        elif b == b"%":
            result = result + b"%25"
        else:
            result = result + b
        idx = idx + 1
    return result

expect_assuan_OK(client: IO) -> None

If the next message received does not start with b"OK", signals an error.

Parameters:

Name Type Description Default
client IO

active assuan socket connection

required
Source code in oarepo_c4gh/key/gpg_agent.py
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
def expect_assuan_OK(client: IO) -> None:
    """If the next message received does not start with b"OK", signals
    an error.

    Parameters:
        client: active assuan socket connection

    """
    ok_dgram = client.recv(4096)
    ok_msg, ok_rest = line_from_dgram(ok_dgram)
    if ok_msg[0:2] != b"OK":
        client.close()
        raise Crypt4GHKeyException("Expected Assuan OK message")
    if len(ok_rest) > 0:
        client.close()
        raise Crypt4GHKeyException("Line noise after Assuan OK")

keygrip_to_hex(kg: bytes) -> bytes

Converts to hexadecimal representation suitable for KEYINFO and READKEY commands.

Parameters:

Name Type Description Default
kg bytes

keygrip in binary form (20 bytes)

required

Returns:

Type Description
bytes

Hexadecimal string as 40 bytes.

Source code in oarepo_c4gh/key/gpg_agent.py
277
278
279
280
281
282
283
284
285
286
287
288
289
290
def keygrip_to_hex(kg: bytes) -> bytes:
    """Converts to hexadecimal representation suitable for KEYINFO and
    READKEY commands.

    Parameters:
        kg: keygrip in binary form (20 bytes)

    Returns:
        Hexadecimal string as 40 bytes.
    """
    result = b""
    for b in kg:
        result = result + hex(0x100 + b)[3:].upper().encode("ascii")
    return result

line_from_dgram(dgram: bytes) -> (bytes, bytes)

Reads single line from given raw data and returns two values: the line read and the remaining data.

Parameters:

Name Type Description Default
dgram bytes

raw bytes with input

required
Source code in oarepo_c4gh/key/gpg_agent.py
214
215
216
217
218
219
220
221
222
223
224
def line_from_dgram(dgram: bytes) -> (bytes, bytes):
    """Reads single line from given raw data and returns two values:
    the line read and the remaining data.

    Parameters:
        dgram: raw bytes with input
    """
    lf_idx = dgram.find(b"\n")
    if (lf_idx == -1) or (lf_idx == (len(dgram) - 1)):
        return dgram, b""
    return dgram[:lf_idx], dgram[lf_idx + 1 :]

parse_binary_sexp(data: bytes) -> list

Reads libassuan binary S-Expression data into a nested lists structure.

Parameters:

Name Type Description Default
data bytes

binary encoding of S-Expressions

required

Returns:

Type Description
list

List of bytes and lists.

Source code in oarepo_c4gh/key/gpg_agent.py
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
def parse_binary_sexp(data: bytes) -> list:
    """Reads libassuan binary S-Expression data into a nested lists
    structure.

    Parameters:
        data: binary encoding of S-Expressions

    Returns:
        List of bytes and lists.

    """
    root = []
    stack = [root]
    idx = 0
    while idx < len(data):
        if data[idx : idx + 1] == b"(":
            lst = []
            stack[len(stack) - 1].append(lst)
            stack.append(lst)
            idx = idx + 1
        elif data[idx : idx + 1] == b")":
            stack = stack[: len(stack) - 1]
            idx = idx + 1
        else:
            sep_idx = data.find(b":", idx)
            if sep_idx < 0:
                break
            len_str = data[idx:sep_idx].decode("ascii")
            if len(len_str) == 0:
                break
            token_len = int(len_str)
            stack[len(stack) - 1].append(
                data[sep_idx + 1 : sep_idx + 1 + token_len]
            )
            idx = sep_idx + token_len + 1
    if len(root) == 0:
        return None
    return root[0]

Key Serialization

oarepo_c4gh.key.writer

This module provides (very simple) means of serializing any c4gh-compatible key into c4gh textual representation. For example a HSM-backed key can be exported as c4gh public key which can be in turn loaded into client software that will use it to encrypt the data for this key.

C4GHPublicKeyWriter

Very simple writer class that can be extended in the future. At the moment it serves as a thin layer between any Key implementation and textual serialization functions.

Source code in oarepo_c4gh/key/writer.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
class C4GHPublicKeyWriter():
    """Very simple writer class that can be extended in the future. At
    the moment it serves as a thin layer between any Key
    implementation and textual serialization functions.

    """

    def __init__(self, key: Key) -> None:
        """Initializes the writer with given Key instance.

        Parameters:
            key: the key to be serialized
        """
        self._key = key

    def __str__(self) -> str:
        """Returns the string version of serialized public key in
        Crypt4GH native format.

        """
        b64key = b64encode(self._key.public_key).decode("ascii")
        return (
            f"-----BEGIN CRYPT4GH PUBLIC KEY-----\n"
            f"{b64key}\n"
            f"-----END CRYPT4GH PUBLIC KEY-----\n"
        )

    def __bytes__(self) -> bytes:
        """The same as the string conversion - this time as bytes (the
        underlying encoding is 7-bit ASCII anyway).

        """
        return str(self).encode("ascii")

    def write(self, ostream: io.RawIOBase) -> None:
        """Writes the serialized key into given IO stream.

        Parameters:
            ostream: where to write the key to
        """
        ostream.write(bytes(self))

__bytes__() -> bytes

The same as the string conversion - this time as bytes (the underlying encoding is 7-bit ASCII anyway).

Source code in oarepo_c4gh/key/writer.py
41
42
43
44
45
46
def __bytes__(self) -> bytes:
    """The same as the string conversion - this time as bytes (the
    underlying encoding is 7-bit ASCII anyway).

    """
    return str(self).encode("ascii")

__init__(key: Key) -> None

Initializes the writer with given Key instance.

Parameters:

Name Type Description Default
key Key

the key to be serialized

required
Source code in oarepo_c4gh/key/writer.py
21
22
23
24
25
26
27
def __init__(self, key: Key) -> None:
    """Initializes the writer with given Key instance.

    Parameters:
        key: the key to be serialized
    """
    self._key = key

write(ostream: io.RawIOBase) -> None

Writes the serialized key into given IO stream.

Parameters:

Name Type Description Default
ostream RawIOBase

where to write the key to

required
Source code in oarepo_c4gh/key/writer.py
48
49
50
51
52
53
54
def write(self, ostream: io.RawIOBase) -> None:
    """Writes the serialized key into given IO stream.

    Parameters:
        ostream: where to write the key to
    """
    ostream.write(bytes(self))