# -*- coding: utf-8 -*-
"""Application Layer Protocol Numbers
========================================
.. module:: pcapkit.vendor.reg.apptype
This module contains the vendor crawler for **Application Layer Protocol Numbers**,
which is automatically generating :class:`pcapkit.const.reg.apptype.AppType`.
"""
import collections
import csv
import keyword
import re
import sys
import textwrap
from typing import TYPE_CHECKING, Callable
from pcapkit.vendor.default import Vendor
if TYPE_CHECKING:
from collections import Counter, OrderedDict
from typing import Callable
__all__ = ['AppType']
LINE = lambda NAME, DOCS, FLAG, ENUM, MISS, MODL: f'''\
# -*- coding: utf-8 -*-
# mypy: disable-error-code=assignment
# pylint: disable=line-too-long,consider-using-f-string
"""{(name := DOCS.split(' [', maxsplit=1)[0])}
{'=' * (len(name) + 6)}
.. module:: {MODL.replace('vendor', 'const')}
This module contains the constant enumeration for **{name}**,
which is automatically generated from :class:`{MODL}.{NAME}`.
"""
from collections import defaultdict
from typing import TYPE_CHECKING
from aenum import IntFlag, StrEnum, auto, extend_enum
from pcapkit.utilities.compat import show_flag_values
__all__ = ['{NAME}']
if TYPE_CHECKING:
from typing import Any, DefaultDict, Type
class TransportProtocol(IntFlag):
"""Transport layer protocol."""
undefined = 0
#: Transmission Control Protocol.
tcp = auto()
#: User Datagram Protocol.
udp = auto()
#: Stream Control Transmission Protocol.
sctp = auto()
#: Datagram Congestion Control Protocol.
dccp = auto()
@staticmethod
def get(key: 'int | str') -> 'TransportProtocol':
"""Backport support for original codes.
Args:
key: Key to get enum item.
:meta private:
"""
if isinstance(key, int):
return TransportProtocol(key)
if key.lower() in TransportProtocol.__members__:
return TransportProtocol[key.lower()] # type: ignore[misc]
max_val = max(TransportProtocol.__members__.values())
return extend_enum(TransportProtocol, key.lower(), max_val * 2)
class {NAME}(StrEnum):
"""[{NAME}] {DOCS}"""
if TYPE_CHECKING:
#: Service name.
svc: 'str'
#: Port number.
port: 'int'
#: Transport protocol.
proto: 'TransportProtocol'
#: Mapping of members based on transport protocol.
__members_proto__: 'DefaultDict[TransportProtocol, dict[int, {NAME}]]' = defaultdict(dict)
def __new__(cls, value: 'int', name: 'str' = '<null>',
proto: 'TransportProtocol' = TransportProtocol.undefined) -> 'Type[{NAME}]':
temp = '%s [%d - %s]' % (name, value, proto.name)
obj = str.__new__(cls, temp)
obj._value_ = temp
obj.svc = name
obj.port = value
obj.proto = proto
for namespace in show_flag_values(proto):
cls.__members_proto__[TransportProtocol(namespace)][value] = obj
if proto is TransportProtocol.undefined:
cls.__members_proto__[proto][value] = obj
return obj
def __repr__(self) -> 'str':
return "<%s.%s: %d [%s]>" % (self.__class__.__name__, self.svc, self.port, self.proto.name)
def __str__(self) -> 'str':
return '%s [%d - %s]' % (self.svc, self.port, self.proto.name)
def __int__(self) -> 'int':
return self.port
def __lt__(self, other: '{NAME}') -> 'bool':
return self.port < other
def __gt__(self, other: '{NAME}') -> 'bool':
return self.port > other
def __le__(self, other: '{NAME}') -> 'bool':
return self.port <= other
def __ge__(self, other: '{NAME}') -> 'bool':
return self.port >= other
def __eq__(self, other: 'Any') -> 'bool':
return self.port == other
def __ne__(self, other: 'Any') -> 'bool':
return self.port != other
def __hash__(self) -> 'int':
return hash(self.port)
{ENUM}
@staticmethod
def get(key: 'int | str', default: 'int' = -1, *,
proto: 'TransportProtocol | str' = TransportProtocol.undefined) -> '{NAME}':
"""Backport support for original codes.
Args:
key: Key to get enum item.
default: Default value if not found.
proto: Transport protocol of the enum item.
:meta private:
"""
if isinstance(key, int):
if isinstance(proto, str):
proto = TransportProtocol.get(proto.lower())
temp_ns = {NAME}.__members_proto__.get(proto, {{}})
if key in temp_ns:
return temp_ns[key]
try:
ret = {NAME}._missing_(key)
if ret is None:
raise ValueError
except ValueError:
ret = extend_enum({NAME}, 'PORT_%d_%s' % (key, proto.name), key, 'unknown', proto)
return ret
if key in {NAME}.__members_proto__:
return getattr({NAME}, key)
return extend_enum({NAME}, key, default, key)
@classmethod
def _missing_(cls, value: 'int') -> '{NAME}':
"""Lookup function used when value is not found.
Args:
value: Value to get enum item.
"""
if not ({FLAG}):
raise ValueError('%r is not a valid %s' % (value, cls.__name__))
if value in cls.__members_proto__.get(TransportProtocol.undefined, {{}}): # type: ignore[call-overload]
return cls.__members_proto__[TransportProtocol.undefined][value] # type: ignore[index]
{MISS}
{'' if ''.join(MISS.splitlines()[-1:]).startswith('return') else 'return super()._missing_(value)'}
'''.strip() # type: Callable[[str, str, str, str, str, str], str]
[docs]
class AppType(Vendor):
"""Application Layer Protocol Numbers"""
#: Value limit checker.
FLAG = 'isinstance(value, int) and 0 <= value <= 65535'
#: Link to registry.
LINK = 'https://www.iana.org/assignments/service-names-port-numbers/service-names-port-numbers.csv'
def count(self, data: 'list[str]') -> 'Counter[str]':
"""Count field records."""
reader = csv.reader(data)
next(reader) # header
return collections.Counter(map(lambda item: '[%s] %s' % (item[2], item[0].strip() or self.safe_name(item[3].strip())),
filter(lambda item: len(item[1].split('-')) != 2, reader)))
@staticmethod
def wrap_comment(text: 'str') -> 'str':
"""Wraps long-length text to shorter lines of comments.
Args:
text: Source text.
Returns:
Wrapped comments.
"""
return '\n #: '.join(textwrap.wrap(text.strip(), 76))
def process(self, data: 'list[str]') -> 'tuple[list[str], list[str]]':
"""Process registry data.
Args:
data: Registry data.
Returns:
Enumeration fields and missing fields.
"""
reader = csv.reader(data)
next(reader) # header
enum = [] # type: list[str]
miss = [] # type: list[str]
line = collections.OrderedDict() # type: OrderedDict[str, list[str]]
for item in reader:
svc = item[0].strip().lower() or self.safe_name(item[3].strip()).lower()
port = item[1].strip() or '-1'
proto = 'TransportProtocol.get(%r)' % (item[2].strip().lower() or 'undefined')
desc = item[3].strip()
temp = [] # type: list[str]
#for rfc in filter(lambda s: 'RFC' in s, re.split(r'\[|\]', item[8])):
# temp.append(f'[{rfc[:3]} {rfc[3:]}]')
for rfc in filter(None, map(lambda s: s.strip(), re.split(r'\[|\]', item[8]))):
if 'RFC' in rfc and re.match(r'\d+', rfc[3:]):
match = re.fullmatch(r'RFC(?P<rfc>\d+)(, Section (?P<sec>.*?))?', rfc)
if match is None:
temp.append(f'[{rfc}]')
else:
if match.group('sec') is not None:
temp.append(f'[:rfc:`{match.group("rfc")}#{match.group("sec")}`]')
else:
temp.append(f'[:rfc:`{match.group("rfc")}`]')
else:
temp.append(f'[{rfc}]'.replace('_', ' '))
cmmt = self.wrap_comment(re.sub(r'\s+', r' ',
'[%s] %s %s' % (item[2].strip().upper() or 'N/A',
desc, ''.join(temp)))) # pylint: disable=consider-using-f-string
try:
code, _ = port, int(port)
if port == '-1':
code = 'null'
renm = self.rename(svc, code)
if f'{renm}_{code}' in line:
renm = f'{renm}_{code}'
if renm in line:
if port == line[renm][1]:
line[renm][2] = f'{line[renm][2]} | {proto}'
if line[renm][3].startswith('-'):
line[renm][3] = f'{line[renm][3]}\n #: - {cmmt}'
else:
line[renm][3] = f'- {line[renm][3]}\n #: - {cmmt}'
else:
line[f'{renm}_{line[renm][1]}'] = line[renm]
line[f'{renm}_{code}'] = [svc, port, proto, cmmt]
del line[renm]
else:
line[renm] = [svc, port, proto, cmmt]
# if port == '-1':
# continue
# proto_name = item[2].strip().lower()
# if proto_name:
# renm = f'PORT_{code}_{proto_name}'
# else:
# renm = f'PORT_{code}'
# line[renm] = [svc, code, proto, cmmt]
except ValueError:
start, stop = port.split('-')
miss.append(f'if {start} <= value <= {stop}:')
miss.append(f' #: {cmmt}')
miss.append(f" return extend_enum(cls, '{self.safe_name(svc)}_%d' % value, value, {svc!r}, {proto})")
for key, (svc, code, proto, cmmt) in line.items():
if keyword.iskeyword(key):
key = '%s_' % key
pres = f"{key}: 'AppType' = {code}, {svc!r}, {proto}"
if cmmt.startswith('-'):
sufs = f'#: {cmmt}'
else:
sufs = '#: %s' % cmmt.replace(' #: ', ' #: ')
enum.append(f'{sufs}\n {pres}')
return enum, miss
def context(self, data: 'list[str]') -> 'str':
"""Generate constant context.
Args:
data: CSV data.
Returns:
Constant context.
"""
enum, miss = self.process(data)
ENUM = '\n\n '.join(map(lambda s: s.rstrip(), enum)).strip()
MISS = '\n '.join(map(lambda s: s.rstrip(), miss)).strip()
return LINE(self.NAME, self.DOCS, self.FLAG, ENUM, MISS, self.__module__)
if __name__ == '__main__':
sys.exit(AppType()) # type: ignore[arg-type]