Coverage for oarepo_c4gh/crypt4gh/stream/stream4gh.py: 100%

48 statements  

« prev     ^ index     » next       coverage.py v7.10.2, created at 2025-08-07 12:05 +0000

1"""A module containing the Crypt4GH stream loading class. 

2 

3""" 

4 

5from ...key import Key, KeyCollection 

6import io 

7from .header import StreamHeader 

8from ...exceptions import Crypt4GHProcessedException 

9from ..common.data_block import DataBlock 

10from ..analyzer import Analyzer 

11from typing import Generator, Union 

12from ..common.proto4gh import Proto4GH 

13 

14 

15class Stream4GH(Proto4GH): 

16 """An instance of this class represents a Crypt4GH container and 

17 provides stream processing capabilities of both header packets and 

18 data blocks. The input is processed lazily as needed and the 

19 header packets are stored for future processing within the 

20 instance. The data blocks stream can be used only once. 

21 

22 """ 

23 

24 def __init__( 

25 self, 

26 istream: io.RawIOBase, 

27 reader_key: Union[Key, KeyCollection], 

28 decrypt: bool = True, 

29 analyze: bool = False, 

30 ) -> None: 

31 """Initializes the instance by storing the reader_key and the 

32 input stream. Verifies whether the reader key can perform 

33 symmetric key derivation. 

34 

35 Parameters: 

36 istream: the container input stream 

37 reader_key: the key (or collection) used for reading the container 

38 decrypt: if True, attempt to decrypt the data blocks 

39 

40 """ 

41 self._istream = istream 

42 self._analyzer = Analyzer() if analyze else None 

43 self._header = StreamHeader(reader_key, istream, self._analyzer) 

44 self._consumed = False 

45 self._decrypt = decrypt 

46 

47 @property 

48 def header(self) -> StreamHeader: 

49 """Accessor for the container header object. 

50 

51 Returns: 

52 The contents of the parsed header. 

53 

54 """ 

55 return self._header 

56 

57 @property 

58 def data_blocks(self) -> Generator[DataBlock, None, None]: 

59 """Single-use iterator for data blocks. 

60 

61 Raises: 

62 Crypt4GHProcessedException: if called second time 

63 

64 """ 

65 assert self.header.packets is not None 

66 if self._consumed: 

67 raise Crypt4GHProcessedException("Already processed once") 

68 offset = 0 

69 while True: 

70 if self._decrypt: 

71 enc, clear, idx = self._header.deks.decrypt_packet( 

72 self._istream 

73 ) 

74 else: 

75 enc = self._istream.read(12 + 65536 + 16) 

76 if len(enc) == 0: 

77 enc = None 

78 clear = None 

79 idx = None 

80 if enc is None: 

81 break 

82 block = DataBlock(enc, clear, idx, offset) 

83 offset = offset + block.size 

84 if self._analyzer is not None: 

85 self._analyzer.analyze_block(block) 

86 yield (block) 

87 self._consumed = True 

88 

89 @property 

90 def analyzer(self): 

91 """For direct access to analyzer and its results.""" 

92 return self._analyzer 

93 

94 @property 

95 def clear_blocks(self) -> Generator[DataBlock, None, None]: 

96 """Single-use iterator for deciphered blocks only.""" 

97 for block in self.data_blocks: 

98 if block.is_deciphered: 

99 yield block