Skip to content

Crypt4GH Container

The documentation of these modules and classes should NOT be considered a stable API and is subject to change in the future. However, it should always be reflecting the current internal implementation.

Common Interfaces

oarepo_c4gh.crypt4gh.common

A convenience module providing all common interfaces in one bundle.

DataBlock

This class represents single data block - either successfully decrypted or opaque.

Source code in oarepo_c4gh/crypt4gh/common/data_block.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
class DataBlock:
    """This class represents single data block - either successfully
    decrypted or opaque.

    """

    def __init__(
        self,
        enc: bytes,
        clear: Optional[bytes],
        idx: Optional[int],
        off: Optional[int],
    ) -> None:
        """Initializes all the data block instance properties.

        Parameters:
            enc: encrypted data of the packet including nonce and MAC
            clear: decrypted packet data - if available

        """
        self._ciphertext = enc
        self._cleartext = clear
        self._dek_index = idx
        self._offset = off

    @property
    def ciphertext(self) -> bytes:
        """The encrypted data of the whole packet accessor.

        Returns:
            The ecrypted packet as-is.

        """
        return self._ciphertext

    @property
    def cleartext(self) -> Optional[bytes]:
        """The decrypted data of the packet accessor.

        Returns:
           The cleartext of the packet contents if available, None otherwise.

        """
        return self._cleartext

    @property
    def is_deciphered(self) -> bool:
        """Predicate to test whether the cleartext contents of this
        packet can be read.

        """
        return self._cleartext is not None

    @property
    def dek_index(self):
        """Returns the DEK index (to avoid leaking the actual key)"""
        return self._dek_index

    @property
    def offset(self):
        """Returns the offset this block starts at (in original
        cleartext data)"""
        return self._offset

    @property
    def size(self):
        """Returns the size of cleartext data of this packet -
        regardless of whether it was deciphered."""
        return len(self._ciphertext) - 16

ciphertext: bytes property

The encrypted data of the whole packet accessor.

Returns:

Type Description
bytes

The ecrypted packet as-is.

cleartext: Optional[bytes] property

The decrypted data of the packet accessor.

Returns:

Type Description
Optional[bytes]

The cleartext of the packet contents if available, None otherwise.

dek_index property

Returns the DEK index (to avoid leaking the actual key)

is_deciphered: bool property

Predicate to test whether the cleartext contents of this packet can be read.

offset property

Returns the offset this block starts at (in original cleartext data)

size property

Returns the size of cleartext data of this packet - regardless of whether it was deciphered.

__init__(enc: bytes, clear: Optional[bytes], idx: Optional[int], off: Optional[int]) -> None

Initializes all the data block instance properties.

Parameters:

Name Type Description Default
enc bytes

encrypted data of the packet including nonce and MAC

required
clear Optional[bytes]

decrypted packet data - if available

required
Source code in oarepo_c4gh/crypt4gh/common/data_block.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
def __init__(
    self,
    enc: bytes,
    clear: Optional[bytes],
    idx: Optional[int],
    off: Optional[int],
) -> None:
    """Initializes all the data block instance properties.

    Parameters:
        enc: encrypted data of the packet including nonce and MAC
        clear: decrypted packet data - if available

    """
    self._ciphertext = enc
    self._cleartext = clear
    self._dek_index = idx
    self._offset = off

Header

Bases: Protocol

This is a protocol class which guarantees that a header packets collection is available by its descendants. The properties provided are a list of packets - both readable and unreadable - and header metadata fields magic_bytes and version.

Source code in oarepo_c4gh/crypt4gh/common/header.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
class Header(Protocol):
    """This is a protocol class which guarantees that a header packets
    collection is available by its descendants. The properties
    provided are a list of packets - both readable and unreadable -
    and header metadata fields magic_bytes and version.

    """

    @property
    @abstractmethod
    def packets(self) -> list:
        """Must return original or transformed list of header packets."""
        ...

    @property
    @abstractmethod
    def magic_bytes(self) -> bytes:
        """Must return the original magic bytes."""
        ...

    @property
    @abstractmethod
    def version(self) -> int:
        """Must return the version of the loaded/transformer
        container. Must always return 1.

        """
        ...

magic_bytes: bytes property

Must return the original magic bytes.

packets: list property

Must return original or transformed list of header packets.

version: int property

Must return the version of the loaded/transformer container. Must always return 1.

HeaderPacket

Represents a single Crypt4GH header packet. If it was possible to decrypt it, the parsed contents are made available as well.

Source code in oarepo_c4gh/crypt4gh/common/header_packet.py
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
class HeaderPacket:
    """Represents a single Crypt4GH header packet. If it was possible
    to decrypt it, the parsed contents are made available as well.

    """

    def __init__(
        self,
        packet_length,
        packet_data,
        content,
        reader_key,
        packet_type,
        data_encryption_method,
        data_encryption_key,
    ):
        """Initializes the packet structure with all fields given."""
        self._packet_length = packet_length
        self._packet_data = packet_data
        self._content = content
        self._reader_key = reader_key
        self._packet_type = packet_type
        self._data_encryption_method = data_encryption_method
        self._data_encryption_key = data_encryption_key

    @property
    def is_data_encryption_parameters(self) -> bool:
        """A predicate for checking whether this packet contains DEK.

        Returns:
            True if this packet was successfully decrypted and it is
            an encryption parameters type packet.

        """
        return self._content is not None and self._packet_type == 0

    @property
    def data_encryption_key(self) -> bytes:
        """Getter for the symmetric encryption key.

        Returns:
            32 bytes of the symmetric key.

        Raises:
            Crypt4GHHeaderPacketException: if this packet does not contain DEK

        """
        if not self.is_data_encryption_parameters:
            raise Crypt4GHHeaderPacketException("No encryption key available.")
        return self._data_encryption_key

    @property
    def is_edit_list(self) -> bool:
        """A predicate for checking whether this packet contains edit
        list.

        Returns:
            True if it is a successfully decrypted edit list packet.

        """
        return self._content is not None and self._packet_type == 1

    @property
    def is_readable(self) -> bool:
        """A predicate for checking whether the packet was
        successfully decrypted.

        """
        return self._content is not None

    @property
    def reader_key(self) -> bytes:
        """Returns public key used for decrypting this header packet
        or None if the decryption was not successful.

        """
        return self._reader_key

    @property
    def packet_data(self) -> bytes:
        """Returns the original packet data (for serialization)."""
        return self._packet_data

    @property
    def packet_type(self) -> int:
        """Returns the numerical representation of packet type."""
        return self._packet_type

    @property
    def content(self) -> bytes:
        """Returns the encrypted packet content."""
        return self._content

    @property
    def length(self) -> int:
        """Returns the packet length in bytes - including the packet
        length 4-byte value at the beginning.

        """
        return self._packet_length

content: bytes property

Returns the encrypted packet content.

data_encryption_key: bytes property

Getter for the symmetric encryption key.

Returns:

Type Description
bytes

32 bytes of the symmetric key.

Raises:

Type Description
Crypt4GHHeaderPacketException

if this packet does not contain DEK

is_data_encryption_parameters: bool property

A predicate for checking whether this packet contains DEK.

Returns:

Type Description
bool

True if this packet was successfully decrypted and it is

bool

an encryption parameters type packet.

is_edit_list: bool property

A predicate for checking whether this packet contains edit list.

Returns:

Type Description
bool

True if it is a successfully decrypted edit list packet.

is_readable: bool property

A predicate for checking whether the packet was successfully decrypted.

length: int property

Returns the packet length in bytes - including the packet length 4-byte value at the beginning.

packet_data: bytes property

Returns the original packet data (for serialization).

packet_type: int property

Returns the numerical representation of packet type.

reader_key: bytes property

Returns public key used for decrypting this header packet or None if the decryption was not successful.

__init__(packet_length, packet_data, content, reader_key, packet_type, data_encryption_method, data_encryption_key)

Initializes the packet structure with all fields given.

Source code in oarepo_c4gh/crypt4gh/common/header_packet.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
def __init__(
    self,
    packet_length,
    packet_data,
    content,
    reader_key,
    packet_type,
    data_encryption_method,
    data_encryption_key,
):
    """Initializes the packet structure with all fields given."""
    self._packet_length = packet_length
    self._packet_data = packet_data
    self._content = content
    self._reader_key = reader_key
    self._packet_type = packet_type
    self._data_encryption_method = data_encryption_method
    self._data_encryption_key = data_encryption_key

Proto4GH

Bases: Protocol

A protocol ensuring a header and data packets are available.

Source code in oarepo_c4gh/crypt4gh/common/proto4gh.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class Proto4GH(Protocol):
    """A protocol ensuring a header and data packets are available.

    """

    @property
    @abstractmethod
    def header(self) -> Header:
        """Must return an implementaiton of abstract header."""
        ...

    @property
    @abstractmethod
    def data_blocks(self) -> Generator[DataBlock, None, None]:
        """Must be a single-use iterator for data blocks."""
        ...

data_blocks: Generator[DataBlock, None, None] property

Must be a single-use iterator for data blocks.

header: Header property

Must return an implementaiton of abstract header.

Protocols

oarepo_c4gh.crypt4gh.common.header

Protocol for header implementation.

Header

Bases: Protocol

This is a protocol class which guarantees that a header packets collection is available by its descendants. The properties provided are a list of packets - both readable and unreadable - and header metadata fields magic_bytes and version.

Source code in oarepo_c4gh/crypt4gh/common/header.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
class Header(Protocol):
    """This is a protocol class which guarantees that a header packets
    collection is available by its descendants. The properties
    provided are a list of packets - both readable and unreadable -
    and header metadata fields magic_bytes and version.

    """

    @property
    @abstractmethod
    def packets(self) -> list:
        """Must return original or transformed list of header packets."""
        ...

    @property
    @abstractmethod
    def magic_bytes(self) -> bytes:
        """Must return the original magic bytes."""
        ...

    @property
    @abstractmethod
    def version(self) -> int:
        """Must return the version of the loaded/transformer
        container. Must always return 1.

        """
        ...

