Source code for pcapkit.foundation.traceflow.tcp

# -*- coding: utf-8 -*-
# pylint: disable=import-outside-toplevel
"""Follow TCP Streams
========================

.. module:: pcapkit.foundation.traceflow.tcp

:mod:`pcapkit.foundation.traceflow.tcp` is the interface to trace
TCP flows from a series of packets and connections.

"""
from typing import TYPE_CHECKING, Generic, overload

from pcapkit.foundation.traceflow.data.tcp import _AT, Buffer, BufferID, Index, Packet
from pcapkit.foundation.traceflow.traceflow import TraceFlowBase as TraceFlow
from pcapkit.protocols.transport.tcp import TCP as TCP_Protocol

__all__ = ['TCP']

if TYPE_CHECKING:
    from dictdumper.dumper import Dumper
    from typing_extensions import Literal


[docs] class TCP(TraceFlow[BufferID, Buffer, Index, Packet[_AT]], Generic[_AT]): """Trace TCP flows. Args: fout: output path format: output format byteorder: output file byte order nanosecond: output nanosecond-resolution file flag *args: Arbitrary positional arguments. **kwargs: Arbitrary keyword arguments. """ ########################################################################## # Defaults. ########################################################################## #: Protocol name of current reassembly object. __protocol_name__ = 'TCP' #: Protocol of current reassembly object. __protocol_type__ = TCP_Protocol ########################################################################## # Methods. ##########################################################################
[docs] def dump(self, packet: 'Packet[_AT]') -> 'None': """Dump frame to output files. Arguments: packet: a flow packet (:term:`trace.tcp.packet`) """ # fetch flow label output = self.trace(packet, output=True) # dump files output(packet.frame, name=f'Frame {packet.index}') # pylint: disable=not-callable
@overload def trace(self, packet: 'Packet[_AT]', *, output: 'Literal[True]' = ...) -> 'Dumper': ... @overload def trace(self, packet: 'Packet[_AT]', *, output: 'Literal[False]' = ...) -> 'str': ...
[docs] def trace(self, packet: 'Packet[_AT]', *, output: 'bool' = False) -> 'Dumper | str': """Trace packets. Arguments: packet: a flow packet (:term:`trace.tcp.packet`) output: flag if has formatted dumper Returns: If ``output`` is :data:`True`, returns the initiated :class:`~dictdumper.dumper.Dumper` object, which will dump data to the output file named after the flow label; otherwise, returns the flow label itself. Notes: The flow label is formatted as following: .. code-block:: python f'{packet.src}_{packet.srcport}-{packet.dst}_{info.dstport}-{packet.timestamp}' """ # clear cache self.__cached__['submit'] = None # Buffer Identifier BUFID = (packet.src, packet.srcport, packet.dst, packet.dstport) # type: BufferID # SYN = packet.syn # Synchronise Flag (Establishment) FIN = packet.fin # Finish Flag (Termination) # # when SYN is set, reset buffer of this seesion # if SYN and BUFID in self._buffer: # temp = self._buffer.pop(BUFID) # temp['fpout'] = (self._fproot, self._fdpext) # temp['index'] = tuple(temp['index']) # self._stream.append(Info(temp)) # initialise buffer with BUFID if BUFID not in self._buffer: if packet.src.version == 4: label = f'{packet.src}_{packet.srcport}-{packet.dst}_{packet.dstport}-{packet.timestamp}' else: label = f'{packet.src}_{packet.srcport}-{packet.dst}_{packet.dstport}-{packet.timestamp}'.replace(':', '.') self._buffer[BUFID] = Buffer( fpout=self._foutio(fname=f'{self._fproot}/{label}{self._fdpext or ""}', protocol=packet.protocol, byteorder=self._endian, nanosecond=self._nnsecd), index=[], label=label, ) # trace frame record self._buffer[BUFID].index.append(packet.index) fpout = self._buffer[BUFID].fpout label = self._buffer[BUFID].label # when FIN is set, submit buffer of this session if FIN: buf = self._buffer.pop(BUFID) # fpout, label = buf['fpout'], buf['label'] index = Index( fpout=f'{self._fproot}/{label}{self._fdpext}' if self._fdpext is not None else None, index=tuple(buf.index), label=label, ) for callback in self.__callback_fn__: callback(index) self._stream.append(index) # return label or output object return fpout if output else label
[docs] def submit(self) -> 'tuple[Index, ...]': """Submit traced TCP flows. Returns: Traced TCP flow (:term:`trace.tcp.index`). """ if (cached := self.__cached__.get('submit')) is not None: return cached ret = [] # type: list[Index] for buf in self._buffer.values(): ret.append(Index(fpout=f"{self._fproot}/{buf.label}{self._fdpext}" if self._fdpext else None, index=tuple(buf.index), label=buf.label,)) ret.extend(self._stream) ret_submit = tuple(ret) self.__cached__['submit'] = ret_submit return ret_submit