# -*- coding: utf-8 -*-"""PyShark Tools===================:mod:`pcapkit.toolkit.pyshark` contains all you need for:mod:`pcapkit` handy usage with `PyShark`_ engine. Allreforming functions returns with a flag to indicate ifusable for its caller... _PyShark: https://kiminewt.github.io/pyshark.. note:: Due to the lack of functionality of `PyShark`_, some functions of :mod:`pcapkit` may not be available with the `PyShark`_ engine."""importipaddressfromtypingimportTYPE_CHECKING,castfrompcapkit.const.reg.linktypeimportLinkTypeasEnum_LinkTypefrompcapkit.foundation.traceflow.data.tcpimportPacketasTF_TCP_PacketifTYPE_CHECKING:fromtypingimportAnyfrompyshark.packet.packetimportPacket__all__=['packet2dict','tcp_traceflow']
[docs]defpacket2dict(packet:'Packet')->'dict[str, Any]':"""Convert PyShark packet into :obj:`dict`. Args: packet: Scapy packet. Returns: A :obj:`dict` mapping of packet data. """dict_={}# type: dict[str, Any]frame=packet.frame_infoforfieldinframe.field_names:dict_[field]=getattr(frame,field)tempdict=dict_forlayerinpacket.layers:tempdict[layer.layer_name.upper()]={}tempdict=tempdict[layer.layer_name.upper()]forfieldinlayer.field_names:tempdict[field]=getattr(layer,field)returndict_
[docs]deftcp_traceflow(packet:'Packet')->'TF_TCP_Packet | None':"""Trace packet flow for TCP. Args: packet: Scapy packet. Returns: Tuple[bool, Dict[str, Any]]: A tuple of data for TCP reassembly. * If the ``packet`` can be used for TCP flow tracing. A packet can be reassembled if it contains TCP layer. * If the ``packet`` can be reassembled, then the :obj:`dict` mapping of data for TCP flow tracing (:term:`trace.tcp.packet`) will be returned; otherwise, returns :data:`None`. See Also: :class:`pcapkit.foundation.traceflow.tcp.TCP` """if'IP'inpacket:ip=cast('Packet',packet.ip)elif'IPv6'inpacket:ip=cast('Packet',packet.ipv6)else:returnNoneif'TCP'inpacket:tcp=cast('Packet',packet.tcp)data=TF_TCP_Packet(# type: ignore[type-var]protocol=Enum_LinkType.get(packet.layers[0].layer_name.upper()),# data link type from global headerindex=int(packet.number),# frame numberframe=packet2dict(packet),# extracted packetsyn=bool(int(tcp.flags_syn)),# TCP synchronise (SYN) flagfin=bool(int(tcp.flags_fin)),# TCP finish (FIN) flagsrc=ipaddress.ip_address(ip.src),# source IPdst=ipaddress.ip_address(ip.dst),# destination IPsrcport=int(tcp.srcport),# TCP source portdstport=int(tcp.dstport),# TCP destination porttimestamp=packet.frame_info.time_epoch,# timestamp)returndatareturnNone