Source code for pcapkit.foundation.reassembly.data.tcp

# -*- coding: utf-8 -*-
"""data models for TCP reassembly"""

from typing import TYPE_CHECKING, Generic, TypeVar

from pcapkit.corekit.infoclass import Info, info_final
from pcapkit.utilities.compat import Tuple

__all__ = [
    'Packet', 'DatagramID', 'Datagram', 'HoleDiscriptor',
    'Fragment', 'Buffer', 'BufferID',
]

if TYPE_CHECKING:
    from ipaddress import IPv4Address, IPv6Address
    from typing import Optional, overload

    from typing_extensions import Literal, TypeAlias

    from pcapkit.protocols.protocol import ProtocolBase as Protocol

_AT = TypeVar('_AT', 'IPv4Address', 'IPv6Address')

#: Buffer ID.
BufferID: 'TypeAlias' = Tuple[_AT, int, _AT, int]


[docs] @info_final class Packet(Info): """Data model for :term:`TCP <reasm.tcp.packet>` packet representation.""" #: Buffer ID. bufid: 'BufferID' #: Data sequence number. dsn: 'int' #: Acknowledgment number. ack: 'int' #: Original packet range number. num: 'int' #: Synchronise flag. syn: 'bool' #: Finish flag. fin: 'bool' #: Reset connection flag. rst: 'bool' #: Payload length, header excluded. len: 'int' #: This sequence number. first: 'int' #: Next (wanted) sequence number. last: 'int' #: Raw :obj:`bytes` type header. header: 'bytes' #: Raw :obj:`bytearray` type payload. payload: 'bytearray' if TYPE_CHECKING: def __init__(self, bufid: 'BufferID', dsn: 'int', ack: 'int', num: 'int', syn: 'bool', fin: 'bool', rst: 'bool', len: 'int', first: 'int', last: 'int', header: 'bytes', payload: 'bytearray') -> 'None': ... # pylint: disable=unused-argument,super-init-not-called,multiple-statements,line-too-long,redefined-builtin
[docs] @info_final class DatagramID(Info, Generic[_AT]): """Data model for :term:`TCP <reasm.tcp.datagram>` original packet identifier.""" #: Source address. src: 'tuple[_AT, int]' #: Destination address. dst: 'tuple[_AT, int]' #: Original packet ACK number. ack: 'int' if TYPE_CHECKING: def __init__(self, src: 'tuple[_AT, int]', dst: 'tuple[_AT, int]', ack: 'int') -> 'None': ... # pylint: disable=unused-argument,super-init-not-called,multiple-statements,line-too-long,redefined-builtin
[docs] @info_final class Datagram(Info, Generic[_AT]): """Data model for :term:`TCP <reasm.tcp.datagram>`.""" #: Completed flag. completed: 'bool' #: Original packet identifier. id: 'DatagramID[_AT]' #: Packet numbers. index: 'tuple[int, ...]' #: Initial TCP header. header: 'bytes' #: Reassembled payload (application layer data). payload: 'bytes | tuple[bytes, ...]' #: Parsed reassembled payload. packet: 'Optional[Protocol]' if TYPE_CHECKING: @overload # pylint: disable=used-before-assignment def __init__(self, completed: 'Literal[True]', id: 'DatagramID[_AT]', index: 'tuple[int, ...]', header: 'bytes', payload: 'bytes', packet: 'Protocol') -> 'None': ... # pylint: disable=unused-argument,super-init-not-called,multiple-statements,line-too-long,redefined-builtin @overload def __init__(self, completed: 'Literal[False]', id: 'DatagramID[_AT]', index: 'tuple[int, ...]', header: 'bytes', payload: 'tuple[bytes, ...]', packet: 'None') -> 'None': ... # pylint: disable=unused-argument,super-init-not-called,multiple-statements,line-too-long,redefined-builtin def __init__(self, completed: 'bool', id: 'DatagramID[_AT]', index: 'tuple[int, ...]', header: 'bytes', payload: 'bytes | tuple[bytes, ...]', packet: 'Optional[Protocol]') -> 'None': ... # pylint: disable=unused-argument,super-init-not-called,multiple-statements,line-too-long,redefined-builtin
[docs] @info_final class HoleDiscriptor(Info): """Data model for :term:`TCP <reasm.tcp.buffer>` hole descriptor.""" #: Start of hole. first: 'int' #: Stop of hole. last: 'int' if TYPE_CHECKING: def __init__(self, first: 'int', last: 'int') -> 'None': ... # pylint: disable=unused-argument,super-init-not-called,multiple-statements,line-too-long,redefined-builtin
[docs] @info_final class Fragment(Info): """Data model for :term:`TCP <reasm.tcp.buffer>` ACK list fragment item.""" #: List of reassembled packets. ind: 'list[int]' #: ISN of payload buffer. isn: 'int' #: Length of payload buffer. len: 'int' #: Reassembled payload holes set to b'\x00'. raw: 'bytearray' if TYPE_CHECKING: def __init__(self, ind: 'list[int]', isn: 'int', len: 'int', raw: 'bytearray') -> 'None': ... # pylint: disable=unused-argument,super-init-not-called,multiple-statements,line-too-long,redefined-builtin
[docs] @info_final class Buffer(Info): """Data model for :term:`TCP <reasm.tcp.buffer>` buffer entry.""" #: Hole descriptor list. hdl: 'list[HoleDiscriptor]' #: Initial TCP header. hdr: 'bytes' #: ACK list. ack: 'dict[int, Fragment]' if TYPE_CHECKING: def __init__(self, hdl: 'list[HoleDiscriptor]', hdr: 'bytes', ack: 'dict[int, Fragment]') -> 'None': ... # pylint: disable=unused-argument,super-init-not-called,multiple-statements,line-too-long,redefined-builtin