magic_bytes: bytes property

Must return the original magic bytes.

packets: list property

Must return original or transformed list of header packets.

version: int property

Must return the version of the loaded/transformer container. Must always return 1.

oarepo_c4gh.crypt4gh.common.proto4gh

Protocol for container implementation.

Proto4GH

Bases: Protocol

A protocol ensuring a header and data packets are available.

Source code in oarepo_c4gh/crypt4gh/common/proto4gh.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class Proto4GH(Protocol):
    """A protocol ensuring a header and data packets are available.

    """

    @property
    @abstractmethod
    def header(self) -> Header:
        """Must return an implementaiton of abstract header."""
        ...

    @property
    @abstractmethod
    def data_blocks(self) -> Generator[DataBlock, None, None]:
        """Must be a single-use iterator for data blocks."""
        ...

data_blocks: Generator[DataBlock, None, None] property

Must be a single-use iterator for data blocks.

header: Header property

Must return an implementaiton of abstract header.

Data Structures

oarepo_c4gh.crypt4gh.common.header_packet

Header packet data structure.

HeaderPacket

Represents a single Crypt4GH header packet. If it was possible to decrypt it, the parsed contents are made available as well.

Source code in oarepo_c4gh/crypt4gh/common/header_packet.py
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
class HeaderPacket:
    """Represents a single Crypt4GH header packet. If it was possible
    to decrypt it, the parsed contents are made available as well.

    """

    def __init__(
        self,
        packet_length,
        packet_data,
        content,
        reader_key,
        packet_type,
        data_encryption_method,
        data_encryption_key,
    ):
        """Initializes the packet structure with all fields given."""
        self._packet_length = packet_length
        self._packet_data = packet_data
        self._content = content
        self._reader_key = reader_key
        self._packet_type = packet_type
        self._data_encryption_method = data_encryption_method
        self._data_encryption_key = data_encryption_key

    @property
    def is_data_encryption_parameters(self) -> bool:
        """A predicate for checking whether this packet contains DEK.

        Returns:
            True if this packet was successfully decrypted and it is
            an encryption parameters type packet.

        """
        return self._content is not None and self._packet_type == 0

    @property
    def data_encryption_key(self) -> bytes:
        """Getter for the symmetric encryption key.

        Returns:
            32 bytes of the symmetric key.

        Raises:
            Crypt4GHHeaderPacketException: if this packet does not contain DEK

        """
        if not self.is_data_encryption_parameters:
            raise Crypt4GHHeaderPacketException("No encryption key available.")
        return self._data_encryption_key

    @property
    def is_edit_list(self) -> bool:
        """A predicate for checking whether this packet contains edit
        list.

        Returns:
            True if it is a successfully decrypted edit list packet.

        """
        return self._content is not None and self._packet_type == 1

    @property
    def is_readable(self) -> bool:
        """A predicate for checking whether the packet was
        successfully decrypted.

        """
        return self._content is not None

    @property
    def reader_key(self) -> bytes:
        """Returns public key used for decrypting this header packet
        or None if the decryption was not successful.

        """
        return self._reader_key

    @property
    def packet_data(self) -> bytes:
        """Returns the original packet data (for serialization)."""
        return self._packet_data

    @property
    def packet_type(self) -> int:
        """Returns the numerical representation of packet type."""
        return self._packet_type

    @property
    def content(self) -> bytes:
        """Returns the encrypted packet content."""
        return self._content

    @property
    def length(self) -> int:
        """Returns the packet length in bytes - including the packet
        length 4-byte value at the beginning.

        """
        return self._packet_length

content: bytes property

Returns the encrypted packet content.

data_encryption_key: bytes property

Getter for the symmetric encryption key.

Returns:

Type Description
bytes

32 bytes of the symmetric key.

Raises:

Type Description
Crypt4GHHeaderPacketException

if this packet does not contain DEK

is_data_encryption_parameters: bool property

A predicate for checking whether this packet contains DEK.

Returns:

Type Description
bool

True if this packet was successfully decrypted and it is

bool

an encryption parameters type packet.

is_edit_list: bool property

A predicate for checking whether this packet contains edit list.

Returns:

Type Description
bool

True if it is a successfully decrypted edit list packet.

is_readable: bool property

A predicate for checking whether the packet was successfully decrypted.

length: int property

Returns the packet length in bytes - including the packet length 4-byte value at the beginning.

packet_data: bytes property

Returns the original packet data (for serialization).

packet_type: int property

Returns the numerical representation of packet type.

reader_key: bytes property

Returns public key used for decrypting this header packet or None if the decryption was not successful.

__init__(packet_length, packet_data, content, reader_key, packet_type, data_encryption_method, data_encryption_key)

Initializes the packet structure with all fields given.

Source code in oarepo_c4gh/crypt4gh/common/header_packet.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
def __init__(
    self,
    packet_length,
    packet_data,
    content,
    reader_key,
    packet_type,
    data_encryption_method,
    data_encryption_key,
):
    """Initializes the packet structure with all fields given."""
    self._packet_length = packet_length
    self._packet_data = packet_data
    self._content = content
    self._reader_key = reader_key
    self._packet_type = packet_type
    self._data_encryption_method = data_encryption_method
    self._data_encryption_key = data_encryption_key

oarepo_c4gh.crypt4gh.common.data_block

This module implements thin layer on top of data blocks read from the container.

DataBlock

This class represents single data block - either successfully decrypted or opaque.

Source code in oarepo_c4gh/crypt4gh/common/data_block.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
class DataBlock:
    """This class represents single data block - either successfully
    decrypted or opaque.

    """

    def __init__(
        self,
        enc: bytes,
        clear: Optional[bytes],
        idx: Optional[int],
        off: Optional[int],
    ) -> None:
        """Initializes all the data block instance properties.

        Parameters:
            enc: encrypted data of the packet including nonce and MAC
            clear: decrypted packet data - if available

        """
        self._ciphertext = enc
        self._cleartext = clear
        self._dek_index = idx
        self._offset = off

    @property
    def ciphertext(self) -> bytes:
        """The encrypted data of the whole packet accessor.

        Returns:
            The ecrypted packet as-is.

        """
        return self._ciphertext

    @property
    def cleartext(self) -> Optional[bytes]:
        """The decrypted data of the packet accessor.

        Returns:
           The cleartext of the packet contents if available, None otherwise.

        """
        return self._cleartext

    @property
    def is_deciphered(self) -> bool:
        """Predicate to test whether the cleartext contents of this
        packet can be read.

        """
        return self._cleartext is not None

    @property
    def dek_index(self):
        """Returns the DEK index (to avoid leaking the actual key)"""
        return self._dek_index

    @property
    def offset(self):
        """Returns the offset this block starts at (in original
        cleartext data)"""
        return self._offset

    @property
    def size(self):
        """Returns the size of cleartext data of this packet -
        regardless of whether it was deciphered."""
        return len(self._ciphertext) - 16

ciphertext: bytes property

The encrypted data of the whole packet accessor.

Returns:

Type Description
bytes

The ecrypted packet as-is.

cleartext: Optional[bytes] property

The decrypted data of the packet accessor.

Returns:

Type Description
Optional[bytes]

The cleartext of the packet contents if available, None otherwise.

dek_index property

Returns the DEK index (to avoid leaking the actual key)

is_deciphered: bool property

Predicate to test whether the cleartext contents of this packet can be read.

offset property

Returns the offset this block starts at (in original cleartext data)

size property

Returns the size of cleartext data of this packet - regardless of whether it was deciphered.

__init__(enc: bytes, clear: Optional[bytes], idx: Optional[int], off: Optional[int]) -> None

Initializes all the data block instance properties.

Parameters:

Name Type Description Default
enc bytes

encrypted data of the packet including nonce and MAC

required
clear Optional[bytes]

decrypted packet data - if available

required
Source code in oarepo_c4gh/crypt4gh/common/data_block.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
def __init__(
    self,
    enc: bytes,
    clear: Optional[bytes],
    idx: Optional[int],
    off: Optional[int],
) -> None:
    """Initializes all the data block instance properties.

    Parameters:
        enc: encrypted data of the packet including nonce and MAC
        clear: decrypted packet data - if available

    """
    self._ciphertext = enc
    self._cleartext = clear
    self._dek_index = idx
    self._offset = off

Container Stream

oarepo_c4gh.crypt4gh.crypt4gh

This module implements a simple convenience wrapper Crypt4GH on top of actual Stream4GH implementation.

Crypt4GH

Bases: Stream4GH

This class differs only in its name from the underlying Stream4GH.

Source code in oarepo_c4gh/crypt4gh/crypt4gh.py
 9
10
11
12
class Crypt4GH(Stream4GH):
    """This class differs only in its name from the underlying
    Stream4GH."""
    pass

oarepo_c4gh.crypt4gh.stream

A convenience module providing all stream classes in one bundle.

Stream4GH

Bases: Proto4GH

An instance of this class represents a Crypt4GH container and provides stream processing capabilities of both header packets and data blocks. The input is processed lazily as needed and the header packets are stored for future processing within the instance. The data blocks stream can be used only once.

