Coverage for oarepo_c4gh/crypt4gh/filter/add_recipient_header.py: 100%
34 statements
« prev ^ index » next coverage.py v7.10.2, created at 2025-08-07 12:05 +0000
« prev ^ index » next coverage.py v7.10.2, created at 2025-08-07 12:05 +0000
1"""The actual recipient adding implementation in a header filter.
3"""
5from .header import FilterHeader
6from nacl.bindings import crypto_aead_chacha20poly1305_ietf_encrypt
7from ...key.software import SoftwareKey
8import io
9import secrets
10from ..common.header_packet import HeaderPacket
11from ..common.header import Header
12from typing import List
13from ...key import Key
16class AddRecipientHeader(FilterHeader):
17 """This class implements a simple filter that adds all readable
18 packets to the packet list - but encrypted for new recipient(s).
20 """
22 def __init__(self, original: Header, recipients: List[Key]):
23 """Just initializes the baseline header filter and stores the
24 list of recipients for actual processing later.
26 Parameters:
27 original: the original container header
28 recipients: a list of recipients' public keys to add
30 """
31 super().__init__(original)
32 self._recipients_to_add = recipients
34 @property
35 def packets(self) -> list:
36 """Returns the filtered packets with added recipients. Both
37 edit lists and DEKs are added.
39 """
40 ekey = None
41 temp_packets = self._original.packets.copy()
42 for public_key in self._recipients_to_add:
43 for packet in self._original.packets:
44 if packet.is_readable and packet.packet_type in (0, 1):
45 if ekey is None:
46 ekey = SoftwareKey.generate()
47 data = io.BytesIO()
48 data.write(packet.length.to_bytes(4, "little"))
49 enc_method = 0
50 data.write(enc_method.to_bytes(4, "little"))
51 data.write(ekey.public_key)
52 symmetric_key = ekey.compute_write_key(public_key)
53 nonce = secrets.token_bytes(12)
54 data.write(nonce)
55 content = crypto_aead_chacha20poly1305_ietf_encrypt(
56 packet.content, None, nonce, symmetric_key
57 )
58 data.write(content)
59 # This packet is useful only for serialization
60 temp_packets.append(
61 HeaderPacket(
62 packet.length,
63 data.getvalue(),
64 None,
65 None,
66 None,
67 None,
68 None,
69 )
70 )
71 return temp_packets