Coverage for oarepo_c4gh/crypt4gh/stream/stream4gh.py: 100%
48 statements
« prev ^ index » next coverage.py v7.10.2, created at 2025-08-07 12:05 +0000
« prev ^ index » next coverage.py v7.10.2, created at 2025-08-07 12:05 +0000
1"""A module containing the Crypt4GH stream loading class.
3"""
5from ...key import Key, KeyCollection
6import io
7from .header import StreamHeader
8from ...exceptions import Crypt4GHProcessedException
9from ..common.data_block import DataBlock
10from ..analyzer import Analyzer
11from typing import Generator, Union
12from ..common.proto4gh import Proto4GH
15class Stream4GH(Proto4GH):
16 """An instance of this class represents a Crypt4GH container and
17 provides stream processing capabilities of both header packets and
18 data blocks. The input is processed lazily as needed and the
19 header packets are stored for future processing within the
20 instance. The data blocks stream can be used only once.
22 """
24 def __init__(
25 self,
26 istream: io.RawIOBase,
27 reader_key: Union[Key, KeyCollection],
28 decrypt: bool = True,
29 analyze: bool = False,
30 ) -> None:
31 """Initializes the instance by storing the reader_key and the
32 input stream. Verifies whether the reader key can perform
33 symmetric key derivation.
35 Parameters:
36 istream: the container input stream
37 reader_key: the key (or collection) used for reading the container
38 decrypt: if True, attempt to decrypt the data blocks
40 """
41 self._istream = istream
42 self._analyzer = Analyzer() if analyze else None
43 self._header = StreamHeader(reader_key, istream, self._analyzer)
44 self._consumed = False
45 self._decrypt = decrypt
47 @property
48 def header(self) -> StreamHeader:
49 """Accessor for the container header object.
51 Returns:
52 The contents of the parsed header.
54 """
55 return self._header
57 @property
58 def data_blocks(self) -> Generator[DataBlock, None, None]:
59 """Single-use iterator for data blocks.
61 Raises:
62 Crypt4GHProcessedException: if called second time
64 """
65 assert self.header.packets is not None
66 if self._consumed:
67 raise Crypt4GHProcessedException("Already processed once")
68 offset = 0
69 while True:
70 if self._decrypt:
71 enc, clear, idx = self._header.deks.decrypt_packet(
72 self._istream
73 )
74 else:
75 enc = self._istream.read(12 + 65536 + 16)
76 if len(enc) == 0:
77 enc = None
78 clear = None
79 idx = None
80 if enc is None:
81 break
82 block = DataBlock(enc, clear, idx, offset)
83 offset = offset + block.size
84 if self._analyzer is not None:
85 self._analyzer.analyze_block(block)
86 yield (block)
87 self._consumed = True
89 @property
90 def analyzer(self):
91 """For direct access to analyzer and its results."""
92 return self._analyzer
94 @property
95 def clear_blocks(self) -> Generator[DataBlock, None, None]:
96 """Single-use iterator for deciphered blocks only."""
97 for block in self.data_blocks:
98 if block.is_deciphered:
99 yield block