Source code in oarepo_c4gh/crypt4gh/stream/stream4gh.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
class Stream4GH(Proto4GH):
    """An instance of this class represents a Crypt4GH container and
    provides stream processing capabilities of both header packets and
    data blocks. The input is processed lazily as needed and the
    header packets are stored for future processing within the
    instance. The data blocks stream can be used only once.

    """

    def __init__(
        self,
        reader_key: Union[Key, KeyCollection],
        istream: io.RawIOBase,
        decrypt: bool = True,
        analyze: bool = False,
    ) -> None:
        """Initializes the instance by storing the reader_key and the
        input stream. Verifies whether the reader key can perform
        symmetric key derivation.

        Parameters:
            reader_key: the key used for reading the container
            istream: the container input stream
            decrypt: if True, attempt to decrypt the data blocks

        """
        self._istream = istream
        self._analyzer = Analyzer() if analyze else None
        self._header = StreamHeader(reader_key, istream, self._analyzer)
        self._consumed = False
        self._decrypt = decrypt

    @property
    def header(self) -> StreamHeader:
        """Accessor for the container header object.

        Returns:
            The contents of the parsed header.

        """
        return self._header

    @property
    def data_blocks(self) -> Generator[DataBlock, None, None]:
        """Single-use iterator for data blocks.

        Raises:
            Crypt4GHProcessedException: if called second time

        """
        assert self.header.packets is not None
        if self._consumed:
            raise Crypt4GHProcessedException("Already processed once")
        offset = 0
        while True:
            if self._decrypt:
                enc, clear, idx = self._header.deks.decrypt_packet(
                    self._istream
                )
            else:
                enc = self._istream.read(12 + 65536 + 16)
                if len(enc) == 0:
                    enc = None
                clear = None
                idx = None
            if enc is None:
                break
            block = DataBlock(enc, clear, idx, offset)
            offset = offset + block.size
            if self._analyzer is not None:
                self._analyzer.analyze_block(block)
            yield (block)
        self._consumed = True

    @property
    def analyzer(self):
        """For direct access to analyzer and its results."""
        return self._analyzer

    @property
    def clear_blocks(self) -> Generator[DataBlock, None, None]:
        """Single-use iterator for deciphered blocks only."""
        for block in self.data_blocks:
            if block.is_deciphered:
                yield block

analyzer property

For direct access to analyzer and its results.

clear_blocks: Generator[DataBlock, None, None] property

Single-use iterator for deciphered blocks only.

data_blocks: Generator[DataBlock, None, None] property

Single-use iterator for data blocks.

Raises:

Type Description
Crypt4GHProcessedException

if called second time

header: StreamHeader property

Accessor for the container header object.

Returns:

Type Description
StreamHeader

The contents of the parsed header.

__init__(reader_key: Union[Key, KeyCollection], istream: io.RawIOBase, decrypt: bool = True, analyze: bool = False) -> None

Initializes the instance by storing the reader_key and the input stream. Verifies whether the reader key can perform symmetric key derivation.

Parameters:

Name Type Description Default
reader_key Union[Key, KeyCollection]

the key used for reading the container

required
istream RawIOBase

the container input stream

required
decrypt bool

if True, attempt to decrypt the data blocks

True
Source code in oarepo_c4gh/crypt4gh/stream/stream4gh.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
def __init__(
    self,
    reader_key: Union[Key, KeyCollection],
    istream: io.RawIOBase,
    decrypt: bool = True,
    analyze: bool = False,
) -> None:
    """Initializes the instance by storing the reader_key and the
    input stream. Verifies whether the reader key can perform
    symmetric key derivation.

    Parameters:
        reader_key: the key used for reading the container
        istream: the container input stream
        decrypt: if True, attempt to decrypt the data blocks

    """
    self._istream = istream
    self._analyzer = Analyzer() if analyze else None
    self._header = StreamHeader(reader_key, istream, self._analyzer)
    self._consumed = False
    self._decrypt = decrypt

StreamHeader

Bases: Header

The constructor of this class loads the Crypt4GH header from given stream.

Source code in oarepo_c4gh/crypt4gh/stream/header.py
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
class StreamHeader(Header):
    """The constructor of this class loads the Crypt4GH header from
    given stream.

    """

    def __init__(
        self,
        reader_key_or_collection: Union[Key, KeyCollection],
        istream: io.RawIOBase,
        analyzer: Analyzer = None,
    ) -> None:
        """Checks the Crypt4GH container signature, version and header
        packet count. The header packets are loaded lazily when needed.

        Parameters:
            reader_key_or_collection: the key used for trying to decrypt header
                packets (must include the private part) or collection of keys
            istream: the container input stream
            analyzer: analyzer for storing packet readability information

        """
        self._magic_bytes = istream.read(8)
        check_crypt4gh_magic(self._magic_bytes)
        self._version = read_crypt4gh_stream_le_uint32(istream, "version")
        if self._version != 1:
            raise Crypt4GHHeaderException(
                f"Invalid Crypt4GH version {self._version}"
            )
        self._packet_count = read_crypt4gh_stream_le_uint32(
            istream, "packet count"
        )
        if isinstance(reader_key_or_collection, KeyCollection):
            self._reader_keys = reader_key_or_collection
        else:
            self._reader_keys = KeyCollection(reader_key_or_collection)
        self._istream = istream
        self._packets = None
        self._deks = DEKCollection()
        self._analyzer = analyzer

    def load_packets(self) -> None:
        """Loads the packets from the input stream and discards the
        key. It populates the internal Data Encryption Key collection
        for later use during this process.

        Raises:
            Crypt4GHHeaderException: if the reader key cannot perform symmetric key
                        derivation

        """
        self._packets = []
        for idx in range(self._packet_count):
            packet = StreamHeaderPacket(self._reader_keys, self._istream)
            if packet.is_data_encryption_parameters:
                self._deks.add_dek(
                    DEK(packet.data_encryption_key, packet.reader_key)
                )
            self._packets.append(packet)
            if self._analyzer is not None:
                self._analyzer.analyze_packet(packet)
        self._reader_keys = None

    @property
    def packets(self) -> list:
        """The accessor to the direct list of header packets.

        Returns:
            List of header packets.

        Raises:
            Crypt4GHHeaderException: if the reader key cannot perform symmetric key
                        derivation

        """
        if self._packets is None:
            self.load_packets()
        return self._packets

    @property
    def deks(self) -> DEKCollection:
        """Returns the collection of Data Encryption Keys obtained by
        processing all header packets. Ensures the header packets were
        actually processed before returning the reference.

        Returns:
            The DEK Collection.

        Raises:
            Crypt4GHHeaderException: if packets needed to be loaded and
                something went wrong

        """
        if self._packets is None:
            self.load_packets()
        return self._deks

    @property
    def magic_bytes(self) -> bytes:
        """Returns the original magic bytes from the beginning of the
        container.

        """
        return self._magic_bytes

    @property
    def version(self) -> int:
        """Returns the version of this container format (must always
        return 1).

        """
        return self._version

    @property
    def reader_keys_used(self) -> list[bytes]:
        """Returns all reader public keys successfully used in any
        packets decryption.

        """
        return list(
            set(
                packet.reader_key
                for packet in self.packets
                if packet.reader_key is not None
            )
        )

deks: DEKCollection property

Returns the collection of Data Encryption Keys obtained by processing all header packets. Ensures the header packets were actually processed before returning the reference.

Returns:

Type Description
DEKCollection

The DEK Collection.

Raises:

Type Description
Crypt4GHHeaderException

if packets needed to be loaded and something went wrong

magic_bytes: bytes property

Returns the original magic bytes from the beginning of the container.

packets: list property

The accessor to the direct list of header packets.

Returns:

Type Description
list

List of header packets.

Raises:

Type Description
Crypt4GHHeaderException

if the reader key cannot perform symmetric key derivation

reader_keys_used: list[bytes] property

Returns all reader public keys successfully used in any packets decryption.

version: int property

Returns the version of this container format (must always return 1).

__init__(reader_key_or_collection: Union[Key, KeyCollection], istream: io.RawIOBase, analyzer: Analyzer = None) -> None

Checks the Crypt4GH container signature, version and header packet count. The header packets are loaded lazily when needed.

Parameters:

Name Type Description Default
reader_key_or_collection Union[Key, KeyCollection]

the key used for trying to decrypt header packets (must include the private part) or collection of keys

required
istream RawIOBase

the container input stream

required
analyzer Analyzer

analyzer for storing packet readability information

None
Source code in oarepo_c4gh/crypt4gh/stream/header.py
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
def __init__(
    self,
    reader_key_or_collection: Union[Key, KeyCollection],
    istream: io.RawIOBase,
    analyzer: Analyzer = None,
) -> None:
    """Checks the Crypt4GH container signature, version and header
    packet count. The header packets are loaded lazily when needed.

    Parameters:
        reader_key_or_collection: the key used for trying to decrypt header
            packets (must include the private part) or collection of keys
        istream: the container input stream
        analyzer: analyzer for storing packet readability information

    """
    self._magic_bytes = istream.read(8)
    check_crypt4gh_magic(self._magic_bytes)
    self._version = read_crypt4gh_stream_le_uint32(istream, "version")
    if self._version != 1:
        raise Crypt4GHHeaderException(
            f"Invalid Crypt4GH version {self._version}"
        )
    self._packet_count = read_crypt4gh_stream_le_uint32(
        istream, "packet count"
    )
    if isinstance(reader_key_or_collection, KeyCollection):
        self._reader_keys = reader_key_or_collection
    else:
        self._reader_keys = KeyCollection(reader_key_or_collection)
    self._istream = istream
    self._packets = None
    self._deks = DEKCollection()
    self._analyzer = analyzer

load_packets() -> None

Loads the packets from the input stream and discards the key. It populates the internal Data Encryption Key collection for later use during this process.

Raises:

Type Description
Crypt4GHHeaderException

if the reader key cannot perform symmetric key derivation

Source code in oarepo_c4gh/crypt4gh/stream/header.py
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
def load_packets(self) -> None:
    """Loads the packets from the input stream and discards the
    key. It populates the internal Data Encryption Key collection
    for later use during this process.

    Raises:
        Crypt4GHHeaderException: if the reader key cannot perform symmetric key
                    derivation

    """
    self._packets = []
    for idx in range(self._packet_count):
        packet = StreamHeaderPacket(self._reader_keys, self._istream)
        if packet.is_data_encryption_parameters:
            self._deks.add_dek(
                DEK(packet.data_encryption_key, packet.reader_key)
            )
        self._packets.append(packet)
        if self._analyzer is not None:
            self._analyzer.analyze_packet(packet)
    self._reader_keys = None

StreamHeaderPacket

Bases: HeaderPacket

Loads the header packet from stream.

