Source code for pcapkit.corekit.fields.ipaddress

# -*- coding: utf-8 -*-
"""IP address field class"""

import abc
import ipaddress
from typing import TYPE_CHECKING, Generic, TypeVar, cast

from pcapkit.corekit.fields.field import Field, NoValue
from pcapkit.utilities.exceptions import FieldValueError

__all__ = [
    'IPv4AddressField', 'IPv6AddressField',
    'IPv4InterfaceField', 'IPv6InterfaceField',
]

if TYPE_CHECKING:
    from ipaddress import IPv4Address, IPv4Interface, IPv6Address, IPv6Interface
    from typing import Any, Callable

    from typing_extensions import Literal, Self

    from pcapkit.corekit.fields.field import NoValueType


_T = TypeVar('_T', 'IPv4Address', 'IPv6Address',
             'IPv4Interface', 'IPv6Interface')
_AT = TypeVar('_AT', 'IPv4Address', 'IPv6Address')
_IT = TypeVar('_IT', 'IPv4Interface', 'IPv6Interface')


[docs] class _IPField(Field[_T], Generic[_T]): """Internal IP related value for protocol fields. Args: length: Field size (in bytes). callback: Callback function to be called upon :meth:`self.__call__ <pcapkit.corekit.fields.field.FieldBase.__call__>`. """ @property @abc.abstractmethod def version(self) -> 'int': """IP version number."""
[docs] class _IPAddressField(_IPField[_AT]): """Internal IP address value for protocol fields. Args: length: Field size (in bytes). callback: Callback function to be called upon :meth:`self.__call__ <pcapkit.corekit.fields.field.FieldBase.__call__>`. """
[docs] def pre_process(self, value: '_AT | bytes | int | str', packet: 'dict[str, Any]') -> 'bytes': """Process field value before packing. Args: value: Field value. packet: Packet data. Returns: Processed field value. """ if isinstance(value, (ipaddress.IPv4Address, ipaddress.IPv6Address)): ip = value # type: IPv4Address | IPv6Address else: ip = ipaddress.ip_address(value) if ip.version != self.version: raise FieldValueError(f'IP version mismatch: {ip.version} != {self.version}') return ip.packed
[docs] def post_process(self, value: 'bytes', packet: 'dict[str, Any]') -> '_AT': """Process field value after parsing (unpacking). Args: value: Field value. packet: Packet data. Returns: Processed field value. """ val = ipaddress.ip_address(value) if val.version != self.version: raise FieldValueError(f'IP version mismatch: {val.version} != {self.version}') return val # type: ignore[return-value]
[docs] class IPv4AddressField(_IPAddressField[ipaddress.IPv4Address]): """IPv4 address value for protocol fields. Args: default: Field default value, if any. callback: Callback function to be called upon :meth:`self.__call__ <pcapkit.corekit.fields.field.FieldBase.__call__>`. """ @property def version(self) -> 'Literal[4]': """IP version number.""" return 4 def __init__(self, default: 'IPv4Address | NoValueType' = NoValue, callback: 'Callable[[Self, dict[str, Any]], None]' = lambda *_: None) -> 'None': super().__init__(4, default, callback) self._template = f'4s'
[docs] class IPv6AddressField(_IPAddressField[ipaddress.IPv6Address]): """IPv6 address value for protocol fields. Args: default: Field default value, if any. callback: Callback function to be called upon :meth:`self.__call__ <pcapkit.corekit.fields.field.FieldBase.__call__>`. """ @property def version(self) -> 'Literal[6]': """IP version number.""" return 6 def __init__(self, default: 'IPv6Address | NoValueType' = NoValue, callback: 'Callable[[Self, dict[str, Any]], None]' = lambda *_: None) -> 'None': super().__init__(16, default, callback) self._template = f'16s'
[docs] class _IPInterfaceField(_IPField[_IT]): """Internal IP interface value for protocol fields. Args: length: Field size (in bytes); if a callable is given, it should return an integer value and accept the current packet as its only argument. default: Field default value, if any. callback: Callback function to be called upon :meth:`self.__call__ <pcapkit.corekit.fields.field.FieldBase.__call__>`. """
[docs] class IPv4InterfaceField(_IPInterfaceField[ipaddress.IPv4Interface]): """IPv4 interface value for protocol fields. Args: default: Field default value, if any. callback: Callback function to be called upon :meth:`self.__call__ <pcapkit.corekit.fields.field.FieldBase.__call__>`. """ @property def version(self) -> 'Literal[4]': """IP version number.""" return 4 def __init__(self, default: 'IPv4Interface | NoValueType' = NoValue, callback: 'Callable[[Self, dict[str, Any]], None]' = lambda *_: None) -> 'None': super().__init__(8, default, callback) self._template = f'8s'
[docs] def pre_process(self, value: 'IPv4Interface | bytes | int | str', packet: 'dict[str, Any]') -> 'bytes': """Process field value before packing. Args: value: Field value. packet: Packet data. Returns: Processed field value. """ if isinstance(value, ipaddress.IPv4Interface): val = value else: val = ipaddress.ip_interface(value) # type: ignore[assignment] if val.version != self.version: raise FieldValueError(f'IP version mismatch: {val.version} != {self.version}') ip = val.ip mask = val.netmask return ip.packed + mask.packed
[docs] def post_process(self, value: 'bytes', packet: 'dict[str, Any]') -> 'IPv4Interface': """Process field value after parsing (unpacking). Args: value: Field value. packet: Packet data. Returns: Processed field value. """ ip = ipaddress.IPv4Address(value[:4]) mask = ipaddress.IPv4Address(value[4:]) val = ipaddress.ip_interface(f'{ip}/{mask}') if val.version != self.version: raise FieldValueError(f'IP version mismatch: {val.version} != {self.version}') return val
[docs] class IPv6InterfaceField(_IPInterfaceField[ipaddress.IPv6Interface]): """IPv6 interface value for protocol fields. Args: default: Field default value, if any. callback: Callback function to be called upon :meth:`self.__call__ <pcapkit.corekit.fields.field.FieldBase.__call__>`. """ @property def version(self) -> 'Literal[6]': """IP version number.""" return 6 def __init__(self, default: 'IPv6Interface | NoValueType' = NoValue, callback: 'Callable[[Self, dict[str, Any]], None]' = lambda *_: None) -> 'None': super().__init__(17, default, callback) self._template = f'17s'
[docs] def pre_process(self, value: 'IPv6Interface | bytes | int | str', packet: 'dict[str, Any]') -> 'bytes': """Process field value before packing. Args: value: Field value. packet: Packet data. Returns: Processed field value. """ if isinstance(value, ipaddress.IPv6Interface): val = value else: val = ipaddress.ip_interface(value) # type: ignore[assignment] if val.version != self.version: raise FieldValueError(f'IP version mismatch: {val.version} != {self.version}') ip = val.ip prefixlen = cast('int', val._prefixlen) # type: ignore[attr-defined] # pylint: disable=protected-access return ip.packed + prefixlen.to_bytes(1, 'big')
[docs] def post_process(self, value: 'bytes', packet: 'dict[str, Any]') -> 'IPv6Interface': """Process field value after parsing (unpacking). Args: value: Field value. packet: Packet data. Returns: Processed field value. """ ip = ipaddress.IPv6Address(value[:16]) mask = int(value[16:]) val = ipaddress.ip_interface(f'{ip}/{mask}') if val.version != self.version: raise FieldValueError(f'IP version mismatch: {val.version} != {self.version}') return val