Coverage for oarepo_c4gh/crypt4gh/filter/add_recipient_header.py: 100%

34 statements  

« prev     ^ index     » next       coverage.py v7.10.2, created at 2025-08-07 12:05 +0000

1"""The actual recipient adding implementation in a header filter. 

2 

3""" 

4 

5from .header import FilterHeader 

6from nacl.bindings import crypto_aead_chacha20poly1305_ietf_encrypt 

7from ...key.software import SoftwareKey 

8import io 

9import secrets 

10from ..common.header_packet import HeaderPacket 

11from ..common.header import Header 

12from typing import List 

13from ...key import Key 

14 

15 

16class AddRecipientHeader(FilterHeader): 

17 """This class implements a simple filter that adds all readable 

18 packets to the packet list - but encrypted for new recipient(s). 

19 

20 """ 

21 

22 def __init__(self, original: Header, recipients: List[Key]): 

23 """Just initializes the baseline header filter and stores the 

24 list of recipients for actual processing later. 

25 

26 Parameters: 

27 original: the original container header 

28 recipients: a list of recipients' public keys to add 

29 

30 """ 

31 super().__init__(original) 

32 self._recipients_to_add = recipients 

33 

34 @property 

35 def packets(self) -> list: 

36 """Returns the filtered packets with added recipients. Both 

37 edit lists and DEKs are added. 

38 

39 """ 

40 ekey = None 

41 temp_packets = self._original.packets.copy() 

42 for public_key in self._recipients_to_add: 

43 for packet in self._original.packets: 

44 if packet.is_readable and packet.packet_type in (0, 1): 

45 if ekey is None: 

46 ekey = SoftwareKey.generate() 

47 data = io.BytesIO() 

48 data.write(packet.length.to_bytes(4, "little")) 

49 enc_method = 0 

50 data.write(enc_method.to_bytes(4, "little")) 

51 data.write(ekey.public_key) 

52 symmetric_key = ekey.compute_write_key(public_key) 

53 nonce = secrets.token_bytes(12) 

54 data.write(nonce) 

55 content = crypto_aead_chacha20poly1305_ietf_encrypt( 

56 packet.content, None, nonce, symmetric_key 

57 ) 

58 data.write(content) 

59 # This packet is useful only for serialization 

60 temp_packets.append( 

61 HeaderPacket( 

62 packet.length, 

63 data.getvalue(), 

64 None, 

65 None, 

66 None, 

67 None, 

68 None, 

69 ) 

70 ) 

71 return temp_packets