Source code in oarepo_c4gh/crypt4gh/stream/header_packet.py
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
class StreamHeaderPacket(HeaderPacket):
    """Loads the header packet from stream."""

    def __init__(
        self, reader_keys: KeyCollection, istream: io.RawIOBase
    ) -> None:
        """Tries parsing a single packet from given input stream and
        stores it for future processing. If it is possible to decrypt
        the packet with given reader key, the contents are parsed and
        interpreted as well.

        Parameters:
            reader_keys: the key collection used for decryption attempts
            istream: the container input stream

        Raises:
            Crypt4GHHeaderPacketException: if any problem in parsing the packet occurs.

        """
        _packet_length = read_crypt4gh_stream_le_uint32(
            istream, "packet length"
        )
        _packet_data = _packet_length.to_bytes(4, "little") + istream.read(
            _packet_length - 4
        )
        if len(_packet_data) != _packet_length:
            raise Crypt4GHHeaderPacketException(
                f"Header packet: read only {len(_packet_data)} "
                f"instead of {_packet_length}"
            )
        encryption_method = read_crypt4gh_bytes_le_uint32(
            _packet_data, 4, "encryption method"
        )
        if encryption_method != 0:
            raise Crypt4GHHeaderPacketException(
                f"Unsupported encryption method {encryption_method}"
            )
        writer_public_key = _packet_data[8:40]
        nonce = _packet_data[40:52]
        payload_length = _packet_length - 4 - 4 - 32 - 12 - 16
        payload = _packet_data[52:]
        for maybe_reader_key in reader_keys.keys:
            symmetric_key = maybe_reader_key.compute_read_key(
                writer_public_key
            )
            _content = None
            _reader_key = None
            try:
                _content = crypto_aead_chacha20poly1305_ietf_decrypt(
                    payload, None, nonce, symmetric_key
                )
                _reader_key = maybe_reader_key.public_key
                break
            except CryptoError as cerr:
                pass
        _data_encryption_method = None
        _packet_type = None
        _data_encryption_key = None
        if _content is not None:
            _packet_type = read_crypt4gh_bytes_le_uint32(
                _content, 0, "packet type"
            )
            if _packet_type == 0:
                _data_encryption_method = read_crypt4gh_bytes_le_uint32(
                    _content, 4, "encryption method"
                )
                if _data_encryption_method != 0:
                    raise Crypt4GHHeaderPacketException(
                        f"Unknown data encryption method "
                        f"{_data_encryption_method}."
                    )
                _data_encryption_key = _content[8:40]
            elif _packet_type == 1:
                # Edit List
                pass
            else:
                # Report error? Warning?
                pass
        super().__init__(
            _packet_length,
            _packet_data,
            _content,
            _reader_key,
            _packet_type,
            _data_encryption_method,
            _data_encryption_key,
        )

__init__(reader_keys: KeyCollection, istream: io.RawIOBase) -> None

Tries parsing a single packet from given input stream and stores it for future processing. If it is possible to decrypt the packet with given reader key, the contents are parsed and interpreted as well.

Parameters:

Name Type Description Default
reader_keys KeyCollection

the key collection used for decryption attempts

required
istream RawIOBase

the container input stream

required

Raises:

Type Description
Crypt4GHHeaderPacketException

if any problem in parsing the packet occurs.

Source code in oarepo_c4gh/crypt4gh/stream/header_packet.py
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
def __init__(
    self, reader_keys: KeyCollection, istream: io.RawIOBase
) -> None:
    """Tries parsing a single packet from given input stream and
    stores it for future processing. If it is possible to decrypt
    the packet with given reader key, the contents are parsed and
    interpreted as well.

    Parameters:
        reader_keys: the key collection used for decryption attempts
        istream: the container input stream

    Raises:
        Crypt4GHHeaderPacketException: if any problem in parsing the packet occurs.

    """
    _packet_length = read_crypt4gh_stream_le_uint32(
        istream, "packet length"
    )
    _packet_data = _packet_length.to_bytes(4, "little") + istream.read(
        _packet_length - 4
    )
    if len(_packet_data) != _packet_length:
        raise Crypt4GHHeaderPacketException(
            f"Header packet: read only {len(_packet_data)} "
            f"instead of {_packet_length}"
        )
    encryption_method = read_crypt4gh_bytes_le_uint32(
        _packet_data, 4, "encryption method"
    )
    if encryption_method != 0:
        raise Crypt4GHHeaderPacketException(
            f"Unsupported encryption method {encryption_method}"
        )
    writer_public_key = _packet_data[8:40]
    nonce = _packet_data[40:52]
    payload_length = _packet_length - 4 - 4 - 32 - 12 - 16
    payload = _packet_data[52:]
    for maybe_reader_key in reader_keys.keys:
        symmetric_key = maybe_reader_key.compute_read_key(
            writer_public_key
        )
        _content = None
        _reader_key = None
        try:
            _content = crypto_aead_chacha20poly1305_ietf_decrypt(
                payload, None, nonce, symmetric_key
            )
            _reader_key = maybe_reader_key.public_key
            break
        except CryptoError as cerr:
            pass
    _data_encryption_method = None
    _packet_type = None
    _data_encryption_key = None
    if _content is not None:
        _packet_type = read_crypt4gh_bytes_le_uint32(
            _content, 0, "packet type"
        )
        if _packet_type == 0:
            _data_encryption_method = read_crypt4gh_bytes_le_uint32(
                _content, 4, "encryption method"
            )
            if _data_encryption_method != 0:
                raise Crypt4GHHeaderPacketException(
                    f"Unknown data encryption method "
                    f"{_data_encryption_method}."
                )
            _data_encryption_key = _content[8:40]
        elif _packet_type == 1:
            # Edit List
            pass
        else:
            # Report error? Warning?
            pass
    super().__init__(
        _packet_length,
        _packet_data,
        _content,
        _reader_key,
        _packet_type,
        _data_encryption_method,
        _data_encryption_key,
    )

oarepo_c4gh.crypt4gh.stream.header_packet

Implementation of single Crypt4GH header packet stream parser.

StreamHeaderPacket

Bases: HeaderPacket

Loads the header packet from stream.

Source code in oarepo_c4gh/crypt4gh/stream/header_packet.py
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
class StreamHeaderPacket(HeaderPacket):
    """Loads the header packet from stream."""

    def __init__(
        self, reader_keys: KeyCollection, istream: io.RawIOBase
    ) -> None:
        """Tries parsing a single packet from given input stream and
        stores it for future processing. If it is possible to decrypt
        the packet with given reader key, the contents are parsed and
        interpreted as well.

        Parameters:
            reader_keys: the key collection used for decryption attempts
            istream: the container input stream

        Raises:
            Crypt4GHHeaderPacketException: if any problem in parsing the packet occurs.

        """
        _packet_length = read_crypt4gh_stream_le_uint32(
            istream, "packet length"
        )
        _packet_data = _packet_length.to_bytes(4, "little") + istream.read(
            _packet_length - 4
        )
        if len(_packet_data) != _packet_length:
            raise Crypt4GHHeaderPacketException(
                f"Header packet: read only {len(_packet_data)} "
                f"instead of {_packet_length}"
            )
        encryption_method = read_crypt4gh_bytes_le_uint32(
            _packet_data, 4, "encryption method"
        )
        if encryption_method != 0:
            raise Crypt4GHHeaderPacketException(
                f"Unsupported encryption method {encryption_method}"
            )
        writer_public_key = _packet_data[8:40]
        nonce = _packet_data[40:52]
        payload_length = _packet_length - 4 - 4 - 32 - 12 - 16
        payload = _packet_data[52:]
        for maybe_reader_key in reader_keys.keys:
            symmetric_key = maybe_reader_key.compute_read_key(
                writer_public_key
            )
            _content = None
            _reader_key = None
            try:
                _content = crypto_aead_chacha20poly1305_ietf_decrypt(
                    payload, None, nonce, symmetric_key
                )
                _reader_key = maybe_reader_key.public_key
                break
            except CryptoError as cerr:
                pass
        _data_encryption_method = None
        _packet_type = None
        _data_encryption_key = None
        if _content is not None:
            _packet_type = read_crypt4gh_bytes_le_uint32(
                _content, 0, "packet type"
            )
            if _packet_type == 0:
                _data_encryption_method = read_crypt4gh_bytes_le_uint32(
                    _content, 4, "encryption method"
                )
                if _data_encryption_method != 0:
                    raise Crypt4GHHeaderPacketException(
                        f"Unknown data encryption method "
                        f"{_data_encryption_method}."
                    )
                _data_encryption_key = _content[8:40]
            elif _packet_type == 1:
                # Edit List
                pass
            else:
                # Report error? Warning?
                pass
        super().__init__(
            _packet_length,
            _packet_data,
            _content,
            _reader_key,
            _packet_type,
            _data_encryption_method,
            _data_encryption_key,
        )

__init__(reader_keys: KeyCollection, istream: io.RawIOBase) -> None

Tries parsing a single packet from given input stream and stores it for future processing. If it is possible to decrypt the packet with given reader key, the contents are parsed and interpreted as well.

Parameters:

Name Type Description Default
reader_keys KeyCollection

the key collection used for decryption attempts

required
istream RawIOBase

the container input stream

required

Raises:

Type Description
Crypt4GHHeaderPacketException

if any problem in parsing the packet occurs.

