from typing import Iterator, Any, Dict, Optional, Union, List import json import bson from websockets.sync.client import connect as ws_connect from websockets.asyncio.client import connect as ws_connect_async from websockets.sync.client import ClientConnection as ClientConnectionSync from websockets.asyncio.client import ClientConnection as ClientConnectionAsync from ...client import Client from ...models.error import Error from ...types import Response {% for import in imports %} {{import}} {% endfor %} def _get_kwargs( {% for arg in args %} {% if arg.in_query %} {% if arg.is_optional == False %} {{arg.name}}: {{arg.type}}, {% endif %} {% endif %} {% endfor %} *, client: Client, {% for arg in args %} {% if arg.in_query %} {% if arg.is_optional %} {{arg.name}}: {{arg.type}}, {% endif %} {% endif %} {% endfor %} ) -> Dict[str, Any]: url = "{{url_template}}".format(client.base_url{% for arg in args %}{% if arg.in_url %}, {{arg.name}}={{arg.name}}{% endif %}{% endfor %}) # noqa: E501 {% for arg in args %} {% if arg.in_query %} if {{arg.name}} is not None: {% if arg.type == "bool" %} if "?" in url: url = url + "&{{arg.name}}=" + str({{arg.name}}).lower() else: url = url + "?{{arg.name}}=" + str({{arg.name}}).lower() {% else %} if "?" in url: url = url + "&{{arg.name}}=" + str({{arg.name}}) else: url = url + "?{{arg.name}}=" + str({{arg.name}}) {% endif %} {% endif %} {% endfor %} headers: Dict[str, Any] = client.get_headers() cookies: Dict[str, Any] = client.get_cookies() return { "url": url, "headers": headers, "cookies": cookies, "timeout": client.get_timeout(), } def sync( {% for arg in args %} {% if arg.in_query %} {% if arg.is_optional == False %} {{arg.name}}: {{arg.type}}, {% endif %} {% endif %} {% endfor %} *, client: Client, {% for arg in args %} {% if arg.in_query %} {% if arg.is_optional %} {{arg.name}}: {{arg.type}}, {% endif %} {% endif %} {% endfor %} ) -> ClientConnectionSync: {%if docs%}"""{{docs}}""" # noqa: E501{% endif %} kwargs = _get_kwargs( {% for arg in args %} {% if arg.in_query %} {{arg.name}}={{arg.name}}, {% endif %} {% endfor %} client=client, ) return ws_connect(kwargs["url"].replace("http", "ws"), additional_headers=kwargs["headers"], close_timeout=120, max_size=None) # type: ignore async def asyncio( {% for arg in args %} {% if arg.in_query %} {% if arg.is_optional == False %} {{arg.name}}: {{arg.type}}, {% endif %} {% endif %} {% endfor %} *, client: Client, {% for arg in args %} {% if arg.in_query %} {% if arg.is_optional %} {{arg.name}}: {{arg.type}}, {% endif %} {% endif %} {% endfor %} ) -> ClientConnectionAsync: {%if docs%}"""{{docs}}""" # noqa: E501{% endif %} kwargs = _get_kwargs( {% for arg in args %} {% if arg.in_query %} {{arg.name}}={{arg.name}}, {% endif %} {% endfor %} client=client, ) return await ws_connect_async(kwargs["url"].replace("http", "ws"), extra_headers=kwargs["headers"], close_timeout=120, max_size=None) {% if has_request_body %} class WebSocket: """A websocket connection to the API endpoint.""" ws: ClientConnectionSync def __init__(self, {% for arg in args %} {% if arg.in_query %} {% if arg.is_optional == False %} {{arg.name}}: {{arg.type}}, {% endif %} {% endif %} {% endfor %} client: Client, {% for arg in args %} {% if arg.in_query %} {% if arg.is_optional %} {{arg.name}}: {{arg.type}}, {% endif %} {% endif %} {% endfor %} ): self.ws = sync( {% for arg in args %} {% if arg.in_query %} {% if arg.is_optional == False %} {{arg.name}}, {% endif %} {% endif %} {% endfor %} client=client, {% for arg in args %} {% if arg.in_query %} {% if arg.is_optional %} {{arg.name}}={{arg.name}}, {% endif %} {% endif %} {% endfor %} ) def __enter__(self, ): return self def __exit__(self, exc_type, exc_value, traceback): self.close() def __iter__(self) -> Iterator[{{response_type}}]: """ Iterate on incoming messages. The iterator calls :meth:`recv` and yields messages in an infinite loop. It exits when the connection is closed normally. It raises a :exc:`~websockets.exceptions.ConnectionClosedError` exception after a protocol error or a network failure. """ for message in self.ws: yield {{response_type}}(**json.loads(message)) def send(self, data:{% for arg in args %}{%if arg.name == "body" %}{{arg.type}}{% endif %}{% endfor %}): """Send data to the websocket.""" self.ws.send(json.dumps(data.model_dump())) def send_binary(self, data:{% for arg in args %}{%if arg.name == "body" %}{{arg.type}}{% endif %}{% endfor %}): """Send data as bson to the websocket.""" self.ws.send(bson.encode(data.model_dump())) # type: ignore def recv(self) -> {{response_type}}: """Receive data from the websocket.""" message = self.ws.recv(timeout=60) return {{response_type}}(**json.loads(message)) def close(self): """Close the websocket.""" self.ws.close() {%endif%}