Skip to content

Key Support

This part of the library implements support for public and private keys used for reading and writing Crypt4GH containers.

Protocol

oarepo_c4gh.key.key

An abstract Base Class for Asymmetric Secret Keys

This module contains only the interface specification for all key classes implementations.

Key

Bases: Protocol

This is an abstract class, containing only abstract methods used to compute the Diffie-Hellman key exchange over the Montgomery curve Curve25519 as specified by the X25519 standard and auxilliary informative helpers.

Source code in oarepo_c4gh/key/key.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
class Key(Protocol):
    """This is an abstract class, containing only abstract methods
    used to compute the Diffie-Hellman key exchange over the
    Montgomery curve Curve25519 as specified by the X25519 standard
    and auxilliary informative helpers.

    """

    @property
    @abstractmethod
    def public_key(self) -> bytes:
        """The derived classes must implement providing corresponding
        public key in this method.

        Returns:
            The 32 bytes of the public key.

        """
        ...

    @abstractmethod
    def compute_write_key(self, reader_public_key: bytes) -> bytes:
        """Accepts the intended reader public key and computes the
        shared secret based on the public and secret key (this key) of
        the writer particular key source implementation.

        Parameters:
            reader_public_key: the 32 bytes of the reader public key

        Returns:
            The shared secret as 32 bytes - usable as symmetric key.

        """
        ...

    @abstractmethod
    def compute_read_key(self, writer_public_key: bytes) -> bytes:
        """Accepts the writer public key and computes the shared
        secret based on the public and secret key (this key) of the
        reader particular key source implementation.

        Parameters:
            writer_public_key: the 32 bytes of the writer public key

        Returns:
            The shared secret as 32 bytes - usable as symmetric key.

        """
        ...

    @property
    @abstractmethod
    def can_compute_symmetric_keys(self) -> bool:
        """A predicate returning true if this key instance can perform
        read/write key derivation. This is usually determined by
        having access to the private key (for software implementation)
        or some other means of working with the private key (for HSM).

        Returns:
            true if it can perform symmetric key derivation

        """
        return False

    def __bytes__(self) -> bytes:
        """Default converter to bytes returns the public key bytes."""
        return self.public_key

can_compute_symmetric_keys: bool property

A predicate returning true if this key instance can perform read/write key derivation. This is usually determined by having access to the private key (for software implementation) or some other means of working with the private key (for HSM).

Returns:

Type Description
bool

true if it can perform symmetric key derivation

public_key: bytes property

The derived classes must implement providing corresponding public key in this method.

Returns:

Type Description
bytes

The 32 bytes of the public key.

__bytes__() -> bytes

Default converter to bytes returns the public key bytes.

Source code in oarepo_c4gh/key/key.py
75
76
77
def __bytes__(self) -> bytes:
    """Default converter to bytes returns the public key bytes."""
    return self.public_key

compute_read_key(writer_public_key: bytes) -> bytes

Accepts the writer public key and computes the shared secret based on the public and secret key (this key) of the reader particular key source implementation.

Parameters:

Name Type Description Default
writer_public_key bytes

the 32 bytes of the writer public key

required

Returns:

Type Description
bytes

The shared secret as 32 bytes - usable as symmetric key.

Source code in oarepo_c4gh/key/key.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
@abstractmethod
def compute_read_key(self, writer_public_key: bytes) -> bytes:
    """Accepts the writer public key and computes the shared
    secret based on the public and secret key (this key) of the
    reader particular key source implementation.

    Parameters:
        writer_public_key: the 32 bytes of the writer public key

    Returns:
        The shared secret as 32 bytes - usable as symmetric key.

    """
    ...

compute_write_key(reader_public_key: bytes) -> bytes

Accepts the intended reader public key and computes the shared secret based on the public and secret key (this key) of the writer particular key source implementation.

Parameters:

Name Type Description Default
reader_public_key bytes

the 32 bytes of the reader public key

required

Returns:

Type Description
bytes

The shared secret as 32 bytes - usable as symmetric key.

Source code in oarepo_c4gh/key/key.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
@abstractmethod
def compute_write_key(self, reader_public_key: bytes) -> bytes:
    """Accepts the intended reader public key and computes the
    shared secret based on the public and secret key (this key) of
    the writer particular key source implementation.

    Parameters:
        reader_public_key: the 32 bytes of the reader public key

    Returns:
        The shared secret as 32 bytes - usable as symmetric key.

    """
    ...

Implementations

oarepo_c4gh.key.software

A base class for all software-defined keys.

This module implements the Diffie-Hellman key exchange using software keys and NaCl bindings. The class contained here also provides an interface for setting the private key instance property by derived classes that should implement particular key loaders.

SoftwareKey

Bases: Key

This class implements the actual Diffie-Hellman key exchange with locally stored private key in the class instance.

