# -*- coding: utf-8 -*-
"""Application Layer Protocol Numbers
========================================
.. module:: pcapkit.vendor.reg.apptype
This module contains the vendor crawler for **Application Layer Protocol Numbers**,
which is automatically generating :class:`pcapkit.const.reg.apptype.AppType`.
"""
import collections
import csv
import keyword
import re
import sys
import textwrap
from typing import TYPE_CHECKING, Callable
from pcapkit.vendor.default import Vendor
if TYPE_CHECKING:
    from collections import Counter, OrderedDict
    from typing import Callable
__all__ = ['AppType']
LINE = lambda NAME, DOCS, FLAG, ENUM, MISS, MODL: f'''\
# -*- coding: utf-8 -*-
# mypy: disable-error-code=assignment
# pylint: disable=line-too-long,consider-using-f-string
"""{(name := DOCS.split(' [', maxsplit=1)[0])}
{'=' * (len(name) + 6)}
.. module:: {MODL.replace('vendor', 'const')}
This module contains the constant enumeration for **{name}**,
which is automatically generated from :class:`{MODL}.{NAME}`.
"""
from collections import defaultdict
from typing import TYPE_CHECKING
from aenum import IntFlag, StrEnum, auto, extend_enum
from pcapkit.utilities.compat import show_flag_values
__all__ = ['{NAME}']
if TYPE_CHECKING:
    from typing import Any, DefaultDict, Type
class TransportProtocol(IntFlag):
    """Transport layer protocol."""
    undefined = 0
    #: Transmission Control Protocol.
    tcp = auto()
    #: User Datagram Protocol.
    udp = auto()
    #: Stream Control Transmission Protocol.
    sctp = auto()
    #: Datagram Congestion Control Protocol.
    dccp = auto()
    @staticmethod
    def get(key: 'int | str') -> 'TransportProtocol':
        """Backport support for original codes.
        Args:
            key: Key to get enum item.
        :meta private:
        """
        if isinstance(key, int):
            return TransportProtocol(key)
        if key.lower() in TransportProtocol.__members__:
            return TransportProtocol[key.lower()]  # type: ignore[misc]
        max_val = max(TransportProtocol.__members__.values())
        return extend_enum(TransportProtocol, key.lower(), max_val * 2)
class {NAME}(StrEnum):
    """[{NAME}] {DOCS}"""
    if TYPE_CHECKING:
        #: Service name.
        svc: 'str'
        #: Port number.
        port: 'int'
        #: Transport protocol.
        proto: 'TransportProtocol'
    #: Mapping of members based on transport protocol.
    __members_proto__: 'DefaultDict[TransportProtocol, dict[int, {NAME}]]' = defaultdict(dict)
    def __new__(cls, value: 'int', name: 'str' = '<null>',
                proto: 'TransportProtocol' = TransportProtocol.undefined) -> 'Type[{NAME}]':
        temp = '%s [%d - %s]' % (name, value, proto.name)
        obj = str.__new__(cls, temp)
        obj._value_ = temp
        obj.svc = name
        obj.port = value
        obj.proto = proto
        for namespace in show_flag_values(proto):
            cls.__members_proto__[TransportProtocol(namespace)][value] = obj
        if proto is TransportProtocol.undefined:
            cls.__members_proto__[proto][value] = obj
        return obj
    def __repr__(self) -> 'str':
        return "<%s.%s: %d [%s]>" % (self.__class__.__name__, self.svc, self.port, self.proto.name)
    def __str__(self) -> 'str':
        return '%s [%d - %s]' % (self.svc, self.port, self.proto.name)
    def __int__(self) -> 'int':
        return self.port
    def __lt__(self, other: '{NAME}') -> 'bool':
        return self.port < other
    def __gt__(self, other: '{NAME}') -> 'bool':
        return self.port > other
    def __le__(self, other: '{NAME}') -> 'bool':
        return self.port <= other
    def __ge__(self, other: '{NAME}') -> 'bool':
        return self.port >= other
    def __eq__(self, other: 'Any') -> 'bool':
        return self.port == other
    def __ne__(self, other: 'Any') -> 'bool':
        return self.port != other
    def __hash__(self) -> 'int':
        return hash(self.port)
    {ENUM}
    @staticmethod
    def get(key: 'int | str', default: 'int' = -1, *,
            proto: 'TransportProtocol | str' = TransportProtocol.undefined) -> '{NAME}':
        """Backport support for original codes.
        Args:
            key: Key to get enum item.
            default: Default value if not found.
            proto: Transport protocol of the enum item.
        :meta private:
        """
        if isinstance(key, int):
            if isinstance(proto, str):
                proto = TransportProtocol.get(proto.lower())
            temp_ns = {NAME}.__members_proto__.get(proto, {{}})
            if key in temp_ns:
                return temp_ns[key]
            try:
                ret = {NAME}._missing_(key)
                if ret is None:
                    raise ValueError
            except ValueError:
                ret = extend_enum({NAME}, 'PORT_%d_%s' % (key, proto.name), key, 'unknown', proto)
            return ret
        if key in {NAME}.__members_proto__:
            return getattr({NAME}, key)
        return extend_enum({NAME}, key, default, key)
    @classmethod
    def _missing_(cls, value: 'int') -> '{NAME}':
        """Lookup function used when value is not found.
        Args:
            value: Value to get enum item.
        """
        if not ({FLAG}):
            raise ValueError('%r is not a valid %s' % (value, cls.__name__))
        if value in cls.__members_proto__.get(TransportProtocol.undefined, {{}}):  # type: ignore[call-overload]
            return cls.__members_proto__[TransportProtocol.undefined][value]  # type: ignore[index]
        {MISS}
        {'' if ''.join(MISS.splitlines()[-1:]).startswith('return') else 'return super()._missing_(value)'}
'''.strip()  # type: Callable[[str, str, str, str, str, str], str]
[docs]
class AppType(Vendor):
    """Application Layer Protocol Numbers"""
    #: Value limit checker.
    FLAG = 'isinstance(value, int) and 0 <= value <= 65535'
    #: Link to registry.
    LINK = 'https://www.iana.org/assignments/service-names-port-numbers/service-names-port-numbers.csv'
    def count(self, data: 'list[str]') -> 'Counter[str]':
        """Count field records."""
        reader = csv.reader(data)
        next(reader)  # header
        return collections.Counter(map(lambda item: '[%s] %s' % (item[2], item[0].strip() or self.safe_name(item[3].strip())),
                                       filter(lambda item: len(item[1].split('-')) != 2, reader)))
    @staticmethod
    def wrap_comment(text: 'str') -> 'str':
        """Wraps long-length text to shorter lines of comments.
        Args:
            text: Source text.
        Returns:
            Wrapped comments.
        """
        return '\n    #:   '.join(textwrap.wrap(text.strip(), 76))
    def process(self, data: 'list[str]') -> 'tuple[list[str], list[str]]':
        """Process registry data.
        Args:
            data: Registry data.
        Returns:
            Enumeration fields and missing fields.
        """
        reader = csv.reader(data)
        next(reader)  # header
        enum = []  # type: list[str]
        miss = []  # type: list[str]
        line = collections.OrderedDict()  # type: OrderedDict[str, list[str]]
        for item in reader:
            svc = item[0].strip().lower() or self.safe_name(item[3].strip()).lower()
            port = item[1].strip() or '-1'
            proto = 'TransportProtocol.get(%r)' % (item[2].strip().lower() or 'undefined')
            desc = item[3].strip()
            temp = []  # type: list[str]
            #for rfc in filter(lambda s: 'RFC' in s, re.split(r'\[|\]', item[8])):
            #    temp.append(f'[{rfc[:3]} {rfc[3:]}]')
            for rfc in filter(None, map(lambda s: s.strip(), re.split(r'\[|\]', item[8]))):
                if 'RFC' in rfc and re.match(r'\d+', rfc[3:]):
                    match = re.fullmatch(r'RFC(?P<rfc>\d+)(, Section (?P<sec>.*?))?', rfc)
                    if match is None:
                        temp.append(f'[{rfc}]')
                    else:
                        if match.group('sec') is not None:
                            temp.append(f'[:rfc:`{match.group("rfc")}#{match.group("sec")}`]')
                        else:
                            temp.append(f'[:rfc:`{match.group("rfc")}`]')
                else:
                    temp.append(f'[{rfc}]'.replace('_', ' '))
            cmmt = self.wrap_comment(re.sub(r'\s+', r' ',
                                            '[%s] %s %s' % (item[2].strip().upper() or 'N/A',
                                                            desc, ''.join(temp))))  # pylint: disable=consider-using-f-string
            try:
                code, _ = port, int(port)
                if port == '-1':
                    code = 'null'
                renm = self.rename(svc, code)
                if f'{renm}_{code}' in line:
                    renm = f'{renm}_{code}'
                if renm in line:
                    if port == line[renm][1]:
                        line[renm][2] = f'{line[renm][2]} | {proto}'
                        if line[renm][3].startswith('-'):
                            line[renm][3] = f'{line[renm][3]}\n    #: - {cmmt}'
                        else:
                            line[renm][3] = f'- {line[renm][3]}\n    #: - {cmmt}'
                    else:
                        line[f'{renm}_{line[renm][1]}'] = line[renm]
                        line[f'{renm}_{code}'] = [svc, port, proto, cmmt]
                        del line[renm]
                else:
                    line[renm] = [svc, port, proto, cmmt]
                # if port == '-1':
                #     continue
                # proto_name = item[2].strip().lower()
                # if proto_name:
                #     renm = f'PORT_{code}_{proto_name}'
                # else:
                #     renm = f'PORT_{code}'
                # line[renm] = [svc, code, proto, cmmt]
            except ValueError:
                start, stop = port.split('-')
                miss.append(f'if {start} <= value <= {stop}:')
                miss.append(f'    #: {cmmt}')
                miss.append(f"    return extend_enum(cls, '{self.safe_name(svc)}_%d' % value, value, {svc!r}, {proto})")
        for key, (svc, code, proto, cmmt) in line.items():
            if keyword.iskeyword(key):
                key = '%s_' % key
            pres = f"{key}: 'AppType' = {code}, {svc!r}, {proto}"
            if cmmt.startswith('-'):
                sufs = f'#: {cmmt}'
            else:
                sufs = '#: %s' % cmmt.replace('    #:   ', '    #: ')
            enum.append(f'{sufs}\n    {pres}')
        return enum, miss
    def context(self, data: 'list[str]') -> 'str':
        """Generate constant context.
        Args:
            data: CSV data.
        Returns:
            Constant context.
        """
        enum, miss = self.process(data)
        ENUM = '\n\n    '.join(map(lambda s: s.rstrip(), enum)).strip()
        MISS = '\n        '.join(map(lambda s: s.rstrip(), miss)).strip()
        return LINE(self.NAME, self.DOCS, self.FLAG, ENUM, MISS, self.__module__) 
if __name__ == '__main__':
    sys.exit(AppType())  # type: ignore[arg-type]