Files
kittycad.py/kittycad/api/executor/create_executor_term.py
Jess Frazelle bf0710f0e6 upgrade pydantic (#266)
* upgrade pydantic

Signed-off-by: Jess Frazelle <github@jessfraz.com>

* updates

Signed-off-by: Jess Frazelle <github@jessfraz.com>

* update other deps

Signed-off-by: Jess Frazelle <github@jessfraz.com>

* update other deps

Signed-off-by: Jess Frazelle <github@jessfraz.com>

* ruff

Signed-off-by: Jess Frazelle <github@jessfraz.com>

* bump more deps

Signed-off-by: Jess Frazelle <github@jessfraz.com>

* update

Signed-off-by: Jess Frazelle <github@jessfraz.com>

* format

Signed-off-by: Jess Frazelle <github@jessfraz.com>

* bump

Signed-off-by: Jess Frazelle <github@jessfraz.com>

---------

Signed-off-by: Jess Frazelle <github@jessfraz.com>
2024-09-10 12:52:57 -07:00

60 lines
1.4 KiB
Python

from typing import Any, Dict
from websockets.client import WebSocketClientProtocol, connect as ws_connect_async
from websockets.sync.client import ClientConnection, connect as ws_connect
from ...client import Client
def _get_kwargs(
*,
client: Client,
) -> Dict[str, Any]:
url = "{}/ws/executor/term".format(client.base_url) # noqa: E501
headers: Dict[str, Any] = client.get_headers()
cookies: Dict[str, Any] = client.get_cookies()
return {
"url": url,
"headers": headers,
"cookies": cookies,
"timeout": client.get_timeout(),
}
def sync(
*,
client: Client,
) -> ClientConnection:
"""Attach to a docker container to create an interactive terminal.""" # noqa: E501
kwargs = _get_kwargs(
client=client,
)
return ws_connect(
kwargs["url"].replace("http", "ws"),
additional_headers=kwargs["headers"],
close_timeout=120,
max_size=None,
) # type: ignore
async def asyncio(
*,
client: Client,
) -> WebSocketClientProtocol:
"""Attach to a docker container to create an interactive terminal.""" # noqa: E501
kwargs = _get_kwargs(
client=client,
)
return await ws_connect_async(
kwargs["url"].replace("http", "ws"),
extra_headers=kwargs["headers"],
close_timeout=120,
max_size=None,
)