Source code in oarepo_c4gh/key/software.py
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
class SoftwareKey(Key):
    """This class implements the actual Diffie-Hellman key exchange
    with locally stored private key in the class instance.

    """

    def __init__(self, key_data: bytes, only_public: bool = False) -> None:
        """Performs rudimentary key data validation and initializes
        either only the public key or both the public and private key.

        Parameters:
            key_data: the 32 bytes of key material
            only_public: whether this contains only the public point

        Raises:
            AssertionError: is the key_data does not contain exactly 32 bytes

        """
        assert len(key_data) == 32, (
            f"The X25519 key must be 32 bytes long" f" ({len(key_data)})!"
        )
        if only_public:
            self._public_key = key_data
            self._private_key = None
        else:
            private_key_obj = PrivateKey(key_data)
            self._private_key = bytes(private_key_obj)
            public_key_obj = private_key_obj.public_key
            self._public_key = bytes(public_key_obj)

    @property
    def public_key(self) -> bytes:
        """Returns the public key corresponding to the private key
        used.

        """
        return self._public_key

    def compute_write_key(self, reader_public_key: bytes) -> bytes:
        """Computes secret symmetric key used for writing Crypt4GH
        encrypted header packets. The instance of this class
        represents the writer key.

        Parameters:
            reader_public_key: the 32 bytes of the reader public key

        Returns:
            Writer symmetric key as 32 bytes.

        Raises:
            Crypt4GHKeyException: if only public key is available

        The algorithm used is not just a Diffie-Hellman key exchange
        to establish shared secret but it also includes derivation of
        two symmetric keys used in bi-directional connection. This
        pair of keys is derived from the shared secret concatenated
        with client public key and server public key by hashing such
        binary string with BLAKE2B-512 hash.

        For server - and therefore the writer - participant it is the
        "transmit" key of the imaginary connection.

        ```
        rx || tx = BLAKE2B-512(p.n || client_pk || server_pk)
        ```

        The order of shared secret and client and server public keys
        in the binary string being matches must be the same on both
        sides. Therefore the same symmetric keys are derived. However
        for maintaining this ordering, each party must know which one
        it is - otherwise even with correctly computed shared secret
        the resulting pair of keys would be different.

        """
        if self._private_key is None:
            raise Crypt4GHKeyException(
                "Only keys with private part can be used"
                " for computing shared key"
            )
        _, shared_key = crypto_kx_server_session_keys(
            self._public_key, self._private_key, reader_public_key
        )
        return shared_key

    def compute_read_key(self, writer_public_key: bytes) -> bytes:
        """Computes secret symmetric key used for reading Crypt4GH
        encrypted header packets. The instance of this class
        represents the reader key.

        See detailed description of ``compute_write_key``.

        For this function the "receive" key is used - which is the
        same as the "transmit" key of the writer.

        Parameters:
            writer_public_key: the 32 bytes of the writer public key

        Returns:
            Reader symmetric key as 32 bytes.

        Raises:
            Crypt4GHKeyException: if only public key is available

        """
        if self._private_key is None:
            raise Crypt4GHKeyException(
                "Only keys with private part can be used"
                " for computing shared key"
            )
        shared_key, _ = crypto_kx_client_session_keys(
            self._public_key, self._private_key, writer_public_key
        )
        return shared_key

    @property
    def can_compute_symmetric_keys(self) -> bool:
        """Returns True if this key contains the private part.

        Returns:
            True if private key is available.

        """
        return self._private_key is not None

    @classmethod
    def generate(self) -> None:
        return SoftwareKey(secrets.token_bytes(32))

can_compute_symmetric_keys: bool property

Returns True if this key contains the private part.

Returns:

Type Description
bool

True if private key is available.

public_key: bytes property

Returns the public key corresponding to the private key used.

__init__(key_data: bytes, only_public: bool = False) -> None

Performs rudimentary key data validation and initializes either only the public key or both the public and private key.

Parameters:

Name Type Description Default
key_data bytes

the 32 bytes of key material

required
only_public bool

whether this contains only the public point

False

Raises:

Type Description
AssertionError

is the key_data does not contain exactly 32 bytes

Source code in oarepo_c4gh/key/software.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
def __init__(self, key_data: bytes, only_public: bool = False) -> None:
    """Performs rudimentary key data validation and initializes
    either only the public key or both the public and private key.

    Parameters:
        key_data: the 32 bytes of key material
        only_public: whether this contains only the public point

    Raises:
        AssertionError: is the key_data does not contain exactly 32 bytes

    """
    assert len(key_data) == 32, (
        f"The X25519 key must be 32 bytes long" f" ({len(key_data)})!"
    )
    if only_public:
        self._public_key = key_data
        self._private_key = None
    else:
        private_key_obj = PrivateKey(key_data)
        self._private_key = bytes(private_key_obj)
        public_key_obj = private_key_obj.public_key
        self._public_key = bytes(public_key_obj)

compute_read_key(writer_public_key: bytes) -> bytes

Computes secret symmetric key used for reading Crypt4GH encrypted header packets. The instance of this class represents the reader key.

See detailed description of compute_write_key.

For this function the "receive" key is used - which is the same as the "transmit" key of the writer.

Parameters:

Name Type Description Default
writer_public_key bytes

the 32 bytes of the writer public key

required

Returns:

Type Description
bytes

Reader symmetric key as 32 bytes.

Raises:

Type Description
Crypt4GHKeyException

if only public key is available

