# -*- coding: utf-8 -*-
# mypy: disable-error-code=dict-item
"""Frame Header
==================
.. module:: pcapkit.protocols.misc.pcap.frame
:mod:`pcapkit.protocols.misc.pcap.frame` contains
:class:`~pcapkit.protocols.misc.pcap.frame.Frame` only,
which implements extractor for frame headers [*]_ of PCAP,
whose structure is described as below:
.. code-block:: c
typedef struct pcaprec_hdr_s {
guint32 ts_sec; /* timestamp seconds */
guint32 ts_usec; /* timestamp microseconds */
guint32 incl_len; /* number of octets of packet saved in file */
guint32 orig_len; /* actual length of packet */
} pcaprec_hdr_t;
.. [*] https://wiki.wireshark.org/Development/LibpcapFileFormat#Record_.28Packet.29_Header
"""
import collections
import datetime
import decimal
import io
import sys
import time
from typing import TYPE_CHECKING, cast, overload
from pcapkit.const.reg.linktype import LinkType as Enum_LinkType
from pcapkit.corekit.module import ModuleDescriptor
from pcapkit.protocols.data.misc.pcap.frame import Frame as Data_Frame
from pcapkit.protocols.data.misc.pcap.frame import FrameInfo as Data_FrameInfo
from pcapkit.protocols.protocol import ProtocolBase as Protocol
from pcapkit.protocols.schema.misc.pcap.frame import Frame as Schema_Frame
from pcapkit.utilities.compat import localcontext
from pcapkit.utilities.exceptions import RegistryError, UnsupportedCall, stacklevel
from pcapkit.utilities.warnings import ProtocolWarning, RegistryWarning, warn
if TYPE_CHECKING:
from datetime import datetime as dt_type
from decimal import Decimal
from typing import IO, Any, DefaultDict, Optional, Type
from typing_extensions import Literal
from pcapkit.protocols.data.misc.pcap.header import Header as Data_Header
from pcapkit.protocols.schema.schema import Schema
__all__ = ['Frame']
# check Python version
py37 = ((version_info := sys.version_info).major >= 3 and version_info.minor >= 7)
[docs]
class Frame(Protocol[Data_Frame, Schema_Frame],
schema=Schema_Frame, data=Data_Frame):
"""Per packet frame header extractor.
This class currently supports parsing of the following protocols, which are
registered in the :attr:`self.__proto__ <pcapkit.protocols.misc.pcap.frame.Frame.__proto__>`
attribute:
.. list-table::
:header-rows: 1
* - Index
- Protocol
* - :attr:`pcapkit.const.reg.linktype.LinkType.ETHERNET`
- :class:`pcapkit.protocols.link.ethernet.Ethernet`
* - :attr:`pcapkit.const.reg.linktype.LinkType.IPV4`
- :class:`pcapkit.protocols.internet.ipv4.IPv4`
* - :attr:`pcapkit.const.reg.linktype.LinkType.IPV6`
- :class:`pcapkit.protocols.internet.ipv6.IPv6`
"""
##########################################################################
# Defaults.
##########################################################################
#: DefaultDict[Enum_LinkType, ModuleDescriptor[Protocol] | Type[Protocol]]: Protocol index mapping for
#: decoding next layer, c.f. :meth:`self._decode_next_layer <pcapkit.protocols.protocol.Protocol._decode_next_layer>`
#: & :meth:`self._import_next_layer <pcapkit.protocols.protocol.Protocol._import_next_layer>`.
#: The values should be a tuple representing the module name and class name, or
#: a :class:`~pcapkit.protocols.protocol.Protocol` subclass.
__proto__ = collections.defaultdict(
lambda: ModuleDescriptor('pcapkit.protocols.misc.raw', 'Raw'),
{
Enum_LinkType.ETHERNET: ModuleDescriptor('pcapkit.protocols.link', 'Ethernet'),
Enum_LinkType.IPV4: ModuleDescriptor('pcapkit.protocols.internet', 'IPv4'),
Enum_LinkType.IPV6: ModuleDescriptor('pcapkit.protocols.internet', 'IPv6'),
},
) # type: DefaultDict[Enum_LinkType | int, ModuleDescriptor[Protocol] | Type[Protocol]]
##########################################################################
# Properties.
##########################################################################
@property
def name(self) -> 'str':
"""Name of corresponding protocol."""
return f'Frame {self._fnum}'
@property
def length(self) -> 'Literal[16]':
"""Header length of corresponding protocol."""
return 16
@property
def header(self) -> 'Data_Header':
"""Global header of the PCAP file."""
return self._ghdr
##########################################################################
# Methods.
##########################################################################
[docs]
@classmethod
def register(cls, code: 'Enum_LinkType', protocol: 'ModuleDescriptor[Protocol] | Type[Protocol]') -> 'None': # type: ignore[override]
r"""Register a new protocol class.
Notes:
The full qualified class name of the new protocol class
should be as ``{protocol.module}.{protocol.name}``.
Arguments:
code: protocol code as in :class:`~pcapkit.const.reg.linktype.LinkType`
module: module descriptor or a
:class:`~pcapkit.protocols.protocol.Protocol` subclass
"""
if isinstance(protocol, ModuleDescriptor):
protocol = protocol.klass
if not issubclass(protocol, Protocol):
raise RegistryError(f'protocol must be a Protocol subclass, not {protocol!r}')
if code in cls.__proto__:
warn(f'protocol {code} already registered, overwriting', RegistryWarning)
cls.__proto__[code] = protocol
[docs]
def index(self, name: 'str | Protocol | Type[Protocol]') -> 'int':
"""Call :meth:`ProtoChain.index <pcapkit.corekit.protochain.ProtoChain.index>`.
Args:
name: ``name`` to be searched
Returns:
First index of ``name``.
Raises:
IndexNotFound: if ``name`` is not present
"""
return self._protos.index(name)
def pack(self, **kwargs: 'Any') -> 'bytes':
"""Pack (construct) packet data.
Args:
**kwargs: Arbitrary keyword arguments.
Returns:
Constructed packet data.
Notes:
We used a special keyword argument ``__packet__`` to pass the
global packet data to underlying methods. This is useful when
the packet data is not available in the current instance.
"""
self.__header__ = self.make(**kwargs)
packet = kwargs.get('__packet__', {}) # packet data
packet['byteorder'] = self._ghdr.magic_number.byteorder
return self.__header__.pack(packet)
[docs]
def unpack(self, length: 'Optional[int]' = None, **kwargs: 'Any') -> 'Data_Frame':
"""Unpack (parse) packet data.
Args:
length: Length of packet data.
**kwargs: Arbitrary keyword arguments.
Returns:
Parsed packet data.
Notes:
We used a special keyword argument ``__packet__`` to pass the
global packet data to underlying methods. This is useful when
the packet data is not available in the current instance.
"""
if cast('Optional[Schema_Frame]', self.__header__) is None:
packet = kwargs.get('__packet__', {}) # packet data
packet['bytesorder'] = self._ghdr.magic_number.byteorder
self.__header__ = cast('Schema_Frame', self.__schema__.unpack(self._file, length, packet)) # type: ignore[call-arg,misc]
return self.read(length, **kwargs)
[docs]
def read(self, length: 'Optional[int]' = None, *, _read: 'bool' = True, **kwargs: 'Any') -> 'Data_Frame':
r"""Read each block after global header.
Args:
length: Length of data to be read.
\_read: If the class is called in a parsing scenario.
**kwargs: Arbitrary keyword arguments.
Returns:
Data_Frame: Parsed packet data.
"""
schema = self.__header__
_tsss = schema.ts_sec
_tsus = schema.ts_usec
_ilen = schema.incl_len
_olen = schema.orig_len
with localcontext(prec=64):
if self._nsec:
_epch = _tsss + decimal.Decimal(_tsus) / 1_000_000_000
else:
_epch = _tsss + decimal.Decimal(_tsus) / 1_000_000
_irat = _epch.as_integer_ratio()
try:
_time = datetime.datetime.fromtimestamp(_irat[0] / _irat[1])
except ValueError:
warn(f'PCAP: invalid timestamp: {_epch}', ProtocolWarning, stacklevel=stacklevel())
_time = datetime.datetime.fromtimestamp(0, datetime.timezone.utc)
frame = Data_Frame(
frame_info=Data_FrameInfo(
ts_sec=_tsss,
ts_usec=_tsus,
incl_len=_ilen,
orig_len=_olen,
),
time=_time,
number=self._fnum,
time_epoch=_epch,
len=_ilen,
cap_len=_olen,
)
if not _read:
# move backward to the beginning of the packet
self._file.seek(0, io.SEEK_SET)
else:
# NOTE: We create a copy of the frame data here for parsing
# scenarios to keep the original frame data intact.
seek_cur = self._file.tell()
# move backward to the beginning of the frame
self._file.seek(-self.length, io.SEEK_CUR)
#: bytes: Raw frame data.
self._data = self._read_fileng(self.length + _ilen)
# move backward to the beginning of frame's payload
self._file.seek(seek_cur, io.SEEK_SET)
#: io.BytesIO: Source data stream.
self._file = io.BytesIO(self._data)
return self._decode_next_layer(frame, self._ghdr.network, frame.len)
[docs]
def make(self,
timestamp: 'Optional[float | Decimal | int | dt_type]' = None,
ts_sec: 'Optional[int]' = None,
ts_usec: 'Optional[int]' = None,
incl_len: 'Optional[int]' = None,
orig_len: 'Optional[int]' = None,
packet: 'bytes | Protocol | Schema' = b'',
nanosecond: 'bool' = False,
**kwargs: 'Any') -> 'Schema_Frame':
"""Make frame packet data.
Args:
timestamp: UNIX-Epoch timestamp
ts_sec: timestamp seconds
ts_usec: timestamp microseconds
incl_len: number of octets of packet saved in file
orig_len: actual length of packet
packet: raw packet data
nanosecond: nanosecond-resolution file flag
**kwargs: Arbitrary keyword arguments.
Returns:
Constructed packet data.
"""
ts_sec, ts_usec = self._make_timestamp(timestamp, ts_sec, ts_usec, nanosecond)
if incl_len is None:
incl_len = min(len(packet), self._ghdr.snaplen)
if orig_len is None:
orig_len = len(packet)
return Schema_Frame(
ts_sec=ts_sec,
ts_usec=ts_usec,
incl_len=incl_len,
orig_len=orig_len,
packet=packet,
)
##########################################################################
# Data models.
##########################################################################
@overload
def __post_init__(self, file: 'IO[bytes] | bytes', length: 'Optional[int]' = ..., *, # pylint: disable=arguments-differ
num: 'int', header: 'Data_Header', **kwargs: 'Any') -> 'None': ...
@overload
def __post_init__(self, *, num: 'int', header: 'Data_Header', # pylint: disable=arguments-differ
**kwargs: 'Any') -> 'None': ...
[docs]
def __post_init__(self, file: 'Optional[IO[bytes] | bytes]' = None, length: 'Optional[int]' = None, *, # pylint: disable=arguments-differ
num: 'int', header: 'Data_Header', **kwargs: 'Any') -> 'None':
"""Initialisation.
Args:
file: Source packet stream.
length: Length of packet data.
num: Frame index number.
header: Global header of the PCAP file.
**kwargs: Arbitrary keyword arguments.
See Also:
For construction argument, please refer to :meth:`make`.
"""
#: int: frame index number
self._fnum = num
#: pcapkit.protocols.misc.pcap.header.Header: Global header of the PCAP file.
self._ghdr = header
#: pcapkit.const.reg.linktype.LinkType: next layer protocol index
self._prot = header.network
#: bool: nanosecond-timestamp PCAP flag
self._nsec = header.magic_number.nanosecond
if file is None:
_read = False
#: bytes: Raw packet data.
self._data = self.pack(**kwargs)
#: io.BytesIO: Source packet stream.
self._file = io.BytesIO(self._data)
else:
_read = True
#: io.BytesIO: Source packet stream.
self._file = io.BytesIO(file) if isinstance(file, bytes) else file
#: pcapkit.corekit.infoclass.Info: Parsed packet data.
self._info = self.unpack(length, _read=_read, **kwargs)
def __length_hint__(self) -> 'Literal[16]':
"""Return an estimated length for the object."""
return 16
# NOTE: This is a hack to make the ``__index__`` method work both as a
# class method and an instance method.
[docs]
def __index__(self: 'Optional[Frame]' = None) -> 'int': # type: ignore[override]
"""Index of the frame.
Args:
self: :class:`Frame` object or :obj:`None`.
Returns:
If the object is initiated, i.e. :attr:`self._fnum <pcapkit.protocols.misc.pcap.frame.Frame._fnum>`
exists, returns the frame index number of itself; else raises :exc:`UnsupportedCall`.
Raises:
UnsupportedCall: This protocol has no registry entry.
"""
if self is None:
raise UnsupportedCall("'Frame' object cannot be interpreted as an integer")
return self._fnum
##########################################################################
# Utilities.
##########################################################################
[docs]
@classmethod
def _make_data(cls, data: 'Data_Frame') -> 'dict[str, Any]': # type: ignore[override]
"""Create key-value pairs from ``data`` for protocol construction.
Args:
data: protocol data
Returns:
Key-value pairs for protocol construction.
"""
return {
'ts_src': data.frame_info.ts_sec,
'ts_usec': data.frame_info.ts_usec,
'incl_len': data.frame_info.incl_len,
'orig_len': data.frame_info.orig_len,
'packet': cls._make_payload(data),
}
def _make_timestamp(self, timestamp: 'Optional[float | Decimal | dt_type | int]' = None, ts_sec: 'Optional[int]' = None,
ts_usec: 'Optional[int]' = None, nanosecond: 'bool' = False) -> 'tuple[int, int]':
"""Make timestamp.
Args:
timestamp: UNIX-Epoch timestamp
ts_sec: timestamp seconds
ts_usec: timestamp microseconds
nanosecond: nanosecond-resolution file flag
Returns:
Second and microsecond/nanosecond value of timestamp.
"""
with localcontext(prec=64):
if timestamp is None:
if py37 and nanosecond:
timestamp = decimal.Decimal(time.time_ns()) / 1_000_000_000
else:
timestamp = decimal.Decimal(time.time())
else:
if isinstance(timestamp, datetime.datetime):
timestamp = timestamp.timestamp()
timestamp = decimal.Decimal(timestamp)
if ts_sec is None:
ts_sec = int(timestamp)
if ts_usec is None:
ts_usec = int((timestamp - ts_sec) * (1_000_000_000 if nanosecond else 1_000_000))
return ts_sec, ts_usec
def _decode_next_layer(self, dict_: 'Data_Frame', proto: 'Optional[int]' = None,
length: 'Optional[int]' = None, *, packet: 'Optional[dict[str, Any]]' = None) -> 'Data_Frame': # pylint: disable=arguments-differ
r"""Decode next layer protocol.
Arguments:
dict\_: info buffer
proto: next layer protocol index
length: valid (*non-padding*) length
packet: packet info (passed from :meth:`self.unpack <pcapkit.protocols.protocol.Protocol.unpack>`)
Returns:
Current protocol with packet extracted.
Notes:
We added a new key ``__next_type__`` to ``dict_`` to store the
next layer protocol type, and a new key ``__next_name__`` to
store the next layer protocol name. These two keys will **NOT**
be included when :meth:`Info.to_dict <pcapkit.corekit.infoclass.Info.to_dict>` is called.
We also added a new key ``protocols`` to ``dict_`` to store the
protocol chain of the current packet (frame).
"""
next_ = cast('Protocol', self._import_next_layer(proto, length, packet=packet)) # type: ignore[misc,call-arg,redundant-cast]
info, chain = next_.info, next_.protochain
# make next layer protocol name
layer = next_.info_name
# proto = next_.__class__.__name__
# write info and protocol chain into dict
dict_.__update__({
layer: info,
'protocols': chain.chain if chain else '',
'__next_type__': type(next_),
'__next_name__': layer,
})
self._next = next_ # pylint: disable=attribute-defined-outside-init
self._protos = chain # pylint: disable=attribute-defined-outside-init
return dict_