Source code for zonis.ws_impls
from typing import Protocol, runtime_checkable
[docs]@runtime_checkable
class WebsocketProtocol(Protocol):
"""The protocol underlying websockets should be exposed via."""
def __init__(self, ws):
raise NotImplementedError
[docs] async def send(self, content: str) -> None:
"""Send a message down the wire.
Parameters
----------
content: str
The content to send down the wire.
"""
raise NotImplementedError
[docs] async def receive(self) -> str | bytes | dict:
"""Listen for a message on the wire.
Returns
-------
str|bytes|dict
The content received
"""
raise NotImplementedError
[docs]class FastAPIWebsockets:
"""A websocket implementation wrapping FastAPI websockets"""
def __init__(self, ws):
self.ws = ws