Source code in oarepo_c4gh/crypt4gh/stream/header_packet.py
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
def __init__(
    self, reader_keys: KeyCollection, istream: io.RawIOBase
) -> None:
    """Tries parsing a single packet from given input stream and
    stores it for future processing. If it is possible to decrypt
    the packet with given reader key, the contents are parsed and
    interpreted as well.

    Parameters:
        reader_keys: the key collection used for decryption attempts
        istream: the container input stream

    Raises:
        Crypt4GHHeaderPacketException: if any problem in parsing the packet occurs.

    """
    _packet_length = read_crypt4gh_stream_le_uint32(
        istream, "packet length"
    )
    _packet_data = _packet_length.to_bytes(4, "little") + istream.read(
        _packet_length - 4
    )
    if len(_packet_data) != _packet_length:
        raise Crypt4GHHeaderPacketException(
            f"Header packet: read only {len(_packet_data)} "
            f"instead of {_packet_length}"
        )
    encryption_method = read_crypt4gh_bytes_le_uint32(
        _packet_data, 4, "encryption method"
    )
    if encryption_method != 0:
        raise Crypt4GHHeaderPacketException(
            f"Unsupported encryption method {encryption_method}"
        )
    writer_public_key = _packet_data[8:40]
    nonce = _packet_data[40:52]
    payload_length = _packet_length - 4 - 4 - 32 - 12 - 16
    payload = _packet_data[52:]
    for maybe_reader_key in reader_keys.keys:
        symmetric_key = maybe_reader_key.compute_read_key(
            writer_public_key
        )
        _content = None
        _reader_key = None
        try:
            _content = crypto_aead_chacha20poly1305_ietf_decrypt(
                payload, None, nonce, symmetric_key
            )
            _reader_key = maybe_reader_key.public_key
            break
        except CryptoError as cerr:
            pass
    _data_encryption_method = None
    _packet_type = None
    _data_encryption_key = None
    if _content is not None:
        _packet_type = read_crypt4gh_bytes_le_uint32(
            _content, 0, "packet type"
        )
        if _packet_type == 0:
            _data_encryption_method = read_crypt4gh_bytes_le_uint32(
                _content, 4, "encryption method"
            )
            if _data_encryption_method != 0:
                raise Crypt4GHHeaderPacketException(
                    f"Unknown data encryption method "
                    f"{_data_encryption_method}."
                )
            _data_encryption_key = _content[8:40]
        elif _packet_type == 1:
            # Edit List
            pass
        else:
            # Report error? Warning?
            pass
    super().__init__(
        _packet_length,
        _packet_data,
        _content,
        _reader_key,
        _packet_type,
        _data_encryption_method,
        _data_encryption_key,
    )

oarepo_c4gh.crypt4gh.stream.header

This module implements the class responsible for loading Crypt4GH from given input stream.

StreamHeader

Bases: Header

The constructor of this class loads the Crypt4GH header from given stream.

Source code in oarepo_c4gh/crypt4gh/stream/header.py
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
class StreamHeader(Header):
    """The constructor of this class loads the Crypt4GH header from
    given stream.

    """

    def __init__(
        self,
        reader_key_or_collection: Union[Key, KeyCollection],
        istream: io.RawIOBase,
        analyzer: Analyzer = None,
    ) -> None:
        """Checks the Crypt4GH container signature, version and header
        packet count. The header packets are loaded lazily when needed.

        Parameters:
            reader_key_or_collection: the key used for trying to decrypt header
                packets (must include the private part) or collection of keys
            istream: the container input stream
            analyzer: analyzer for storing packet readability information

        """
        self._magic_bytes = istream.read(8)
        check_crypt4gh_magic(self._magic_bytes)
        self._version = read_crypt4gh_stream_le_uint32(istream, "version")
        if self._version != 1:
            raise Crypt4GHHeaderException(
                f"Invalid Crypt4GH version {self._version}"
            )
        self._packet_count = read_crypt4gh_stream_le_uint32(
            istream, "packet count"
        )
        if isinstance(reader_key_or_collection, KeyCollection):
            self._reader_keys = reader_key_or_collection
        else:
            self._reader_keys = KeyCollection(reader_key_or_collection)
        self._istream = istream
        self._packets = None
        self._deks = DEKCollection()
        self._analyzer = analyzer

    def load_packets(self) -> None:
        """Loads the packets from the input stream and discards the
        key. It populates the internal Data Encryption Key collection
        for later use during this process.

        Raises:
            Crypt4GHHeaderException: if the reader key cannot perform symmetric key
                        derivation

        """
        self._packets = []
        for idx in range(self._packet_count):
            packet = StreamHeaderPacket(self._reader_keys, self._istream)
            if packet.is_data_encryption_parameters:
                self._deks.add_dek(
                    DEK(packet.data_encryption_key, packet.reader_key)
                )
            self._packets.append(packet)
            if self._analyzer is not None:
                self._analyzer.analyze_packet(packet)
        self._reader_keys = None

    @property
    def packets(self) -> list:
        """The accessor to the direct list of header packets.

        Returns:
            List of header packets.

        Raises:
            Crypt4GHHeaderException: if the reader key cannot perform symmetric key
                        derivation

        """
        if self._packets is None:
            self.load_packets()
        return self._packets

    @property
    def deks(self) -> DEKCollection:
        """Returns the collection of Data Encryption Keys obtained by
        processing all header packets. Ensures the header packets were
        actually processed before returning the reference.

        Returns:
            The DEK Collection.

        Raises:
            Crypt4GHHeaderException: if packets needed to be loaded and
                something went wrong

        """
        if self._packets is None:
            self.load_packets()
        return self._deks

    @property
    def magic_bytes(self) -> bytes:
        """Returns the original magic bytes from the beginning of the
        container.

        """
        return self._magic_bytes

    @property
    def version(self) -> int:
        """Returns the version of this container format (must always
        return 1).

        """
        return self._version

    @property
    def reader_keys_used(self) -> list[bytes]:
        """Returns all reader public keys successfully used in any
        packets decryption.

        """
        return list(
            set(
                packet.reader_key
                for packet in self.packets
                if packet.reader_key is not None
            )
        )

deks: DEKCollection property

Returns the collection of Data Encryption Keys obtained by processing all header packets. Ensures the header packets were actually processed before returning the reference.

Returns:

Type Description
DEKCollection

The DEK Collection.

Raises:

Type Description
Crypt4GHHeaderException

if packets needed to be loaded and something went wrong

magic_bytes: bytes property

Returns the original magic bytes from the beginning of the container.

packets: list property

The accessor to the direct list of header packets.

Returns:

Type Description
list

List of header packets.

Raises:

Type Description
Crypt4GHHeaderException

if the reader key cannot perform symmetric key derivation

reader_keys_used: list[bytes] property

Returns all reader public keys successfully used in any packets decryption.

version: int property

Returns the version of this container format (must always return 1).

__init__(reader_key_or_collection: Union[Key, KeyCollection], istream: io.RawIOBase, analyzer: Analyzer = None) -> None

Checks the Crypt4GH container signature, version and header packet count. The header packets are loaded lazily when needed.

Parameters:

Name Type Description Default
reader_key_or_collection Union[Key, KeyCollection]

the key used for trying to decrypt header packets (must include the private part) or collection of keys

required
istream RawIOBase

the container input stream

required
analyzer Analyzer

analyzer for storing packet readability information

None
Source code in oarepo_c4gh/crypt4gh/stream/header.py
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
def __init__(
    self,
    reader_key_or_collection: Union[Key, KeyCollection],
    istream: io.RawIOBase,
    analyzer: Analyzer = None,
) -> None:
    """Checks the Crypt4GH container signature, version and header
    packet count. The header packets are loaded lazily when needed.

    Parameters:
        reader_key_or_collection: the key used for trying to decrypt header
            packets (must include the private part) or collection of keys
        istream: the container input stream
        analyzer: analyzer for storing packet readability information

    """
    self._magic_bytes = istream.read(8)
    check_crypt4gh_magic(self._magic_bytes)
    self._version = read_crypt4gh_stream_le_uint32(istream, "version")
    if self._version != 1:
        raise Crypt4GHHeaderException(
            f"Invalid Crypt4GH version {self._version}"
        )
    self._packet_count = read_crypt4gh_stream_le_uint32(
        istream, "packet count"
    )
    if isinstance(reader_key_or_collection, KeyCollection):
        self._reader_keys = reader_key_or_collection
    else:
        self._reader_keys = KeyCollection(reader_key_or_collection)
    self._istream = istream
    self._packets = None
    self._deks = DEKCollection()
    self._analyzer = analyzer

load_packets() -> None

Loads the packets from the input stream and discards the key. It populates the internal Data Encryption Key collection for later use during this process.

Raises:

Type Description
Crypt4GHHeaderException

if the reader key cannot perform symmetric key derivation

Source code in oarepo_c4gh/crypt4gh/stream/header.py
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
def load_packets(self) -> None:
    """Loads the packets from the input stream and discards the
    key. It populates the internal Data Encryption Key collection
    for later use during this process.

    Raises:
        Crypt4GHHeaderException: if the reader key cannot perform symmetric key
                    derivation

    """
    self._packets = []
    for idx in range(self._packet_count):
        packet = StreamHeaderPacket(self._reader_keys, self._istream)
        if packet.is_data_encryption_parameters:
            self._deks.add_dek(
                DEK(packet.data_encryption_key, packet.reader_key)
            )
        self._packets.append(packet)
        if self._analyzer is not None:
            self._analyzer.analyze_packet(packet)
    self._reader_keys = None

check_crypt4gh_magic(magic_bytes: bytes) -> None

Checks given bytes whether they match the required Crypt4GH magic bytes.

Parameters:

Name Type Description Default
magic_bytes bytes

the bytes to check

required

Raises:

Type Description
Crypt4GHHeaderException

if not enough or incorrect bytes

Source code in oarepo_c4gh/crypt4gh/stream/header.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
def check_crypt4gh_magic(magic_bytes: bytes) -> None:
    """Checks given bytes whether they match the required Crypt4GH
    magic bytes.

    Parameters:
        magic_bytes: the bytes to check

    Raises:
        Crypt4GHHeaderException: if not enough or incorrect bytes

    """
    magic_bytes_len = len(CRYPT4GH_MAGIC)
    if len(magic_bytes) != magic_bytes_len:
        raise Crypt4GHHeaderException(
            f"Cannot read enough magic bytes {magic_bytes_len}"
        )
    if magic_bytes != CRYPT4GH_MAGIC:
        raise Crypt4GHHeaderException(
            f"Incorrect Crypt4GH magic: {magic_bytes}"
        )

oarepo_c4gh.crypt4gh.stream.stream4gh

A module containing the Crypt4GH stream loading class.

Stream4GH

Bases: Proto4GH

An instance of this class represents a Crypt4GH container and provides stream processing capabilities of both header packets and data blocks. The input is processed lazily as needed and the header packets are stored for future processing within the instance. The data blocks stream can be used only once.

