from typing import Any, Dict, Optional, Union, List import json import bson from websockets.sync.client import connect as ws_connect from websockets.client import connect as ws_connect_async from websockets.sync.client import ClientConnection from websockets.client import WebSocketClientProtocol from ...client import Client from ...models.error import Error from ...types import Response {% for import in imports %} {{import}} {% endfor %} def _get_kwargs( {% for arg in args %} {% if arg.in_query %} {% if arg.is_optional == False %} {{arg.name}}: {{arg.type}}, {% endif %} {% endif %} {% endfor %} *, client: Client, {% for arg in args %} {% if arg.in_query %} {% if arg.is_optional %} {{arg.name}}: {{arg.type}}, {% endif %} {% endif %} {% endfor %} ) -> Dict[str, Any]: url = "{{url_template}}".format(client.base_url{% for arg in args %}{% if arg.in_url %}, {{arg.name}}={{arg.name}}{% endif %}{% endfor %}) # noqa: E501 {% for arg in args %} {% if arg.in_query %} if {{arg.name}} is not None: {% if arg.type == "bool" %} if "?" in url: url = url + "&{{arg.name}}=" + str({{arg.name}}).lower() else: url = url + "?{{arg.name}}=" + str({{arg.name}}).lower() {% else %} if "?" in url: url = url + "&{{arg.name}}=" + str({{arg.name}}) else: url = url + "?{{arg.name}}=" + str({{arg.name}}) {% endif %} {% endif %} {% endfor %} headers: Dict[str, Any] = client.get_headers() cookies: Dict[str, Any] = client.get_cookies() return { "url": url, "headers": headers, "cookies": cookies, "timeout": client.get_timeout(), } def sync( {% for arg in args %} {% if arg.in_query %} {% if arg.is_optional == False %} {{arg.name}}: {{arg.type}}, {% endif %} {% endif %} {% endfor %} *, client: Client, {% for arg in args %} {% if arg.in_query %} {% if arg.is_optional %} {{arg.name}}: {{arg.type}}, {% endif %} {% endif %} {% endfor %} ) -> ClientConnection: {%if docs%}"""{{docs}}""" # noqa: E501{% endif %} kwargs = _get_kwargs( {% for arg in args %} {% if arg.in_query %} {{arg.name}}={{arg.name}}, {% endif %} {% endfor %} client=client, ) with ws_connect(kwargs["url"].replace("https://", "wss://"), additional_headers=kwargs["headers"]) as websocket: return websocket # type: ignore # Return an error if we got here. return Error(message="An error occurred while connecting to the websocket.") async def asyncio( {% for arg in args %} {% if arg.in_query %} {% if arg.is_optional == False %} {{arg.name}}: {{arg.type}}, {% endif %} {% endif %} {% endfor %} *, client: Client, {% for arg in args %} {% if arg.in_query %} {% if arg.is_optional %} {{arg.name}}: {{arg.type}}, {% endif %} {% endif %} {% endfor %} ) -> WebSocketClientProtocol: {%if docs%}"""{{docs}}""" # noqa: E501{% endif %} kwargs = _get_kwargs( {% for arg in args %} {% if arg.in_query %} {{arg.name}}={{arg.name}}, {% endif %} {% endfor %} client=client, ) async with ws_connect_async(kwargs["url"].replace("https://", "wss://"), extra_headers=kwargs["headers"]) as websocket: return websocket # Return an error if we got here. return Error(message="An error occurred while connecting to the websocket.") {% if has_request_body %} class WebSocket: """A websocket connection to the API endpoint.""" ws: ClientConnection def __init__(self, {% for arg in args %} {% if arg.in_query %} {% if arg.is_optional == False %} {{arg.name}}: {{arg.type}}, {% endif %} {% endif %} {% endfor %} client: Client, {% for arg in args %} {% if arg.in_query %} {% if arg.is_optional %} {{arg.name}}: {{arg.type}}, {% endif %} {% endif %} {% endfor %} ): self.ws = sync( {% for arg in args %} {% if arg.in_query %} {{arg.name}}, {% endif %} {% endfor %} client=client, ) def send(self, data:{% for arg in args %}{%if arg.name == "body" %}{{arg.type}}{% endif %}{% endfor %}): """Send data to the websocket.""" self.ws.send(json.dumps(data.to_dict())) def send_binary(self, data:{% for arg in args %}{%if arg.name == "body" %}{{arg.type}}{% endif %}{% endfor %}): """Send data as bson to the websocket.""" self.ws.send(bson.BSON.encode(data.to_dict())) def recv(self) -> {{response_type}}: """Receive data from the websocket.""" message = self.ws.recv() return {{response_type}}.from_dict(json.loads(message)) {%endif%}