Source code for pcapkit.protocols.application.ftp

# -*- coding: utf-8 -*-
"""file transfer protocol

.. module:: pcapkit.protocols.application.ftp

:mod:`pcapkit.protocols.application.ftp` contains
:class:`~pcapkit.protocols.application.ftp.FTP` only,
which implements extractor for File Transfer Protocol
(FTP) [*]_.

.. [*] https://en.wikipedia.org/wiki/File_Transfer_Protocol

"""
import re
from typing import TYPE_CHECKING

from pcapkit.const.ftp.command import Command as Enum_Command
from pcapkit.const.ftp.return_code import ReturnCode as Enum_ReturnCode
from pcapkit.protocols.application.application import Application
from pcapkit.protocols.data.application.ftp import FTP as Data_FTP
from pcapkit.protocols.data.application.ftp import Request as Data_Request
from pcapkit.protocols.data.application.ftp import Response as Data_Response
from pcapkit.protocols.misc.raw import Raw
from pcapkit.protocols.schema.application.ftp import FTP as Schema_FTP
from pcapkit.utilities.compat import StrEnum
from pcapkit.utilities.exceptions import ProtocolError, UnsupportedCall

if TYPE_CHECKING:
    from typing import Any, NoReturn, Optional

    from typing_extensions import Literal

__all__ = ['FTP', 'FTP_DATA']

# regex for FTP
FTP_REQUEST = re.compile(rb'^(?P<cmmd>[A-Z]{3,4})( +(?P<args>.*))?\r\n$', re.I)
FTP_RESPONSE = re.compile(rb'^(?P<code>[0-9]{3})(?P<more>\-)?( +(?P<args>.*))?\r\n$', re.I)


[docs] class Type(StrEnum): """FTP packet type.""" #: Request packet. REQUEST = 'request' #: Response packet. RESPONSE = 'response'
[docs] class FTP(Application[Data_FTP, Schema_FTP], data=Data_FTP, schema=Schema_FTP): """This class implements File Transfer Protocol.""" ########################################################################## # Properties. ########################################################################## @property def name(self) -> 'Literal["File Transfer Protocol"]': """Name of current protocol.""" return 'File Transfer Protocol' @property def length(self) -> 'NoReturn': """Header length of current protocol. Raises: UnsupportedCall: This protocol doesn't support :attr:`length`. """ raise UnsupportedCall(f"'{self.__class__.__name__}' object has no attribute 'length'") ########################################################################## # Methods. ##########################################################################
[docs] def read(self, length: 'Optional[int]' = None, **kwargs: 'Any') -> 'Data_FTP': # pylint: disable=unused-argument """Read File Transfer Protocol (FTP). Args: length: Length of packet data. **kwargs: Arbitrary keyword arguments. Returns: Parsed packet data. Raises: ProtocolError: If the packet is malformed. """ if length is None: length = len(self) schema = self.__header__ data = schema.data if (match := FTP_REQUEST.match(data)) is not None: cmmd = match.group('cmmd').decode() args = match.group('args') cmmd_val = Enum_Command.get(cmmd) args_val = self.decode(args) ftp = Data_Request( type=Type.REQUEST, cmmd=cmmd_val, args=args_val, ) # type: Data_FTP elif (match := FTP_RESPONSE.match(data)) is not None: code = int(match.group('code')) more = bool(match.group('more')) args = match.group('args') code_val = Enum_ReturnCode.get(code) args_val = self.decode(args) ftp = Data_Response( type=Type.RESPONSE, code=code_val, more=more, args=args_val, ) else: raise ProtocolError('FTP: invalid packet format') return ftp
[docs] def make(self, cmmd: 'Optional[Enum_Command | str | bytes]' = None, code: 'Optional[Enum_ReturnCode | int | str | bytes]' = None, args: 'Optional[str | bytes]' = None, more: 'bool' = False, **kwargs: 'Any') -> 'Schema_FTP': """Make (construct) packet data. Args: cmmd: FTP command. code: FTP status code. args: Optional FTP command arguments and/or status messages. more: More status messages to follow for response packets. **kwargs: Arbitrary keyword arguments. Returns: Constructed packet data. """ if cmmd is not None and code is None: if isinstance(cmmd, bytes): prefix = cmmd elif isinstance(cmmd, str): prefix = cmmd.encode() else: prefix = cmmd.value mf = b'' elif cmmd is None and code is not None: code_val = int(code) prefix = str(code_val).encode() mf = b'-' if more else b'' else: raise ProtocolError('FTP: invalid packet type') if args is None: suffix = b'' elif isinstance(args, bytes): suffix = args else: suffix = args.encode() return Schema_FTP( data=b'%s%s %s' % (prefix, mf, suffix), )
########################################################################## # Utilities. ##########################################################################
[docs] @classmethod def _make_data(cls, data: 'Data_FTP') -> 'dict[str, Any]': # type: ignore[override] """Create key-value pairs from ``data`` for protocol construction. Args: data: protocol data Returns: Key-value pairs for protocol construction. """ return { 'cmmd': getattr(data, 'cmmd', None), 'code': getattr(data, 'code', None), 'args': getattr(data, 'args', None), 'more': getattr(data, 'more', False), }
[docs] class FTP_DATA(Raw): """This class implements FTP data channel transmission.""" ########################################################################## # Properties. ########################################################################## # name of current protocol @property def name(self) -> 'Literal["FTP_DATA"]': # type: ignore[override] """Name of current protocol.""" return 'FTP_DATA'