Source code in oarepo_c4gh/crypt4gh/stream/stream4gh.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
class Stream4GH(Proto4GH):
    """An instance of this class represents a Crypt4GH container and
    provides stream processing capabilities of both header packets and
    data blocks. The input is processed lazily as needed and the
    header packets are stored for future processing within the
    instance. The data blocks stream can be used only once.

    """

    def __init__(
        self,
        reader_key: Union[Key, KeyCollection],
        istream: io.RawIOBase,
        decrypt: bool = True,
        analyze: bool = False,
    ) -> None:
        """Initializes the instance by storing the reader_key and the
        input stream. Verifies whether the reader key can perform
        symmetric key derivation.

        Parameters:
            reader_key: the key used for reading the container
            istream: the container input stream
            decrypt: if True, attempt to decrypt the data blocks

        """
        self._istream = istream
        self._analyzer = Analyzer() if analyze else None
        self._header = StreamHeader(reader_key, istream, self._analyzer)
        self._consumed = False
        self._decrypt = decrypt

    @property
    def header(self) -> StreamHeader:
        """Accessor for the container header object.

        Returns:
            The contents of the parsed header.

        """
        return self._header

    @property
    def data_blocks(self) -> Generator[DataBlock, None, None]:
        """Single-use iterator for data blocks.

        Raises:
            Crypt4GHProcessedException: if called second time

        """
        assert self.header.packets is not None
        if self._consumed:
            raise Crypt4GHProcessedException("Already processed once")
        offset = 0
        while True:
            if self._decrypt:
                enc, clear, idx = self._header.deks.decrypt_packet(
                    self._istream
                )
            else:
                enc = self._istream.read(12 + 65536 + 16)
                if len(enc) == 0:
                    enc = None
                clear = None
                idx = None
            if enc is None:
                break
            block = DataBlock(enc, clear, idx, offset)
            offset = offset + block.size
            if self._analyzer is not None:
                self._analyzer.analyze_block(block)
            yield (block)
        self._consumed = True

    @property
    def analyzer(self):
        """For direct access to analyzer and its results."""
        return self._analyzer

    @property
    def clear_blocks(self) -> Generator[DataBlock, None, None]:
        """Single-use iterator for deciphered blocks only."""
        for block in self.data_blocks:
            if block.is_deciphered:
                yield block

analyzer property

For direct access to analyzer and its results.

clear_blocks: Generator[DataBlock, None, None] property

Single-use iterator for deciphered blocks only.

data_blocks: Generator[DataBlock, None, None] property

Single-use iterator for data blocks.

Raises:

Type Description
Crypt4GHProcessedException

if called second time

header: StreamHeader property

Accessor for the container header object.

Returns:

Type Description
StreamHeader

The contents of the parsed header.

__init__(reader_key: Union[Key, KeyCollection], istream: io.RawIOBase, decrypt: bool = True, analyze: bool = False) -> None

Initializes the instance by storing the reader_key and the input stream. Verifies whether the reader key can perform symmetric key derivation.

Parameters:

Name Type Description Default
reader_key Union[Key, KeyCollection]

the key used for reading the container

required
istream RawIOBase

the container input stream

required
decrypt bool

if True, attempt to decrypt the data blocks

True
Source code in oarepo_c4gh/crypt4gh/stream/stream4gh.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
def __init__(
    self,
    reader_key: Union[Key, KeyCollection],
    istream: io.RawIOBase,
    decrypt: bool = True,
    analyze: bool = False,
) -> None:
    """Initializes the instance by storing the reader_key and the
    input stream. Verifies whether the reader key can perform
    symmetric key derivation.

    Parameters:
        reader_key: the key used for reading the container
        istream: the container input stream
        decrypt: if True, attempt to decrypt the data blocks

    """
    self._istream = istream
    self._analyzer = Analyzer() if analyze else None
    self._header = StreamHeader(reader_key, istream, self._analyzer)
    self._consumed = False
    self._decrypt = decrypt

Data Keys

oarepo_c4gh.crypt4gh.dek

Module with Data Encryption Key wrapper.

DEK

Data Encryption Key with reference to the Key that unlocked it.

Source code in oarepo_c4gh/crypt4gh/dek.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
class DEK:
    """Data Encryption Key with reference to the Key that unlocked
    it.

    """

    def __init__(self, dek: bytes, key: bytes) -> None:
        """Initializes the wrapper.

        Parameters:
            dek: the symmetric Data Encryption Key
            key: public key that unlocked this DEK
        """
        if len(dek) != 32:
            raise Crypt4GHDEKException("DEK must be 32 bytes")
        self._dek = dek
        self._key = key

    @property
    def dek(self) -> bytes:
        """The Data Encryption Key - directly usable by symmetric
        cryptography functions.

        """
        return self._dek

    @property
    def key(self) -> Key:
        """Bytes representation of the public key that unlocked this
        DEK.

        """
        return self._key

dek: bytes property

The Data Encryption Key - directly usable by symmetric cryptography functions.

key: Key property

Bytes representation of the public key that unlocked this DEK.

__init__(dek: bytes, key: bytes) -> None

Initializes the wrapper.

Parameters:

Name Type Description Default
dek bytes

the symmetric Data Encryption Key

required
key bytes

public key that unlocked this DEK

required
Source code in oarepo_c4gh/crypt4gh/dek.py
15
16
17
18
19
20
21
22
23
24
25
def __init__(self, dek: bytes, key: bytes) -> None:
    """Initializes the wrapper.

    Parameters:
        dek: the symmetric Data Encryption Key
        key: public key that unlocked this DEK
    """
    if len(dek) != 32:
        raise Crypt4GHDEKException("DEK must be 32 bytes")
    self._dek = dek
    self._key = key

oarepo_c4gh.crypt4gh.dek_collection

This module provides a persistent storage for multiple Data Encryption Keys and automates the mechanisms used for decrypting individual Data Blocks. It ensures the last working DEK is always tried first and properly reports decryption failure if no key managed to decrypt the data.

DEKCollection

This class contains a list of Data Encryption Keys and provides functionality for the Header4GH reader to add new DEKs. When fully populated it can be then used for decrypting a stream of Data Blocks.

Source code in oarepo_c4gh/crypt4gh/dek_collection.py
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
class DEKCollection:
    """This class contains a list of Data Encryption Keys and provides
    functionality for the Header4GH reader to add new DEKs. When
    fully populated it can be then used for decrypting a stream of
    Data Blocks.

    """

    def __init__(self) -> None:
        """Initializes an empty collection."""
        self._deks = []
        self._current = 0

    @property
    def count(self) -> int:
        """The current number of DEKs in the collection."""
        return len(self._deks)

    @property
    def empty(self) -> bool:
        """True if there are no DEKs available."""
        return self.count == 0

    def contains_dek(self, dek: DEK) -> bool:
        """Check for duplicate DEKS.

        Parameters:
            dek: a Data Encryption Key to check

        Returns:
            True if given DEK is already contained.
        """
        return next((True for v in self._deks if dek.dek == v.dek), False)

    def add_dek(self, dek: DEK) -> None:
        """Adds a new dek to the collection if it is not already
        there.

        Parameters:
            dek: a Data Encryption Key to add

        """
        if not self.contains_dek(dek):
            self._deks.append(dek)

    def decrypt_packet(self, istream: io.RawIOBase) -> (bytes, bytes, int):
        """Internal procedure for decrypting single data block from
        the stream. If there is not enough data (for example at EOF),
        two None values are returned. If the block cannot be decrypted
        using known DEKs, the encrypted version is returned as-is and
        None is returned as the cleartext version. If the block can be
        decrypted, both the ciphertext and cleartext versions are
        returned.

        Updates current key upon successfull decryption so that
        subsequent attempts will try this key first.

        Tries all DEKs in the collection in circular order until all
        have been tried or one succeeded.

        Parameters:
            istream: input stream with data blocks

        Returns:
            Two values, the first representing the encrypted
                version of the data block and second one containing
                decrypted contents if possible. Both are none when no
                packet has been read.

        """
        nonce = istream.read(12)
        if len(nonce) != 12:
            return (None, None, None)
        datamac = istream.read(65536 + 16)
        if len(datamac) < 16:
            return (None, None, None)
        current = self._current
        while True:
            dek = self._deks[current]
            try:
                cleartext = crypto_aead_chacha20poly1305_ietf_decrypt(
                    datamac, None, nonce, dek.dek
                )
                self._current = current
                return (nonce + datamac, cleartext, current)
            except CryptoError as cerr:
                pass
            current = (current + 1) % self.count
            if current == self._current:
                return (nonce + datamac, None, None)

    def __getitem__(self, idx: int) -> DEK:
        """Returns DEK at given index.

        Parameters:
            idx: 0-based index (must be obtained elsewhere)

        """
        return self._deks[idx]

count: int property

The current number of DEKs in the collection.

empty: bool property

True if there are no DEKs available.

__init__() -> None

Initializes an empty collection.

Source code in oarepo_c4gh/crypt4gh/dek_collection.py
25
26
27
28
def __init__(self) -> None:
    """Initializes an empty collection."""
    self._deks = []
    self._current = 0

add_dek(dek: DEK) -> None

Adds a new dek to the collection if it is not already there.

Parameters:

Name Type Description Default
dek DEK

a Data Encryption Key to add

required
Source code in oarepo_c4gh/crypt4gh/dek_collection.py
51
52
53
54
55
56
57
58
59
60
def add_dek(self, dek: DEK) -> None:
    """Adds a new dek to the collection if it is not already
    there.

    Parameters:
        dek: a Data Encryption Key to add

    """
    if not self.contains_dek(dek):
        self._deks.append(dek)

contains_dek(dek: DEK) -> bool

Check for duplicate DEKS.

Parameters:

Name Type Description Default
dek DEK

a Data Encryption Key to check

required

Returns:

Type Description
bool

True if given DEK is already contained.

