import asyncio
import logging
import signal
from typing import Optional, Dict, Any
from zonis import (
Packet,
Router,
RouteHandler,
UnknownPacket,
UnhandledWebsocketType,
)
from zonis.packet import (
RequestPacket,
IdentifyDataPacket,
ClientToServerPacket,
)
log = logging.getLogger(__name__)
[docs]class Client(RouteHandler):
"""
Parameters
----------
reconnect_attempt_count: :class:`int`
The number of times that the :class:`Client` should
attempt to reconnect.
url: :class:`str`
Defaults to ``ws://localhost``.
port: Optional[:class:`int`]
The port that the :class:`Client` should use.
"""
def __init__(
self,
*,
reconnect_attempt_count: int = 1,
url: str = "ws://localhost",
port: Optional[int] = None,
identifier: str = "DEFAULT",
secret_key: str = "",
override_key: Optional[str] = None,
) -> None:
super().__init__()
url = f"{url}:{port}" if port else url
url = (
f"ws://{url}"
if not url.startswith("ws://") and not url.startswith("wss://")
else url
)
self._url: str = url
self.identifier: str = identifier
self._reconnect_attempt_count: int = reconnect_attempt_count
self._connection_future: asyncio.Future = asyncio.Future()
self._secret_key: str = secret_key
self._override_key: str = override_key
self.__is_open: bool = True
self.__current_ws = None
self.__task: Optional[asyncio.Task] = None
self._instance_mapping: Dict[str, Any] = {}
self.router: Router = Router(self.identifier, None).register_receiver(
self._request_handler
)
# https://github.com/gearbot/GearBot/blob/live/GearBot/GearBot.py
try:
for signame in ("SIGINT", "SIGTERM", "SIGKILL"):
asyncio.get_event_loop().add_signal_handler(
getattr(signal, signame),
lambda: asyncio.ensure_future(self.close()),
)
except Exception as e:
pass # doesn't work on windows
[docs] async def block_until_closed(self):
"""A blocking call which releases when the WS closes."""
await self.router.block_until_closed()
[docs] async def start(self) -> None:
"""Start the IPC client."""
self.load_routes()
await self.router.connect_client(
self._url,
idp=IdentifyDataPacket(
secret_key=self._secret_key, override_key=self._override_key
),
)
log.info(
"Successfully connected to the server with identifier %s",
self.identifier,
)
[docs] async def close(self) -> None:
"""Stop the IPC client."""
await self.router.close()
log.info("Successfully closed the client")
async def _request_handler(self, packet_data, resolution_handler):
data: RequestPacket = packet_data["data"]
route_name = data["route"]
if route_name not in self._routes:
await resolution_handler(
data=Packet(
identifier=self.identifier,
type="FAILURE_RESPONSE",
data=f"{route_name} is not a valid route name.",
)
)
return
if route_name in self._instance_mapping:
result = await self._routes[route_name](
self._instance_mapping[route_name],
**data["arguments"],
)
else:
result = await self._routes[route_name](**data["arguments"])
await resolution_handler(
data=Packet(
identifier=self.identifier,
type="SUCCESS_RESPONSE",
data=result,
)
)
[docs] async def request(self, route: str, **kwargs):
"""Make a request to the server"""
request_future: asyncio.Future = await self.router.send(
ClientToServerPacket(
identifier=self.identifier,
type="CLIENT_REQUEST",
data=RequestPacket(route=route, arguments=kwargs),
)
)
data: Packet = await request_future
if "type" not in data:
log.debug("Failed to resolve packet type for %s", data)
raise UnknownPacket
if "data" not in data:
log.debug(
"Failed to resolve packet as it was missing the 'data' field: %s",
data,
)
raise UnknownPacket
if data["type"] != "SUCCESS_RESPONSE":
raise UnhandledWebsocketType(
f"Client.request expected a packet of type "
f"SUCCESS_RESPONSE. Received {data['type']}"
)
return data["data"]