Source code in oarepo_c4gh/key/software.py
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
def compute_read_key(self, writer_public_key: bytes) -> bytes:
    """Computes secret symmetric key used for reading Crypt4GH
    encrypted header packets. The instance of this class
    represents the reader key.

    See detailed description of ``compute_write_key``.

    For this function the "receive" key is used - which is the
    same as the "transmit" key of the writer.

    Parameters:
        writer_public_key: the 32 bytes of the writer public key

    Returns:
        Reader symmetric key as 32 bytes.

    Raises:
        Crypt4GHKeyException: if only public key is available

    """
    if self._private_key is None:
        raise Crypt4GHKeyException(
            "Only keys with private part can be used"
            " for computing shared key"
        )
    shared_key, _ = crypto_kx_client_session_keys(
        self._public_key, self._private_key, writer_public_key
    )
    return shared_key

compute_write_key(reader_public_key: bytes) -> bytes

Computes secret symmetric key used for writing Crypt4GH encrypted header packets. The instance of this class represents the writer key.

Parameters:

Name Type Description Default
reader_public_key bytes

the 32 bytes of the reader public key

required

Returns:

Type Description
bytes

Writer symmetric key as 32 bytes.

Raises:

Type Description
Crypt4GHKeyException

if only public key is available

The algorithm used is not just a Diffie-Hellman key exchange to establish shared secret but it also includes derivation of two symmetric keys used in bi-directional connection. This pair of keys is derived from the shared secret concatenated with client public key and server public key by hashing such binary string with BLAKE2B-512 hash.

For server - and therefore the writer - participant it is the "transmit" key of the imaginary connection.

rx || tx = BLAKE2B-512(p.n || client_pk || server_pk)

The order of shared secret and client and server public keys in the binary string being matches must be the same on both sides. Therefore the same symmetric keys are derived. However for maintaining this ordering, each party must know which one it is - otherwise even with correctly computed shared secret the resulting pair of keys would be different.

Source code in oarepo_c4gh/key/software.py
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
def compute_write_key(self, reader_public_key: bytes) -> bytes:
    """Computes secret symmetric key used for writing Crypt4GH
    encrypted header packets. The instance of this class
    represents the writer key.

    Parameters:
        reader_public_key: the 32 bytes of the reader public key

    Returns:
        Writer symmetric key as 32 bytes.

    Raises:
        Crypt4GHKeyException: if only public key is available

    The algorithm used is not just a Diffie-Hellman key exchange
    to establish shared secret but it also includes derivation of
    two symmetric keys used in bi-directional connection. This
    pair of keys is derived from the shared secret concatenated
    with client public key and server public key by hashing such
    binary string with BLAKE2B-512 hash.

    For server - and therefore the writer - participant it is the
    "transmit" key of the imaginary connection.

    ```
    rx || tx = BLAKE2B-512(p.n || client_pk || server_pk)
    ```

    The order of shared secret and client and server public keys
    in the binary string being matches must be the same on both
    sides. Therefore the same symmetric keys are derived. However
    for maintaining this ordering, each party must know which one
    it is - otherwise even with correctly computed shared secret
    the resulting pair of keys would be different.

    """
    if self._private_key is None:
        raise Crypt4GHKeyException(
            "Only keys with private part can be used"
            " for computing shared key"
        )
    _, shared_key = crypto_kx_server_session_keys(
        self._public_key, self._private_key, reader_public_key
    )
    return shared_key

oarepo_c4gh.key.c4gh

Class for loading the Crypt4GH reference key format.

C4GHKey

Bases: SoftwareKey

This class implements the loader for Crypt4GH key file format.

Source code in oarepo_c4gh/key/c4gh.py
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
class C4GHKey(SoftwareKey):
    """This class implements the loader for Crypt4GH key file format."""

    @classmethod
    def from_file(
        self, file_name: str, callback: callable = default_passphrase_callback
    ) -> Self:
        """Opens file stream and loads the Crypt4GH key from it.

        Parameters:
            file_name: path to the file with the key.
            callback: must return passphrase for decryption if called.

        Returns:
            Initialized C4GHKey instance.

        """
        return C4GHKey.from_stream(open(file_name, "rb"), callback)

    @classmethod
    def from_string(
        self, contents: str, callback: callable = default_passphrase_callback
    ) -> Self:
        """Converts string to bytes which is opened as binary stream
        and loads the Crypt4GH key from it.

        Parameters:
            contents: complete contents of the file with Crypt4GH key.
            callback: must return passphrase for decryption if called.

        Returns:
            Initialized C4GHKey instance.

        """
        return C4GHKey.from_bytes(bytes(contents, "ASCII"), callback)

    @classmethod
    def from_bytes(
        self, contents: bytes, callback: callable = default_passphrase_callback
    ) -> Self:
        """Opens the contents bytes as binary stream and loads the
        Crypt4GH key from it.

        Parameters:
            contents: complete contents of the file with Crypt4GH key.
            callback: must return passphrase for decryption if called.

        Returns:
            Initialized C4GHKey instance.

        """
        return C4GHKey.from_stream(BytesIO(contents), callback)

    @classmethod
    def from_stream(
        self,
        istream: RawIOBase,
        callback: callable = default_passphrase_callback,
    ) -> Self:
        """Parses the stream with stored key.

        Parameters:
            istream: input stream with the key file contents.
            callback: must return passphrase for decryption if called

        Returns:
            The newly constructed key instance.
        """
        slabel, sdata = decode_b64_envelope(istream)
        istream.close()
        if slabel == b"CRYPT4GH PUBLIC KEY":
            return C4GHKey(sdata, True)
        else:
            istreamb = BytesIO(sdata)
            check_c4gh_stream_magic(istreamb)
            kdf_name, kdf_rounds, kdf_salt = parse_c4gh_kdf_options(istreamb)
            cipher_name = decode_c4gh_bytes(istreamb)
            if cipher_name == b"none":
                secret_data = decode_c4gh_bytes(istreamb)
                return C4GHKey(secret_data, False)
            if cipher_name != b"chacha20_poly1305":
                raise Crypt4GHKeyException(
                    f"Unsupported cipher: {cipher_name}"
                )
            assert callable(
                callback
            ), "Invalid passphrase callback (non-callable)"
            passphrase = callback().encode()
            symmetric_key = derive_c4gh_key(
                kdf_name, passphrase, kdf_salt, kdf_rounds
            )
            nonce_and_encrypted_data = decode_c4gh_bytes(istreamb)
            nonce = nonce_and_encrypted_data[:12]
            encrypted_data = nonce_and_encrypted_data[12:]
            decrypted_data = ChaCha20Poly1305(symmetric_key).decrypt(
                nonce, encrypted_data, None
            )
            return C4GHKey(decrypted_data, False)

