Coverage for oarepo_c4gh/crypt4gh/writer.py: 100%
14 statements
« prev ^ index » next coverage.py v7.10.2, created at 2025-08-07 12:05 +0000
« prev ^ index » next coverage.py v7.10.2, created at 2025-08-07 12:05 +0000
1"""Wrapper around container that performs stream serialization.
3"""
5from .common.proto4gh import Proto4GH
6import io
9class Crypt4GHWriter:
10 """Simple writer which performs just one operation."""
12 def __init__(self, container: Proto4GH, ostream: io.RawIOBase) -> None:
13 """Can be wrapped around originally loaded Crypt4GH container
14 or something compatible (like filtered container).
16 """
17 self._container = container
18 self._stream = ostream
20 def write(self) -> None:
21 """Performs the write operation."""
22 self._stream.write(self._container.header.magic_bytes)
23 self._stream.write(
24 self._container.header.version.to_bytes(4, "little")
25 )
26 self._stream.write(
27 len(self._container.header.packets).to_bytes(4, "little")
28 )
29 for packet in self._container.header.packets:
30 self._stream.write(packet.packet_data)
31 for block in self._container.data_blocks:
32 self._stream.write(block.ciphertext)