Base Class

pcapkit.foundation.reassembly.reassembly contains Reassembly only, which is an abstract base class for all reassembly classes, bases on algorithms described in RFC 791 and RFC 815, implements datagram reassembly of IP and TCP packets.

class pcapkit.foundation.reassembly.reassembly.Reassembly(*, strict=True, store=True)[source]

Bases: ReassemblyBase[_PT, _DT, _IT, _BT], Generic[_PT, _DT, _IT, _BT]

Base flow tracing class.

Example

Use keyword argument protocol to specify the protocol name at class definition:

class MyProtocol(Reassembly, protocol='my_protocol'):
    ...
Parameters:
  • strict (bool) – if return all datagrams (including those not implemented) when submit

  • store (bool) – if store reassembled datagram in memory, i.e., self._dtgram (if not, datagram will be discarded after callback)

See also

For more information on customisation and extension, please refer to Customisation & Extensions.

property name: str

Protocol name of current reassembly class.

Note

This property is also available as a class variable. Its value can be set by __protocol_name__ class attribute.

property protocol: Type[Protocol]

Protocol of current reassembly class.

Note

This property is also available as a class variable. Its value can be set by __protocol_type__ class attribute.

property count: int

Total number of reassembled packets.

property datagram: tuple[_DT, ...]

Reassembled datagram.

Raises:

UnsupportedCall – If self._flag_d is set to False.

abstract reassembly(info)

Reassembly procedure.

Parameters:

info (TypeVar(_PT, bound= Info)) – info dict of packets to be reassembled

Return type:

None

abstract submit(buf, **kwargs)

Submit reassembled payload.

Parameters:
  • buf (TypeVar(_BT, bound= Info)) – buffer dict of reassembled packets

  • **kwargs (Any) – arbitrary keyword arguments

Return type:

list[TypeVar(_DT, bound= Info)]

fetch()

Fetch datagram.

Return type:

tuple[TypeVar(_DT, bound= Info), ...]

Returns:

Tuple of reassembled datagrams.

Fetch reassembled datagrams from self._dtgram and returns a tuple of such datagrams.

If no cache found, the method will call self.submit to forcedly obtain newly reassembled payload. Otherwise, the already calculated self._dtgram will be returned.

index(pkt_num)

Return datagram index.

Parameters:

pkt_num (int) – index of packet

Return type:

Optional[int]

Returns:

Reassembled datagram index which was from No. pkt_num packet; if not found, returns None.

run(packets)

Run automatically.

Parameters:

packets (list[TypeVar(_PT, bound= Info)]) – list of packet dicts to be reassembled

Return type:

None

classmethod register(callback, *, index=None)

Register callback function.

Parameters:
  • callback (Callable[[list[TypeVar(_DT, bound= Info)]], None]) – callback function, which will be called when reassembled datagram is obtained, with the list of reassembled datagrams as its only argument

  • index (Optional[int]) – index to be inserted in the callback list,; by default, the callback will be appended to the end of the list

Return type:

None

__callback_fn__: list[Callable[[list[TypeVar(_DT, bound= Info)]], None]]

List of callback functions upon reassembled datagram.

_flag_s: bool

Strict mode flag. If set to True, all data will be returned, including those not completely reassembled; otherwise, only completely reassembled data will be returned.

Type:

bool

_flag_d: bool

Store mode flag. If set to True, all reassembled datagram will be stored in memory, i.e., self._dtgram; otherwise, datagram will be discarded after callback.

Type:

bool

_flag_n: bool

New datagram flag. If set to True, the self._dtgram will be repopulated.

Type:

bool

_buffer: dict[TypeVar(_IT, bound= tuple), TypeVar(_BT, bound= Info)]

Dict buffer field. This field is used to store reassembled packets in the form of {bufid: buffer}.

Type:

dict[_IT, _BT]

_dtgram: list[TypeVar(_DT, bound= Info)]

List reassembled datagram. This list is used to store reassembled datagrams.

Type:

list[_DT]

__call__(packet)

Call packet reassembly.

Parameters:

packet (TypeVar(_PT, bound= Info)) – packet dict to be reassembled (detailed format described in corresponding protocol)

Return type:

None

classmethod __init_subclass__(protocol=None, *args, **kwargs)[source]

Initialise subclass.

This method is to be used for registering the engine class to Extractor class.

Parameters:
  • name – Protocol name, default to class name.

  • *args (Any) – Arbitrary positional arguments.

  • **kwargs (Any) – Arbitrary keyword arguments.

Return type:

None

See also

For more details, please refer to pcapkit.foundation.extraction.Extractor.register_reassembly().

__protocol_name__: str

Protocol name of current reassembly object.

__protocol_type__: Type[ProtocolBase]

Protocol of current reassembly object.

Internal Definitions

class pcapkit.foundation.reassembly.reassembly.ReassemblyBase(*, strict=True, store=True)[source]

Bases: Generic[_PT, _DT, _IT, _BT]

Base class for reassembly procedure.

Parameters:
  • strict (bool) – if return all datagrams (including those not implemented) when submit

  • store (bool) – if store reassembled datagram in memory, i.e., self._dtgram (if not, datagram will be discarded after callback)

Note

This class is for internal use only. For customisation, please use TraceFlow instead.

class pcapkit.foundation.reassembly.reassembly.ReassemblyMeta(name, bases, namespace, /, **kwargs)[source]

Bases: ABCMeta

Meta class to add dynamic support to Reassembly.

This meta class is used to generate necessary attributes for the Reassembly class. It can be useful to reduce unnecessary registry calls and simplify the customisation process.

Type Variables

pcapkit.foundation.reassembly.reassembly._PT: pcapkit.corekit.infoclass.Info

Packet data structure.

pcapkit.foundation.reassembly.reassembly._DT: pcapkit.corekit.infoclass.Info

Datagram data structure.

pcapkit.foundation.reassembly.reassembly._IT: pcapkit.corekit.infoclass.Info

Buffer ID data structure.

pcapkit.foundation.reassembly.reassembly._BT: pcapkit.corekit.infoclass.Info

Buffer data structure.