from_bytes(contents: bytes, callback: callable = default_passphrase_callback) -> Self classmethod

Opens the contents bytes as binary stream and loads the Crypt4GH key from it.

Parameters:

Name Type Description Default
contents bytes

complete contents of the file with Crypt4GH key.

required
callback callable

must return passphrase for decryption if called.

default_passphrase_callback

Returns:

Type Description
Self

Initialized C4GHKey instance.

Source code in oarepo_c4gh/key/c4gh.py
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
@classmethod
def from_bytes(
    self, contents: bytes, callback: callable = default_passphrase_callback
) -> Self:
    """Opens the contents bytes as binary stream and loads the
    Crypt4GH key from it.

    Parameters:
        contents: complete contents of the file with Crypt4GH key.
        callback: must return passphrase for decryption if called.

    Returns:
        Initialized C4GHKey instance.

    """
    return C4GHKey.from_stream(BytesIO(contents), callback)

from_file(file_name: str, callback: callable = default_passphrase_callback) -> Self classmethod

Opens file stream and loads the Crypt4GH key from it.

Parameters:

Name Type Description Default
file_name str

path to the file with the key.

required
callback callable

must return passphrase for decryption if called.

default_passphrase_callback

Returns:

Type Description
Self

Initialized C4GHKey instance.

Source code in oarepo_c4gh/key/c4gh.py
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
@classmethod
def from_file(
    self, file_name: str, callback: callable = default_passphrase_callback
) -> Self:
    """Opens file stream and loads the Crypt4GH key from it.

    Parameters:
        file_name: path to the file with the key.
        callback: must return passphrase for decryption if called.

    Returns:
        Initialized C4GHKey instance.

    """
    return C4GHKey.from_stream(open(file_name, "rb"), callback)

from_stream(istream: RawIOBase, callback: callable = default_passphrase_callback) -> Self classmethod

Parses the stream with stored key.

Parameters:

Name Type Description Default
istream RawIOBase

input stream with the key file contents.

required
callback callable

must return passphrase for decryption if called

default_passphrase_callback

Returns:

Type Description
Self

The newly constructed key instance.

Source code in oarepo_c4gh/key/c4gh.py
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
@classmethod
def from_stream(
    self,
    istream: RawIOBase,
    callback: callable = default_passphrase_callback,
) -> Self:
    """Parses the stream with stored key.

    Parameters:
        istream: input stream with the key file contents.
        callback: must return passphrase for decryption if called

    Returns:
        The newly constructed key instance.
    """
    slabel, sdata = decode_b64_envelope(istream)
    istream.close()
    if slabel == b"CRYPT4GH PUBLIC KEY":
        return C4GHKey(sdata, True)
    else:
        istreamb = BytesIO(sdata)
        check_c4gh_stream_magic(istreamb)
        kdf_name, kdf_rounds, kdf_salt = parse_c4gh_kdf_options(istreamb)
        cipher_name = decode_c4gh_bytes(istreamb)
        if cipher_name == b"none":
            secret_data = decode_c4gh_bytes(istreamb)
            return C4GHKey(secret_data, False)
        if cipher_name != b"chacha20_poly1305":
            raise Crypt4GHKeyException(
                f"Unsupported cipher: {cipher_name}"
            )
        assert callable(
            callback
        ), "Invalid passphrase callback (non-callable)"
        passphrase = callback().encode()
        symmetric_key = derive_c4gh_key(
            kdf_name, passphrase, kdf_salt, kdf_rounds
        )
        nonce_and_encrypted_data = decode_c4gh_bytes(istreamb)
        nonce = nonce_and_encrypted_data[:12]
        encrypted_data = nonce_and_encrypted_data[12:]
        decrypted_data = ChaCha20Poly1305(symmetric_key).decrypt(
            nonce, encrypted_data, None
        )
        return C4GHKey(decrypted_data, False)

