Source code for firebird.base.buffer

# SPDX-FileCopyrightText: 2020-present The Firebird Projects <www.firebirdsql.org>
#
# SPDX-License-Identifier: MIT
#
# PROGRAM/MODULE: firebird-base
# FILE:           firebird/base/buffer.py
# DESCRIPTION:    Memory buffer manager
# CREATED:        14.5.2020
#
# The contents of this file are subject to the MIT License
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#
# Copyright (c) 2020 Firebird Project (www.firebirdsql.org)
# All Rights Reserved.
#
# Contributor(s): Pavel Císař (original code)
#                 ______________________________________

"""Firebird Base - Memory buffer manager

This module provides a raw memory buffer manager with convenient methods to read/write
data of various data type.
"""

from __future__ import annotations
from typing import runtime_checkable, Protocol, Type, Union, Any
from ctypes import memset, create_string_buffer
from .types import Sentinel, UNLIMITED, ByteOrder

[docs] @runtime_checkable class BufferFactory(Protocol): # pragma: no cover """BufferFactory Protocol definition. """
[docs] def create(self, init_or_size: Union[int, bytes], size: int=None) -> Any: """This function must create and return a mutable character buffer. Arguments: init_or_size: Must be an integer which specifies the size of the array, or a bytes object which will be used to initialize the array items. size: Size of the array. """
[docs] def clear(self, buffer: Any) -> None: """Fills the buffer with zero. Argument: buffer: A memory buffer previously created by `BufferFactory.create()` method. """
[docs] class BytesBufferFactory: """Buffer factory for `bytearray` buffers. """
[docs] def create(self, init_or_size: Union[int, bytes], size: int=None) -> bytearray: """This function creates a mutable character buffer. The returned object is a `bytearray`. Arguments: init_or_size: Must be an integer which specifies the size of the array, or a bytes object which will be used to initialize the array items. size: Size of the array. Important: Although arguments are the same as for `ctypes.create_string_buffer`, the behavior is different when new buffer is initialized from bytes: 1. If there are more bytes than specified `size`, this function copies only `size` bytes into new buffer. The `~ctypes.create_string_buffer` raises an excpetion. 2. Unlike `~ctypes.create_string_buffer` when `size` is NOT specified, the buffer is NOT made one item larger than its length so that the last element in the array is a NUL termination character. """ if isinstance(init_or_size, int): return bytearray(init_or_size) size = len(init_or_size) if size is None else size buffer = bytearray(size) limit = min(len(init_or_size), size) buffer[:limit] = init_or_size[:limit] return buffer
[docs] def clear(self, buffer: bytearray) -> None: """Fills the buffer with zero. """ buffer[:] = b'\x00' * len(buffer)
[docs] class CTypesBufferFactory: """Buffer factory for `ctypes` array of `~ctypes.c_char` buffers. """
[docs] def create(self, init_or_size: Union[int, bytes], size: int=None) -> bytearray: """This function creates a `ctypes` mutable character buffer. The returned object is an array of `ctypes.c_char`. Arguments: init_or_size: Must be an integer which specifies the size of the array, or a bytes object which will be used to initialize the array items. size: Size of the array. Important: Although arguments are the same as for `ctypes.create_string_buffer`, the behavior is different when new buffer is initialized from bytes: 1. If there are more bytes than specified `size`, this function copies only `size` bytes into new buffer. The `~ctypes.create_string_buffer` raises an excpetion. 2. Unlike `~ctypes.create_string_buffer` when `size` is NOT specified, the buffer is NOT made one item larger than its length so that the last element in the array is a NUL termination character. """ if isinstance(init_or_size, int): return create_string_buffer(init_or_size) size = len(init_or_size) if size is None else size buffer = create_string_buffer(size) limit = min(len(init_or_size), size) buffer[:limit] = init_or_size[:limit] return buffer
[docs] def clear(self, buffer: bytearray, init: int=0) -> None: """Fills the buffer with specified value (default). """ memset(buffer, init, len(buffer))
def safe_ord(byte: Union[bytes, int]) -> int: """If `byte` argument is byte character, returns ord(byte), otherwise returns argument. """ return byte if isinstance(byte, int) else ord(byte)
[docs] class MemoryBuffer: """Generic memory buffer manager. """
[docs] def __init__(self, init: Union[int, bytes], size: int = None, *, factory: Type[BufferFactory]=BytesBufferFactory, eof_marker: int = None, max_size: Union[int, Sentinel]=UNLIMITED, byteorder: ByteOrder=ByteOrder.LITTLE): """ Arguments: init: Must be an integer which specifies the size of the array, or a `bytes` object which will be used to initialize the array items. size: Size of the array. The argument value is used only when `init` is a `bytes` object. factory: Factory object used to create/resize the internal memory buffer. eof_marker: Value that indicates the end of data. Could be None. max_size: If specified, the buffer couldn't grow beyond specified number of bytes. byteorder: The byte order used to read/write numbers. """ #: Buffer factory instance used by manager [default: `BytesBufferFactory`]. self.factory: BufferFactory = factory() #: The memory buffer. The actual data type of buffer depends on `buffer factory`, #: but it must provide direct acces to cells, slices and length like `bytearray`. self.raw: bytearray = self.factory.create(init, size) #: Current position in buffer, i.e. the next read/writen byte would be at this position. self.pos: int = 0 #: Value that indicates the end of data. Could be None. self.eof_marker: int = eof_marker #: The buffer couldn't grow beyond specified number of bytes [default: `.UNLIMITED`]. self.max_size: Union[int, Sentinel] = max_size #: The byte order used to read/write numbers [default: `.LITTLE`]. self.byteorder: ByteOrder = byteorder
def _ensure_space(self, size: int) -> None: if len(self.raw) < self.pos + size: self.resize(self.pos + size) def _check_space(self, size: int): if len(self.raw) < self.pos + size: raise IOError("Insufficient buffer size")
[docs] def clear(self) -> None: """Fills the buffer with zeros and resets the position in buffer to zero. """ self.factory.clear(self.raw) self.pos = 0
[docs] def resize(self, size: int) -> None: """Resize buffer to specified length. """ if self.max_size is not UNLIMITED and self.max_size < size: raise IOError(f"Cannot resize buffer past max. size {self.max_size} bytes") self.raw = self.factory.create(self.raw, size)
[docs] def is_eof(self) -> bool: """Return True when positioned past the end of buffer or on `.eof_marker` (if defined). """ if self.pos >= len(self.raw): return True if self.eof_marker is not None and safe_ord(self.raw[self.pos]) == self.eof_marker: return True return False
[docs] def write(self, data: bytes) -> None: """Write bytes. """ size = len(data) self._ensure_space(size) self.raw[self.pos:self.pos + size] = data self.pos += size
[docs] def write_byte(self, byte: int) -> None: """Write one byte. """ self._ensure_space(1) self.raw[self.pos] = byte self.pos += 1
[docs] def write_number(self, value: int, size: int, *, signed: bool=False) -> None: """Write number with specified size (in bytes). """ self.write(value.to_bytes(size, self.byteorder.value, signed=signed))
[docs] def write_short(self, value: int) -> None: """Write 2 byte number (c_ushort). """ self.write_number(value, 2)
[docs] def write_int(self, value: int) -> None: """Write 4 byte number (c_uint). """ self.write_number(value, 4)
[docs] def write_bigint(self, value: int) -> None: """Write tagged 8 byte number (c_ulonglong). """ self.write_number(value, 8)
[docs] def write_string(self, value: str, *, encoding: str='ascii', errors: str='strict') -> None: """Write zero-terminated string. """ self.write(value.encode(encoding, errors)) self.write_byte(0)
[docs] def write_pascal_string(self, value: str, *, encoding: str='ascii', errors: str='strict') -> None: """Write tagged Pascal string (2 byte length followed by data). """ value = value.encode(encoding, errors) self.write_byte(len(value)) self.write(value)
[docs] def write_sized_string(self, value: str, *, encoding: str='ascii', errors: str='strict') -> None: """Write string (2 byte length followed by data). """ value = value.encode(encoding, errors) self.write_short(len(value)) self.write(value)
[docs] def read(self, size: int=-1) -> bytes: """Reads specified number of bytes, or all remaining data. """ if size < 0: size = self.buffer_size - self.pos self._check_space(size) result = self.raw[self.pos: self.pos + size] self.pos += size return result
[docs] def read_number(self, size: int, *, signed=False) -> int: """Read number with specified size in bytes. """ self._check_space(size) result = (0).from_bytes(self.raw[self.pos: self.pos + size], self.byteorder.value, signed=signed) self.pos += size return result
[docs] def read_byte(self, *, signed: bool=False) -> int: """Read 1 byte number (c_ubyte). """ return self.read_number(1, signed=signed)
[docs] def read_short(self, *, signed: bool=False) -> int: """Read 2 byte number (c_ushort). """ return self.read_number(2, signed=signed)
[docs] def read_int(self, *, signed: bool=False) -> int: """Read 4 byte number (c_uint). """ return self.read_number(4, signed=signed)
[docs] def read_bigint(self, *, signed: bool=False) -> int: """Read 8 byte number (c_ulonglong). """ return self.read_number(8, signed=signed)
[docs] def read_sized_int(self, *, signed: bool=False) -> int: """Read number cluster (2 byte length followed by data). """ return self.read_number(self.read_short(), signed=signed)
[docs] def read_string(self, *, encoding: str='ascii', errors: str='strict') -> str: """Read null-terminated string. """ i = self.pos while i < self.buffer_size and safe_ord(self.raw[i]) != 0: i += 1 result = self.read(i - self.pos).decode(encoding, errors) self.pos += 1 return result
[docs] def read_pascal_string(self, *, encoding: str='ascii', errors: str='strict') -> str: """Read Pascal string (1 byte length followed by string data). """ return self.read(self.read_byte()).decode(encoding, errors)
[docs] def read_sized_string(self, *, encoding: str='ascii', errors: str='strict') -> str: """Read string (2 byte length followed by data). """ return self.read(self.read_short()).decode(encoding, errors)
[docs] def read_bytes(self) -> bytes: """Read content of binary cluster (2 bytes data length followed by data). """ return self.read(self.read_short())
# Properties @property def buffer_size(self) -> int: """Current buffer size in bytes. """ return len(self.raw) @property def last_data(self) -> int: """Index of first non-zero byte when searched from the end of buffer. """ i = len(self.raw) - 1 while i >= 0: if safe_ord(self.raw[i]) != 0: break i -= 1 return i