Source code for pcapkit.foundation.registry.protocols

# -*- coding: utf-8 -*-
"""Protocol Registries
=========================

.. module:: pcapkit.foundation.registry.protocols

This module provides the protocol registries for :mod:`pcapkit`.

"""
from typing import TYPE_CHECKING, cast, overload

from pcapkit.const.reg.apptype import AppType as Enum_AppType
from pcapkit.const.reg.apptype import TransportProtocol
from pcapkit.corekit.module import ModuleDescriptor
from pcapkit.protocols import __proto__ as protocol_registry
from pcapkit.protocols.application.httpv2 import HTTP as HTTPv2
from pcapkit.protocols.internet.hip import HIP
from pcapkit.protocols.internet.hopopt import HOPOPT
from pcapkit.protocols.internet.internet import Internet
from pcapkit.protocols.internet.ipv4 import IPv4
from pcapkit.protocols.internet.ipv6_opts import IPv6_Opts
from pcapkit.protocols.internet.ipv6_route import IPv6_Route
from pcapkit.protocols.internet.mh import MH
from pcapkit.protocols.link.link import Link
from pcapkit.protocols.misc.pcap.frame import Frame
from pcapkit.protocols.misc.pcapng import PCAPNG
from pcapkit.protocols.protocol import ProtocolBase as Protocol
from pcapkit.protocols.schema.application.httpv2 import FrameType as Schema_HTTP_FrameType
from pcapkit.protocols.schema.internet.hip import Parameter as Schema_HIP_Parameter
from pcapkit.protocols.schema.internet.hopopt import Option as Schema_HOPOPT_Option
from pcapkit.protocols.schema.internet.ipv4 import Option as Schema_IPv4_Option
from pcapkit.protocols.schema.internet.ipv6_opts import Option as Schema_IPv6_Opts_Option
from pcapkit.protocols.schema.internet.ipv6_route import \
    RoutingType as Schema_IPv6_Route_RoutingType
from pcapkit.protocols.schema.internet.mh import CGAExtension as Schema_MH_CGAExtension
from pcapkit.protocols.schema.internet.mh import Option as Schema_MH_Option
from pcapkit.protocols.schema.internet.mh import Packet as Schema_MH_Packet
from pcapkit.protocols.schema.misc.pcapng import BlockType as Schema_PCAPNG_BlockType
from pcapkit.protocols.schema.misc.pcapng import DSBSecrets as Schema_PCAPNG_DSBSecrets
from pcapkit.protocols.schema.misc.pcapng import \
    NameResolutionRecord as Schema_PCAPNG_NameResolutionRecord
from pcapkit.protocols.schema.misc.pcapng import Option as Schema_PCAPNG_Option
from pcapkit.protocols.schema.transport.tcp import MPTCP as Schema_TCP_MPTCP
from pcapkit.protocols.schema.transport.tcp import Option as Schema_TCP_Option
from pcapkit.protocols.transport.tcp import TCP
from pcapkit.protocols.transport.udp import UDP
from pcapkit.utilities.exceptions import RegistryError
from pcapkit.utilities.logging import logger