from_string(contents: str, callback: callable = default_passphrase_callback) -> Self classmethod

Converts string to bytes which is opened as binary stream and loads the Crypt4GH key from it.

Parameters:

Name Type Description Default
contents str

complete contents of the file with Crypt4GH key.

required
callback callable

must return passphrase for decryption if called.

default_passphrase_callback

Returns:

Type Description
Self

Initialized C4GHKey instance.

Source code in oarepo_c4gh/key/c4gh.py
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
@classmethod
def from_string(
    self, contents: str, callback: callable = default_passphrase_callback
) -> Self:
    """Converts string to bytes which is opened as binary stream
    and loads the Crypt4GH key from it.

    Parameters:
        contents: complete contents of the file with Crypt4GH key.
        callback: must return passphrase for decryption if called.

    Returns:
        Initialized C4GHKey instance.

    """
    return C4GHKey.from_bytes(bytes(contents, "ASCII"), callback)

check_c4gh_kdf(kdf_name: bytes) -> bool

Returns true if given KDF is supported.

Parameters:

Name Type Description Default
kdf_name bytes

KDF name string as bytes

required

Returns:

Type Description
bool

True if the KDF is supported.

Source code in oarepo_c4gh/key/c4gh.py
19
20
21
22
23
24
25
26
27
28
def check_c4gh_kdf(kdf_name: bytes) -> bool:
    """Returns true if given KDF is supported.

    Parameters:
        kdf_name: KDF name string as bytes

    Returns:
        True if the KDF is supported.
    """
    return kdf_name in C4GH_KDFS

check_c4gh_stream_magic(istreamb: RawIOBase) -> None

Reads enough bytes from given input stream and checks whether they contain the correct Crypt4GH signature. Raises error if it doesn't.

Parameters:

Name Type Description Default
istreamb RawIOBase

input stream with the raw Crypt4GH binary key stream.

required

Raises:

Type Description
Crypt4GHKeyException

if the signature does not match.

Source code in oarepo_c4gh/key/c4gh.py
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
def check_c4gh_stream_magic(istreamb: RawIOBase) -> None:
    """Reads enough bytes from given input stream and checks whether
    they contain the correct Crypt4GH signature. Raises error if it
    doesn't.

    Parameters:
        istreamb: input stream with the raw Crypt4GH binary key stream.

    Raises:
        Crypt4GHKeyException: if the signature does not match.

    """
    magic_to_check = istreamb.read(len(C4GH_MAGIC_WORD))
    if magic_to_check != C4GH_MAGIC_WORD:
        raise Crypt4GHKeyException("Not a Crypt4GH private key!")

decode_b64_envelope(istream: RawIOBase) -> (bytes, bytes)

Reads PEM-like format and returns its label and decoded bytes.

Parameters:

Name Type Description Default
istream RawIOBase

input stream with the data.

required

Returns:

Type Description
(bytes, bytes)

Label of the envelope and decoded content bytes.

Source code in oarepo_c4gh/key/c4gh.py
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
def decode_b64_envelope(istream: RawIOBase) -> (bytes, bytes):
    """Reads PEM-like format and returns its label and decoded bytes.

    Parameters:
        istream: input stream with the data.

    Returns:
        Label of the envelope and decoded content bytes.

    """
    lines = list(
        filter(
            lambda line: line,
            map(lambda raw_line: raw_line.strip(), istream.readlines()),
        )
    )
    assert (
        len(lines) >= 3
    ), "At least 3 lines are needed - 2 for envelope and 1 with data."
    assert lines[0].startswith(
        b"-----BEGIN "
    ), f"Must start with BEGIN line {lines[0]}."
    assert lines[-1].startswith(
        b"-----END "
    ), f"Must end with END line {lines[-1]}."
    data = b64decode(b"".join(lines[1:-1]))
    begin_label = lines[0][11:-1].strip(b"-")
    end_label = lines[-1][9:-1].strip(b"-")
    assert (
        begin_label == end_label
    ), f"BEGIN {begin_label} not END {end_label}!"
    return begin_label, data

decode_c4gh_bytes(istream: RawIOBase) -> bytes

Decodes binary string encoded as two-byte big-endian integer length and the actual data that follows this length field.

Parameters:

Name Type Description Default
istream RawIOBase

input stream from which to decode the bytes string.

required

Returns:

Type Description
bytes

The decoded bytes string.

Raises:

Type Description
Crypt4GHKeyException

if there is not enough data in the stream

Source code in oarepo_c4gh/key/c4gh.py
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
def decode_c4gh_bytes(istream: RawIOBase) -> bytes:
    """Decodes binary string encoded as two-byte big-endian integer
    length and the actual data that follows this length field.

    Parameters:
        istream: input stream from which to decode the bytes string.

    Returns:
        The decoded bytes string.

    Raises:
        Crypt4GHKeyException: if there is not enough data in the stream

    """
    lengthb = istream.read(2)
    lengthb_length = len(lengthb)
    if len(lengthb) != 2:
        raise Crypt4GHKeyException(
            f"Binary string read - not enought data to read the length: "
            f"{lengthb_length} != 2"
        )
    length = int.from_bytes(lengthb, byteorder="big")
    string = istream.read(length)
    read_length = len(string)
    if read_length != length:
        raise Crypt4GHKeyException(
            f"Binary string read - not enough data: {read_length} != {length}"
        )
    return string

