# -*- coding: utf-8 -*-
r"""HTTP/1.* - Hypertext Transfer Protocol
============================================
.. module:: pcapkit.protocols.application.httpv1
:mod:`pcapkit.protocols.application.httpv1` contains
:class:`~pcapkit.protocols.application.httpv1.HTTP`
only, which implements extractor for Hypertext Transfer
Protocol (HTTP/1.*) [*]_, whose structure is described
as below:
.. code-block:: text
METHOD URL HTTP/VERSION\r\n :==: REQUEST LINE
<key> : <value>\r\n :==: REQUEST HEADER
............ (Ellipsis) :==: REQUEST HEADER
\r\n :==: REQUEST SEPARATOR
<body> :==: REQUEST BODY (optional)
HTTP/VERSION CODE DESP \r\n :==: RESPONSE LINE
<key> : <value>\r\n :==: RESPONSE HEADER
............ (Ellipsis) :==: RESPONSE HEADER
\r\n :==: RESPONSE SEPARATOR
<body> :==: RESPONSE BODY (optional)
.. [*] https://en.wikipedia.org/wiki/Hypertext_Transfer_Protocol
"""
import re
from typing import TYPE_CHECKING
from pcapkit.const.http.method import Method as Enum_Method
from pcapkit.const.http.status_code import StatusCode as Enum_StatusCode
from pcapkit.corekit.multidict import OrderedMultiDict
from pcapkit.protocols.application.http import HTTP as HTTPBase
from pcapkit.protocols.data.application.httpv1 import HTTP as Data_HTTP
from pcapkit.protocols.data.application.httpv1 import RequestHeader as Data_RequestHeader
from pcapkit.protocols.data.application.httpv1 import ResponseHeader as Data_ResponseHeader
from pcapkit.protocols.schema.application.httpv1 import HTTP as Schema_HTTP
from pcapkit.utilities.compat import StrEnum
from pcapkit.utilities.exceptions import ProtocolError
if TYPE_CHECKING:
from enum import IntEnum as StdlibEnum
from typing import Any, Optional
from typing import Type as _Type
from aenum import IntEnum as AenumEnum
from typing_extensions import Literal
from pcapkit.protocols.data.application.httpv1 import Header as Data_Header
__all__ = ['HTTP']
# Regular expression to match HTTP methods.
_RE_METHOD = re.compile(rb"(?P<method>[A-Z][A-Z-]*)") # RFC 9110, section 16.1.1, 9.1, 5.6.2
# Regular expression to match HTTP version string.
_RE_VERSION = re.compile(rb"HTTP/(?P<version>\d\.\d)")
# Regular expression to match HTTP status code.
_RE_STATUS = re.compile(rb'\d{3}')
[docs]
class Type(StrEnum):
"""HTTP packet type."""
#: Request packet.
REQUEST = 'request'
#: Response packet.
RESPONSE = 'response'
[docs]
class HTTP(HTTPBase[Data_HTTP, Schema_HTTP],
data=Data_HTTP, schema=Schema_HTTP):
"""This class implements Hypertext Transfer Protocol (HTTP/1.*)."""
##########################################################################
# Defaults.
##########################################################################
#: Type: Type of HTTP receipt.
_receipt: 'Type'
##########################################################################
# Properties.
##########################################################################
@property
def alias(self) -> 'Literal["HTTP/0.9", "HTTP/1.0", "HTTP/1.1"]':
"""Acronym of current protocol."""
return f'HTTP/{self.version}' # type: ignore[return-value]
@property
def version(self) -> 'Literal["0.9", "1.0", "1.1"]':
"""Version of current protocol."""
return self._info.receipt.version # type: ignore[attr-defined]
##########################################################################
# Methods.
##########################################################################
[docs]
def read(self, length: 'Optional[int]' = None, **kwargs: 'Any') -> 'Data_HTTP': # pylint: disable=unused-argument
"""Read Hypertext Transfer Protocol (HTTP/1.*).
Structure of HTTP/1.* packet [:rfc:`7230`]:
.. code-block:: text
HTTP-message :==: start-line
*( header-field CRLF )
CRLF
[ message-body ]
Args:
length: Length of packet data.
**kwargs: Arbitrary keyword arguments.
Returns:
Parsed packet data.
Raises:
ProtocolError: If the packet is malformed.
"""
if length is None:
length = len(self)
schema = self.__header__
packet = schema.data
header, body = packet.split(b'\r\n\r\n', maxsplit=1)
header_line, header_unpacked = self._read_http_header(header)
body_unpacked = self._read_http_body(body, headers=header_unpacked) or None
http = Data_HTTP(
receipt=header_line,
header=header_unpacked,
body=body_unpacked,
)
self._receipt = header_line.type
self._version = header_line.version # type: ignore[attr-defined]
self._length = len(header)
return http
[docs]
def make(self, # type: ignore[override]
http_version: 'Literal["0.9", "1.0", "1.1", b"0.9", b"1.0", b"1.1"]' = '1.1',
method: 'Optional[Enum_Method | str | bytes]' = None,
uri: 'Optional[str | bytes]' = None,
status: 'Optional[Enum_StatusCode | str | bytes | int]' = None,
status_default: 'Optional[int]' = None,
status_namespace: 'Optional[dict[str, int] | dict[int, str] | _Type[StdlibEnum] | _Type[AenumEnum]]' = None, # pylint: disable=line-too-long
status_reversed: 'bool' = False,
message: 'Optional[str | bytes]' = None,
headers: 'Optional[OrderedMultiDict[str, str]]' = None,
body: 'bytes' = b'',
**kwargs: 'Any') -> 'Schema_HTTP':
"""Make (construct) packet data.
Args:
http_version: HTTP version.
method: HTTP method.
uri: HTTP request URI.
status: HTTP status code.
status_default: Default HTTP status code.
status_namespace: Namespace of HTTP status code.
status_reversed: Whether to reverse the namespace.
message: HTTP status message.
headers: HTTP headers.
body: HTTP body.
**kwargs: Arbitrary keyword arguments.
Returns:
Constructed packet data.
"""
version = http_version.encode() if isinstance(http_version, str) else http_version
if method is not None and status is None:
if uri is None:
raise ProtocolError('HTTP request must have URI.')
if isinstance(method, bytes):
meth = method
elif isinstance(method, str):
meth = method.encode()
else:
meth = method.value.encode()
uri_val = uri.encode() if isinstance(uri, str) else uri
header_line = b'%s %s HTTP/%s\r\n' % (meth, uri_val, version)
elif method is None and status is not None:
status_code = self._make_index(status, status_default, namespace=status_namespace,
reversed=status_reversed, pack=False)
status_code_val = int(status_code)
if message is None:
msg = getattr(status_code, 'message', b'') or b''
else:
msg = message.encode() if isinstance(message, str) else message
header_line = b'HTTP/%s %s %s\r\n' % (version, str(status_code_val).encode(), msg)
header_fields = [] # type: list[bytes]
if headers is not None:
header_fields = []
for key, value in headers.items(multi=True):
header_fields.append(b'%s: %s\r\n' % (key.encode(), value.encode()))
return Schema_HTTP(
data=header_line + b''.join(header_fields) + b'\r\n' + body,
)
[docs]
@classmethod
def id(cls) -> 'tuple[Literal["HTTP"], Literal["HTTPv1"]]': # type: ignore[override]
"""Index ID of the protocol.
Returns:
Index ID of the protocol.
"""
return (cls.__name__, 'HTTPv1') # type: ignore[return-value]
##########################################################################
# Utilities.
##########################################################################
[docs]
@classmethod
def _make_data(cls, data: 'Data_HTTP') -> 'dict[str, Any]': # type: ignore[override]
"""Create key-value pairs from ``data`` for protocol construction.
Args:
data: protocol data
Returns:
Key-value pairs for protocol construction.
"""
return {
'http_version': data.receipt.version, # type: ignore[attr-defined]
'method': getattr(data.receipt, 'method', None),
'uri': getattr(data.receipt, 'uri', None),
'status': getattr(data.receipt, 'status', None),
'message': getattr(data.receipt, 'message', None),
'headers': data.header,
'body': data.body,
}
[docs]
def _read_http_body(self, body: 'bytes', *,
headers: 'OrderedMultiDict[str, str]') -> 'Any':
"""Read HTTP/1.* body.
Args:
body: HTTP body data.
headers: HTTP header fields.
Returns:
Raw HTTP body.
"""
return body