if TYPE_CHECKING:
    from typing import Optional, Type

    from pcapkit.const.hip.parameter import Parameter as HIP_Parameter
    from pcapkit.const.http.frame import Frame as HTTP_Frame
    from pcapkit.const.ipv4.option_number import OptionNumber as IPv4_OptionNumber
    from pcapkit.const.ipv6.option import Option as IPv6_Option
    from pcapkit.const.ipv6.routing import Routing as IPv6_Routing
    from pcapkit.const.mh.cga_extension import CGAExtension as MH_CGAExtension
    from pcapkit.const.mh.option import Option as MH_Option
    from pcapkit.const.mh.packet import Packet as MH_Packet
    from pcapkit.const.pcapng.block_type import BlockType as PCAPNG_BlockType
    from pcapkit.const.pcapng.option_type import OptionType as PCAPNG_OptionType
    from pcapkit.const.pcapng.record_type import RecordType as PCAPNG_RecordType
    from pcapkit.const.pcapng.secrets_type import SecretsType as PCAPNG_SecretsType
    from pcapkit.const.reg.ethertype import EtherType
    from pcapkit.const.reg.linktype import LinkType
    from pcapkit.const.reg.transtype import TransType
    from pcapkit.const.tcp.mp_tcp_option import MPTCPOption as TCP_MPTCPOption
    from pcapkit.const.tcp.option import Option as TCP_Option
    from pcapkit.protocols.application.httpv2 import FrameConstructor as HTTP_FrameConstructor
    from pcapkit.protocols.application.httpv2 import FrameParser as HTTP_FrameParser
    from pcapkit.protocols.internet.hip import ParameterConstructor as HIP_ParameterConstructor
    from pcapkit.protocols.internet.hip import ParameterParser as HIP_ParameterParser
    from pcapkit.protocols.internet.hopopt import OptionConstructor as HOPOPT_OptionConstructor
    from pcapkit.protocols.internet.hopopt import OptionParser as HOPOPT_OptionParser
    from pcapkit.protocols.internet.ipv4 import OptionConstructor as IPv4_OptionConstructor
    from pcapkit.protocols.internet.ipv4 import OptionParser as IPv4_OptionParser
    from pcapkit.protocols.internet.ipv6_opts import \
        OptionConstructor as IPv6_Opts_OptionConstructor
    from pcapkit.protocols.internet.ipv6_opts import OptionParser as IPv6_Opts_OptionParser
    from pcapkit.protocols.internet.ipv6_route import TypeConstructor as IPv6_Route_TypeConstructor
    from pcapkit.protocols.internet.ipv6_route import TypeParser as IPv6_Route_TypeParser
    from pcapkit.protocols.internet.mh import ExtensionConstructor as MH_ExtensionConstructor
    from pcapkit.protocols.internet.mh import ExtensionParser as MH_ExtensionParser
    from pcapkit.protocols.internet.mh import OptionConstructor as MH_OptionConstructor
    from pcapkit.protocols.internet.mh import OptionParser as MH_OptionParser
    from pcapkit.protocols.internet.mh import PacketConstructor as MH_PacketConstructor
    from pcapkit.protocols.internet.mh import PacketParser as MH_PacketParser
    from pcapkit.protocols.misc.pcapng import BlockConstructor as PCAPNG_BlockConstructor
    from pcapkit.protocols.misc.pcapng import BlockParser as PCAPNG_BlockParser
    from pcapkit.protocols.misc.pcapng import OptionConstructor as PCAPNG_OptionConstructor
    from pcapkit.protocols.misc.pcapng import OptionParser as PCAPNG_OptionParser
    from pcapkit.protocols.misc.pcapng import RecordConstructor as PCAPNG_RecordConstructor
    from pcapkit.protocols.misc.pcapng import RecordParser as PCAPNG_RecordParser
    from pcapkit.protocols.misc.pcapng import SecretsConstructor as PCAPNG_SecretsConstructor
    from pcapkit.protocols.misc.pcapng import SecretsParser as PCAPNG_SecretsParser
    from pcapkit.protocols.transport.tcp import MPOptionConstructor as TCP_MPOptionConstructor
    from pcapkit.protocols.transport.tcp import MPOptionParser as TCP_MPOptionParser
    from pcapkit.protocols.transport.tcp import OptionConstructor as TCP_OptionConstructor
    from pcapkit.protocols.transport.tcp import OptionParser as TCP_OptionParser

__all__ = [
    'register_protocol',

    'register_linktype',
    'register_pcap', 'register_pcapng',

    'register_ethertype',

    'register_transtype',
    'register_ipv4_option', 'register_hip_parameter', 'register_hopopt_option',
    'register_ipv6_opts_option', 'register_ipv6_route_routing',
    'register_mh_message', 'register_mh_option', 'register_mh_extension',

    'register_apptype',
    'register_tcp', 'register_udp',
    'register_tcp_option', 'register_tcp_mp_option',

    'register_http_frame',

    'register_pcapng_block', 'register_pcapng_option', 'register_pcapng_secrets',
    'register_pcapng_record',
]

NULL = '(null)'


