Source code for websocket.stream.fragment

"""
You should not make an instance of the FragmentContext class yourself, rather you should only 
get instances through :meth:`websocket.stream.writer.WebSocketWriter.fragment`

>>> async with client.writer.fragment() as stream:
...     stream.send('Hello ')
...     stream.send("World!")
"""
import asyncio
import logging

from ..enums import DataType

logger = logging.getLogger(__name__)


[docs]class FragmentContext: """A context manager that can send fragments to a client.""" class Break(Exception): pass def __init__(self, writer, loop): self.loop = loop self.writer = writer self.data_type = None self.previous_fragment = None # We need to track this so that we can set the fin bit on the last fragment. self.push_task = None self.first_write = True async def _push(self, fragment, fin=False): if self.push_task is not None and not self.push_task.done(): await self.push_task self.push_task = asyncio.ensure_future(self._write(fragment, fin), loop=self.loop) async def _write(self, fragment, fin): op_code = 0 if self.first_write: logger.debug(f"Start fragment _write: fin = {fin}") op_code = self.data_type.value self.first_write = False else: logger.debug(f"Fragment continuation _write: fin = {fin}") self.writer.write_frame((op_code | fin << 7).to_bytes(1, 'big'), fragment, len(fragment)) await self.writer.writer.drain()
[docs] async def send(self, data, force=False): """Que a message to be sent. :param data: The data you with to send, must be either :class:`str` or :class:`bytes`. :type data: either str or bytes :param force: If true send message even if the connection is closing e.g. we got valid message after having previously been sent a close frame from the client or after having received invalid frame(s) :type force: bool """ if not self.writer.ensure_open(force): raise self.Break if self.data_type is None: if isinstance(data, str): self.data_type = DataType.TEXT else: self.data_type = DataType.BINARY if self.previous_fragment is not None: await self._push(self.previous_fragment) logger.debug("Queuing fragment") self.previous_fragment = data
async def finish_send(self): if self.previous_fragment is not None: await self._push(self.previous_fragment, fin=True) await self.push_task async def __aenter__(self): """Enter the context manager""" await self.writer.write_lock return self async def __aexit__(self, exc_type, exc_val, exc_tb): """Exit the context manager""" self.writer.write_lock.release() if exc_type == self.Break: return True await self.finish_send()