Source code for pcapkit.protocols.application.httpv2

# -*- coding: utf-8 -*-
"""HTTP/2 - Hypertext Transfer Protocol

.. module:: pcapkit.protocols.application.httpv2

:mod:`pcapkit.protocols.application.httpv2` contains
only, which implements extractor for Hypertext Transfer
Protocol (HTTP/2) [*]_, whose structure is described as

======= ========= ===================== ==========================
Octets      Bits        Name                    Description
======= ========= ===================== ==========================
  0           0   ``http.length``             Length
  3          24   ``http.type``               Type
  4          32   ``http.flags``              Flags
  5          40                               Reserved
  5          41   ``http.sid``                Stream Identifier
  9          72   ``http.payload``            Frame Payload
======= ========= ===================== ==========================

.. [*]

import collections
from typing import TYPE_CHECKING, cast

from pcapkit.const.http.error_code import ErrorCode as Enum_ErrorCode
from pcapkit.const.http.frame import Frame as Enum_Frame
from pcapkit.const.http.setting import Setting as Enum_Setting
from pcapkit.corekit.multidict import OrderedMultiDict
from pcapkit.protocols.application.http import HTTP as HTTPBase
from import HTTP as Data_HTTP
from import ContinuationFrame as Data_ContinuationFrame
from import \
    ContinuationFrameFlags as Data_ContinuationFrameFlags
from import DataFrame as Data_DataFrame
from import DataFrameFlags as Data_DataFrameFlags
from import GoawayFrame as Data_GoawayFrame
from import HeadersFrame as Data_HeadersFrame
from import HeadersFrameFlags as Data_HeadersFrameFlags
from import PingFrame as Data_PingFrame
from import PingFrameFlags as Data_PingFrameFlags
from import PriorityFrame as Data_PriorityFrame
from import PushPromiseFrame as Data_PushPromiseFrame
from import \
    PushPromiseFrameFlags as Data_PushPromiseFrameFlags
from import RSTStreamFrame as Data_RSTStreamFrame
from import SettingsFrame as Data_SettingsFrame
from import SettingsFrameFlags as Data_SettingsFrameFlags
from import UnassignedFrame as Data_UnassignedFrame
from import WindowUpdateFrame as Data_WindowUpdateFrame
from pcapkit.protocols.schema.application.httpv2 import HTTP as Schema_HTTP
from pcapkit.protocols.schema.application.httpv2 import \
    ContinuationFrame as Schema_ContinuationFrame
from pcapkit.protocols.schema.application.httpv2 import DataFrame as Schema_DataFrame
from pcapkit.protocols.schema.application.httpv2 import FrameType as Schema_FrameType
from pcapkit.protocols.schema.application.httpv2 import GoawayFrame as Schema_GoawayFrame
from pcapkit.protocols.schema.application.httpv2 import HeadersFrame as Schema_HeadersFrame
from pcapkit.protocols.schema.application.httpv2 import PingFrame as Schema_PingFrame
from pcapkit.protocols.schema.application.httpv2 import PriorityFrame as Schema_PriorityFrame
from pcapkit.protocols.schema.application.httpv2 import PushPromiseFrame as Schema_PushPromiseFrame
from pcapkit.protocols.schema.application.httpv2 import RSTStreamFrame as Schema_RSTStreamFrame
from pcapkit.protocols.schema.application.httpv2 import SettingPair as Schema_SettingPair
from pcapkit.protocols.schema.application.httpv2 import SettingsFrame as Schema_SettingsFrame
from pcapkit.protocols.schema.application.httpv2 import UnassignedFrame as Schema_UnassignedFrame
from pcapkit.protocols.schema.application.httpv2 import \
    WindowUpdateFrame as Schema_WindowUpdateFrame
from pcapkit.protocols.schema.schema import Schema
from pcapkit.utilities.exceptions import ProtocolError
from pcapkit.utilities.warnings import ProtocolWarning, RegistryWarning, warn

    from enum import IntEnum as StdlibEnum
    from typing import Any, Callable, DefaultDict, Optional, Tuple, Type

    from aenum import IntEnum as AenumEnum
    from mypy_extensions import DefaultArg, KwArg, NamedArg
    from typing_extensions import Literal

    Flags = Schema_FrameType.Flags

    FrameParser = Callable[[Schema_FrameType, NamedArg(Schema_HTTP, 'header')], Data_HTTP]
    FrameConstructor = Callable[[Enum_Frame, DefaultArg(Optional[Data_HTTP]),
                                 KwArg(Any)], Tuple[Schema_FrameType, 'Flags']]

__all__ = ['HTTP']

[docs] class HTTP(HTTPBase[Data_HTTP, Schema_HTTP], schema=Schema_HTTP, data=Data_HTTP): """This class implements Hypertext Transfer Protocol (HTTP/2). This class currently supports parsing of the following HTTP/2 frames, which are directly mapped to the :class:`pcapkit.const.http.frame.Frame` enumeration: .. list-table:: :header-rows: 1 * - Frame Code - Frame Parser - Frame Constructor * - :attr:`~pcapkit.const.http.frame.Frame.DATA` - :meth:`~pcapkit.protocols.application.httpv2.HTTP._read_http_data` - :meth:`~pcapkit.protocols.application.httpv2.HTTP._make_http_data` * - :attr:`~pcapkit.const.http.frame.Frame.HEADERS` - :meth:`~pcapkit.protocols.application.httpv2.HTTP._read_http_headers` - :meth:`~pcapkit.protocols.application.httpv2.HTTP._make_http_headers` * - :attr:`~pcapkit.const.http.frame.Frame.PRIORITY` - :meth:`~pcapkit.protocols.application.httpv2.HTTP._read_http_priority` - :meth:`~pcapkit.protocols.application.httpv2.HTTP._make_http_priority` * - :attr:`~pcapkit.const.http.frame.Frame.RST_STREAM` - :meth:`~pcapkit.protocols.application.httpv2.HTTP._read_http_rst_stream` - :meth:`~pcapkit.protocols.application.httpv2.HTTP._make_http_rst_stream` * - :attr:`~pcapkit.const.http.frame.Frame.SETTINGS` - :meth:`~pcapkit.protocols.application.httpv2.HTTP._read_http_settings` - :meth:`~pcapkit.protocols.application.httpv2.HTTP._make_http_settings` * - :attr:`~pcapkit.const.http.frame.Frame.PUSH_PROMISE` - :meth:`~pcapkit.protocols.application.httpv2.HTTP._read_http_push_promise` - :meth:`~pcapkit.protocols.application.httpv2.HTTP._make_http_push_promise` * - :attr:`~pcapkit.const.http.frame.Frame.PING` - :meth:`~pcapkit.protocols.application.httpv2.HTTP._read_http_ping` - :meth:`~pcapkit.protocols.application.httpv2.HTTP._make_http_ping` * - :attr:`~pcapkit.const.http.frame.Frame.GOAWAY` - :meth:`~pcapkit.protocols.application.httpv2.HTTP._read_http_goaway` - :meth:`~pcapkit.protocols.application.httpv2.HTTP._make_http_goaway` * - :attr:`~pcapkit.const.http.frame.Frame.WINDOW_UPDATE` - :meth:`~pcapkit.protocols.application.httpv2.HTTP._read_http_window_update` - :meth:`~pcapkit.protocols.application.httpv2.HTTP._make_http_window_update` * - :attr:`~pcapkit.const.http.frame.Frame.CONTINUATION` - :meth:`~pcapkit.protocols.application.httpv2.HTTP._read_http_continuation` - :meth:`~pcapkit.protocols.application.httpv2.HTTP._make_http_continuation` """ ########################################################################## # Defaults. ########################################################################## #: DefaultDict[Enum_Frame, str | tuple[FrameParser, FrameConstructor]]: Frame #: code to method mapping. Method names are expected to be referred to #: the class by ``_read_http_${name}`` and/or ``_make_http_${name}``, and if #: such name not found, the value should then be a method that can parse the #: frame by itself. __frame__ = collections.defaultdict( lambda: 'none', { Enum_Frame.DATA: 'data', # DATA Enum_Frame.HEADERS: 'headers', # HEADERS Enum_Frame.PRIORITY: 'priority', # PRIORITY Enum_Frame.RST_STREAM: 'rst_stream', # RST_STREAM Enum_Frame.SETTINGS: 'settings', # SETTINGS Enum_Frame.PUSH_PROMISE: 'push_promise', # PUSH_PROMISE Enum_Frame.PING: 'ping', # PING Enum_Frame.GOAWAY: 'goaway', # GOAWAY Enum_Frame.WINDOW_UPDATE: 'window_update', # WINDOW_UPDATE Enum_Frame.CONTINUATION: 'continuation', # CONTINUATION }, ) # type: DefaultDict[Enum_Frame | int, str | tuple[FrameParser, FrameConstructor]] ########################################################################## # Properties. ########################################################################## @property def alias(self) -> 'Literal["HTTP/2"]': """Acronym of current protocol.""" return 'HTTP/2' @property def length(self) -> 'Literal[9]': """Header length of current protocol.""" return 9 @property def version(self) -> 'Literal["2"]': """Version of current protocol.""" return '2' ########################################################################## # Methods. ##########################################################################
[docs] def read(self, length: 'Optional[int]' = None, **kwargs: 'Any') -> 'Data_HTTP': """Read Hypertext Transfer Protocol (HTTP/2). Structure of HTTP/2 packet [:rfc:`7540`]: .. code-block:: text +-----------------------------------------------+ | Length (24) | +---------------+---------------+---------------+ | Type (8) | Flags (8) | +-+-------------+---------------+-------------------------------+ |R| Stream Identifier (31) | +=+=============================================================+ | Frame Payload (0...) ... +---------------------------------------------------------------+ Args: length: Length of packet data. **kwargs: Arbitrary keyword arguments. Returns: Parsed packet data. Raises: ProtocolError: If the packet is malformed. """ if length is None: length = len(self) schema = self.__header__ if schema.length < 9: raise ProtocolError(f'HTTP/2: [Type {schema.type}] invalid format') if schema.type in (Enum_Frame.SETTINGS, Enum_Frame.PING) and['sid'] != 0: raise ProtocolError(f'HTTP/2: [Type {schema.type}] invalid format') name = self.__frame__[schema.type] if isinstance(name, str): meth_name = f'_read_http_{name}' meth = cast('FrameParser', getattr(self, meth_name, self._read_http_none)) else: meth = name[0] http = meth(schema.frame, header=schema) return http
[docs] def make(self, # type: ignore[override] type: 'Enum_Frame | StdlibEnum | AenumEnum | str | int' = Enum_Frame.DATA, type_default: 'Optional[int]' = None, type_namespace: 'Optional[dict[str, int] | dict[int, str] | Type[StdlibEnum] | Type[AenumEnum]]' = None, # pylint: disable=line-too-long type_reversed: 'bool' = False, flags: 'Flags' = 0, # type: ignore[assignment] sid: 'int' = 0, frame: 'bytes | Data_HTTP | Schema_FrameType | dict[str, Any]' = b'', **kwargs: 'Any') -> 'Schema_HTTP': """Make (construct) packet data. Args: type: Type of HTTP/2 frame. type_default: Default frame type. type_namespace: Namespace of frame type. type_reversed: Whether to reverse the namespace. flags: Flags of HTTP/2 frame. sid: Stream ID of HTTP/2 frame. frame: Frame data of HTTP/2 frame. **kwargs: Arbitrary keyword arguments. Returns: Constructed packet data. """ type_val = cast('Enum_Frame', self._make_index(type, type_default, namespace=type_namespace, reversed=type_reversed, pack=False)) if isinstance(frame, bytes): length = len(frame) + 9 frame_val = frame # type: bytes | Schema_FrameType elif isinstance(frame, (dict, Data_HTTP)): name = self.__frame__[type_val] if isinstance(name, str): meth_name = f'_make_http_{name}' meth = cast('FrameConstructor', getattr(self, meth_name, self._make_http_none)) else: meth = name[1] if isinstance(frame, dict): frame_val, flags = meth(type_val, **frame) else: frame_val, flags = meth(type_val, frame) length = len(frame_val.pack()) + 9 elif isinstance(frame, Schema): length = len(frame.pack()) + 9 frame_val = frame else: raise ProtocolError(f'HTTP/2: [Type {type_val}] invalid format') flags_val = {} # type: dict[str, int] for bit in range(8): flags_val[f'bit_{bit}'] = (flags & (1 << bit)) >> bit return Schema_HTTP( length=length, type=type_val, flags=flags_val, # type: ignore[arg-type] stream={ 'sid': sid, }, frame=frame_val, )
[docs] @classmethod def id(cls) -> 'tuple[Literal["HTTP"], Literal["HTTPv2"]]': # type: ignore[override] """Index ID of the protocol. Returns: Index ID of the protocol. """ return (cls.__name__, 'HTTPv2') # type: ignore[return-value]
[docs] @classmethod def register_frame(cls, code: 'Enum_Frame', meth: 'str | tuple[FrameParser, FrameConstructor]') -> 'None': """Register a frame parser. Args: code: HTTP frame type code. meth: Method name or callable to parse and/or construct the frame. """ if code in cls.__frame__: warn(f'HTTP/2: [Type {code}] frame already registered', RegistryWarning) cls.__frame__[code] = meth
########################################################################## # Data models. ########################################################################## def __length_hint__(self) -> 'Literal[9]': """Total length of corresponding protocol.""" return 9 ########################################################################## # Utilities. ##########################################################################
[docs] @classmethod def _make_data(cls, data: 'Data_HTTP') -> 'dict[str, Any]': # type: ignore[override] """Create key-value pairs from ``data`` for protocol construction. Args: data: protocol data Returns: Key-value pairs for protocol construction. """ return { 'length': data.length, 'type': data.type, 'flags': data.flags.__value__ if data.flags is not None else 0, 'sid': data.sid, 'frame': data, }
[docs] def _read_http_none(self, schema: 'Schema_UnassignedFrame', *, header: 'Schema_HTTP') -> 'Data_UnassignedFrame': """Read HTTP packet with unassigned type. Args: schema: Parsed frame schema. header: Parsed HTTP/2 header schema. Returns: Parsed packet data. Raises: ProtocolError: If the packet is malformed. """ if any(header.flags): #raise ProtocolError(f'HTTP/2: [Type {frame}] invalid format') warn(f'HTTP/2: [Type {header.type}] invalid format', ProtocolWarning) data = Data_UnassignedFrame( length=header.length, type=header.type, flags=None,['sid'],, ) return data
[docs] def _read_http_data(self, schema: 'Schema_DataFrame', *, header: 'Schema_HTTP') -> 'Data_DataFrame': """Read HTTP/2 ``DATA`` frames. Structure of HTTP/2 ``DATA`` frame [:rfc:`7540`]: .. code-block:: text +-----------------------------------------------+ | Length (24) | +---------------+---------------+---------------+ | Type (8) | Flags (8) | +-+-------------+---------------+-------------------------------+ |R| Stream Identifier (31) | +---------------+-----------------------------------------------+ |Pad Length? (8)| +---------------+-----------------------------------------------+ | Data (*) ... +---------------------------------------------------------------+ | Padding (*) ... +---------------------------------------------------------------+ Args: schema: Parsed frame schema. header: Parsed HTTP/2 header schema. Returns: Parsed packet data. Raises: ProtocolError: If the packet is malformed. """ flag = Data_DataFrameFlags( END_STREAM=bool(header.flags['bit_0']), # bit 0 PADDED=bool(header.flags['bit_3']), # bit 3 ) flag.__update__({ '__value__': schema.__flags__, }) data = Data_DataFrame( length=header.length, type=header.type, flags=flag, pad_len=schema.pad_len if flag.PADDED else 0,['sid'],, ) return data
[docs] def _read_http_headers(self, schema: 'Schema_HeadersFrame', *, header: 'Schema_HTTP') -> 'Data_HeadersFrame': """Read HTTP/2 ``HEADERS`` frames. Structure of HTTP/2 ``HEADERS`` frame [:rfc:`7540`]: .. code-block:: text +-----------------------------------------------+ | Length (24) | +---------------+---------------+---------------+ | Type (8) | Flags (8) | +-+-------------+---------------+-------------------------------+ |R| Stream Identifier (31) | +---------------+-----------------------------------------------+ |Pad Length? (8)| +-+-------------+-----------------------------------------------+ |E| Stream Dependency? (31) | +-+-------------+-----------------------------------------------+ | Weight? (8) | +-+-------------+-----------------------------------------------+ | Header Block Fragment (*) ... +---------------------------------------------------------------+ | Padding (*) ... +---------------------------------------------------------------+ Args: schema: Parsed frame schema. header: Parsed HTTP/2 header schema. Returns: Parsed packet data. Raises: ProtocolError: If the packet is malformed. """ flag = Data_HeadersFrameFlags( END_STREAM=bool(header.flags['bit_0']), # bit 0 END_HEADERS=bool(header.flags['bit_2']), # bit 2 PADDED=bool(header.flags['bit_3']), # bit 3 PRIORITY=bool(header.flags['bit_5']), # bit 5 ) flag.__update__({ '__value__': schema.__flags__, }) data = Data_HeadersFrame( length=header.length, type=header.type, flags=flag, pad_len=schema.pad_len if flag.PADDED else 0,['sid'], excl_dependency=bool(schema.stream_dep['exclusive']) if flag.PRIORITY else False, stream_dependency=schema.stream_dep['sid'] if flag.PRIORITY else 0, weight=(schema.weight + 1) if flag.PRIORITY else 0, fragment=schema.fragment, ) return data
[docs] def _read_http_priority(self, schema: 'Schema_PriorityFrame', *, header: 'Schema_HTTP') -> 'Data_PriorityFrame': # pylint: disable=unused-argument """Read HTTP/2 ``PRIORITY`` frames. Structure of HTTP/2 ``PRIORITY`` frame [:rfc:`7540`]: .. code-block:: text +-----------------------------------------------+ | Length (24) | +---------------+---------------+---------------+ | Type (8) | Flags (8) | +-+-------------+---------------+-------------------------------+ |R| Stream Identifier (31) | +-+-------------------------------------------------------------+ |E| Stream Dependency (31) | +-+-------------+-----------------------------------------------+ | Weight (8) | +-+-------------+ Args: schema: Parsed frame schema. header: Parsed HTTP/2 header schema. Returns: Parsed packet data. Raises: ProtocolError: If the packet is malformed. """ if header.length != 9: raise ProtocolError(f'HTTP/2: [Type {header.type}] invalid format') data = Data_PriorityFrame( length=header.length, type=header.type, flags=None,['sid'], excl_dependency=bool(['exclusive']),['sid'], weight=schema.weight + 1, ) return data
[docs] def _read_http_rst_stream(self, schema: 'Schema_RSTStreamFrame', *, header: 'Schema_HTTP') -> 'Data_RSTStreamFrame': # pylint: disable=unused-argument """Read HTTP/2 ``RST_STREAM`` frames. Structure of HTTP/2 ``RST_STREAM`` frame [:rfc:`7540`]: .. code-block:: text +-----------------------------------------------+ | Length (24) | +---------------+---------------+---------------+ | Type (8) | Flags (8) | +-+-------------+---------------+-------------------------------+ |R| Stream Identifier (31) | +---------------------------------------------------------------+ | Error Code (32) | +---------------------------------------------------------------+ Args: schema: Parsed frame schema. header: Parsed HTTP/2 header schema. Returns: Parsed packet data. Raises: ProtocolError: If the packet is malformed. """ if header.length != 13: raise ProtocolError(f'HTTP/2: [Type {header.type}] invalid format') data = Data_RSTStreamFrame( length=header.length, type=header.type, flags=None,['sid'], error=schema.error, ) return data
[docs] def _read_http_settings(self, schema: 'Schema_SettingsFrame', *, header: 'Schema_HTTP') -> 'Data_SettingsFrame': """Read HTTP/2 ``SETTINGS`` frames. Structure of HTTP/2 ``SETTINGS`` frame [:rfc:`7540`]: .. code-block:: text +-----------------------------------------------+ | Length (24) | +---------------+---------------+---------------+ | Type (8) | Flags (8) | +-+-------------+---------------+-------------------------------+ |R| Stream Identifier (31) | +---------------------------------------------------------------+ | Identifier (16) | +-------------------------------+-------------------------------+ | Value (32) | +---------------------------------------------------------------+ | ...... | Args: schema: Parsed frame schema. header: Parsed HTTP/2 header schema. Returns: Parsed packet data. Raises: ProtocolError: If the packet is malformed. """ if (header.length - 9) % 6 != 0 or['sid'] != 0: raise ProtocolError(f'HTTP/2: [Type {header.type}] invalid format') flag = Data_SettingsFrameFlags( ACK=bool(header.flags['bit_0']), # bit 0 ) flag.__update__({ '__value__': schema.__flags__, }) if flag.ACK and header.length - 9 != 0: raise ProtocolError(f'HTTP/2: [Type {header.type}] invalid format') sets = OrderedMultiDict() # type: OrderedMultiDict[Enum_Setting, int] for setting in schema.settings: sets[] = setting.value data = Data_SettingsFrame( length=header.length, type=header.type, flags=flag,['sid'], settings=sets, ) return data
[docs] def _read_http_push_promise(self, schema: 'Schema_PushPromiseFrame', *, header: 'Schema_HTTP') -> 'Data_PushPromiseFrame': """Read HTTP/2 ``PUSH_PROMISE`` frames. Structure of HTTP/2 ``PUSH_PROMISE`` frame [:rfc:`7540`]: .. code-block:: text +-----------------------------------------------+ | Length (24) | +---------------+---------------+---------------+ | Type (8) | Flags (8) | +-+-------------+---------------+-------------------------------+ |R| Stream Identifier (31) | +---------------+-----------------------------------------------+ |Pad Length? (8)| +-+-------------+-----------------------------------------------+ |R| Promised Stream ID (31) | +-+-----------------------------+-------------------------------+ | Header Block Fragment (*) ... +---------------------------------------------------------------+ | Padding (*) ... +---------------------------------------------------------------+ Args: schema: Parsed frame schema. header: Parsed HTTP/2 header schema. Returns: Parsed packet data. Raises: ProtocolError: If the packet is malformed. """ if header.length < 13: raise ProtocolError(f'HTTP/2: [Type {header.type}] invalid format') flag = Data_PushPromiseFrameFlags( END_HEADERS=bool(header.flags['bit_2']), # bit 2 PADDED=bool(header.flags['bit_3']), # bit 3 ) flag.__update__({ '__value__': schema.__flags__, }) data = Data_PushPromiseFrame( length=header.length, type=header.type, flags=flag,['sid'], pad_len=schema.pad_len if flag.PADDED else 0,['sid'], fragment=schema.fragment, ) return data
[docs] def _read_http_ping(self, schema: 'Schema_PingFrame', *, header: 'Schema_HTTP') -> 'Data_PingFrame': """Read HTTP/2 ``PING`` frames. Structure of HTTP/2 ``PING`` frame [:rfc:`7540`]: .. code-block:: text +-----------------------------------------------+ | Length (24) | +---------------+---------------+---------------+ | Type (8) | Flags (8) | +-+-------------+---------------+-------------------------------+ |R| Stream Identifier (31) | +---------------------------------------------------------------+ | | | Opaque Data (64) | | | +---------------------------------------------------------------+ Args: schema: Parsed frame schema. header: Parsed HTTP/2 header schema. Returns: Parsed packet data. Raises: ProtocolError: If the packet is malformed. """ if header.length != 17: raise ProtocolError(f'HTTP/2: [Type {header.type}] invalid format') flag = Data_PingFrameFlags( ACK=bool(header.flags['bit_0']), # bit 0 ) flag.__update__({ '__value__': schema.__flags__, }) data = Data_PingFrame( length=header.length, type=header.type, flags=flag,['sid'],, ) return data
[docs] def _read_http_goaway(self, schema: 'Schema_GoawayFrame', *, header: 'Schema_HTTP') -> 'Data_GoawayFrame': # pylint: disable=unused-argument """Read HTTP/2 ``GOAWAY`` frames. Structure of HTTP/2 ``GOAWAY`` frame [:rfc:`7540`]: .. code-block:: text +-----------------------------------------------+ | Length (24) | +---------------+---------------+---------------+ | Type (8) | Flags (8) | +-+-------------+---------------+-------------------------------+ |R| Stream Identifier (31) | +-+-------------+---------------+-------------------------------+ |R| Last-Stream-ID (31) | +-+-------------------------------------------------------------+ | Error Code (32) | +---------------------------------------------------------------+ | Additional Debug Data (*) | +---------------------------------------------------------------+ Args: schema: Parsed frame schema. header: Parsed HTTP/2 header schema. Returns: Parsed packet data. Raises: ProtocolError: If the packet is malformed. """ data = Data_GoawayFrame( length=header.length, type=header.type, flags=None,['sid'],['sid'], error=schema.error, debug_data=schema.debug, ) return data
[docs] def _read_http_window_update(self, schema: 'Schema_WindowUpdateFrame', *, header: 'Schema_HTTP') -> 'Data_WindowUpdateFrame': # pylint: disable=unused-argument """Read HTTP/2 ``WINDOW_UPDATE`` frames. Structure of HTTP/2 ``WINDOW_UPDATE`` frame [:rfc:`7540`]: .. code-block:: text +-----------------------------------------------+ | Length (24) | +---------------+---------------+---------------+ | Type (8) | Flags (8) | +-+-------------+---------------+-------------------------------+ |R| Stream Identifier (31) | +-+-------------+---------------+-------------------------------+ |R| Window Size Increment (31) | +-+-------------------------------------------------------------+ Args: schema: Parsed frame schema. header: Parsed HTTP/2 header schema. Returns: Parsed packet data. Raises: ProtocolError: If the packet is malformed. """ if header.length != 13: raise ProtocolError(f'HTTP/2: [Type {header.type}] invalid format') data = Data_WindowUpdateFrame( length=header.length, type=header.type, flags=None,['sid'], increment=schema.size['incr'], ) return data
[docs] def _read_http_continuation(self, schema: 'Schema_ContinuationFrame', *, header: 'Schema_HTTP') -> 'Data_ContinuationFrame': """Read HTTP/2 ``CONTINUATION`` frames. Structure of HTTP/2 ``CONTINUATION`` frame [:rfc:`7540`]: .. code-block:: text +-----------------------------------------------+ | Length (24) | +---------------+---------------+---------------+ | Type (8) | Flags (8) | +-+-------------+---------------+-------------------------------+ |R| Stream Identifier (31) | +---------------------------------------------------------------+ | Header Block Fragment (*) ... +---------------------------------------------------------------+ Args: schema: Parsed frame schema. header: Parsed HTTP/2 header schema. Returns: Parsed packet data. Raises: ProtocolError: If the packet is malformed. """ flag = Data_ContinuationFrameFlags( END_HEADERS=bool(header.flags['bit_2']), # bit 2 ) flag.__update__({ '__value__': schema.__flags__, }) data = Data_ContinuationFrame( length=header.length, type=header.type, flags=flag,['sid'], fragment=schema.fragment, ) return data
[docs] def _make_http_none(self, frame: 'Optional[Data_UnassignedFrame]' = None, *, data: 'bytes' = b'', **kwargs: 'Any') -> 'tuple[Schema_UnassignedFrame, Flags]': """Make HTTP/2 unassigned frame type. Args: frame: Frame data model. data: Unspecified frame data. **kwargs: Arbitrary keyword arguments. Returns: Constructed frame schema and updated flags. """ if frame is not None: data = return Schema_UnassignedFrame( data=data, ), Schema_UnassignedFrame.Flags(0)
[docs] def _make_http_data(self, frame: 'Optional[Data_DataFrame]' = None, *, end_stream: 'bool' = False, pad_len: 'int' = 0, data: 'bytes' = b'', **kwargs: 'Any') -> 'tuple[Schema_DataFrame, Flags]': """Make HTTP/2 ``DATA`` frame. Args: frame: Frame data model. end_stream: End of stream flag. data: Frame data. **kwargs: Arbitrary keyword arguments. Returns: Constructed frame schema and updated flags. """ if frame is not None: pad_len = frame.pad_len data = flags = Schema_DataFrame.Flags(0) if end_stream: flags |= Schema_DataFrame.Flags.END_STREAM if pad_len: flags |= Schema_DataFrame.Flags.PADDED return Schema_DataFrame( pad_len=pad_len, data=data, ), flags
[docs] def _make_http_headers(self, frame: 'Optional[Data_HeadersFrame]' = None, *, end_stream: 'bool' = False, end_headers: 'bool' = False, pad_len: 'int' = 0, excl_dep: 'bool' = False, sid_dep: 'Optional[int]' = None, weight: 'int' = 0, fragment: 'bytes' = b'', **kwargs: 'Any') -> 'tuple[Schema_HeadersFrame, Flags]': """Make HTTP/2 ``HEADERS`` frame. Args: frame: Frame data model. end_stream: End of stream flag. end_headers: End of headers flag. excl_dep: Exclusive dependency flag. sid_dep: Dependency stream identifier. weight: Priority weight value. fragment: Header block fragment. **kwargs: Arbitrary keyword arguments. Returns: Constructed frame schema and updated flags. """ if frame is not None: priority = frame.flags.PRIORITY end_headers = frame.flags.END_HEADERS end_stream = frame.flags.END_STREAM pad_len = frame.pad_len excl_dep = frame.excl_dependency sid_dep = frame.stream_dependency weight = frame.weight fragment = frame.fragment else: priority = sid_dep is not None sid_dep = sid_dep or 0 flags = Schema_HeadersFrame.Flags(0) if end_stream: flags |= Schema_HeadersFrame.Flags.END_STREAM if end_headers: flags |= Schema_HeadersFrame.Flags.END_HEADERS if pad_len: flags |= Schema_HeadersFrame.Flags.PADDED if priority: flags |= Schema_HeadersFrame.Flags.PRIORITY return Schema_HeadersFrame( pad_len=pad_len, stream_dep={ 'exclusive': excl_dep, 'sid': sid_dep, }, weight=weight - 1 if weight else 0, fragment=fragment, ), flags
[docs] def _make_http_priority(self, frame: 'Optional[Data_PriorityFrame]' = None, *, sid_dep: 'int' = 0, excl_dep: 'bool' = False, weight: 'int' = 0, **kwargs: 'Any') -> 'tuple[Schema_PriorityFrame, Flags]': """Make HTTP/2 ``PRIORITY`` frame. Args: frame: Frame data model. excl_dep: Exclusive dependency flag. sid_dep: Dependency stream identifier. weight: Priority weight value. **kwargs: Arbitrary keyword arguments. Returns: Constructed frame schema and updated flags. """ if frame is not None: excl_dep = frame.excl_dependency sid_dep = frame.stream_dependency weight = frame.weight return Schema_PriorityFrame( stream={ 'exclusive': excl_dep, 'sid': sid_dep, }, weight=weight - 1 if weight else 0, ), Schema_PriorityFrame.Flags(0)
[docs] def _make_http_rst_stream(self, frame: 'Optional[Data_RSTStreamFrame]' = None, *, error: 'Enum_ErrorCode | str | int | StdlibEnum | AenumEnum' = Enum_ErrorCode.HTTP_1_1_REQUIRED, error_default: 'Optional[int]' = None, error_namespace: 'Optional[dict[str, int] | dict[int, str] | Type[StdlibEnum] | Type[AenumEnum]]' = None, # pylint: disable=line-too-long error_reversed: 'bool' = False, **kwargs: 'Any') -> 'tuple[Schema_RSTStreamFrame, Flags]': """Make HTTP/2 ``RST_STREAM`` frame. Args: frame: Frame data model. error: Error code. **kwargs: Arbitrary keyword arguments. Returns: Constructed frame schema and updated flags. """ if frame is not None: error_val = frame.error else: error_val = self._make_index(error, error_default, namespace=error_namespace, # type: ignore[assignment] reversed=error_reversed, pack=False) return Schema_RSTStreamFrame( error=error_val, ), Schema_RSTStreamFrame.Flags(0)
[docs] def _make_http_settings(self, frame: 'Optional[Data_SettingsFrame]' = None, *, ack: 'bool' = False, settings: 'Optional[OrderedMultiDict[Enum_Setting, int] | bytes | list[Schema_SettingPair | tuple[Enum_Setting, int]]]' = None, # pylint: disable=line-too-long **kwargs: 'Any') -> 'tuple[Schema_SettingsFrame, Flags]': """Make HTTP/2 ``SETTINGS`` frame. Args: frame: Frame data model. ack: Acknowledge flag. settings: Settings. **kwargs: Arbitrary keyword arguments. Returns: Constructed frame schema and updated flags. """ if frame is not None: ack = frame.flags.ACK settings = frame.settings flags = Schema_SettingsFrame.Flags(0) if ack: flags |= Schema_SettingsFrame.Flags.ACK if isinstance(settings, bytes): settings_val = settings # type: bytes | list[Schema_SettingPair] elif isinstance(settings, dict): settings_val = [] for key, val in settings.items(multi=True): settings_val.append(Schema_SettingPair( id=key, value=val, )) elif isinstance(settings, list): settings_val = [] for item in settings: if isinstance(item, Schema_SettingPair): temp = item else: id, value = item temp = Schema_SettingPair( id=id, value=value, ) settings_val.append(temp) else: raise ProtocolError(f'HTTP/2 : [Type {Enum_Frame.SETTINGS}] invalid settings') return Schema_SettingsFrame( settings=settings_val, ), flags
[docs] def _make_http_push_promise(self, frame: 'Optional[Data_PushPromiseFrame]' = None, *, end_headers: 'bool' = False, pad_len: 'int' = 0, promised_sid: 'int' = 0, fragment: 'bytes' = b'', **kwargs: 'Any') -> 'tuple[Schema_PushPromiseFrame, Flags]': """Make HTTP/2 ``PUSH_PROMISE`` frame. Args: frame: Frame data model. end_headers: End of headers flag. pad_len: Padding length. promised_sid: Promised stream identifier. fragment: Header block fragment. **kwargs: Arbitrary keyword arguments. Returns: Constructed frame schema and updated flags. """ if frame is not None: end_headers = frame.flags.END_HEADERS pad_len = frame.pad_len promised_sid = frame.promised_sid fragment = frame.fragment flags = Schema_PushPromiseFrame.Flags(0) if end_headers: flags |= Schema_PushPromiseFrame.Flags.END_HEADERS if pad_len: flags |= Schema_PushPromiseFrame.Flags.PADDED return Schema_PushPromiseFrame( pad_len=pad_len, stream={ 'sid': promised_sid, }, fragment=fragment, ), flags
[docs] def _make_http_ping(self, frame: 'Optional[Data_PingFrame]' = None, *, ack: 'bool' = False, opaque_data: 'bytes' = b'', **kwargs: 'Any') -> 'tuple[Schema_PingFrame, Flags]': """Make HTTP/2 ``PING`` frame. Args: frame: Frame data model. ack: Acknowledge flag. opaque_data: Opaque data. **kwargs: Arbitrary keyword arguments. Returns: Constructed frame schema and updated flags. """ if frame is not None: ack = frame.flags.ACK opaque_data = flags = Schema_PingFrame.Flags(0) if ack: flags |= Schema_PingFrame.Flags.ACK return Schema_PingFrame( data=opaque_data, ), flags
[docs] def _make_http_goaway(self, frame: 'Optional[Data_GoawayFrame]' = None, *, last_sid: 'int' = 0, error: 'Enum_ErrorCode | str | int | StdlibEnum | AenumEnum' = Enum_ErrorCode.HTTP_1_1_REQUIRED, error_default: 'Optional[int]' = None, error_namespace: 'Optional[dict[str, int] | dict[int, str] | Type[StdlibEnum] | Type[AenumEnum]]' = None, # pylint: disable=line-too-long error_reversed: 'bool' = False, debug_data: 'bytes' = b'', **kwargs: 'Any') -> 'tuple[Schema_GoawayFrame, Flags]': """Make HTTP/2 ``GOAWAY`` frame. Args: frame: Frame data model. last_sid: Last stream identifier. error: Error code. error_default: Default value of error code. error_namespace: Namespace of error code. error_reversed: Reversed namespace of error code. debug_data: Additional debug data. **kwargs: Arbitrary keyword arguments. Returns: Constructed frame schema and updated flags. """ if frame is not None: last_sid = frame.last_sid error_val = frame.error debug = frame.debug_data else: error_val = self._make_index(error, error_default, namespace=error_namespace, # type: ignore[assignment] reversed=error_reversed, pack=False) return Schema_GoawayFrame( stream={ 'sid': last_sid, }, error=error_val, debug=debug, ), Schema_GoawayFrame.Flags(0)
[docs] def _make_http_window_update(self, frame: 'Optional[Data_WindowUpdateFrame]' = None, *, incr: 'int' = 0, **kwargs: 'Any') -> 'tuple[Schema_WindowUpdateFrame, Flags]': """Make HTTP/2 ``WINDOW_UPDATE`` frame. Args: frame: Frame data model. incr: Window size increment. **kwargs: Arbitrary keyword arguments. Returns: Constructed frame schema and updated flags. """ if frame is not None: incr = frame.increment return Schema_WindowUpdateFrame( size={ 'incr': incr, } ), Schema_WindowUpdateFrame.Flags(0)
[docs] def _make_http_continuation(self, frame: 'Optional[Data_ContinuationFrame]' = None, *, end_headers: 'bool' = False, fragment: 'bytes' = b'', **kwargs: 'Any') -> 'tuple[Schema_ContinuationFrame, Flags]': """Make HTTP/2 ``CONTINUATION`` frame. Args: frame: Frame data model. end_headers: End of headers flag. fragment: Header block fragment. **kwargs: Arbitrary keyword arguments. Returns: Constructed frame schema and updated flags. """ if frame is not None: end_headers = frame.flags.END_HEADERS fragment = frame.fragment flags = Schema_ContinuationFrame.Flags(0) if end_headers: flags |= Schema_ContinuationFrame.Flags.END_HEADERS return Schema_ContinuationFrame( fragment=fragment, ), flags