Customisation & Extensions¶
pcapkit
is designed with extensive ability in customisation and
extension. It is easy to add new protocols, layers, fields, and even new file
formats. This section will introduce how to customise and extend pcapkit
to meet your own needs.
What’s in for Protocols?¶
Protocol
is the most essential concept
and class in pcapkit
. Every protocol is represented by a
Protocol
subclass, which is responsible
for parsing and/or constructing the protocol packets, as in the network stack.
The following table shows all available protocol classes in pcapkit
:
Protocol Type |
Protocol Class |
|||
---|---|---|---|---|
Link Layer
( |
ARP Family |
|||
RARP Family |
||||
Internet Layer
( |
IP Family |
|||
IPv6 Extension Header |
||||
IPsec Family |
||||
Transport Layer
( |
||||
Application Layer
( |
FTP Family |
|||
HTTP Family |
||||
Miscellaneous Protocols |
PCAP Format |
|||
PCAP-NG Format |
||||
New Protocol¶
To add a new protocol, you need to create a new class inherited from
Protocol
, or its subclasses, specifically,
the Link
,
Internet
,
Transport
, and
Application
classes, which
are the base classes for link, internet, transport, and application layer
protocols, respectively.
Important
The Protocol
class is the base class
for all protocols, and it is not recommended to inherit from it directly,
unless you are going to create a new protocol stack, e.g., a new
miscellaneous protocol stack as in pcapkit.protocols.misc
module.
Besides, you need to implement the corresponding schema and data model for the
new protocol. The schema is a Schema
subclass that defines the field structure of the protocol, and the data model
is a Data
subclass that defines the data
attributes of the protocol.
Note
The schema classes are used for parsing and/or constructing the protocol
packets, each field should be defined as a class attribute with the value of
a Field
subclass instance.
The data classes are used for storing the parsed protocol packets. Each data attribute should be defined as a class attribute with no value besides the type annotations of the corresponding field.
Once the protocol class is implemented, you need to register it to the protocol
registry, which is managed by the APIs provided by
pcapkit.foundation.registry.protocols
module. Depending on the protocol
type, you need to register the protocol class to the corresponding registry, e.g.,
for a link layer protocol, you need to register it to the link layer protocol
registry thru register_linktype()
function.
The following table shows the type of protocols and the corresponding registry functions:
Protocol Type |
Registry Function |
Notes |
|
---|---|---|---|
Link Layer |
Registry functions on the left column are combined calls to those on the right columns. It is recommended to use those on the left columns when registering new protocols. |
||
Internet Layer |
|||
Transport Layer |
|||
Application Layer |
|||
Samples¶
The following code snippet shows how to create a new protocol class:
Note
Following is a PoC implementation of a new internet layer protocol. For
demonstration purpose, we use the IPv4 protocol as an example, where the
comprehensive implementation can be found in the
pcapkit.protocols.internet.ipv4
module.
from typing import TYPE_CHECKING
from pcapkit.protocols.internet.internet import Internet # import base class
from pcapkit.protocols.data.data import Data # import data class
from pcapkit.protocols.schema.schema import Schema # import schema class
from pcapkit.corekit.fields.ipaddress import IPv4AddressField # import field class
from pcapkit.const.reg.ethertype import EtherType # import protocol code registry
from pcapkit.foundation.registry.protocols import register_ethertype # import protocol registry API
if TYPE_CHECKING:
from ipaddress import IPv4Address
from typing import Any, Optional
class IPv4Data(Data): # define data model
...
src: 'IPv4Address' # source address
dst: 'IPv4Address' # destination address
...
class IPv4Schema(Schema): # define fields
...
src = IPv4AddressField(...)
dst = IPv4AddressField(...)
...
class MyIPv4(Internet[IPv4Data, IPv4Schema],
schema=IPv4Schema, data=IPv4Data): # define protocol class
def read(self, length: 'Optional[int]' = None, **kwargs: 'Any') -> 'IPv4Data':
if length is None:
length = len(self) # infer length of packet from data
schema = self.__header__ # get schema
return IPv4Data(...)
def make(self, ..., **kwargs: 'Any') -> 'IPv4Schema':
...
return IPv4Schema(...)
# register protocol class
register_ethertype(EtherType.Internet_Protocol_version_4, MyIPv4)
Extending Existing Protocol¶
In many cases, existing protocols have customisable attributes, e.g., the option headers in the IPv4 protocol. To extend an existing protocol with additional methods for handling option headers, and etc., you need to first redirect to the corresponding protocol class, and check if the protocol class supports the expected extension. If so, you can add the extension methods to the protocol class, as it may be discussed in the protocol documentation.
Warning
It is not recommended to directly inherit from the protocol class, as it may cause unexpected errors. Instead, you should use the provided helper functions to extend the protocol class.
See pcapkit.foundation.registry.protocols
module for available
helper registry functions and the corresponding protocol classes.
The following table shows the protocol classes with their corresponding available extensible items and the helper registry functions:
Protocol Type |
Protocol Class |
Extensable Registry |
Registry Function |
---|---|---|---|
Internet Layer |
|||
Transport Layer |
|||
Application Layer |
|||
Miscellaneous Protocols |
|||
Important
All extensable items are to be implemented as class methods, whose first argument is the protocol class itself, and the rest arguments are the same as the other built-in methods’ signatures.
And Speaking of Library Foundations…¶
The pcapkit
library is built on top of the pcapkit.foundation
module, which provides the basic functionalities for the library. The
pcapkit.foundation
module is designed to be extensible, and it is
easy to add new features to the library.
Extractor Engines¶
The pcapkit.foundation.engines
module provides several built-in engines
for extracting network packets from the intput PCAP and/or PCAP-NG, etc., files.
All engines are implemented as Engine
subclasses, which are responsible for parsing the input files and extracting
the network packets for further processing.
The following table shows the available engines and the corresponding supported file formats:
Engine Type |
Engine Class |
Supported File Formats |
---|---|---|
Built-in Engines |
PCAP only |
|
PCAP-NG only |
||
Third-party Engines |
all formats supported by Scapy |
|
all formats supported by DPKT |
||
all formats supported by PyShark |
Samples¶
The following code snippet shows how to create a new engine class:
Note
Following is a PoC implementation of a new engine based on scapy
. For documentation
purposes, we use the Scapy
engine as an example,
where the comprehensive implementation can be found in the pcapkit.foundation.engines.scapy
module.
from typing import TYPE_CHECKING
from pcapkit.foundation.engines.engine import Engine # import base class
if TYPE_CHECKING:
from pcapkit.foundation.extraction import Extractor
from scapy.packet import Packet
class MyScapy(Engine['Packet']):
__engine_name__ = 'Scapy' # friendly name of the engine
__engine_module__ = 'scapy' # module name that the engine is based on
def __init__(self, extractor: 'Extractor') -> 'None':
# NOTE: the API entry point of the underlying engine module should
# be imported here, instead of at the top of the module, to avoid
# dependency issues.
from scapy import sendrecv # import API entry point
self._expkg = sendrecv # store API entry point
self._extmp = None # intermediate storage for the interator
# generated by the API entry point
super().__init__(extractor) # initialise base class
# NOTE: The following methods are the core methods of the engine class,
# which are responsible for parsing the input file and extracting the
# network packets for further processing. It is expected to create an
# iterator instance that yields the network packets, and store it in
# the intermediate storage, i.e., self._extmp for further processing,
# using the API entry point as in the self._expkg attribute.
def run(self) -> 'None':
ext = self._extractor # get extractor instance
# NOTE: You may wish to do some pre-processing here, e.g., to
# check if there's any extraction protocol and/or layer thresholds
# set by the user (some engines may not support this feature); or
# to set up the verbose handler for the underlying engine module,
# as in the following code snippet.
if ext._flag_v: # check if verbose mode is enabled
# NOTE: PyPCAPKit has its own bundled supporting functions for
# third-party libraries, e.g., the `packet2chain` function is
# to convert a given Packet instance of the Scapy library to a
# human-readable string, which is used for verbose output.
from pcapkit.toolkit.scapy import pack2chain # import utility function
ext._vfunc = lambda extractor, packet: print(
f'Frame {extractor._frnum:>3d}: {pack2chain(packet)}'
) # print verbose message as `Frame XXX: Ethernet:IP:TCP:...`
# NOTE: Here we use the API entry point, i.e., scapy.sendrecv.sniff,
# to read the input file and extract the network packets. The API
# entry point returns an iterable instance, which is then converted
# as an iterator and stored in the intermediate storage for further
# processing.
self._extmp = iter(self._expkg.sniff(offline=ext._ifnm))
# NOTE: The following methods are the helper methods of the engine class,
# which are responsible for extracting the network packets from the
# intermediate storage, i.e., self._extmp, and returning the extracted
# network packets for further processing. It is also expected to handle
# necessary actions, including but not limited to, verbose output, file
# output, reassembly and flow tracing, etc.
def read_frame(self) -> 'Packet':
from pcapkit.toolkit.scapy import (ipv4_reassembly, ipv6_reassembly, packet2dict,
tcp_reassembly, tcp_traceflow) # import utility functions
ext = self._extractor # get extractor instance
packet = next(self._extmp) # get next packet from the iterator
ext._frnum += 1 # increment frame number
ext._vfunc(ext, packet) # print verbose message
# NOTE: The following code snippet is to handle the file output,
# i.e., to write the packet to the output file if the output file
# is specified by the user.
frnum = f'Frame {ext._frnum}'
if not ext._flag_q:
info = packet2dict(packet) # convert packet to dict-like object
# NOTE: Now, we need to check if output file is to be a single
# file. If so, we can directly write the packet to the output
# file, i.e., ext._ofile; otherwise, we need to create a new
# dumper instance by initialising ext._ofile, and write the
# packet to the dumper instance. For more information, please
# refer to the documentation of DictDumper library.
if ext._flag_f:
ofile = ext._ofile(f'{ext._ofnm}/{frnum}.{ext._fext}')
ofile(info, name=frnum)
else:
ext._ofile(info, name=frnum)
# NOTE: The following code snippet is to handle the reassembly of
# the network packets, i.e., to reassemble the fragmented packets
# into a complete packet. Before reassembly, we need to convert
# the packet to a tailored format, i.e., a dict-like object, which
# is then passed to the reassembly function. For more information,
# please refer to the documentation of pcapkit.foundation.reassembly
# module.
if ext._flag_r:
if ext._ipv4:
data_ipv4 = ipv4_reassembly(packet, count=ext._frnum)
if data_ipv4 is not None:
ext._reasm.ipv4(data_ipv4)
if ext._ipv6:
data_ipv6 = ipv6_reassembly(packet, count=ext._frnum)
if data_ipv6 is not None:
ext._reasm.ipv6(data_ipv6)
if ext._tcp:
data_tcp = tcp_reassembly(packet, count=ext._frnum)
if data_tcp is not None:
ext._reasm.tcp(data_tcp)
# NOTE: The following code snippet is to handle the flow tracing of
# the network packets, i.e., to trace a series of packets as they
# will compose as a flow and/or stream. Before reassembly, we need
# to convert the packet to a tailored format, i.e., a dict-like object,
# which is then passed to the flow tracing function. For more
# information, please refer to the documentation of
# pcapkit.foundation.traceflow module.
if ext._flag_t:
if ext._tcp:
data_tf_tcp = tcp_traceflow(packet, count=ext._frnum)
if data_tf_tcp is not None:
ext._trace.tcp(data_tf_tcp)
# NOTE: The following code snippet is to record extracted frames
# into the internal storage, i.e., ext._frame, for further processing.
if ext._flag_d:
ext._frame.append(packet)
return packet # return the extracted packet
Output Dumpers¶
The pcapkit.dumpkit
module wraps the DictDumper
library, which provides the basic functionalities for dumping the extracted
network packets to the output file. The pcapkit.dumpkit
module is
designed to be extensible, and it is easy to add new output formats to the
library, based on the extensibility of the DictDumper
library.
See also
Please refer to the documentation of DictDumper
library
for more information about the output dumpers.
The following table shows the available output dumpers and the corresponding formats:
Dumper Format |
Dumper Class |
Output Extension |
---|---|---|
No Output |
N/A |
|
PCAP ( |
|
|
XML ( |
|
|
JSON ( |
|
|
Text ( |
|
Samples¶
The following code snippet shows how to create a new dumper class:
Note
Following is a PoC implementation of a new dumper for the PCAP format. For
documentation purposes, we use the PCAPIO
dumper as an example, where the comprehensive implementation can be found
in the pcapkit.dumpkit.pcap
module.
from typing import TYPE_CHECKING
from pcapkit.dumpkit.common import Dumper # import base class
from pcapkit.protocols.misc.pcap.header import Header # import PCAP header class
from pcapkit.protocols.misc.pcap.frame import Frame # import PCAP frame class
if TYPE_CHECKING:
from typing import Optional
# NOTE: We need to specify the output file format and extension here at the
# class definition, as in the following code snippet. Such that PyPCAPKit
# will be able to automatically register our dumper class to the corresponding
# dumper registries, and use it to dump the extracted network packets.
class MyDumper(Dumper, fmt='pcap', ext='.pcap'):
# NOTE: This property is to define the file format of the dumper.
# It is expected to be a string, which is used as the file extension
# of the output file.
@property
def kind(self) -> 'str':
return 'pcap'
# NOTE: We need to collect some necessary information here thru the necessary
# arguments, e.g., the file name, link-layer protocol, etc., which will be
# used to generate the top-level header of the output file.
def __init__(self, fname: 'str', *, ...) -> 'None':
# NOTE: We can do some pre-processing here if necessary, e.g., to
# save the nanosecond-resolution flag, link-layer protocol type, etc.,
# such that we can use them later when dumping the network packets.
self._fnum = 1 # initialise frame number counter to 1
super().__init__(fname, ...) # initialise base class with the file name
# and other necessary arguments
# NOTE: We need to overwrite the original ``__call__`` method, cause the
# PCAP file format is a binary format, thus we need to re-open the output
# file in binary mode, instead of the default text mode.
def __call__(self, value: 'Frame', name: 'Optional[str]' = None) -> 'MyDumper':
with open(self._file, 'ab') as file:
self._append_value(value, file, name or f'Frame {self._fnum}') # append value to the file
return self
# NOTE: The following method will be called at the initialisation of the
# dumper class, which is expected to write the top-level header of the
# output file.
def _dump_header(self, *, ...) -> 'None':
packet = Header(...) # create PCAP header instance based on the given arguments
with open(self._file, 'wb') as file:
file.write(packet.data) # write the header to the output file
# NOTE: The following method is to write the network packets to the output
# file. It is expected to convert the network packet to a tailored format,
# i.e., a Frame instance, which is then converted to the binary format and
# written to the output file.
def _append_value(self, value: 'Frame', file: 'IO[bytes]', name: 'str') -> 'None':
packet = Frame(...) # create PCAP frame instance based on the given arguments
file.write(packet.data) # write the frame to the output file
# NOTE: We can do some post-processing here if necessary, e.g., to
# update the frame number counter, etc., such that we can use them
# later when dumping other network packets.
self._fnum += 1 # increment frame number counter
Reassembly and Flow Tracing¶
The pcapkit.foundation.reassembly
module provides several built-in
reassembly classes for reassembling the fragmented network packets; and the
pcapkit.foundation.traceflow
module provides several built-in flow
tracing classes for tracing the network packets as they will compose as a
flow and/or stream. All reassembly and flow tracing classes are implemented
as Reassembly
and
TraceFlow
subclasses,
respectively, which are responsible for reassembling and tracing the network
packets for further processing.
The following table shows the available reassembly and flow tracing classes per the supported protocols:
Protocol |
Reassembly Class |
Flow Tracing Class |
---|---|---|
See also
Please refer to the documentation of pcapkit.foundation.reassembly
and
pcapkit.foundation.traceflow
modules for more information about the
reassembly and flow tracing classes, as well as the underlying algorithms.
New Implementation¶
To add a new reassembly or flow tracing class, you need to create a new class
inherited from Reassembly
or TraceFlow
class, which
is responsible for reassembling or tracing the network packets for further
processing.
The following code snippet shows how to create a new reassembly class:
Note
Following is a PoC implementation of a new reassembly class for the IPv4
protocol. For documentation purposes, we use the
IPv4
reassembly class as an
example, where the comprehensive implementation can be found in the
pcapkit.foundation.reassembly.ipv4
module.
from typing import TYPE_CHECKING
from pcapkit.corekit.infoclass import Info, info_final # import Info class for data model definitions
from pcapkit.protocols.internet.ipv4 import IPv4 # import protocol class for attribute definitions
from pcapkit.foundation.reassembly.reassembly import Reassembly # import base class
if TYPE_CHECKING:
from typing import TypeAlias
# NOTE: We need to specify some useful data structure before creating the
# reassembly class, e.g., the data model for the reassembled packet, the
# data model for the reassembly table, etc., which will be used to store
# the reassembled packet and the reassembly table, respectively.
BufferID: 'TypeAlias' = ... # a tuple-like object to identify the reassembly
# table entry's ID, e.g., (src, dst, id, proto)
# for IPv4, (src, dst, spi) for IPv6, etc.
@info_final
class Packet(Info):
# NOTE: This is the data model for the extracted packet, which is
# expected to be a dict-like object, and will be used to store the
# information from the extracted packets to be reassembled.
...
@info_final
class DatagramID(Info):
# NOTE: This is the data model for the reassembled datagram's ID,
# which is expected to be a dict-like object, and will be used to
# store the information to identify the reassembled datagram.
...
@info_final
class Datagram(Info):
# NOTE: This is the data model for the reassembled datagram, which
# is expected to be a dict-like object, and will be used to store
# the information of the reassembled datagram.
...
@info_final
class Buffer(Info):
# NOTE: This is the data model for the reassembly table, which is
# expected to be a dict-like object, and will be used to store the
# information of each buffered entry in the reassembly table, that
# is to be reassembled with the following fragmented packets.
...
# NOTE: Now we can create the reassembly class, which is expected to be
# a subclass of the base class, i.e., Reassembly, and implement the core
# methods, i.e., reassembly and submit, for reassembling the fragmented
# packets and submitting the reassembled datagram, respectively.
class MyReassembly(Reassembly[Packet, Datagram, BufferID, Buffer]):
__protocol_name__ = 'IPv4' # name of the protocol
__protocol_type__ = IPv4 # type of the protocol
# NOTE: This method is to reassemble the fragmented packets, i.e.,
# ``info`` formatted as a ``Packet`` instance, into the reassembly
# table, i.e., ``self._buffer`` formatted as a dict of ``BufferID``
# mapped to ``Buffer`` instances.
def reassembly(self, info: 'Packet') -> 'None':
... # reassembly algorithm implementation
# NOTE: Once the fragmented packets are reassembled, we need to
# submit the reassembled datagram, i.e., ``Datagram`` instance,
# to the storage, i.e., ``self._dtgram``, for further processing.
self._dtgram.extend(
self.submit(...) # submit reassembled datagram
)
# NOTE: This method is to convert the reassembled datagram, i.e.,
# ``buf`` formatted as a ``Buffer`` instance, to a ``Datagram``
# instance, which is then submitted to the storage, i.e.,
# ``self._dtgram``.
def submit(self, buf: 'Buffer', * , bufid: 'BufferID') -> 'list[Datagram]':
datagram = ... # submit algorithm implementation, e.g., to convert
# the reassembled datagram to a list of ``Datagram``
# instances, which is then returned for further
# processing by the caller
# NOTE: Before we submit the reassembled datagram, we need to
# run the registered callback functions, if any, to process the
# reassembled datagram, e.g., to check if the reassembled datagram
# is to be discarded, etc.
for callback in self.__callback_fn__:
callback(datagram) # run callback functions
return datagram
Callback Functions¶
It is possible to register callback functions to the reassembly and flow tracing classes, which will be called at the end of the reassembly and flow tracing process, respectively. This feature is designed to create the ability to process the reassembled datagrams and/or flows, e.g., to check if the datagram and/or flows are to be discarded, etc.
See also
For more information, you may refer to the documentation of
pcapkit.foundation.registry.foundation
for the callback
registry functions:
All callback functions are expected to be a callback function, which
accepts a single argument, i.e., the list of reassembled datagrams
and/or flows, and returns None
. Any return value will be ignored.
It is possible to modify the reassembled datagrams and/or flows in the callback functions, e.e., to discard certain reassembled datagrams and/or flows, etc. However, it is not recommended to modify the reassembled datagrams and/or flows directly. Should that is the intended behaviour, you should create a new reassembly and/or flow tracing class, and modify the corresponding reassembly and/or flow tracing algorithm.