Source code for websocket.stream.buffer

import asyncio

import logging

logger = logging.getLogger(__name__)


[docs]class Buffer: """ """ def __init__(self, limit, loop): """Creates a new asynchronous buffer for reading and writing. :param limit: The maxsize if the buffer :type limit: int :param loop: The running event loop """ self.backing = bytearray(limit) self._loop = loop self.read_available = 0 self.write_available = limit self.limit = limit self.read_head = 0 self.write_head = 0 self.read_signal = asyncio.Event(loop=loop) self.write_signal = asyncio.Event(loop=loop) self.eof = False self.exc = None
[docs] async def write(self, data): """Write some data to the buffer, length must not exceed the buffer limit, or else it will block forever. :param data: The data to write :type data: iterable of bytes """ length = len(data) while self.write_available < length: await self.read_signal.wait() tail = self.limit - self.write_head if tail < length: self.backing[self.write_head:] = data[:tail] self.backing[:length - tail] = data[tail:] self.write_head = length - tail else: self.backing[self.write_head:self.write_head + length] = data self.write_head += length self.read_available += length self.write_available -= length self.write_signal.set() self.write_signal.clear()
[docs] async def read(self, n=-1, chunksize=None): """Read data from the buffer. Reads until eof or n bytes. :param n: The amount of data to read. :return: A :class:`bytearray` with the data """ if chunksize is None: chunksize = self.limit//8 if n < 0: buffer = bytearray(chunksize) result = bytearray() while not self.eof: read = await self.read_into(buffer, chunksize) result.extend(buffer[:read]) return result else: read = 0 result = bytearray(n) while n - read > chunksize: read += await self.read_into(result, chunksize, offset=read) if n > 0: read += await self.read_into(result, n, offset=read) return result
[docs] async def read_into_exactly(self, buffer, n, offset=0): """Read data from the buffer into a bytearray. Reads n bytes or throws an exception if reading eof. n must not exceed the buffer limit, or else it will block forever. :param buffer: The bytearray to write the data into. :param n: The amount of data to read. """ await self._wait_for_read(n) if self.eof: if self.read_available < n: raise IncompleteReadError(f"{self.read_available} bytes available of {n} expected bytes") await self._read_into(buffer, n, offset=offset)
[docs] async def read_into(self, buffer, n, offset=0): """Read data from the buffer into a bytearray. Reads until eof or n bytes. n must not exceed the buffer limit, or else it will block forever. :param buffer: The bytearray to write the data into. :param n: The amount of data to read. :param offset: The index into buffer to write to. """ await self._wait_for_read(n) if self.eof: n = min(self.read_available, n) return await self._read_into(buffer, n, offset=offset)
async def _wait_for_read(self, n): while self.read_available < n and not self.eof and not self.exc: await self.write_signal.wait() if self.exc: raise self.exc async def _read_into(self, buffer, n, offset=0): if n == 0: return n tail = self.read_head + n if tail > self.limit: remaining = self.limit - self.read_head buffer[offset:offset + remaining] = self.backing[self.read_head:self.limit] buffer[offset + remaining:n] = self.backing[:n - remaining] self.read_head = n - remaining else: buffer[offset:offset + n] = self.backing[self.read_head:tail] self.read_head = tail self.read_available -= n self.write_available += n self.read_signal.set() self.read_signal.clear() return n
[docs] def feed_eof(self): """Feed the buffer with `end of file`""" self.eof = True self.write_available = 0 self.write_signal.set()
[docs] def empty(self): """ :return: True iff there is no more data to read. """ return self.read_available == 0
[docs] def at_eof(self): """ :return: True iff there is no more data to read AND we have been fed `end of file`. """ return self.eof and self.read_available == 0
[docs] def set_exception(self, exc): """Set an exception to raise at next read.""" self.exc = exc self.write_available = 0 self.write_signal.set()
class IncompleteReadError(Exception): pass