default_passphrase_callback() -> None

By default the constructor has no means of obtaining the passphrase and therefore this function unconditionally raises an exception when called.

Source code in oarepo_c4gh/key/c4gh.py
31
32
33
34
35
36
37
def default_passphrase_callback() -> None:
    """By default the constructor has no means of obtaining the
    passphrase and therefore this function unconditionally raises an
    exception when called.

    """
    raise Crypt4GHKeyException("No password callback provided!")

derive_c4gh_key(algo: bytes, passphrase: bytes, salt: bytes, rounds: int) -> bytes

Derives the symmetric key for decrypting the private key.

Parameters:

Name Type Description Default
algo bytes

the algorithm for key derivation

required
passphrase bytes

the passphrase from which to derive the key

required
rounds int

number of hashing rounds

required

Returns:

Type Description
bytes

The derived symmetric key.

Raises:

Type Description
Crypt4GHKeyException

if given KDF algorithm is not supported (should not happen as this is expected to be called after parse_c4gh_kdf_options).

Source code in oarepo_c4gh/key/c4gh.py
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
def derive_c4gh_key(
    algo: bytes, passphrase: bytes, salt: bytes, rounds: int
) -> bytes:
    """Derives the symmetric key for decrypting the private key.

    Parameters:
        algo: the algorithm for key derivation
        passphrase: the passphrase from which to derive the key
        rounds: number of hashing rounds

    Returns:
        The derived symmetric key.

    Raises:
        Crypt4GHKeyException: if given KDF algorithm is not supported (should not happen
            as this is expected to be called after parse_c4gh_kdf_options).
    """
    if algo == b"scrypt":
        from hashlib import scrypt

        return scrypt(passphrase, salt=salt, n=1 << 14, r=8, p=1, dklen=32)
    if algo == b"bcrypt":
        import bcrypt

        return bcrypt.kdf(
            passphrase,
            salt=salt,
            desired_key_bytes=32,
            rounds=rounds,
            ignore_few_rounds=True,
        )
    if algo == b"pbkdf2_hmac_sha256":
        from hashlib import pbkdf2_hmac

        return pbkdf2_hmac("sha256", passphrase, salt, rounds, dklen=32)
    raise Crypt4GHKeyException(f"Unsupported KDF: {algo}")

parse_c4gh_kdf_options(istreamb: RawIOBase) -> (bytes, int, bytes)

Parses KDF name and options (if applicable) from given input stream.

Parameters:

Name Type Description Default
istreamb RawIOBase

input stream with the raw Crypt4GH binary stream.

required

Returns:

Name Type Description
kdf_name (bytes, int, bytes)

the name of the KDF as binary string

kdf_rounds (bytes, int, bytes)

number of hashing rounds for KDF

kdf_salt (bytes, int, bytes)

salt for initializing the hashing

Raises:

Type Description
Crypt4GHKeyException

if parsed KDF name is not supported

Source code in oarepo_c4gh/key/c4gh.py
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
def parse_c4gh_kdf_options(istreamb: RawIOBase) -> (bytes, int, bytes):
    """Parses KDF name and options (if applicable) from given input
    stream.

    Parameters:
        istreamb: input stream with the raw Crypt4GH binary stream.

    Returns:
        kdf_name: the name of the KDF as binary string
        kdf_rounds: number of hashing rounds for KDF
        kdf_salt: salt for initializing the hashing

    Raises:
        Crypt4GHKeyException: if parsed KDF name is not supported

    """
    kdf_name = decode_c4gh_bytes(istreamb)
    if kdf_name == b"none":
        return (kdf_name, None, None)
    elif check_c4gh_kdf(kdf_name):
        kdf_options = decode_c4gh_bytes(istreamb)
        kdf_rounds = int.from_bytes(kdf_options[:4], byteorder="big")
        kdf_salt = kdf_options[4:]
        return (kdf_name, kdf_rounds, kdf_salt)
    else:
        raise Crypt4GHKeyException(f"Unsupported KDF {kdf_name}")

oarepo_c4gh.key.key_collection

This module implements a key collection that is to be used when reading the container header packets instead to support multiple available reader keys.

KeyCollection

This class implements a simple storage for a collection of reader keys and gives a reusable iterator which is guaranteed to iterate over all the keys at most once. Each round of iterations starts with the last key was used in the previous round. This ensures that if a reader key successfully reads a packet, it will always be the first to try for the very next packet.