# NOTE: pcapkit.protocols.__proto__
[docs] def register_protocol(protocol: 'Type[Protocol]') -> 'None': """Registered protocol class. The protocol class must be a subclass of :class:`~pcapkit.protocols.protocol.Protocol`, and will be registered to the :data:`pcapkit.protocols.__proto__` registry. Args: protocol: Protocol class. """ if not issubclass(protocol, Protocol): raise RegistryError(f'protocol must be a Protocol subclass, not {protocol!r}') protocol_registry[protocol.__name__.upper()] = protocol logger.info('registered protocol: %s', protocol.__name__)
############################################################################### # Top-Level Registries ############################################################################### @overload def register_linktype(code: 'LinkType', module: 'ModuleDescriptor[Protocol] | Type[Protocol]') -> 'None': ... @overload def register_linktype(code: 'LinkType', module: 'str', class_: 'str') -> 'None': ...
[docs] def register_linktype(code: 'LinkType', module: 'str | ModuleDescriptor[Protocol] | Type[Protocol]', class_: 'str' = NULL) -> 'None': r"""Register a new protocol class. Notes: The full qualified class name of the new protocol class should be as ``{module}.{class_}``. The function will register the given protocol class to the following registries: - :data:`pcapkit.protocols.misc.pcap.frame.Frame.__proto__` - :data:`pcapkit.protocols.misc.pcapng.PCAPNG.__proto__` Arguments: code: protocol code as in :class:`~pcapkit.const.reg.linktype.LinkType` module: module name or module descriptor or a :class:`~pcapkit.protocols.protocol.Protocol` subclass class\_: class name See Also: * :func:`pcapkit.foundation.registry.protocols.register_pcap` * :func:`pcapkit.foundation.registry.protocols.register_pcapng` """ if isinstance(module, str): module = cast('ModuleDescriptor[Protocol]', ModuleDescriptor(module, class_)) Frame.register(code, module) PCAPNG.register(code, module) logger.info('registered linktype protocol: %s', code.name) # register protocol to protocol registry if isinstance(module, ModuleDescriptor): module = module.klass register_protocol(module)
@overload def register_pcap(code: 'LinkType', module: 'ModuleDescriptor[Protocol] | Type[Protocol]') -> 'None': ... @overload def register_pcap(code: 'LinkType', module: 'str', class_: 'str') -> 'None': ... # NOTE: pcapkit.protocols.misc.pcap.frame.Frame.__proto__
[docs] def register_pcap(code: 'LinkType', module: 'str | ModuleDescriptor[Protocol] | Type[Protocol]', class_: 'str' = NULL) -> 'None': r"""Register a new protocol class. Notes: The full qualified class name of the new protocol class should be as ``{module}.{class_}``. The function will register the given protocol class to the :data:`pcapkit.protocols.misc.pcap.frame.Frame.__proto__` registry. Arguments: code: protocol code as in :class:`~pcapkit.const.reg.linktype.LinkType` module: module name or module descriptor or a :class:`~pcapkit.protocols.protocol.Protocol` subclass class\_: class name """ if isinstance(module, str): module = cast('ModuleDescriptor[Protocol]', ModuleDescriptor(module, class_)) Frame.register(code, module) logger.info('registered PCAP linktype protocol: %s', code.name) # register protocol to protocol registry if isinstance(module, ModuleDescriptor): module = module.klass register_protocol(module)
@overload def register_pcapng(code: 'LinkType', module: 'ModuleDescriptor[Protocol] | Type[Protocol]') -> 'None': ... @overload def register_pcapng(code: 'LinkType', module: 'str', class_: 'str') -> 'None': ... # NOTE: pcapkit.protocols.misc.pcapng.PCAPNG.__proto__
[docs] def register_pcapng(code: 'LinkType', module: 'str | ModuleDescriptor[Protocol] | Type[Protocol]', class_: 'str' = NULL) -> 'None': r"""Register a new protocol class. Notes: The full qualified class name of the new protocol class should be as ``{module}.{class_}``. The function will register the given protocol class to the :data:`pcapkit.protocols.misc.pcapng.PCAPNG.__proto__` registry. Arguments: code: protocol code as in :class:`~pcapkit.const.reg.linktype.LinkType` module: module name or module descriptor or a :class:`~pcapkit.protocols.protocol.Protocol` subclass class\_: class name """ if isinstance(module, str): module = cast('ModuleDescriptor[Protocol]', ModuleDescriptor(module, class_)) PCAPNG.register(code, module) logger.info('registered PCAP-NG linktype protocol: %s', code.name) # register protocol to protocol registry if isinstance(module, ModuleDescriptor): module = module.klass register_protocol(module)
############################################################################### # Link Layer Registries ############################################################################### @overload def register_ethertype(code: 'EtherType', module: 'ModuleDescriptor[Protocol] | Type[Protocol]') -> 'None': ... @overload def register_ethertype(code: 'EtherType', module: 'str', class_: 'str') -> 'None': ... # NOTE: pcapkit.protocols.link.link.Link.__proto__
[docs] def register_ethertype(code: 'EtherType', module: 'str | ModuleDescriptor[Protocol] | Type[Protocol]', class_: 'str' = NULL) -> 'None': r"""Register a new protocol class. Notes: The full qualified class name of the new protocol class should be as ``{module}.{class_}``. The function will register the given protocol class to the :data:`pcapkit.protocols.link.link.Link.__proto__` registry. Arguments: code: protocol code as in :class:`~pcapkit.const.reg.ethertype.EtherType` module: module name or module descriptor or a :class:`~pcapkit.protocols.protocol.Protocol` subclass class\_: class name """ if isinstance(module, str): module = cast('ModuleDescriptor[Protocol]', ModuleDescriptor(module, class_)) Link.register(code, module) logger.info('registered ethertype protocol: %s', code.name) # register protocol to protocol registry if isinstance(module, ModuleDescriptor): module = module.klass register_protocol(module)
############################################################################### # Internet Layer Registries ############################################################################### @overload def register_transtype(code: 'TransType', module: 'ModuleDescriptor[Protocol] | Type[Protocol]') -> 'None': ... @overload def register_transtype(code: 'TransType', module: 'str', class_: 'str') -> 'None': ... # NOTE: pcapkit.protocols.internet.internet.Internet.__proto__
[docs] def register_transtype(code: 'TransType', module: 'str | ModuleDescriptor[Protocol] | Type[Protocol]', class_: 'str' = NULL) -> 'None': r"""Register a new protocol class. Notes: The full qualified class name of the new protocol class should be as ``{module}.{class_}``. The function will register the given protocol class to the :data:`pcapkit.protocols.internet.internet.Internet.__proto__` registry. Arguments: code: protocol code as in :class:`~pcapkit.const.reg.transtype.TransType` module: module name or module descriptor or a :class:`~pcapkit.protocols.protocol.Protocol` subclass class\_: class name """ if isinstance(module, str): module = cast('ModuleDescriptor[Protocol]', ModuleDescriptor(module, class_)) Internet.register(code, module) logger.info('registered transtype protocol: %s', code.name) # register protocol to protocol registry if isinstance(module, ModuleDescriptor): module = module.klass register_protocol(module)
# NOTE: pcapkit.protocols.internet.internet.IPv4
[docs] def register_ipv4_option(code: 'IPv4_OptionNumber', meth: 'str | tuple[IPv4_OptionParser, IPv4_OptionConstructor]', *, schema: 'Optional[Type[Schema_IPv4_Option]]' = None) -> 'None': """Register an option parser. The function will register the given option parser to the :data:`pcapkit.protocols.internet.internet.IPv4` internal registry. Args: code: :class:`IPv4 <pcapkit.protocols.internet.ipv4.IPv4>` option code as in :class:`~pcapkit.const.ipv4.option_number.OptionNumber`. meth: Method name or callable to parse and/or construct the option. schema: :class:`~pcapkit.protocols.schema.schema.Schema` class for the option. It should be a subclass of :class:`pcapkit.protocols.schema.internet.ipv4.Option`. """ if isinstance(meth, str) and not hasattr(IPv4, f'_read_opt_{meth}'): raise RegistryError('method must be a valid IPv4 option parser function') IPv4.register_option(code, meth) if schema is not None: Schema_IPv4_Option.register(code, schema) logger.info('registered IPv4 option parser: %s', code.name)
# NOTE: pcapkit.protocols.internet.hip.HIP
[docs] def register_hip_parameter(code: 'HIP_Parameter', meth: 'str | tuple[HIP_ParameterParser, HIP_ParameterConstructor]', *, schema: 'Optional[Type[Schema_HIP_Parameter]]' = None) -> 'None': """Register a parameter parser. The function will register the given parameter parser to the :data:`pcapkit.protocols.internet.hip.HIP` internal registry. Args: code: :class:`~pcapkit.protocols.internet.hip.HIP` parameter code as in :class:`~pcapkit.const.hip.parameter.Parameter`. meth: Method name or callable to parse and/or construct the parameter. schema: :class:`~pcapkit.protocols.schema.schema.Schema` class for the parameter. It should be a subclass of :class:`pcapkit.protocols.schema.internet.hip.Parameter`. """ if isinstance(meth, str) and not hasattr(HIP, f'_read_param_{meth}'): raise RegistryError('method must be a valid HIP parameter parser function') HIP.register_parameter(code, meth) if schema is not None: Schema_HIP_Parameter.register(code, schema) logger.info('registered HIP parameter parser: %s', code.name)
# NOTE: pcapkit.protocols.internet.hopopt.HOPOPT.__option__
[docs] def register_hopopt_option(code: 'IPv6_Option', meth: 'str | tuple[HOPOPT_OptionParser, HOPOPT_OptionConstructor]', *, schema: 'Optional[Type[Schema_HOPOPT_Option]]' = None) -> 'None': """Register an option parser. The function will register the given option parser to the :data:`pcapkit.protocols.internet.hopopt.HOPOPT.__option__` registry. Args: code: :class:`~pcapkit.protocols.internet.hopopt.HOPOPT` option code as in :class:`~pcapkit.const.ipv6.option.Option`. meth: Method name or callable to parse and/or construct the option. schema: :class:`~pcapkit.protocols.schema.schema.Schema` class for the option. It should be a subclass of :class:`pcapkit.protocols.schema.internet.hopopt.Option`. """ if isinstance(meth, str) and not hasattr(HOPOPT, f'_read_opt_{meth}'): raise RegistryError('method must be a valid HOPOPT option parser function') HOPOPT.register_option(code, meth) if schema is not None: Schema_HOPOPT_Option.register(code, schema) logger.info('registered HOPOPT option parser: %s', code.name)
# NOTE: pcapkit.protocols.internet.ipv6_opts.IPv6_Opts.__option__
[docs] def register_ipv6_opts_option(code: 'IPv6_Option', meth: 'str | tuple[IPv6_Opts_OptionParser, IPv6_Opts_OptionConstructor]', *, schema: 'Optional[Type[Schema_IPv6_Opts_Option]]' = None) -> 'None': """Register an option parser. The function will register the given option parser to the :data:`pcapkit.protocols.internet.ipv6_opts.IPv6_Opts.__option__` registry. Args: code: :class:`IPv6-Opts <pcapkit.protocols.internet.ipv6_opts.IPv6_Opts>` option code as in :class:`~pcapkit.const.ipv6.option.Option`. meth: Method name or callable to parse and/or construct the option. schema: :class:`~pcapkit.protocols.schema.schema.Schema` class for the option. It should be a subclass of :class:`pcapkit.protocols.schema.internet.ipv6_opts.Option`. """ if isinstance(meth, str) and not hasattr(IPv6_Opts, f'_read_opt_{meth}'): raise RegistryError('method must be a valid IPv6-Opts option parser function') IPv6_Opts.register_option(code, meth) if schema is not None: Schema_IPv6_Opts_Option.register(code, schema) logger.info('registered IPv6-Opts option parser: %s', code.name)
# NOTE: pcapkit.protocols.internet.ipv6_route.IPv6_Route.__routing__
[docs] def register_ipv6_route_routing(code: 'IPv6_Routing', meth: 'str | tuple[IPv6_Route_TypeParser, IPv6_Route_TypeConstructor]', *, schema: 'Optional[Type[Schema_IPv6_Route_RoutingType]]' = None) -> 'None': """Register a routing data parser. The function will register the given routing data parser to the :data:`pcapkit.protocols.internet.ipv6_route.IPv6_Route.__routing__` registry. Args: code: :class:`IPv6-Route <pcapkit.protocols.internet.ipv6_route.IPv6_Route>` data type code as in :class:`~pcapkit.const.ipv6.routing.Routing`. meth: Method name or callable to parse and/or construct the data. schema: :class:`~pcapkit.protocols.schema.schema.Schema` class for the routing data. It should be a subclass of :class:`pcapkit.protocols.schema.internet.ipv6_route.RoutingType`. """ if isinstance(meth, str) and not hasattr(IPv6_Route, f'_read_data_type_{meth}'): raise RegistryError('method must be a valid IPv6-Route routing data parser function') IPv6_Route.register_routing(code, meth) if schema is not None: Schema_IPv6_Route_RoutingType.register(code, schema) logger.info('registered IPv6-Route routing data parser: %s', code.name)
# NOTE: pcapkit.protocols.internet.mh.MH.__message__
[docs] def register_mh_message(code: 'MH_Packet', meth: 'str | tuple[MH_PacketParser, MH_PacketConstructor]', *, schema: 'Optional[Type[Schema_MH_Packet]]' = None) -> 'None': """Register a MH message type parser. The function will register the given message type parser to the :data:`pcapkit.protocols.internet.mh.MH.__message__` registry. Args: code: :class:`~pcapkit.protocols.internet.mh.MH>` data type code as in :class:`~pcapkit.const.mh.packet.Packet`. meth: Method name or callable to parse and/or construct the data. schema: :class:`~pcapkit.protocols.schema.schema.Schema` class for the message type. It should be a subclass of :class:`pcapkit.protocols.schema.internet.mh.Packet`. """ if isinstance(meth, str) and not hasattr(MH, f'_read_msg_{meth}'): raise RegistryError('method must be a valid MH message type parser function') MH.register_message(code, meth) if schema is not None: Schema_MH_Packet.register(code, schema) logger.info('registered MH message type parser: %s', code.name)
# NOTE: pcapkit.protocols.internet.mh.MH.__option__
[docs] def register_mh_option(code: 'MH_Option', meth: 'str | tuple[MH_OptionParser, MH_OptionConstructor]', *, schema: 'Optional[Type[Schema_MH_Option]]' = None) -> 'None': """Register a MH option parser. The function will register the given option parser to the :data:`pcapkit.protocols.internet.mh.MH.__option__` registry. Args: code: :class:`~pcapkit.protocols.internet.mh.MH>` data type code as in :class:`~pcapkit.const.mh.option.Option`. meth: Method name or callable to parse and/or construct the data. schema: :class:`~pcapkit.protocols.schema.schema.Schema` class for the message type. It should be a subclass of :class:`pcapkit.protocols.schema.internet.mh.Option`. """ if isinstance(meth, str) and not hasattr(MH, f'_read_opt_{meth}'): raise RegistryError('method must be a valid MH option parser function') MH.register_option(code, meth) if schema is not None: Schema_MH_Option.register(code, schema) logger.info('registered MH option parser: %s', code.name)
# NOTE: pcapkit.protocols.internet.mh.MH.__extension__
[docs] def register_mh_extension(code: 'MH_CGAExtension', meth: 'str | tuple[MH_ExtensionParser, MH_ExtensionConstructor]', schema: 'Optional[Type[Schema_MH_CGAExtension]]' = None) -> 'None': """Register a CGA extension parser. The function will register the given CGA extension to the :data:`pcapkit.protocols.internet.mh.MH.__extension__` registry. Args: code: :class:`~pcapkit.protocols.internet.mh.MH>` data type code as in :class:`~pcapkit.const.mh.cga_extension.CGAExtension`. meth: Method name or callable to parse and/or construct the data. schema: :class:`~pcapkit.protocols.schema.schema.Schema` class for the message type. It should be a subclass of :class:`pcapkit.protocols.schema.internet.mh.CGAExtension`. """ if isinstance(meth, str) and not hasattr(MH, f'_read_ext_{meth}'): raise RegistryError('method must be a valid MH CGA extension function') MH.register_extension(code, meth) if schema is not None: Schema_MH_CGAExtension.register(code, schema) logger.info('registered MH CGA extension: %s', code.name)
############################################################################### # Transport Layer Registries ############################################################################### @overload def register_apptype(code: 'int', module: 'ModuleDescriptor[Protocol] | Type[Protocol]', *, proto: 'TransportProtocol | str') -> 'None': ... @overload def register_apptype(code: 'Enum_AppType', module: 'ModuleDescriptor[Protocol] | Type[Protocol]', *, proto: 'TransportProtocol | str' = ...) -> 'None': ... @overload def register_apptype(code: 'int', module: 'str', class_: 'str', *, proto: 'TransportProtocol | str') -> 'None': ... @overload def register_apptype(code: 'Enum_AppType', module: 'str', class_: 'str', *, proto: 'TransportProtocol | str' = ...) -> 'None': ...
[docs] def register_apptype(code: 'int | Enum_AppType', module: 'str | ModuleDescriptor[Protocol] | Type[Protocol]', class_: 'str' = NULL, *, proto: 'TransportProtocol | str' = NULL) -> 'None': r"""Register a new protocol class. Notes: The full qualified class name of the new protocol class should be as ``{module}.{class_}``. The function will register the given protocol class to the :data:`pcapkit.protocols.transport.tcp.TCP.__proto__` and/or :data:`pcapkit.protocols.transport.udp.UDP.__proto__` registry. Arguments: code: port number module: module name or module descriptor or a :class:`~pcapkit.protocols.protocol.Protocol` subclass class\_: class name proto: protocol name (must be a valid transport protocol) See Also: * :func:`pcapkit.foundation.registry.register_tcp` * :func:`pcapkit.foundation.registry.register_udp` """ if isinstance(code, Enum_AppType): if proto is NULL: proto = code.proto code = code.port if isinstance(module, str): module = cast('ModuleDescriptor[Protocol]', ModuleDescriptor(module, class_)) _reg = False if isinstance(proto, str): proto = TransportProtocol.get(proto.lower()) for test, cls in cast('dict[TransportProtocol, Type[Protocol]]', { TransportProtocol.tcp: TCP, TransportProtocol.udp: UDP, }).items(): if test not in proto: continue cls.register(code, module) logger.info('registered %s port: %s', test.name, code) _reg = True if not _reg: raise RegistryError(f'unknown transport protocol: {proto.name}') # register protocol to protocol registry if isinstance(module, ModuleDescriptor): module = module.klass register_protocol(module)
@overload def register_tcp(code: 'int | Enum_AppType', module: 'ModuleDescriptor[Protocol] | Type[Protocol]') -> 'None': ... @overload def register_tcp(code: 'int | Enum_AppType', module: 'str', class_: 'str') -> 'None': ... # NOTE: pcapkit.protocols.transport.tcp.TCP.__proto__
[docs] def register_tcp(code: 'int | Enum_AppType', module: 'str | ModuleDescriptor[Protocol] | Type[Protocol]', class_: 'str' = NULL) -> 'None': r"""Register a new protocol class. Notes: The full qualified class name of the new protocol class should be as ``{module}.{class_}``. The function will register the given protocol class to the :data:`pcapkit.protocols.transport.tcp.TCP.__proto__` registry. Arguments: code: port number module: module name or module descriptor or a :class:`~pcapkit.protocols.protocol.Protocol` subclass class\_: class name """ if isinstance(code, Enum_AppType): code = code.port if isinstance(module, str): module = cast('ModuleDescriptor[Protocol]', ModuleDescriptor(module, class_)) TCP.register(code, module) logger.info('registered TCP port: %s', code) # register protocol to protocol registry if isinstance(module, ModuleDescriptor): module = module.klass register_protocol(module)
# NOTE: pcapkit.protocols.transport.tcp.TCP.__option__
[docs] def register_tcp_option(code: 'TCP_Option', meth: 'str | tuple[TCP_OptionParser, TCP_OptionConstructor]', *, schema: 'Optional[Type[Schema_TCP_Option]]' = None) -> 'None': """Register an option parser. The function will register the given option parser to the :data:`pcapkit.protocols.transport.tcp.TCP.__option__` registry. Args: code: :class:`~pcapkit.protocols.transport.tcp.TCP` option code as in :class:`~pcapkit.const.tcp.option.Option`. meth: Method name or callable to parse and/or construct the option. schema: :class:`~pcapkit.protocols.schema.schema.Schema` class for the option. It should be a subclass of :class:`pcapkit.protocols.schema.transport.tcp.Option`. """ if isinstance(meth, str) and not hasattr(TCP, f'_read_mode_{meth}'): raise RegistryError('method must be a TCP option parser function') TCP.register_option(code, meth) if schema is not None: Schema_TCP_Option.register(code, schema) logger.info('registered TCP option parser: %s', code.name)
# NOTE: pcapkit.protocols.transport.tcp.TCP.__mp_option__
[docs] def register_tcp_mp_option(code: 'TCP_MPTCPOption', meth: 'str | tuple[TCP_MPOptionParser, TCP_MPOptionConstructor]', *, schema: 'Optional[Type[Schema_TCP_MPTCP]]' = None) -> 'None': """Register an MPTCP option parser. The function will register the given option parser to the :data:`pcapkit.protocols.transport.tcp.TCP.__mp_option__` registry. Args: code: Multipath :class:`~pcapkit.protocols.transport.tcp.TCP` option code as in :class:`~pcapkit.const.tcp.mp_tcp_option.MPTCPOption`. meth: Method name or callable to parse and/or construct the option. schema: :class:`~pcapkit.protocols.schema.schema.Schema` class for the option. It should be a subclass of :class:`pcapkit.protocols.schema.transport.tcp.MPTCP`. """ if isinstance(meth, str) and not hasattr(TCP, f'_read_mptcp_{meth}'): raise RegistryError('method must be a MPTCP option parser function') TCP.register_mp_option(code, meth) if schema is not None: Schema_TCP_MPTCP.register(code, schema) logger.info('registered MPTCP option parser: %s', code.name)
@overload def register_udp(code: 'int | Enum_AppType', module: 'ModuleDescriptor[Protocol] | Type[Protocol]') -> 'None': ... @overload def register_udp(code: 'int | Enum_AppType', module: 'str', class_: 'str') -> 'None': ... # NOTE: pcapkit.protocols.transport.udp.UDP.__proto__
[docs] def register_udp(code: 'int | Enum_AppType', module: 'str | ModuleDescriptor[Protocol] | Type[Protocol]', class_: 'str' = NULL) -> 'None': r"""Register a new protocol class. Notes: The full qualified class name of the new protocol class should be as ``{module}.{class_}``. The function will register the given protocol class to the :data:`pcapkit.protocols.transport.udp.UDP.__proto__` registry. Arguments: code: port number module: module name or module descriptor or a :class:`~pcapkit.protocols.protocol.Protocol` subclass class\_: class name """ if isinstance(code, Enum_AppType): code = code.port if isinstance(module, str): module = cast('ModuleDescriptor[Protocol]', ModuleDescriptor(module, class_)) UDP.register(code, module) logger.info('registered UDP port: %s', code) # register protocol to protocol registry if isinstance(module, ModuleDescriptor): module = module.klass register_protocol(module)
############################################################################### # Application Layer Registries ############################################################################### # NOTE: pcapkit.protocols.application.httpv2.HTTPv2.__frame__
[docs] def register_http_frame(code: 'HTTP_Frame', meth: 'str | tuple[HTTP_FrameParser, HTTP_FrameConstructor]', *, schema: 'Optional[Type[Schema_HTTP_FrameType]]' = None) -> 'None': """Registered a frame parser. The function will register the given frame parser to the :data:`pcapkit.protocols.application.httpv2.HTTP.__frame__` registry. Args: code: :class:`HTTP/2 <pcapkit.protocols.application.httpv2.HTTP>` frame type code as in :class:`~pcapkit.const.http.frame.Frame`. meth: Method name or callable to parse and/or construct the frame. schema: :class:`~pcapkit.protocols.schema.schema.Schema` class for the frame. It should be a subclass of :class:`pcapkit.protocols.schema.application.httpv2.FrameType`. """ if isinstance(meth, str) and not hasattr(HTTPv2, f'_read_http_{meth}'): raise RegistryError('method must be a frame parser function') HTTPv2.register_frame(code, meth) if schema is not None: Schema_HTTP_FrameType.register(code, schema) logger.info('registered HTTP/2 frame parser: %s', code.name)
############################################################################### # Miscellaneous Protocol Registries ############################################################################### # NOTE: pcapkit.protocols.misc.pcapng.PCAPNG.__block__
[docs] def register_pcapng_block(code: 'PCAPNG_BlockType', meth: 'str | tuple[PCAPNG_BlockParser, PCAPNG_BlockConstructor]', *, schema: 'Optional[Type[Schema_PCAPNG_BlockType]]' = None) -> 'None': """Registered a block parser. The function will register the given block parser to the :data:`pcapkit.protocols.misc.pcapng.PCAPNG.__block__` registry. Args: code: :class:`HTTP/2 <pcapkit.protocols.misc.pcapng.PCAPNG>` block type code as in :class:`~pcapkit.const.pcapng.block_type.BlockType`. meth: Method name or callable to parse and/or construct the block. schema: :class:`~pcapkit.protocols.schema.schema.Schema` class for the block. It should be a subclass of :class:`pcapkit.protocols.schema.misc.pcapng.BlockType`. """ if isinstance(meth, str) and not hasattr(PCAPNG, f'_read_block_{meth}'): raise RegistryError('method must be a block parser function') PCAPNG.register_block(code, meth) if schema is not None: Schema_PCAPNG_BlockType.register(code, schema) logger.info('registered PCAP-NG block parser: %s', code.name)
# NOTE: pcapkit.protocols.misc.pcapng.PCAPNG.__option__
[docs] def register_pcapng_option(code: 'PCAPNG_OptionType', meth: 'str | tuple[PCAPNG_OptionParser, PCAPNG_OptionConstructor]', *, schema: 'Optional[Type[Schema_PCAPNG_Option]]' = None) -> 'None': """Registered a option parser. The function will register the given option parser to the :data:`pcapkit.protocols.misc.pcapng.PCAPNG.__option__` registry. Args: code: :class:`PCAPNG <pcapkit.protocols.misc.pcapng.PCAPNG>` option type code as in :class:`~pcapkit.const.pcapng.option_type.OptionType`. meth: Method name or callable to parse and/or construct the option. schema: :class:`~pcapkit.protocols.schema.schema.Schema` class for the option. It should be a subclass of :class:`pcapkit.protocols.schema.misc.pcapng.Option`. """ if isinstance(meth, str) and not hasattr(PCAPNG, f'_read_option_{meth}'): raise RegistryError('method must be a option parser function') PCAPNG.register_option(code, meth) if schema is not None: Schema_PCAPNG_Option.register(code, schema) logger.info('registered PCAP-NG option parser: %s', code.name)
# NOTE: pcapkit.protocols.misc.pcapng.PCAPNG.__record__
[docs] def register_pcapng_record(code: 'PCAPNG_RecordType', meth: 'str | tuple[PCAPNG_RecordParser, PCAPNG_RecordConstructor]', *, schema: 'Optional[Type[Schema_PCAPNG_NameResolutionRecord]]' = None) -> 'None': """Registered a name resolution record parser. The function will register the given name resolution record parser to the :data:`pcapkit.protocols.misc.pcapng.PCAPNG.__record__` registry. Args: code: :class:`PCAPNG <pcapkit.protocols.misc.pcapng.PCAPNG>` name resolution record type code as in :class:`~pcapkit.const.pcapng.record_type.RecordType`. meth: Method name or callable to parse and/or construct the name resolution record. schema: :class:`~pcapkit.protocols.schema.schema.Schema` class for the name resolution record. It should be a subclass of :class:`pcapkit.protocols.schema.misc.pcapng.NameResolutionRecord`. """ if isinstance(meth, str) and not hasattr(PCAPNG, f'_read_record_{meth}'): raise RegistryError('method must be a name resolution record parser function') PCAPNG.register_record(code, meth) if schema is not None: Schema_PCAPNG_NameResolutionRecord.register(code, schema) logger.info('registered PCAP-NG name resolution record parser: %s', code.name)
# NOTE: pcapkit.protocols.misc.pcapng.PCAPNG.__secrets__
[docs] def register_pcapng_secrets(code: 'PCAPNG_SecretsType', meth: 'str | tuple[PCAPNG_SecretsParser, PCAPNG_SecretsConstructor]', *, schema: 'Optional[Type[Schema_PCAPNG_DSBSecrets]]' = None) -> 'None': """Registered a decryption secrets parser. The function will register the given decryption secrets parser to the :data:`pcapkit.protocols.misc.pcapng.PCAPNG.__secrets__` registry. Args: code: :class:`PCAPNG <pcapkit.protocols.misc.pcapng.PCAPNG>` decryption secrets type code as in :class:`~pcapkit.const.pcapng.secrets_type.SecretsType`. meth: Method name or callable to parse and/or construct the decryption secrets. schema: :class:`~pcapkit.protocols.schema.schema.Schema` class for the decryption secrets. It should be a subclass of :class:`pcapkit.protocols.schema.misc.pcapng.DSBSecrets`. """ if isinstance(meth, str) and not hasattr(PCAPNG, f'_read_secrets_{meth}'): raise RegistryError('method must be a decryption secrets parser function') PCAPNG.register_secrets(code, meth) if schema is not None: Schema_PCAPNG_DSBSecrets.register(code, schema) logger.info('registered PCAP-NG decryption secrets parser: %s', code.name)