Source code in oarepo_c4gh/crypt4gh/dek_collection.py
40
41
42
43
44
45
46
47
48
49
def contains_dek(self, dek: DEK) -> bool:
    """Check for duplicate DEKS.

    Parameters:
        dek: a Data Encryption Key to check

    Returns:
        True if given DEK is already contained.
    """
    return next((True for v in self._deks if dek.dek == v.dek), False)

decrypt_packet(istream: io.RawIOBase) -> (bytes, bytes, int)

Internal procedure for decrypting single data block from the stream. If there is not enough data (for example at EOF), two None values are returned. If the block cannot be decrypted using known DEKs, the encrypted version is returned as-is and None is returned as the cleartext version. If the block can be decrypted, both the ciphertext and cleartext versions are returned.

Updates current key upon successfull decryption so that subsequent attempts will try this key first.

Tries all DEKs in the collection in circular order until all have been tried or one succeeded.

Parameters:

Name Type Description Default
istream RawIOBase

input stream with data blocks

required

Returns:

Type Description
(bytes, bytes, int)

Two values, the first representing the encrypted version of the data block and second one containing decrypted contents if possible. Both are none when no packet has been read.

Source code in oarepo_c4gh/crypt4gh/dek_collection.py
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
def decrypt_packet(self, istream: io.RawIOBase) -> (bytes, bytes, int):
    """Internal procedure for decrypting single data block from
    the stream. If there is not enough data (for example at EOF),
    two None values are returned. If the block cannot be decrypted
    using known DEKs, the encrypted version is returned as-is and
    None is returned as the cleartext version. If the block can be
    decrypted, both the ciphertext and cleartext versions are
    returned.

    Updates current key upon successfull decryption so that
    subsequent attempts will try this key first.

    Tries all DEKs in the collection in circular order until all
    have been tried or one succeeded.

    Parameters:
        istream: input stream with data blocks

    Returns:
        Two values, the first representing the encrypted
            version of the data block and second one containing
            decrypted contents if possible. Both are none when no
            packet has been read.

    """
    nonce = istream.read(12)
    if len(nonce) != 12:
        return (None, None, None)
    datamac = istream.read(65536 + 16)
    if len(datamac) < 16:
        return (None, None, None)
    current = self._current
    while True:
        dek = self._deks[current]
        try:
            cleartext = crypto_aead_chacha20poly1305_ietf_decrypt(
                datamac, None, nonce, dek.dek
            )
            self._current = current
            return (nonce + datamac, cleartext, current)
        except CryptoError as cerr:
            pass
        current = (current + 1) % self.count
        if current == self._current:
            return (nonce + datamac, None, None)

Auxilliary Functions and Analyzer

oarepo_c4gh.crypt4gh.util

Miscellaneous helper functions for Crypt4GH stream processing.

parse_crypt4gh_bytes_le_uint(number_bytes: bytes, name: str, size: int) -> int

Parses size-byte little-endian binary number from given bytes handling insufficient data errors with customizable error message.

Parameters:

Name Type Description Default
number_bytes bytes

the bytes to parse

required
name str

optional name of the number in the error message

required
size int

number of bytes the encoding should contain

required

Raises:

Type Description
ValueError

if the bytes given are too short

Source code in oarepo_c4gh/crypt4gh/util.py
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
def parse_crypt4gh_bytes_le_uint(
    number_bytes: bytes, name: str, size: int
) -> int:
    """Parses size-byte little-endian binary number from given bytes
    handling insufficient data errors with customizable error message.

    Parameters:
        number_bytes: the bytes to parse
        name: optional name of the number in the error message
        size: number of bytes the encoding should contain

    Raises:
        ValueError: if the bytes given are too short

    """
    number_bytes_len = len(number_bytes)
    if number_bytes_len != size:
        raise ValueError(
            f"Only {number_bytes_len} bytes for reading le_uint({size}) {name}"
        )
    return int.from_bytes(number_bytes, byteorder="little")

read_crypt4gh_bytes_le_uint32(ibytes: bytes, offset: int, name: str = 'number') -> int

Extracts little-endian integer from given bytes object handling errors with customizable message.

Parameters:

Name Type Description Default
ibytes bytes

bytes with the binary structure

required
offset int

starting byte of the encoded number

required
name str

optional name of the number in the error message

'number'

Raises:

Type Description
ValueError

if not enough data given

Source code in oarepo_c4gh/crypt4gh/util.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
def read_crypt4gh_bytes_le_uint32(
    ibytes: bytes, offset: int, name: str = "number"
) -> int:
    """Extracts little-endian integer from given bytes object handling
    errors with customizable message.

    Parameters:
        ibytes: bytes with the binary structure
        offset: starting byte of the encoded number
        name: optional name of the number in the error message

    Raises:
        ValueError: if not enough data given

    """
    number_bytes = ibytes[offset : offset + 4]
    return parse_crypt4gh_bytes_le_uint(number_bytes, name, 4)

read_crypt4gh_stream_le_uint32(istream: io.RawIOBase, name: str = 'number') -> int

Reads little-endian integer from given stream handling read errors with customizable error message.

Parameters:

Name Type Description Default
istream RawIOBase

the container input stream

required
name str

optional name of the number in the error message

'number'

Raises:

Type Description
ValueError

if not enough data can be read

Source code in oarepo_c4gh/crypt4gh/util.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def read_crypt4gh_stream_le_uint32(
    istream: io.RawIOBase, name: str = "number"
) -> int:
    """Reads little-endian integer from given stream handling read
    errors with customizable error message.

    Parameters:
        istream: the container input stream
        name: optional name of the number in the error message

    Raises:
        ValueError: if not enough data can be read

    """
    number_bytes = istream.read(4)
    return parse_crypt4gh_bytes_le_uint(number_bytes, name, 4)

oarepo_c4gh.crypt4gh.analyzer

Module with the Crypt4GH container analyzer.

Analyzer

The instance of this class keeps track of readable header packets and accessible data blocks and provides summary results about these.

Source code in oarepo_c4gh/crypt4gh/analyzer.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
class Analyzer:
    """The instance of this class keeps track of readable header
    packets and accessible data blocks and provides summary results
    about these.

    """

    def __init__(self):
        """Initializes the instance with empty lists and no key
        information.
        """
        self._packet_info = []
        self._block_info = []
        self._public_keys = []

    def analyze_packet(self, packet: HeaderPacket) -> None:
        """Analyzes single header packet and adds the result into the
        packet_info list.

        Parameters:
            packet: single header packet instance

        """
        if packet.is_readable:
            self._packet_info.append(packet.reader_key)
            if not packet.reader_key in self._public_keys:
                self._public_keys.append(packet.reader_key)
        else:
            self._packet_info.append(False)

    def analyze_block(self, block: DataBlock) -> None:
        """Analyzes single data block and adds the result into the
        block_info list.

        Parameters:
            block: data block information class instance

        """
        if block.is_deciphered:
            self._block_info.append(block.dek_index)
        else:
            self._block_info.append(False)

    def to_dict(self) -> dict:
        """Returns dictionary representation of the analysis."""
        result = {}
        result["header"] = self._packet_info
        result["readers"] = self._public_keys
        result["blocks"] = self._block_info
        return result

__init__()

Initializes the instance with empty lists and no key information.

Source code in oarepo_c4gh/crypt4gh/analyzer.py
16
17
18
19
20
21
22
def __init__(self):
    """Initializes the instance with empty lists and no key
    information.
    """
    self._packet_info = []
    self._block_info = []
    self._public_keys = []

analyze_block(block: DataBlock) -> None

Analyzes single data block and adds the result into the block_info list.

Parameters:

Name Type Description Default
block DataBlock

data block information class instance

required
Source code in oarepo_c4gh/crypt4gh/analyzer.py
39
40
41
42
43
44
45
46
47
48
49
50
def analyze_block(self, block: DataBlock) -> None:
    """Analyzes single data block and adds the result into the
    block_info list.

    Parameters:
        block: data block information class instance

    """
    if block.is_deciphered:
        self._block_info.append(block.dek_index)
    else:
        self._block_info.append(False)

analyze_packet(packet: HeaderPacket) -> None

Analyzes single header packet and adds the result into the packet_info list.

Parameters:

Name Type Description Default
packet HeaderPacket

single header packet instance

required
Source code in oarepo_c4gh/crypt4gh/analyzer.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
def analyze_packet(self, packet: HeaderPacket) -> None:
    """Analyzes single header packet and adds the result into the
    packet_info list.

    Parameters:
        packet: single header packet instance

    """
    if packet.is_readable:
        self._packet_info.append(packet.reader_key)
        if not packet.reader_key in self._public_keys:
            self._public_keys.append(packet.reader_key)
    else:
        self._packet_info.append(False)

to_dict() -> dict

Returns dictionary representation of the analysis.

Source code in oarepo_c4gh/crypt4gh/analyzer.py
52
53
54
55
56
57
58
def to_dict(self) -> dict:
    """Returns dictionary representation of the analysis."""
    result = {}
    result["header"] = self._packet_info
    result["readers"] = self._public_keys
    result["blocks"] = self._block_info
    return result

Stream Filtering

oarepo_c4gh.crypt4gh.filter

Empty

Base Filter

oarepo_c4gh.crypt4gh.filter.filter

This module implements a filtered Crypt4GH container backed by other Crypt4GH container but presenting filtered (added, changed and/or removed) header packets.

Filter

Bases: Proto4GH

The whole container filter which actually filters only header packets but for the writer the whole interface is needed.

Source code in oarepo_c4gh/crypt4gh/filter/filter.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
class Filter(Proto4GH):
    """The whole container filter which actually filters only header
    packets but for the writer the whole interface is needed.

    """

    def __init__(self, original: Proto4GH) -> None:
        """Only prepares the filtered header and original container
        with original blocks.

        Parameters:
            original: the original container to be filtered.

        """
        self._original = original

    @property
    def header(self) -> Header:
        """Returns the filtered header instance."""
        return self._original._header

    @property
    def data_blocks(self) -> Generator[DataBlock, None, None]:
        """Returns the iterator for the original data blocks."""
        return self._original.data_blocks

data_blocks: Generator[DataBlock, None, None] property