Source code in oarepo_c4gh/key/key_collection.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
class KeyCollection:
    """This class implements a simple storage for a collection of
    reader keys and gives a reusable iterator which is guaranteed to
    iterate over all the keys at most once. Each round of iterations
    starts with the last key was used in the previous round. This
    ensures that if a reader key successfully reads a packet, it will
    always be the first to try for the very next packet.

    """

    def __init__(self, *keys: List[Key]) -> None:
        """Initializes the collection with a list of keys.

        Parameters:
            keys: list of instances of classes implementing the Key Protocol

        Raises:
            Crypt4GHKeyException: if some key(s) do not have access to
                                  private part or no keys were given

        """
        if len(keys) == 0:
            raise Crypt4GHKeyException("Collection needs at least one key")
        for key in keys:
            if not key.can_compute_symmetric_keys:
                raise Crypt4GHKeyException(
                    "KeyCollection is only for keys with access to private key"
                )
        self._keys = keys
        self._current = 0

    @property
    def count(self) -> int:
        """Returns the number of keys in this collection."""
        return len(self._keys)

    @property
    def keys(self) -> Generator[Key, None, None]:
        """Multiple-use iterator that yields each key at most
        once. When re-used, the iteration always starts with the most
        recently yielded key.

        """
        first_current = self._current
        while True:
            yield self._keys[self._current]
            self._current = (self._current + 1) % self.count
            if self._current == first_current:
                break

count: int property

Returns the number of keys in this collection.

keys: Generator[Key, None, None] property

Multiple-use iterator that yields each key at most once. When re-used, the iteration always starts with the most recently yielded key.

__init__(*keys: List[Key]) -> None

Initializes the collection with a list of keys.

Parameters:

Name Type Description Default
keys List[Key]

list of instances of classes implementing the Key Protocol

()

Raises:

Type Description
Crypt4GHKeyException

if some key(s) do not have access to private part or no keys were given

Source code in oarepo_c4gh/key/key_collection.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
def __init__(self, *keys: List[Key]) -> None:
    """Initializes the collection with a list of keys.

    Parameters:
        keys: list of instances of classes implementing the Key Protocol

    Raises:
        Crypt4GHKeyException: if some key(s) do not have access to
                              private part or no keys were given

    """
    if len(keys) == 0:
        raise Crypt4GHKeyException("Collection needs at least one key")
    for key in keys:
        if not key.can_compute_symmetric_keys:
            raise Crypt4GHKeyException(
                "KeyCollection is only for keys with access to private key"
            )
    self._keys = keys
    self._current = 0

External Keys

oarepo_c4gh.key.external

This module provides partial implementation of external (hardware or network) private keys that allow for computing symmetric keys. It assumes a derived class will implement the actual ECDH finalization.

ExternalKey

Bases: Key

This class implements the Crypt4GH symmetric key derivation from ECDH result. The actual ECDH computation must be implemented by derived class.

Source code in oarepo_c4gh/key/external.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
class ExternalKey(Key):
    """This class implements the Crypt4GH symmetric key derivation
    from ECDH result. The actual ECDH computation must be implemented
    by derived class.

    """

    @abstractmethod
    def compute_ecdh(self, public_point: bytes) -> bytes:
        """Given a public point on the curve, this function must
        multiply it by the private key and return the resulting point
        in compressed format (32 bytes).

        Parameters:
            public_point: the public point generated by the other party in compressed format

        Returns:
            The resulting point in compressed format (32 bytes).

        """
        ...

    def compute_write_key(self, reader_public_key: bytes) -> bytes:
        """Computes the write key using this instance's private key
        and the provided reader public key. See
        [`Software.compute_write_key`][oarepo_c4gh.key.software.SoftwareKey.compute_write_key]
        for details.

        Parameters:
            reader_public_key: the reader public key (point) in compressed format

        Returns:
            The writer symmetric key as raw 32 bytes.

        """
        shared_secret = self.compute_ecdh(reader_public_key)
        hash_source = shared_secret + reader_public_key + self.public_key
        the_hash = blake2b(digest_size=64)
        the_hash.update(hash_source)
        digest = the_hash.digest()
        return digest[:32]

    def compute_read_key(self, writer_public_key: bytes) -> bytes:
        """Computes the reader key using this instance's private key
        and provided writer public key. See
        [`Software.compute_read_key`][oarepo_c4gh.key.software.SoftwareKey.compute_read_key]
        for details.

        Parameters:
            writer_public_key: the writer public key (point) in compressed format

        Returns:
            The reader symmetric key as raw 32 bytes.

        """
        shared_secret = self.compute_ecdh(writer_public_key)
        hash_source = shared_secret + self.public_key + writer_public_key
        the_hash = blake2b(digest_size=64)
        the_hash.update(hash_source)
        digest = the_hash.digest()
        return digest[:32]

    @property
    def can_compute_symmetric_keys(self) -> bool:
        """External keys always have private key and therefore can
        always compute the symmetric keys.

        Returns:
            Always True.

        """
        return True

can_compute_symmetric_keys: bool property

External keys always have private key and therefore can always compute the symmetric keys.

Returns:

Type Description
bool

Always True.

compute_ecdh(public_point: bytes) -> bytes

Given a public point on the curve, this function must multiply it by the private key and return the resulting point in compressed format (32 bytes).

Parameters:

Name Type Description Default
public_point bytes

the public point generated by the other party in compressed format

required

Returns:

Type Description
bytes

The resulting point in compressed format (32 bytes).

