Files
kittycad.py/kittycad/api/executor/create_executor_term.py
dependabot[bot] 779a1c3458 Bump websockets from 13.1 to 14.1 (#319)
* Bump websockets from 13.1 to 14.1

Bumps [websockets](https://github.com/python-websockets/websockets) from 13.1 to 14.1.
- [Release notes](https://github.com/python-websockets/websockets/releases)
- [Commits](https://github.com/python-websockets/websockets/compare/13.1...14.1)

---
updated-dependencies:
- dependency-name: websockets
  dependency-type: direct:production
  update-type: version-update:semver-major
...

Signed-off-by: dependabot[bot] <support@github.com>

* use Python 3.9 only

* update deprecated websocket connection

* generate client

* ignore .vscode/

---------

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Greg Sweeney <greg@kittycad.io>
2024-11-26 13:36:23 -05:00

66 lines
1.4 KiB
Python

from typing import Any, Dict
from websockets.asyncio.client import (
ClientConnection as ClientConnectionAsync,
connect as ws_connect_async,
)
from websockets.sync.client import (
ClientConnection as ClientConnectionSync,
connect as ws_connect,
)
from ...client import Client
def _get_kwargs(
*,
client: Client,
) -> Dict[str, Any]:
url = "{}/ws/executor/term".format(client.base_url) # noqa: E501
headers: Dict[str, Any] = client.get_headers()
cookies: Dict[str, Any] = client.get_cookies()
return {
"url": url,
"headers": headers,
"cookies": cookies,
"timeout": client.get_timeout(),
}
def sync(
*,
client: Client,
) -> ClientConnectionSync:
"""Attach to a docker container to create an interactive terminal.""" # noqa: E501
kwargs = _get_kwargs(
client=client,
)
return ws_connect(
kwargs["url"].replace("http", "ws"),
additional_headers=kwargs["headers"],
close_timeout=120,
max_size=None,
) # type: ignore
async def asyncio(
*,
client: Client,
) -> ClientConnectionAsync:
"""Attach to a docker container to create an interactive terminal.""" # noqa: E501
kwargs = _get_kwargs(
client=client,
)
return await ws_connect_async(
kwargs["url"].replace("http", "ws"),
extra_headers=kwargs["headers"],
close_timeout=120,
max_size=None,
)