Returns the iterator for the original data blocks.

header: Header property

Returns the filtered header instance.

__init__(original: Proto4GH) -> None

Only prepares the filtered header and original container with original blocks.

Parameters:

Name Type Description Default
original Proto4GH

the original container to be filtered.

required
Source code in oarepo_c4gh/crypt4gh/filter/filter.py
20
21
22
23
24
25
26
27
28
def __init__(self, original: Proto4GH) -> None:
    """Only prepares the filtered header and original container
    with original blocks.

    Parameters:
        original: the original container to be filtered.

    """
    self._original = original

oarepo_c4gh.crypt4gh.filter.header

This module implements filtered header on top of other Header implementation. All filters should be derived from this class.

FilterHeader

Bases: Header

As the header has its own interface, this class implements such interface for filtered header.

Source code in oarepo_c4gh/crypt4gh/filter/header.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
class FilterHeader(Header):
    """As the header has its own interface, this class implements such
    interface for filtered header.

    """

    def __init__(self, original: Header) -> None:
        """Setup to match original.

        Parameters:
            original: The original container header.

        """
        self._original = original

    @property
    def magic_bytes(self) -> bytes:
        """Returns the original data."""
        return self._original.magic_bytes

    @property
    def version(self) -> int:
        """Returns the original version."""
        return self._original.version

magic_bytes: bytes property

Returns the original data.

version: int property

Returns the original version.

__init__(original: Header) -> None

Setup to match original.

Parameters:

Name Type Description Default
original Header

The original container header.

required
Source code in oarepo_c4gh/crypt4gh/filter/header.py
14
15
16
17
18
19
20
21
def __init__(self, original: Header) -> None:
    """Setup to match original.

    Parameters:
        original: The original container header.

    """
    self._original = original

Add Recipient Filter

oarepo_c4gh.crypt4gh.filter.add_recipient

This module implements a filtered Crypt4GH container backed by other Crypt4GH container but presenting added packets based on recipients to be added.

AddRecipientFilter

Bases: Filter

The whole container filter which actually filters only header packets but for the writer the whole interface is needed.

Source code in oarepo_c4gh/crypt4gh/filter/add_recipient.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
class AddRecipientFilter(Filter):
    """The whole container filter which actually filters only header
    packets but for the writer the whole interface is needed.

    """

    def __init__(self, original: Proto4GH, *recipients: List[Key]) -> None:
        """Only prepares the filtered header and original container
        with original blocks.

        Parameters:
            original: the original container to be filtered.

        """
        super().__init__(original)
        self._header = AddRecipientHeader(original.header, recipients)

    @property
    def header(self) -> FilterHeader:
        """Returns the filtered header instance."""
        return self._header

header: FilterHeader property

Returns the filtered header instance.

__init__(original: Proto4GH, *recipients: List[Key]) -> None

Only prepares the filtered header and original container with original blocks.

Parameters:

Name Type Description Default
original Proto4GH

the original container to be filtered.

required
Source code in oarepo_c4gh/crypt4gh/filter/add_recipient.py
23
24
25
26
27
28
29
30
31
32
def __init__(self, original: Proto4GH, *recipients: List[Key]) -> None:
    """Only prepares the filtered header and original container
    with original blocks.

    Parameters:
        original: the original container to be filtered.

    """
    super().__init__(original)
    self._header = AddRecipientHeader(original.header, recipients)

oarepo_c4gh.crypt4gh.filter.add_recipient_header

The actual recipient adding implementation in a header filter.

AddRecipientHeader

Bases: FilterHeader

This class implements a simple filter that adds all readable packets to the packet list - but encrypted for new recipient(s).

Source code in oarepo_c4gh/crypt4gh/filter/add_recipient_header.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
class AddRecipientHeader(FilterHeader):
    """This class implements a simple filter that adds all readable
    packets to the packet list - but encrypted for new recipient(s).

    """
    def __init__(self, original: Header, recipients: List[Key]):
        """Just initializes the baseline header filter and stores the
        list of recipients for actual processing later.

        Parameters:
            original: the original container header
            recipients: a list of recipients' public keys to add

        """
        super().__init__(original)
        self._recipients_to_add = recipients


    @property
    def packets(self) -> list:
        """Returns the filtered packets with added recipients. Both
        edit lists and DEKs are added.

        """
        ekey = None
        temp_packets = self._original.packets.copy()
        for public_key in self._recipients_to_add:
            for packet in self._original.packets:
                if packet.is_readable and packet.packet_type in (0, 1):
                    if ekey is None:
                        ekey = SoftwareKey.generate()
                    data = io.BytesIO()
                    data.write(packet.length.to_bytes(4, "little"))
                    enc_method = 0
                    data.write(enc_method.to_bytes(4, "little"))
                    data.write(ekey.public_key)
                    symmetric_key = ekey.compute_write_key(public_key)
                    nonce = secrets.token_bytes(12)
                    data.write(nonce)
                    content = crypto_aead_chacha20poly1305_ietf_encrypt(
                        packet.content, None, nonce, symmetric_key
                    )
                    data.write(content)
                    # This packet is useful only for serialization
                    temp_packets.append(
                        HeaderPacket(
                            packet.length,
                            data.getvalue(),
                            None,
                            None,
                            None,
                            None,
                            None,
                        )
                    )
        return temp_packets

packets: list property

Returns the filtered packets with added recipients. Both edit lists and DEKs are added.

__init__(original: Header, recipients: List[Key])

Just initializes the baseline header filter and stores the list of recipients for actual processing later.

Parameters:

Name Type Description Default
original Header

the original container header

required
recipients List[Key]

a list of recipients' public keys to add

required
Source code in oarepo_c4gh/crypt4gh/filter/add_recipient_header.py
20
21
22
23
24
25
26
27
28
29
30
def __init__(self, original: Header, recipients: List[Key]):
    """Just initializes the baseline header filter and stores the
    list of recipients for actual processing later.

    Parameters:
        original: the original container header
        recipients: a list of recipients' public keys to add

    """
    super().__init__(original)
    self._recipients_to_add = recipients

Only Readable Filter

oarepo_c4gh.crypt4gh.filter.only_readable

A module implementing container filter that removes all non-readable packets from its header.

OnlyReadableFilter

Bases: Filter

This class implements a container filter that filters out all non-readable packets from the header.

Source code in oarepo_c4gh/crypt4gh/filter/only_readable.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
class OnlyReadableFilter(Filter):
    """This class implements a container filter that filters out all
    non-readable packets from the header.

    """

    def __init__(self, original: Proto4GH):
        """Initializes with original container and sets filtering
        header instance up.

        Parameters:
            original: the original container

        """
        super().__init__(original)
        self._header = OnlyReadableHeader(original.header)

    @property
    def header(self) -> FilterHeader:
        """Returns the filtered header instance."""
        return self._header

header: FilterHeader property

Returns the filtered header instance.

__init__(original: Proto4GH)

Initializes with original container and sets filtering header instance up.

Parameters:

Name Type Description Default
original Proto4GH

the original container

required
Source code in oarepo_c4gh/crypt4gh/filter/only_readable.py
17
18
19
20
21
22
23
24
25
26
def __init__(self, original: Proto4GH):
    """Initializes with original container and sets filtering
    header instance up.

    Parameters:
        original: the original container

    """
    super().__init__(original)
    self._header = OnlyReadableHeader(original.header)

oarepo_c4gh.crypt4gh.filter.only_readable_header

This module implements a container header filter that passes through only readable header packets.

OnlyReadableHeader

Bases: FilterHeader

This class wraps original container header and passes on only readable packets.

Source code in oarepo_c4gh/crypt4gh/filter/only_readable_header.py
 8
 9
10
11
12
13
14
15
16
17
18
class OnlyReadableHeader(FilterHeader):
    """This class wraps original container header and passes on only
    readable packets.
    """

    @property
    def packets(self) -> list:
        """Returns only readable packets.

        """
        return [x for x in self._original.packets if x.is_readable]

packets: list property

Returns only readable packets.

Container Writer

oarepo_c4gh.crypt4gh.writer

Wrapper around container that performs stream serialization.

Crypt4GHWriter

Simple writer which performs just one operation.

Source code in oarepo_c4gh/crypt4gh/writer.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
class Crypt4GHWriter:
    """Simple writer which performs just one operation."""

    def __init__(self, container: Proto4GH, ostream: io.RawIOBase) -> None:
        """Can be wrapped around originally loaded Crypt4GH container
        or something compatible (like filtered container).

        """
        self._container = container
        self._stream = ostream

    def write(self) -> None:
        """Performs the write operation."""
        self._stream.write(self._container.header.magic_bytes)
        self._stream.write(
            self._container.header.version.to_bytes(4, "little")
        )
        self._stream.write(
            len(self._container.header.packets).to_bytes(4, "little")
        )
        for packet in self._container.header.packets:
            self._stream.write(packet.packet_data)
        for block in self._container.data_blocks:
            self._stream.write(block.ciphertext)

__init__(container: Proto4GH, ostream: io.RawIOBase) -> None

Can be wrapped around originally loaded Crypt4GH container or something compatible (like filtered container).

Source code in oarepo_c4gh/crypt4gh/writer.py
12
13
14
15
16
17
18
def __init__(self, container: Proto4GH, ostream: io.RawIOBase) -> None:
    """Can be wrapped around originally loaded Crypt4GH container
    or something compatible (like filtered container).

    """
    self._container = container
    self._stream = ostream

write() -> None

Performs the write operation.

Source code in oarepo_c4gh/crypt4gh/writer.py
20
21
22
23
24
25
26
27
28
29
30
31
32
def write(self) -> None:
    """Performs the write operation."""
    self._stream.write(self._container.header.magic_bytes)
    self._stream.write(
        self._container.header.version.to_bytes(4, "little")
    )
    self._stream.write(
        len(self._container.header.packets).to_bytes(4, "little")
    )
    for packet in self._container.header.packets:
        self._stream.write(packet.packet_data)
    for block in self._container.data_blocks:
        self._stream.write(block.ciphertext)