# -*- coding: utf-8 -*-
"""Scapy Tools
=================
.. module:: pcapkit.toolkit.scapy
:mod:`pcapkit.toolkit.scapy` contains all you need for
:mod:`pcapkit` handy usage with `Scapy`_ engine. All reforming
functions returns with a flag to indicate if usable for
its caller.
.. _Scapy: https://scapy.net
.. warning::
This module requires installed `Scapy`_ engine.
"""
import ipaddress
import time
from typing import TYPE_CHECKING, cast
from pcapkit.const.reg.linktype import LinkType as Enum_LinkType
from pcapkit.const.reg.transtype import TransType as Enum_TransType
from pcapkit.foundation.reassembly.data.ip import Packet as IP_Packet
from pcapkit.foundation.reassembly.data.tcp import Packet as TCP_Packet
from pcapkit.foundation.traceflow.data.tcp import Packet as TF_TCP_Packet
from pcapkit.utilities.compat import ModuleNotFoundError # pylint: disable=redefined-builtin
from pcapkit.utilities.exceptions import ModuleNotFound, stacklevel
from pcapkit.utilities.warnings import ScapyWarning, warn
try:
import scapy
except ModuleNotFoundError:
scapy = None
warn("dependency package 'Scapy' not found",
ScapyWarning, stacklevel=stacklevel())
if TYPE_CHECKING:
from ipaddress import IPv4Address, IPv6Address
from typing import Any
from scapy.layers.inet import IP, TCP
from scapy.layers.inet6 import IPv6
from scapy.packet import Packet
__all__ = [
'packet2chain', 'packet2dict',
'ipv4_reassembly', 'ipv6_reassembly', 'tcp_reassembly', 'tcp_traceflow'
]
[docs]
def packet2chain(packet: 'Packet') -> 'str':
"""Fetch Scapy packet protocol chain.
Args:
packet: Scapy packet.
Returns:
Colon (``:``) seperated list of protocol chain.
Raises:
ModuleNotFound: If `Scapy`_ is not installed.
"""
if scapy is None:
raise ModuleNotFound("No module named 'scapy'", name='scapy')
from scapy.packet import NoPayload
chain = [packet.name]
payload = packet.payload
while not isinstance(payload, NoPayload):
chain.append(payload.name)
payload = payload.payload
return ':'.join(chain)
[docs]
def packet2dict(packet: 'Packet') -> 'dict[str, Any]':
"""Convert Scapy packet into :obj:`dict`.
Args:
packet: Scapy packet.
Returns:
A :obj:`dict` mapping of packet data.
Raises:
ModuleNotFound: If `Scapy`_ is not installed.
"""
if scapy is None:
raise ModuleNotFound("No module named 'scapy'", name='scapy')
from scapy.packet import NoPayload
def wrapper(packet: 'Packet') -> 'dict[str, Any]':
dict_ = packet.fields
payload = packet.payload
if not isinstance(payload, NoPayload):
dict_[payload.name] = wrapper(payload)
return dict_
return {
'packet': bytes(packet),
packet.name: wrapper(packet),
}
[docs]
def ipv4_reassembly(packet: 'Packet', *, count: 'int' = -1) -> 'IP_Packet[IPv4Address] | None':
"""Make data for IPv4 reassembly.
Args:
packet: Scapy packet.
count: Packet index. If not provided, default to ``-1``.
Returns:
Data for IPv4 reassembly.
* If the ``packet`` can be used for IPv4 reassembly. A packet can be reassembled
if it contains IPv4 layer (:class:`scapy.layers.inet.IP`) and the **DF**
(:attr:`scapy.layers.inet.IP.flags.DF`) flag is :data:`False`.
* If the ``packet`` can be reassembled, then the :obj:`dict` mapping of data for IPv4
reassembly (:term:`reasm.ipv4.packet`) will be returned; otherwise, returns :data:`None`.
See Also:
:class:`pcapkit.foundation.reassembly.ipv4.IPv4`
"""
if 'IP' in packet:
ipv4 = cast('IP', packet['IP'])
if ipv4.flags.DF: # dismiss not fragmented packet
return None
data = IP_Packet(
bufid=(
cast('IPv4Address',
ipaddress.ip_address(ipv4.src)), # source IP address
cast('IPv4Address',
ipaddress.ip_address(ipv4.dst)), # destination IP address
ipv4.id, # identification
Enum_TransType.get(ipv4.proto), # payload protocol type
),
num=count, # original packet range number
fo=ipv4.frag, # fragment offset
ihl=ipv4.ihl, # internet header length
mf=bool(ipv4.flags.MF), # more fragment flag
tl=ipv4.len, # total length, header includes
header=ipv4.raw_packet_cache, # raw bytes type header
payload=bytearray(bytes(ipv4.payload)), # raw bytearray type payload
)
return data
return None
[docs]
def ipv6_reassembly(packet: 'Packet', *, count: 'int' = -1) -> 'IP_Packet[IPv6Address] | None':
"""Make data for IPv6 reassembly.
Args:
packet: Scapy packet.
count: Packet index. If not provided, default to ``-1``.
Returns:
Data for IPv6 reassembly.
* If the ``packet`` can be used for IPv6 reassembly. A packet can be reassembled
if it contains IPv6 layer (:class:`scapy.layers.inet6.IPv6`) and IPv6 Fragment
header (:rfc:`2460#section-4.5`, i.e., :class:`scapy.layers.inet6.IPv6ExtHdrFragment`).
* If the ``packet`` can be reassembled, then the :obj:`dict` mapping of data for IPv6
reassembly (:term:`reasm.ipv6.packet`) will be returned; otherwise, returns :data:`None`.
Raises:
ModuleNotFound: If `Scapy`_ is not installed.
See Also:
:class:`pcapkit.foundation.reassembly.ipv6.IPv6`
"""
if scapy is None:
raise ModuleNotFound("No module named 'scapy'", name='scapy')
from scapy.layers.inet6 import IPv6ExtHdrFragment
if 'IPv6' in packet:
ipv6 = cast('IPv6', packet['IPv6'])
if IPv6ExtHdrFragment not in ipv6: # pylint: disable=E1101
return None # dismiss not fragmented packet
ipv6_frag = cast('IPv6ExtHdrFragment', ipv6['IPv6ExtHdrFragment'])
data = IP_Packet(
bufid=(
cast('IPv6Address',
ipaddress.ip_address(ipv6.src)), # source IP address
cast('IPv6Address',
ipaddress.ip_address(ipv6.dst)), # destination IP address
ipv6.fl, # label
Enum_TransType.get(ipv6_frag.nh), # next header field in IPv6 Fragment Header
),
num=count, # original packet range number
fo=ipv6_frag.offset, # fragment offset
ihl=len(ipv6) - len(ipv6_frag), # header length, only headers before IPv6-Frag
mf=bool(ipv6_frag.m), # more fragment flag
tl=len(ipv6), # total length, header includes
header=bytes(ipv6)[:-len(ipv6_frag)], # raw bytes type header before IPv6-Frag
payload=bytearray(bytes(ipv6_frag.payload)), # raw bytearray type payload after IPv6-Frag
)
return data
return None
[docs]
def tcp_reassembly(packet: 'Packet', *, count: 'int' = -1) -> 'TCP_Packet | None':
"""Store data for TCP reassembly.
Args:
packet: Scapy packet.
count: Packet index. If not provided, default to ``-1``.
Returns:
Data for TCP reassembly.
* If the ``packet`` can be used for TCP reassembly. A packet can be reassembled
if it contains TCP layer (:class:`scapy.layers.inet.TCP`).
* If the ``packet`` can be reassembled, then the :obj:`dict` mapping of data for TCP
reassembly (:term:`reasm.tcp.packet`) will be returned; otherwise, returns :data:`None`.
See Also:
:class:`pcapkit.foundation.reassembly.tcp.TCP`
"""
if 'IP' in packet:
ip = cast('IP', packet['IP'])
elif 'IPv6' in packet:
ip = cast('IPv6', packet['IPv6'])
else:
return None
if 'TCP' in packet:
tcp = cast('TCP', packet['TCP'])
raw_len = len(tcp.payload) # payload length, header excludes
data = TCP_Packet(
bufid=(
ipaddress.ip_address(ip.src), # source IP address
tcp.sport, # source port
ipaddress.ip_address(ip.dst), # destination IP address
tcp.dport, # destination port
),
num=count, # original packet range number
ack=tcp.ack, # acknowledgement
dsn=tcp.seq, # data sequence number
syn=bool(tcp.flags.S), # synchronise flag
fin=bool(tcp.flags.F), # finish flag
rst=bool(tcp.flags.R), # reset connection flag
header=tcp.raw_packet_cache, # raw bytes type header
payload=bytearray(bytes(tcp.payload)), # raw bytearray type payload
first=tcp.seq, # this sequence number
last=tcp.seq + raw_len, # next (wanted) sequence number
len=raw_len, # payload length, header excludes
)
return data
return None
[docs]
def tcp_traceflow(packet: 'Packet', *, count: 'int' = -1) -> 'TF_TCP_Packet | None':
"""Trace packet flow for TCP.
Args:
packet: Scapy packet.
count: Packet index. If not provided, default to ``-1``.
Returns:
Data for TCP reassembly.
* If the ``packet`` can be used for TCP flow tracing. A packet can be reassembled
if it contains TCP layer (:class:`scapy.layers.inet.TCP`).
* If the ``packet`` can be reassembled, then the :obj:`dict` mapping of data for TCP
flow tracing (:term:`trace.tcp.packet`) will be returned; otherwise, returns :data:`None`.
See Also:
:class:`pcapkit.foundation.traceflow.tcp.TCP`
"""
if 'TCP' in packet:
ip = cast('IP', packet['IP']) if 'IP' in packet else cast('IPv6', packet['IPv6'])
tcp = cast('TCP', packet['TCP'])
data = TF_TCP_Packet( # type: ignore[type-var]
protocol=Enum_LinkType.get(packet.name.upper()), # data link type from global header
index=count, # frame number
frame=packet2dict(packet), # extracted packet
syn=bool(tcp.flags.S), # TCP synchronise (SYN) flag
fin=bool(tcp.flags.F), # TCP finish (FIN) flag
src=ipaddress.ip_address(ip.src), # source IP
dst=ipaddress.ip_address(ip.dst), # destination IP
srcport=tcp.sport, # TCP source port
dstport=tcp.dport, # TCP destination port
timestamp=time.time(), # timestamp
)
return data
return None