Source code in oarepo_c4gh/key/external.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@abstractmethod
def compute_ecdh(self, public_point: bytes) -> bytes:
    """Given a public point on the curve, this function must
    multiply it by the private key and return the resulting point
    in compressed format (32 bytes).

    Parameters:
        public_point: the public point generated by the other party in compressed format

    Returns:
        The resulting point in compressed format (32 bytes).

    """
    ...

compute_read_key(writer_public_key: bytes) -> bytes

Computes the reader key using this instance's private key and provided writer public key. See Software.compute_read_key for details.

Parameters:

Name Type Description Default
writer_public_key bytes

the writer public key (point) in compressed format

required

Returns:

Type Description
bytes

The reader symmetric key as raw 32 bytes.

Source code in oarepo_c4gh/key/external.py
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
def compute_read_key(self, writer_public_key: bytes) -> bytes:
    """Computes the reader key using this instance's private key
    and provided writer public key. See
    [`Software.compute_read_key`][oarepo_c4gh.key.software.SoftwareKey.compute_read_key]
    for details.

    Parameters:
        writer_public_key: the writer public key (point) in compressed format

    Returns:
        The reader symmetric key as raw 32 bytes.

    """
    shared_secret = self.compute_ecdh(writer_public_key)
    hash_source = shared_secret + self.public_key + writer_public_key
    the_hash = blake2b(digest_size=64)
    the_hash.update(hash_source)
    digest = the_hash.digest()
    return digest[:32]

compute_write_key(reader_public_key: bytes) -> bytes

Computes the write key using this instance's private key and the provided reader public key. See Software.compute_write_key for details.

Parameters:

Name Type Description Default
reader_public_key bytes

the reader public key (point) in compressed format

required

Returns:

Type Description
bytes

The writer symmetric key as raw 32 bytes.

Source code in oarepo_c4gh/key/external.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
def compute_write_key(self, reader_public_key: bytes) -> bytes:
    """Computes the write key using this instance's private key
    and the provided reader public key. See
    [`Software.compute_write_key`][oarepo_c4gh.key.software.SoftwareKey.compute_write_key]
    for details.

    Parameters:
        reader_public_key: the reader public key (point) in compressed format

    Returns:
        The writer symmetric key as raw 32 bytes.

    """
    shared_secret = self.compute_ecdh(reader_public_key)
    hash_source = shared_secret + reader_public_key + self.public_key
    the_hash = blake2b(digest_size=64)
    the_hash.update(hash_source)
    digest = the_hash.digest()
    return digest[:32]

oarepo_c4gh.key.external_software

This module implemens a virtual external key that is actually backed by SoftwareKey and uses its private key directly.

This module is intended ONLY for testing related functionality and should NEVER be used in production.

ExternalSoftwareKey

Bases: ExternalKey

This is a virtual external key backed by any SoftwareKey implementation.

Do NOT use this class in any production code.

Source code in oarepo_c4gh/key/external_software.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
class ExternalSoftwareKey(ExternalKey):
    """This is a virtual external key backed by any SoftwareKey
    implementation.

    Do NOT use this class in any production code.

    """

    def __init__(self, softkey: SoftwareKey) -> None:
        """Gets its backing private+public key pair from the provided
        SoftwareKey implementation.

        Do NOT use in production code.

        Parameters:
            softkey: the backing key which must include private key

        """
        if not softkey.can_compute_symmetric_keys:
            raise Crypt4GHKeyException(
                "ExternalSoftwareKey needs a private key"
            )
        self._private_key = softkey._private_key
        self._public_key = softkey._public_key

    def compute_ecdh(self, public_point: bytes) -> bytes:
        """Computes directly the final result of ECDH from given
        public point. This implementation is using crypto_scalarmult
        from nacl.bindings.

        Do NOT use in production code.

        """
        return crypto_scalarmult(self._private_key, public_point)

    @property
    def public_key(self) -> bytes:
        """Returns the underlying public key."""
        return self._public_key

public_key: bytes property

Returns the underlying public key.

__init__(softkey: SoftwareKey) -> None

Gets its backing private+public key pair from the provided SoftwareKey implementation.

Do NOT use in production code.

Parameters:

Name Type Description Default
softkey SoftwareKey

the backing key which must include private key

required
Source code in oarepo_c4gh/key/external_software.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
def __init__(self, softkey: SoftwareKey) -> None:
    """Gets its backing private+public key pair from the provided
    SoftwareKey implementation.

    Do NOT use in production code.

    Parameters:
        softkey: the backing key which must include private key

    """
    if not softkey.can_compute_symmetric_keys:
        raise Crypt4GHKeyException(
            "ExternalSoftwareKey needs a private key"
        )
    self._private_key = softkey._private_key
    self._public_key = softkey._public_key

compute_ecdh(public_point: bytes) -> bytes

Computes directly the final result of ECDH from given public point. This implementation is using crypto_scalarmult from nacl.bindings.

Do NOT use in production code.

Source code in oarepo_c4gh/key/external_software.py
40
41
42
43
44
45
46
47
48
def compute_ecdh(self, public_point: bytes) -> bytes:
    """Computes directly the final result of ECDH from given
    public point. This implementation is using crypto_scalarmult
    from nacl.bindings.

    Do NOT use in production code.

    """
    return crypto_scalarmult(self._private_key, public_point)