update paths

Signed-off-by: Jess Frazelle <github@jessfraz.com>
This commit is contained in:
Jess Frazelle
2022-02-27 21:13:56 -08:00
parent 2a9ae9dc8d
commit 45ac31914d
16 changed files with 1288 additions and 604 deletions

View File

@ -6,6 +6,7 @@ import re
package_name = 'kittycad' package_name = 'kittycad'
def main(): def main():
cwd = os.getcwd() cwd = os.getcwd()
path = os.path.join(cwd, 'spec.json') path = os.path.join(cwd, 'spec.json')
@ -20,6 +21,433 @@ def main():
generateTypes(cwd, parser) generateTypes(cwd, parser)
# Generate the paths. # Generate the paths.
generatePaths(cwd, parser)
def generatePaths(cwd: str, parser: OpenApiParser):
# Make sure we have the directory.
path = os.path.join(cwd, 'kittycad', 'api')
os.makedirs(path, exist_ok=True)
# Open the __init__.py file.
file_name = '__init__.py'
file_path = os.path.join(path, file_name)
f = open(file_path, 'w')
f.write("\"\"\" Contains methods for accessing the API \"\"\"\n")
# Close the file.
f.close()
# Generate the directory/__init__.py for each of the tags.
tags = parser.data['tags']
for tag in tags:
tag_name = tag['name']
tag_description = tag['description']
tag_path = os.path.join(path, tag_name)
# Esnure the directory exists.
os.makedirs(tag_path, exist_ok=True)
# Open the __init__.py file.
file_name = '__init__.py'
file_path = os.path.join(tag_path, file_name)
f = open(file_path, 'w')
f.write(
"\"\"\" Contains methods for accessing the " +
tag_name +
" API paths: " +
tag_description +
" \"\"\"\n")
# Close the file.
f.close()
# Generate the paths.
data = parser.data
paths = data['paths']
for p in paths:
for method in paths[p]:
endpoint = paths[p][method]
generatePath(path, p, method, endpoint)
def generatePath(path: str, name: str, method: str, endpoint: dict):
# Generate the path.
file_name = camel_to_snake(endpoint['operationId']) + '.py'
# Add the tag to the path if it exists.
if 'tags' in endpoint:
tag_name = endpoint['tags'][0]
path = os.path.join(path, tag_name)
file_path = os.path.join(path, file_name)
print("generating type: ", name, " at: ", file_path)
print(" endpoint: ", [endpoint])
f = open(file_path, "w")
endoint_refs = getEndpointRefs(endpoint)
parameter_refs = getParameterRefs(endpoint)
request_body_refs = getRequestBodyRefs(endpoint)
# Add our imports.
f.write("from typing import Any, Dict, Optional, Union\n")
f.write("\n")
f.write("import httpx\n")
f.write("\n")
f.write("from ...client import Client\n")
# Import our references for responses.
for ref in endoint_refs:
f.write(
"from ...models." +
camel_to_snake(ref) +
" import " +
ref +
"\n")
for ref in parameter_refs:
f.write(
"from ...models." +
camel_to_snake(ref) +
" import " +
ref +
"\n")
for ref in request_body_refs:
f.write(
"from ...models." +
camel_to_snake(ref) +
" import " +
ref +
"\n")
f.write("from ...types import Response\n")
f.write("\n")
# Define the method.
f.write("def _get_kwargs(\n")
# Iterate over the parameters.
if 'parameters' in endpoint:
parameters = endpoint['parameters']
for parameter in parameters:
parameter_name = parameter['name']
if 'type' in parameter['schema']:
parameter_type = parameter['schema']['type'].replace(
'string', 'str')
elif '$ref' in parameter['schema']:
parameter_type = parameter['schema']['$ref'].replace(
'#/components/schemas/', '')
else:
print(" parameter: ", parameter)
raise Exception("Unknown parameter type")
f.write(
"\t" +
camel_to_snake(parameter_name) +
": " +
parameter_type +
",\n")
f.write("*, client: Client) -> Dict[str, Any]:\n")
f.write("\turl = \"{}" + name + "\".format(client.base_url,\n")
# Iterate over the parameters.
if 'parameters' in endpoint:
parameters = endpoint['parameters']
for parameter in parameters:
parameter_name = parameter['name']
if 'type' in parameter['schema']:
parameter_type = parameter['schema']['type'].replace(
'string', 'str')
elif '$ref' in parameter['schema']:
parameter_type = parameter['schema']['$ref'].replace(
'#/components/schemas/', '')
else:
print(" parameter: ", parameter)
raise Exception("Unknown parameter type")
f.write(
"\t" +
parameter_name +
"=" +
camel_to_snake(parameter_name) +
",\n")
f.write("\t)\n")
f.write("\n")
f.write("\theaders: Dict[str, Any] = client.get_headers()\n")
f.write("\tcookies: Dict[str, Any] = client.get_cookies()\n")
f.write("\n")
f.write("\treturn {\n")
f.write("\t\t\"url\": url,\n")
f.write("\t\t\"headers\": headers,\n")
f.write("\t\t\"cookies\": cookies,\n")
f.write("\t\t\"timeout\": client.get_timeout(),\n")
f.write("\t}\n")
# Define the parse reponse.
f.write("\n")
f.write("\n")
f.write(
"def _parse_response(*, response: httpx.Response) -> Optional[Union[Any, " +
", ".join(endoint_refs) +
"]]:\n")
# Iterate over the responses.
responses = endpoint['responses']
for response_code in responses:
response = responses[response_code]
f.write("\tif response.status_code == " + response_code + ":\n")
if 'content' in response:
content = response['content']
for content_type in content:
if content_type == 'application/json':
json = content[content_type]['schema']
if '$ref' in json:
ref = json['$ref'].replace('#/components/schemas/', '')
f.write(
"\t\tresponse_" +
response_code +
" = " +
ref +
".from_dict(response.json())\n")
else:
f.write("\t\tresponse_" + response_code + " = None\n")
f.write("\t\treturn response_" + response_code + "\n")
# End the method.
f.write("\treturn None\n")
# Define the build response method.
f.write("\n")
f.write("\n")
f.write(
"def _build_response(*, response: httpx.Response) -> Response[Union[Any, " +
", ".join(endoint_refs) +
"]]:\n")
f.write("\treturn Response(\n")
f.write("\t\tstatus_code=response.status_code,\n")
f.write("\t\tcontent=response.content,\n")
f.write("\t\theaders=response.headers,\n")
f.write("\t\tparsed=_parse_response(response=response),\n")
f.write("\t)\n")
# Define the sync_detailed method.
f.write("\n")
f.write("\n")
f.write(
"def sync_detailed(\n")
# Iterate over the parameters.
if 'parameters' in endpoint:
parameters = endpoint['parameters']
for parameter in parameters:
parameter_name = parameter['name']
if 'type' in parameter['schema']:
parameter_type = parameter['schema']['type'].replace(
'string', 'str')
elif '$ref' in parameter['schema']:
parameter_type = parameter['schema']['$ref'].replace(
'#/components/schemas/', '')
else:
print(" parameter: ", parameter)
raise Exception("Unknown parameter type")
f.write(
"\t" +
camel_to_snake(parameter_name) +
": " +
parameter_type +
",\n")
f.write("*, client: Client) -> Response[Union[Any, " +
", ".join(endoint_refs) +
"]]:\n")
f.write("\tkwargs = _get_kwargs(\n")
# Iterate over the parameters.
if 'parameters' in endpoint:
parameters = endpoint['parameters']
for parameter in parameters:
parameter_name = parameter['name']
if 'type' in parameter['schema']:
parameter_type = parameter['schema']['type'].replace(
'string', 'str')
elif '$ref' in parameter['schema']:
parameter_type = parameter['schema']['$ref'].replace(
'#/components/schemas/', '')
else:
print(" parameter: ", parameter)
raise Exception("Unknown parameter type")
f.write(
"\t" +
camel_to_snake(parameter_name) +
"=" +
camel_to_snake(parameter_name) +
",\n")
f.write("\t\tclient=client,\n")
f.write("\t)\n")
f.write("\n")
f.write("\tresponse = httpx." + method + "(\n")
f.write("\t\tverify=client.verify_ssl,\n")
f.write("\t\t**kwargs,\n")
f.write("\t)\n")
f.write("\n")
f.write("\treturn _build_response(response=response)\n")
# Define the sync method.
f.write("\n")
f.write("\n")
f.write(
"def sync(\n")
# Iterate over the parameters.
if 'parameters' in endpoint:
parameters = endpoint['parameters']
for parameter in parameters:
parameter_name = parameter['name']
if 'type' in parameter['schema']:
parameter_type = parameter['schema']['type'].replace(
'string', 'str')
elif '$ref' in parameter['schema']:
parameter_type = parameter['schema']['$ref'].replace(
'#/components/schemas/', '')
else:
print(" parameter: ", parameter)
raise Exception("Unknown parameter type")
f.write(
"\t" +
camel_to_snake(parameter_name) +
": " +
parameter_type +
",\n")
f.write("*, client: Client) -> Optional[Union[Any, " +
", ".join(endoint_refs) +
"]]:\n")
if 'description' in endpoint:
f.write("\t\"\"\" " + endpoint['description'] + " \"\"\"\n")
f.write("\n")
f.write("\treturn sync_detailed(\n")
# Iterate over the parameters.
if 'parameters' in endpoint:
parameters = endpoint['parameters']
for parameter in parameters:
parameter_name = parameter['name']
if 'type' in parameter['schema']:
parameter_type = parameter['schema']['type'].replace(
'string', 'str')
elif '$ref' in parameter['schema']:
parameter_type = parameter['schema']['$ref'].replace(
'#/components/schemas/', '')
else:
print(" parameter: ", parameter)
raise Exception("Unknown parameter type")
f.write(
"\t" +
camel_to_snake(parameter_name) +
"=" +
camel_to_snake(parameter_name) +
",\n")
f.write("\t\tclient=client,\n")
f.write("\t).parsed\n")
# Define the asyncio_detailed method.
f.write("\n")
f.write("\n")
f.write(
"async def asyncio_detailed(\n")
# Iterate over the parameters.
if 'parameters' in endpoint:
parameters = endpoint['parameters']
for parameter in parameters:
parameter_name = parameter['name']
if 'type' in parameter['schema']:
parameter_type = parameter['schema']['type'].replace(
'string', 'str')
elif '$ref' in parameter['schema']:
parameter_type = parameter['schema']['$ref'].replace(
'#/components/schemas/', '')
else:
print(" parameter: ", parameter)
raise Exception("Unknown parameter type")
f.write(
"\t" +
camel_to_snake(parameter_name) +
": " +
parameter_type +
",\n")
f.write("*, client: Client) -> Response[Union[Any, " +
", ".join(endoint_refs) +
"]]:\n")
f.write("\tkwargs = _get_kwargs(\n")
# Iterate over the parameters.
if 'parameters' in endpoint:
parameters = endpoint['parameters']
for parameter in parameters:
parameter_name = parameter['name']
if 'type' in parameter['schema']:
parameter_type = parameter['schema']['type'].replace(
'string', 'str')
elif '$ref' in parameter['schema']:
parameter_type = parameter['schema']['$ref'].replace(
'#/components/schemas/', '')
else:
print(" parameter: ", parameter)
raise Exception("Unknown parameter type")
f.write(
"\t" +
camel_to_snake(parameter_name) +
"=" +
camel_to_snake(parameter_name) +
",\n")
f.write("\t\tclient=client,\n")
f.write("\t)\n")
f.write("\n")
f.write("\tasync with httpx.AsyncClient(verify=client.verify_ssl) as _client:\n")
f.write("\t\tresponse = await _client." + method + "(**kwargs)\n")
f.write("\n")
f.write("\treturn _build_response(response=response)\n")
# Define the asyncio method.
f.write("\n")
f.write("\n")
f.write(
"async def asyncio(\n")
# Iterate over the parameters.
if 'parameters' in endpoint:
parameters = endpoint['parameters']
for parameter in parameters:
parameter_name = parameter['name']
if 'type' in parameter['schema']:
parameter_type = parameter['schema']['type'].replace(
'string', 'str')
elif '$ref' in parameter['schema']:
parameter_type = parameter['schema']['$ref'].replace(
'#/components/schemas/', '')
else:
print(" parameter: ", parameter)
raise Exception("Unknown parameter type")
f.write(
"\t" +
camel_to_snake(parameter_name) +
": " +
parameter_type +
",\n")
f.write("*, client: Client) -> Optional[Union[Any, " +
", ".join(endoint_refs) +
"]]:\n")
if 'description' in endpoint:
f.write("\t\"\"\" " + endpoint['description'] + " \"\"\"\n")
f.write("\n")
f.write("\treturn (await asyncio_detailed(\n")
# Iterate over the parameters.
if 'parameters' in endpoint:
parameters = endpoint['parameters']
for parameter in parameters:
parameter_name = parameter['name']
if 'type' in parameter['schema']:
parameter_type = parameter['schema']['type'].replace(
'string', 'str')
elif '$ref' in parameter['schema']:
parameter_type = parameter['schema']['$ref'].replace(
'#/components/schemas/', '')
else:
print(" parameter: ", parameter)
raise Exception("Unknown parameter type")
f.write(
"\t" +
camel_to_snake(parameter_name) +
"=" +
camel_to_snake(parameter_name) +
",\n")
f.write("\t\tclient=client,\n")
f.write("\t)).parsed\n")
# Close the file.
f.close()
def generateTypes(cwd: str, parser: OpenApiParser): def generateTypes(cwd: str, parser: OpenApiParser):
# Make sure we have the directory. # Make sure we have the directory.
@ -39,11 +467,12 @@ def generateTypes(cwd: str, parser: OpenApiParser):
for key in schemas: for key in schemas:
schema = schemas[key] schema = schemas[key]
generateType(path, key, schema) generateType(path, key, schema)
f.write("from ."+camel_to_snake(key)+" import " + key + "\n") f.write("from ." + camel_to_snake(key) + " import " + key + "\n")
# Close the file. # Close the file.
f.close() f.close()
def generateType(path: str, name: str, schema: dict): def generateType(path: str, name: str, schema: dict):
# Generate the type. # Generate the type.
file_name = camel_to_snake(name) + '.py' file_name = camel_to_snake(name) + '.py'
@ -65,14 +494,19 @@ def generateType(path: str, name: str, schema: dict):
refs = getRefs(schema) refs = getRefs(schema)
for ref in refs: for ref in refs:
f.write("from ..models."+camel_to_snake(ref)+" import "+ref+"\n") f.write(
"from ..models." +
camel_to_snake(ref) +
" import " +
ref +
"\n")
f.write("from ..types import UNSET, Unset\n") f.write("from ..types import UNSET, Unset\n")
f.write("\n") f.write("\n")
f.write("T = TypeVar(\"T\", bound=\""+name+"\")\n") f.write("T = TypeVar(\"T\", bound=\"" + name + "\")\n")
f.write("\n") f.write("\n")
f.write("@attr.s(auto_attribs=True)\n") f.write("@attr.s(auto_attribs=True)\n")
f.write("class "+name+":\n") f.write("class " + name + ":\n")
# Write the description. # Write the description.
f.write("\t\"\"\" \"\"\"\n") f.write("\t\"\"\" \"\"\"\n")
# Iterate over the properties. # Iterate over the properties.
@ -85,27 +519,49 @@ def generateType(path: str, name: str, schema: dict):
if property_type == 'string': if property_type == 'string':
if 'format' in property_schema: if 'format' in property_schema:
if property_schema['format'] == 'date-time': if property_schema['format'] == 'date-time':
f.write("\t"+property_name+": Union[Unset, datetime.datetime] = UNSET\n") f.write(
"\t" +
property_name +
": Union[Unset, datetime.datetime] = UNSET\n")
continue continue
f.write("\t"+property_name+": Union[Unset, str] = UNSET\n") f.write(
"\t" +
property_name +
": Union[Unset, str] = UNSET\n")
elif property_type == 'integer': elif property_type == 'integer':
f.write("\t"+property_name+": Union[Unset, int] = UNSET\n") f.write(
"\t" +
property_name +
": Union[Unset, int] = UNSET\n")
elif property_type == 'number': elif property_type == 'number':
f.write("\t"+property_name+": Union[Unset, float] = UNSET\n") f.write(
"\t" +
property_name +
": Union[Unset, float] = UNSET\n")
elif property_type == 'boolean': elif property_type == 'boolean':
f.write("\t"+property_name+": Union[Unset, bool] = False\n") f.write(
"\t" +
property_name +
": Union[Unset, bool] = False\n")
else: else:
raise (" unknown type: ", property_type) raise (" unknown type: ", property_type)
elif '$ref' in property_schema: elif '$ref' in property_schema:
ref = property_schema['$ref'].replace('#/components/schemas/', '') ref = property_schema['$ref'].replace(
f.write("\t"+property_name+": Union[Unset, "+ref+"] = UNSET\n") '#/components/schemas/', '')
f.write(
"\t" +
property_name +
": Union[Unset, " +
ref +
"] = UNSET\n")
else: else:
raise (" unknown schema: ", property_schema) raise (" unknown schema: ", property_schema)
# Finish writing the class. # Finish writing the class.
f.write("\n") f.write("\n")
f.write("\tadditional_properties: Dict[str, Any] = attr.ib(init=False, factory=dict)\n") f.write(
"\tadditional_properties: Dict[str, Any] = attr.ib(init=False, factory=dict)\n")
# Now let's write the to_dict method. # Now let's write the to_dict method.
f.write("\n") f.write("\n")
@ -120,25 +576,66 @@ def generateType(path: str, name: str, schema: dict):
if property_type == 'string': if property_type == 'string':
if 'format' in property_schema: if 'format' in property_schema:
if property_schema['format'] == 'date-time': if property_schema['format'] == 'date-time':
f.write("\t\t"+property_name+": Union[Unset, str] = UNSET\n") f.write(
f.write("\t\tif not isinstance(self."+property_name+", Unset):\n") "\t\t" +
f.write("\t\t\t"+property_name+" = self."+property_name+".isoformat()\n") property_name +
": Union[Unset, str] = UNSET\n")
f.write(
"\t\tif not isinstance(self." + property_name + ", Unset):\n")
f.write(
"\t\t\t" +
property_name +
" = self." +
property_name +
".isoformat()\n")
continue continue
f.write("\t"+property_name+" = self."+property_name+"\n") f.write(
"\t" +
property_name +
" = self." +
property_name +
"\n")
elif property_type == 'integer': elif property_type == 'integer':
f.write("\t"+property_name+" = self."+property_name+"\n") f.write(
"\t" +
property_name +
" = self." +
property_name +
"\n")
elif property_type == 'number': elif property_type == 'number':
f.write("\t"+property_name+" = self."+property_name+"\n") f.write(
"\t" +
property_name +
" = self." +
property_name +
"\n")
elif property_type == 'boolean': elif property_type == 'boolean':
f.write("\t"+property_name+" = self."+property_name+"\n") f.write(
"\t" +
property_name +
" = self." +
property_name +
"\n")
else: else:
raise (" unknown type: ", property_type) raise (" unknown type: ", property_type)
elif '$ref' in property_schema: elif '$ref' in property_schema:
ref = property_schema['$ref'].replace('#/components/schemas/', '') ref = property_schema['$ref'].replace(
f.write("\t\t"+property_name+": Union[Unset, str] = UNSET\n") '#/components/schemas/', '')
f.write("\t\tif not isinstance(self."+property_name+", Unset):\n") f.write(
f.write("\t\t\t"+property_name+" = self."+property_name+".value\n") "\t\t" +
property_name +
": Union[Unset, str] = UNSET\n")
f.write(
"\t\tif not isinstance(self." +
property_name +
", Unset):\n")
f.write(
"\t\t\t" +
property_name +
" = self." +
property_name +
".value\n")
else: else:
raise (" unknown schema: ", property_schema) raise (" unknown schema: ", property_schema)
@ -148,12 +645,16 @@ def generateType(path: str, name: str, schema: dict):
f.write("\t\tfield_dict.update(self.additional_properties)\n") f.write("\t\tfield_dict.update(self.additional_properties)\n")
f.write("\t\tfield_dict.update({})\n") f.write("\t\tfield_dict.update({})\n")
# Iternate over the properties. # Iternate over the properties.
for property_name in schema['properties']: for property_name in schema['properties']:
# Write the property. # Write the property.
f.write("\t\tif "+property_name+" is not UNSET:\n") f.write("\t\tif " + property_name + " is not UNSET:\n")
f.write("\t\t\tfield_dict['"+property_name+"'] = "+property_name+"\n") f.write(
"\t\t\tfield_dict['" +
property_name +
"'] = " +
property_name +
"\n")
f.write("\n") f.write("\n")
f.write("\t\treturn field_dict\n") f.write("\t\treturn field_dict\n")
@ -161,7 +662,8 @@ def generateType(path: str, name: str, schema: dict):
# Now let's write the from_dict method. # Now let's write the from_dict method.
f.write("\n") f.write("\n")
f.write("\t@classmethod\n") f.write("\t@classmethod\n")
f.write("\tdef from_dict(cls: Type[T], src_dict: Dict[str, Any]) -> T:\n") f.write(
"\tdef from_dict(cls: Type[T], src_dict: Dict[str, Any]) -> T:\n")
f.write("\t\td = src_dict.copy()\n") f.write("\t\td = src_dict.copy()\n")
# Iternate over the properties. # Iternate over the properties.
@ -174,37 +676,78 @@ def generateType(path: str, name: str, schema: dict):
if property_type == 'string': if property_type == 'string':
if 'format' in property_schema: if 'format' in property_schema:
if property_schema['format'] == 'date-time': if property_schema['format'] == 'date-time':
f.write("\t\t_"+property_name+" = d.pop(\"" +property_name+"\", UNSET)\n") f.write(
f.write("\t\t"+property_name+": Union[Unset, datetime.datetime]\n") "\t\t_" +
f.write("\t\tif not isinstance(_"+property_name+", Unset):\n") property_name +
f.write("\t\t\t"+property_name+" = UNSET\n") " = d.pop(\"" +
property_name +
"\", UNSET)\n")
f.write(
"\t\t" +
property_name +
": Union[Unset, datetime.datetime]\n")
f.write(
"\t\tif not isinstance(_" + property_name + ", Unset):\n")
f.write("\t\t\t" + property_name + " = UNSET\n")
f.write("\t\telse:\n") f.write("\t\telse:\n")
f.write("\t\t\t"+property_name+" = isoparse(_"+property_name+")\n") f.write("\t\t\t" + property_name +
" = isoparse(_" + property_name + ")\n")
f.write("\n") f.write("\n")
continue continue
f.write("\t"+property_name+" = d.pop(\"" +property_name+"\", UNSET)\n") f.write(
"\t" +
property_name +
" = d.pop(\"" +
property_name +
"\", UNSET)\n")
f.write("\n") f.write("\n")
elif property_type == 'integer': elif property_type == 'integer':
f.write("\t"+property_name+" = d.pop(\"" +property_name+"\", UNSET)\n") f.write(
"\t" +
property_name +
" = d.pop(\"" +
property_name +
"\", UNSET)\n")
f.write("\n") f.write("\n")
elif property_type == 'number': elif property_type == 'number':
f.write("\t"+property_name+" = d.pop(\"" +property_name+"\", UNSET)\n") f.write(
"\t" +
property_name +
" = d.pop(\"" +
property_name +
"\", UNSET)\n")
f.write("\n") f.write("\n")
elif property_type == 'boolean': elif property_type == 'boolean':
f.write("\t"+property_name+" = d.pop(\"" +property_name+"\", UNSET)\n") f.write(
"\t" +
property_name +
" = d.pop(\"" +
property_name +
"\", UNSET)\n")
f.write("\n") f.write("\n")
else: else:
print(" unknown type: ", property_type) print(" unknown type: ", property_type)
raise raise
elif '$ref' in property_schema: elif '$ref' in property_schema:
ref = property_schema['$ref'].replace('#/components/schemas/', '') ref = property_schema['$ref'].replace(
f.write("\t\t_"+property_name+" = d.pop(\"" +property_name+"\", UNSET)\n") '#/components/schemas/', '')
f.write("\t\t"+property_name+": Union[Unset, "+ref+"]\n") f.write(
f.write("\t\tif not isinstance(_"+property_name+", Unset):\n") "\t\t_" +
f.write("\t\t\t"+property_name+" = UNSET\n") property_name +
" = d.pop(\"" +
property_name +
"\", UNSET)\n")
f.write("\t\t" + property_name +
": Union[Unset, " + ref + "]\n")
f.write(
"\t\tif not isinstance(_" +
property_name +
", Unset):\n")
f.write("\t\t\t" + property_name + " = UNSET\n")
f.write("\t\telse:\n") f.write("\t\telse:\n")
f.write("\t\t\t"+property_name+" = "+ref+"(_"+property_name+")\n") f.write("\t\t\t" + property_name + " = " +
ref + "(_" + property_name + ")\n")
f.write("\n") f.write("\n")
else: else:
print(" unknown schema: ", property_schema) print(" unknown schema: ", property_schema)
@ -212,17 +755,17 @@ def generateType(path: str, name: str, schema: dict):
# Finish writing the from_dict method. # Finish writing the from_dict method.
f.write("\n") f.write("\n")
f.write("\t\t"+camel_to_snake(name)+" = cls(\n") f.write("\t\t" + camel_to_snake(name) + " = cls(\n")
# Iternate over the properties. # Iternate over the properties.
for property_name in schema['properties']: for property_name in schema['properties']:
# Write the property. # Write the property.
f.write("\t\t\t"+property_name+"= "+property_name+",\n") f.write("\t\t\t" + property_name + "= " + property_name + ",\n")
# Close the class. # Close the class.
f.write("\t\t)\n") f.write("\t\t)\n")
f.write("\n") f.write("\n")
f.write("\t\t"+camel_to_snake(name)+".additional_properties = d\n") f.write("\t\t" + camel_to_snake(name) + ".additional_properties = d\n")
f.write("return "+camel_to_snake(name)+"\n") f.write("return " + camel_to_snake(name) + "\n")
# write the rest of the class. # write the rest of the class.
f.write("\n") f.write("\n")
@ -248,10 +791,15 @@ def generateType(path: str, name: str, schema: dict):
elif type_name == 'string' and 'enum' in schema: elif type_name == 'string' and 'enum' in schema:
f.write("from enum import Enum\n") f.write("from enum import Enum\n")
f.write("\n") f.write("\n")
f.write("class "+name+"(str, Enum):\n") f.write("class " + name + "(str, Enum):\n")
# Iterate over the properties. # Iterate over the properties.
for value in schema['enum']: for value in schema['enum']:
f.write("\t"+camel_to_screaming_snake(value)+" = '"+value+"'\n") f.write(
"\t" +
camel_to_screaming_snake(value) +
" = '" +
value +
"'\n")
# close the enum. # close the enum.
f.write("\n") f.write("\n")
@ -264,6 +812,7 @@ def generateType(path: str, name: str, schema: dict):
# Close the file. # Close the file.
f.close() f.close()
def hasDateTime(schema: dict) -> bool: def hasDateTime(schema: dict) -> bool:
# Generate the type. # Generate the type.
if 'type' in schema: if 'type' in schema:
@ -281,6 +830,7 @@ def hasDateTime(schema: dict) -> bool:
return False return False
def getRefs(schema: dict) -> [str]: def getRefs(schema: dict) -> [str]:
refs = [] refs = []
if '$ref' in schema: if '$ref' in schema:
@ -301,14 +851,66 @@ def getRefs(schema: dict) -> [str]:
return refs return refs
def getEndpointRefs(endpoint: dict) -> [str]:
refs = []
responses = endpoint['responses']
for response_code in responses:
response = responses[response_code]
if 'content' in response:
content = response['content']
for content_type in content:
if content_type == 'application/json':
json = content[content_type]['schema']
if '$ref' in json:
ref = json['$ref'].replace('#/components/schemas/', '')
refs.append(ref)
return refs
def getParameterRefs(endpoint: dict) -> [str]:
refs = []
if 'parameters' in endpoint:
parameters = endpoint['parameters']
for parameter in parameters:
parameter_name = parameter['name']
if '$ref' in parameter['schema']:
parameter_type = parameter['schema']['$ref'].replace(
'#/components/schemas/', '')
refs.append(parameter_type)
return refs
def getRequestBodyRefs(endpoint: dict) -> [str]:
refs = []
if 'requestBody' in endpoint:
requestBody = endpoint['requestBody']
if 'content' in requestBody:
content = requestBody['content']
for content_type in content:
if content_type == 'application/json':
json = content[content_type]['schema']
if '$ref' in json:
ref = json['$ref'].replace('#/components/schemas/', '')
refs.append(ref)
return refs
def camel_to_snake(name: str): def camel_to_snake(name: str):
name = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', name) name = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', name)
return re.sub('([a-z0-9])([A-Z])', r'\1_\2', name).lower() return re.sub('([a-z0-9])([A-Z])', r'\1_\2', name).lower()
def camel_to_screaming_snake(name: str): def camel_to_screaming_snake(name: str):
name = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', name) name = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', name)
return re.sub('([a-z0-9])([A-Z])', r'\1_\2', name).upper() return re.sub('([a-z0-9])([A-Z])', r'\1_\2', name).upper()
if (__name__ == '__main__'): if (__name__ == '__main__'):
exit_code = main() exit_code = main()
exit(exit_code) exit(exit_code)

View File

@ -0,0 +1 @@
""" Contains methods for accessing the beta API paths: Beta API endpoints. """

View File

@ -0,0 +1 @@
""" Contains methods for accessing the file API paths: CAD file operations. """

View File

@ -1,128 +0,0 @@
from typing import Any, Dict, Optional, Union
import httpx
from ...client import AuthenticatedClient
from ...models.file_conversion import FileConversion
from ...types import Response
def _get_kwargs(
id: str,
*,
client: AuthenticatedClient,
) -> Dict[str, Any]:
url = "{}/file/conversion/{id}".format(client.base_url, id=id)
headers: Dict[str, Any] = client.get_headers()
cookies: Dict[str, Any] = client.get_cookies()
return {
"url": url,
"headers": headers,
"cookies": cookies,
"timeout": client.get_timeout(),
}
def _parse_response(*, response: httpx.Response) -> Optional[Union[Any, FileConversion]]:
if response.status_code == 200:
response_200 = FileConversion.from_dict(response.json())
return response_200
if response.status_code == 400:
response_400 = None
return response_400
if response.status_code == 401:
response_401 = None
return response_401
if response.status_code == 403:
response_403 = None
return response_403
if response.status_code == 404:
response_404 = None
return response_404
if response.status_code == 406:
response_406 = None
return response_406
if response.status_code == 500:
response_500 = None
return response_500
return None
def _build_response(*, response: httpx.Response) -> Response[Union[Any, FileConversion]]:
return Response(
status_code=response.status_code,
content=response.content,
headers=response.headers,
parsed=_parse_response(response=response),
)
def sync_detailed(
id: str,
*,
client: AuthenticatedClient,
) -> Response[Union[Any, FileConversion]]:
kwargs = _get_kwargs(
id=id,
client=client,
)
response = httpx.get(
verify=client.verify_ssl,
**kwargs,
)
return _build_response(response=response)
def sync(
id: str,
*,
client: AuthenticatedClient,
) -> Optional[Union[Any, FileConversion]]:
"""Get the status of a file conversion."""
return sync_detailed(
id=id,
client=client,
).parsed
async def asyncio_detailed(
id: str,
*,
client: AuthenticatedClient,
) -> Response[Union[Any, FileConversion]]:
kwargs = _get_kwargs(
id=id,
client=client,
)
async with httpx.AsyncClient(verify=client.verify_ssl) as _client:
response = await _client.get(**kwargs)
return _build_response(response=response)
async def asyncio(
id: str,
*,
client: AuthenticatedClient,
) -> Optional[Union[Any, FileConversion]]:
"""Get the status of a file conversion."""
return (
await asyncio_detailed(
id=id,
client=client,
)
).parsed

View File

@ -0,0 +1,107 @@
from typing import Any, Dict, Optional, Union
import httpx
from ...client import Client
from ...models.file_conversion import FileConversion
from ...types import Response
def _get_kwargs(
id: str,
*, client: Client) -> Dict[str, Any]:
url = "{}/file/conversion/{id}".format(client.base_url,
id=id,
)
headers: Dict[str, Any] = client.get_headers()
cookies: Dict[str, Any] = client.get_cookies()
return {
"url": url,
"headers": headers,
"cookies": cookies,
"timeout": client.get_timeout(),
}
def _parse_response(*, response: httpx.Response) -> Optional[Union[Any, FileConversion]]:
if response.status_code == 200:
response_200 = FileConversion.from_dict(response.json())
return response_200
if response.status_code == 401:
response_401 = None
return response_401
if response.status_code == 403:
response_403 = None
return response_403
if response.status_code == 404:
response_404 = None
return response_404
if response.status_code == 406:
response_406 = None
return response_406
if response.status_code == 500:
response_500 = None
return response_500
return None
def _build_response(*, response: httpx.Response) -> Response[Union[Any, FileConversion]]:
return Response(
status_code=response.status_code,
content=response.content,
headers=response.headers,
parsed=_parse_response(response=response),
)
def sync_detailed(
id: str,
*, client: Client) -> Response[Union[Any, FileConversion]]:
kwargs = _get_kwargs(
id=id,
client=client,
)
response = httpx.get(
verify=client.verify_ssl,
**kwargs,
)
return _build_response(response=response)
def sync(
id: str,
*, client: Client) -> Optional[Union[Any, FileConversion]]:
""" Get the status and output of an async file conversion. """
return sync_detailed(
id=id,
client=client,
).parsed
async def asyncio_detailed(
id: str,
*, client: Client) -> Response[Union[Any, FileConversion]]:
kwargs = _get_kwargs(
id=id,
client=client,
)
async with httpx.AsyncClient(verify=client.verify_ssl) as _client:
response = await _client.get(**kwargs)
return _build_response(response=response)
async def asyncio(
id: str,
*, client: Client) -> Optional[Union[Any, FileConversion]]:
""" Get the status and output of an async file conversion. """
return (await asyncio_detailed(
id=id,
client=client,
)).parsed

View File

@ -1,150 +0,0 @@
from typing import Any, Dict, Optional, Union
import httpx
from ...client import AuthenticatedClient
from ...models.file_conversion import FileConversion
from ...models.valid_file_type import ValidFileType
from ...types import Response
def _get_kwargs(
source_format: ValidFileType,
output_format: ValidFileType,
content: bytes,
*,
client: AuthenticatedClient,
) -> Dict[str, Any]:
url = "{}/file/conversion/{sourceFormat}/{outputFormat}".format(
client.base_url, sourceFormat=source_format, outputFormat=output_format
)
headers: Dict[str, Any] = client.get_headers()
cookies: Dict[str, Any] = client.get_cookies()
return {
"url": url,
"headers": headers,
"cookies": cookies,
"timeout": client.get_timeout(),
"content": content,
}
def _parse_response(*, response: httpx.Response) -> Optional[Union[Any, FileConversion]]:
if response.status_code == 200:
response_200 = FileConversion.from_dict(response.json())
return response_200
if response.status_code == 202:
response_202 = FileConversion.from_dict(response.json())
return response_202
if response.status_code == 400:
response_400 = None
return response_400
if response.status_code == 401:
response_401 = None
return response_401
if response.status_code == 403:
response_403 = None
return response_403
if response.status_code == 406:
response_406 = None
return response_406
if response.status_code == 500:
response_500 = None
return response_500
return None
def _build_response(*, response: httpx.Response) -> Response[Union[Any, FileConversion]]:
return Response(
status_code=response.status_code,
content=response.content,
headers=response.headers,
parsed=_parse_response(response=response),
)
def sync_detailed(
source_format: ValidFileType,
output_format: ValidFileType,
content: bytes,
*,
client: AuthenticatedClient,
) -> Response[Union[Any, FileConversion]]:
kwargs = _get_kwargs(
source_format=source_format,
output_format=output_format,
content=content,
client=client,
)
response = httpx.post(
verify=client.verify_ssl,
**kwargs,
)
return _build_response(response=response)
def sync(
source_format: ValidFileType,
output_format: ValidFileType,
content: bytes,
*,
client: AuthenticatedClient,
) -> Optional[Union[Any, FileConversion]]:
"""Convert a CAD file from one format to another. If the file being converted is larger than a certain size it will be performed asynchronously."""
return sync_detailed(
source_format=source_format,
output_format=output_format,
content=content,
client=client,
).parsed
async def asyncio_detailed(
source_format: ValidFileType,
output_format: ValidFileType,
content: bytes,
*,
client: AuthenticatedClient,
) -> Response[Union[Any, FileConversion]]:
kwargs = _get_kwargs(
source_format=source_format,
output_format=output_format,
content=content,
client=client,
)
async with httpx.AsyncClient(verify=client.verify_ssl) as _client:
response = await _client.post(**kwargs)
return _build_response(response=response)
async def asyncio(
source_format: ValidFileType,
output_format: ValidFileType,
content: bytes,
*,
client: AuthenticatedClient,
) -> Optional[Union[Any, FileConversion]]:
"""Convert a CAD file from one format to another. If the file being converted is larger than a certain size it will be performed asynchronously."""
return (
await asyncio_detailed(
source_format=source_format,
output_format=output_format,
content=content,
client=client,
)
).parsed

View File

@ -0,0 +1,123 @@
from typing import Any, Dict, Optional, Union
import httpx
from ...client import Client
from ...models.file_conversion import FileConversion
from ...models.file_conversion import FileConversion
from ...models.valid_source_file_format import ValidSourceFileFormat
from ...models.valid_output_file_format import ValidOutputFileFormat
from ...types import Response
def _get_kwargs(
source_format: ValidSourceFileFormat,
output_format: ValidOutputFileFormat,
*, client: Client) -> Dict[str, Any]:
url = "{}/file/conversion/{sourceFormat}/{outputFormat}".format(client.base_url,
sourceFormat=source_format,
outputFormat=output_format,
)
headers: Dict[str, Any] = client.get_headers()
cookies: Dict[str, Any] = client.get_cookies()
return {
"url": url,
"headers": headers,
"cookies": cookies,
"timeout": client.get_timeout(),
}
def _parse_response(*, response: httpx.Response) -> Optional[Union[Any, FileConversion, FileConversion]]:
if response.status_code == 200:
response_200 = FileConversion.from_dict(response.json())
return response_200
if response.status_code == 202:
response_202 = FileConversion.from_dict(response.json())
return response_202
if response.status_code == 400:
response_400 = None
return response_400
if response.status_code == 401:
response_401 = None
return response_401
if response.status_code == 403:
response_403 = None
return response_403
if response.status_code == 406:
response_406 = None
return response_406
if response.status_code == 500:
response_500 = None
return response_500
return None
def _build_response(*, response: httpx.Response) -> Response[Union[Any, FileConversion, FileConversion]]:
return Response(
status_code=response.status_code,
content=response.content,
headers=response.headers,
parsed=_parse_response(response=response),
)
def sync_detailed(
source_format: ValidSourceFileFormat,
output_format: ValidOutputFileFormat,
*, client: Client) -> Response[Union[Any, FileConversion, FileConversion]]:
kwargs = _get_kwargs(
source_format=source_format,
output_format=output_format,
client=client,
)
response = httpx.post(
verify=client.verify_ssl,
**kwargs,
)
return _build_response(response=response)
def sync(
source_format: ValidSourceFileFormat,
output_format: ValidOutputFileFormat,
*, client: Client) -> Optional[Union[Any, FileConversion, FileConversion]]:
""" Convert a CAD file from one format to another. If the file being converted is larger than 30MB, it will be performed asynchronously. """
return sync_detailed(
source_format=source_format,
output_format=output_format,
client=client,
).parsed
async def asyncio_detailed(
source_format: ValidSourceFileFormat,
output_format: ValidOutputFileFormat,
*, client: Client) -> Response[Union[Any, FileConversion, FileConversion]]:
kwargs = _get_kwargs(
source_format=source_format,
output_format=output_format,
client=client,
)
async with httpx.AsyncClient(verify=client.verify_ssl) as _client:
response = await _client.post(**kwargs)
return _build_response(response=response)
async def asyncio(
source_format: ValidSourceFileFormat,
output_format: ValidOutputFileFormat,
*, client: Client) -> Optional[Union[Any, FileConversion, FileConversion]]:
""" Convert a CAD file from one format to another. If the file being converted is larger than 30MB, it will be performed asynchronously. """
return (await asyncio_detailed(
source_format=source_format,
output_format=output_format,
client=client,
)).parsed

View File

@ -0,0 +1 @@
""" Contains methods for accessing the internal API paths: Internal API endpoints. """

View File

@ -0,0 +1,86 @@
from typing import Any, Dict, Optional, Union
import httpx
from ...client import Client
from ...types import Response
def _get_kwargs(
*, client: Client) -> Dict[str, Any]:
url = "{}/_internal/gpu/devices".format(client.base_url,
)
headers: Dict[str, Any] = client.get_headers()
cookies: Dict[str, Any] = client.get_cookies()
return {
"url": url,
"headers": headers,
"cookies": cookies,
"timeout": client.get_timeout(),
}
def _parse_response(*, response: httpx.Response) -> Optional[Union[Any, ]]:
if response.status_code == 200:
return response_200
if response.status_code == 401:
response_401 = None
return response_401
if response.status_code == 403:
response_403 = None
return response_403
return None
def _build_response(*, response: httpx.Response) -> Response[Union[Any, ]]:
return Response(
status_code=response.status_code,
content=response.content,
headers=response.headers,
parsed=_parse_response(response=response),
)
def sync_detailed(
*, client: Client) -> Response[Union[Any, ]]:
kwargs = _get_kwargs(
client=client,
)
response = httpx.get(
verify=client.verify_ssl,
**kwargs,
)
return _build_response(response=response)
def sync(
*, client: Client) -> Optional[Union[Any, ]]:
""" Get information about GPU devices on this server. This is primarily used for debugging. This endpoint can only be used by specific KittyCAD employees. """
return sync_detailed(
client=client,
).parsed
async def asyncio_detailed(
*, client: Client) -> Response[Union[Any, ]]:
kwargs = _get_kwargs(
client=client,
)
async with httpx.AsyncClient(verify=client.verify_ssl) as _client:
response = await _client.get(**kwargs)
return _build_response(response=response)
async def asyncio(
*, client: Client) -> Optional[Union[Any, ]]:
""" Get information about GPU devices on this server. This is primarily used for debugging. This endpoint can only be used by specific KittyCAD employees. """
return (await asyncio_detailed(
client=client,
)).parsed

View File

@ -0,0 +1,91 @@
from typing import Any, Dict, Optional, Union
import httpx
from ...client import Client
from ...models.file_conversion import FileConversion
from ...types import Response
def _get_kwargs(
*, client: Client) -> Dict[str, Any]:
url = "{}/_internal/async/conversions/stop".format(client.base_url,
)
headers: Dict[str, Any] = client.get_headers()
cookies: Dict[str, Any] = client.get_cookies()
return {
"url": url,
"headers": headers,
"cookies": cookies,
"timeout": client.get_timeout(),
}
def _parse_response(*, response: httpx.Response) -> Optional[Union[Any, FileConversion]]:
if response.status_code == 200:
response_200 = FileConversion.from_dict(response.json())
return response_200
if response.status_code == 401:
response_401 = None
return response_401
if response.status_code == 403:
response_403 = None
return response_403
if response.status_code == 404:
response_404 = None
return response_404
return None
def _build_response(*, response: httpx.Response) -> Response[Union[Any, FileConversion]]:
return Response(
status_code=response.status_code,
content=response.content,
headers=response.headers,
parsed=_parse_response(response=response),
)
def sync_detailed(
*, client: Client) -> Response[Union[Any, FileConversion]]:
kwargs = _get_kwargs(
client=client,
)
response = httpx.post(
verify=client.verify_ssl,
**kwargs,
)
return _build_response(response=response)
def sync(
*, client: Client) -> Optional[Union[Any, FileConversion]]:
""" Stop all async conversions that are currently running. This endpoint can only be used by specific KittyCAD employees. """
return sync_detailed(
client=client,
).parsed
async def asyncio_detailed(
*, client: Client) -> Response[Union[Any, FileConversion]]:
kwargs = _get_kwargs(
client=client,
)
async with httpx.AsyncClient(verify=client.verify_ssl) as _client:
response = await _client.post(**kwargs)
return _build_response(response=response)
async def asyncio(
*, client: Client) -> Optional[Union[Any, FileConversion]]:
""" Stop all async conversions that are currently running. This endpoint can only be used by specific KittyCAD employees. """
return (await asyncio_detailed(
client=client,
)).parsed

View File

@ -0,0 +1 @@
""" Contains methods for accessing the meta API paths: Meta information about servers, instances, and sessions. """

View File

@ -0,0 +1,88 @@
from typing import Any, Dict, Optional, Union
import httpx
from ...client import Client
from ...models.auth_session import AuthSession
from ...types import Response
def _get_kwargs(
*, client: Client) -> Dict[str, Any]:
url = "{}/_meta/debug/session".format(client.base_url,
)
headers: Dict[str, Any] = client.get_headers()
cookies: Dict[str, Any] = client.get_cookies()
return {
"url": url,
"headers": headers,
"cookies": cookies,
"timeout": client.get_timeout(),
}
def _parse_response(*, response: httpx.Response) -> Optional[Union[Any, AuthSession]]:
if response.status_code == 200:
response_200 = AuthSession.from_dict(response.json())
return response_200
if response.status_code == 401:
response_401 = None
return response_401
if response.status_code == 403:
response_403 = None
return response_403
return None
def _build_response(*, response: httpx.Response) -> Response[Union[Any, AuthSession]]:
return Response(
status_code=response.status_code,
content=response.content,
headers=response.headers,
parsed=_parse_response(response=response),
)
def sync_detailed(
*, client: Client) -> Response[Union[Any, AuthSession]]:
kwargs = _get_kwargs(
client=client,
)
response = httpx.get(
verify=client.verify_ssl,
**kwargs,
)
return _build_response(response=response)
def sync(
*, client: Client) -> Optional[Union[Any, AuthSession]]:
""" Get information about your API request session. This is primarily used for debugging. """
return sync_detailed(
client=client,
).parsed
async def asyncio_detailed(
*, client: Client) -> Response[Union[Any, AuthSession]]:
kwargs = _get_kwargs(
client=client,
)
async with httpx.AsyncClient(verify=client.verify_ssl) as _client:
response = await _client.get(**kwargs)
return _build_response(response=response)
async def asyncio(
*, client: Client) -> Optional[Union[Any, AuthSession]]:
""" Get information about your API request session. This is primarily used for debugging. """
return (await asyncio_detailed(
client=client,
)).parsed

View File

@ -0,0 +1,88 @@
from typing import Any, Dict, Optional, Union
import httpx
from ...client import Client
from ...models.instance import Instance
from ...types import Response
def _get_kwargs(
*, client: Client) -> Dict[str, Any]:
url = "{}/_meta/debug/instance".format(client.base_url,
)
headers: Dict[str, Any] = client.get_headers()
cookies: Dict[str, Any] = client.get_cookies()
return {
"url": url,
"headers": headers,
"cookies": cookies,
"timeout": client.get_timeout(),
}
def _parse_response(*, response: httpx.Response) -> Optional[Union[Any, Instance]]:
if response.status_code == 200:
response_200 = Instance.from_dict(response.json())
return response_200
if response.status_code == 401:
response_401 = None
return response_401
if response.status_code == 403:
response_403 = None
return response_403
return None
def _build_response(*, response: httpx.Response) -> Response[Union[Any, Instance]]:
return Response(
status_code=response.status_code,
content=response.content,
headers=response.headers,
parsed=_parse_response(response=response),
)
def sync_detailed(
*, client: Client) -> Response[Union[Any, Instance]]:
kwargs = _get_kwargs(
client=client,
)
response = httpx.get(
verify=client.verify_ssl,
**kwargs,
)
return _build_response(response=response)
def sync(
*, client: Client) -> Optional[Union[Any, Instance]]:
""" Get information about this specific API server instance. This is primarily used for debugging. """
return sync_detailed(
client=client,
).parsed
async def asyncio_detailed(
*, client: Client) -> Response[Union[Any, Instance]]:
kwargs = _get_kwargs(
client=client,
)
async with httpx.AsyncClient(verify=client.verify_ssl) as _client:
response = await _client.get(**kwargs)
return _build_response(response=response)
async def asyncio(
*, client: Client) -> Optional[Union[Any, Instance]]:
""" Get information about this specific API server instance. This is primarily used for debugging. """
return (await asyncio_detailed(
client=client,
)).parsed

View File

@ -1,107 +0,0 @@
from typing import Any, Dict, Optional, Union
import httpx
from ...client import AuthenticatedClient
from ...models.instance_metadata import InstanceMetadata
from ...types import Response
def _get_kwargs(
*,
client: AuthenticatedClient,
) -> Dict[str, Any]:
url = "{}/_meta/debug/instance".format(client.base_url)
headers: Dict[str, Any] = client.get_headers()
cookies: Dict[str, Any] = client.get_cookies()
return {
"url": url,
"headers": headers,
"cookies": cookies,
"timeout": client.get_timeout(),
}
def _parse_response(*, response: httpx.Response) -> Optional[Union[Any, InstanceMetadata]]:
if response.status_code == 200:
response_200 = InstanceMetadata.from_dict(response.json())
return response_200
if response.status_code == 400:
response_400 = None
return response_400
if response.status_code == 401:
response_401 = None
return response_401
if response.status_code == 403:
response_403 = None
return response_403
return None
def _build_response(*, response: httpx.Response) -> Response[Union[Any, InstanceMetadata]]:
return Response(
status_code=response.status_code,
content=response.content,
headers=response.headers,
parsed=_parse_response(response=response),
)
def sync_detailed(
*,
client: AuthenticatedClient,
) -> Response[Union[Any, InstanceMetadata]]:
kwargs = _get_kwargs(
client=client,
)
response = httpx.get(
verify=client.verify_ssl,
**kwargs,
)
return _build_response(response=response)
def sync(
*,
client: AuthenticatedClient,
) -> Optional[Union[Any, InstanceMetadata]]:
"""Get information about this specific API server instance. This is primarily used for debugging."""
return sync_detailed(
client=client,
).parsed
async def asyncio_detailed(
*,
client: AuthenticatedClient,
) -> Response[Union[Any, InstanceMetadata]]:
kwargs = _get_kwargs(
client=client,
)
async with httpx.AsyncClient(verify=client.verify_ssl) as _client:
response = await _client.get(**kwargs)
return _build_response(response=response)
async def asyncio(
*,
client: AuthenticatedClient,
) -> Optional[Union[Any, InstanceMetadata]]:
"""Get information about this specific API server instance. This is primarily used for debugging."""
return (
await asyncio_detailed(
client=client,
)
).parsed

View File

@ -1,107 +0,0 @@
from typing import Any, Dict, Optional, Union
import httpx
from ...client import AuthenticatedClient
from ...models.auth_session import AuthSession
from ...types import Response
def _get_kwargs(
*,
client: AuthenticatedClient,
) -> Dict[str, Any]:
url = "{}/_meta/debug/session".format(client.base_url)
headers: Dict[str, Any] = client.get_headers()
cookies: Dict[str, Any] = client.get_cookies()
return {
"url": url,
"headers": headers,
"cookies": cookies,
"timeout": client.get_timeout(),
}
def _parse_response(*, response: httpx.Response) -> Optional[Union[Any, AuthSession]]:
if response.status_code == 200:
response_200 = AuthSession.from_dict(response.json())
return response_200
if response.status_code == 400:
response_400 = None
return response_400
if response.status_code == 401:
response_401 = None
return response_401
if response.status_code == 403:
response_403 = None
return response_403
return None
def _build_response(*, response: httpx.Response) -> Response[Union[Any, AuthSession]]:
return Response(
status_code=response.status_code,
content=response.content,
headers=response.headers,
parsed=_parse_response(response=response),
)
def sync_detailed(
*,
client: AuthenticatedClient,
) -> Response[Union[Any, AuthSession]]:
kwargs = _get_kwargs(
client=client,
)
response = httpx.get(
verify=client.verify_ssl,
**kwargs,
)
return _build_response(response=response)
def sync(
*,
client: AuthenticatedClient,
) -> Optional[Union[Any, AuthSession]]:
"""Get information about your API request session. This is primarily used for debugging."""
return sync_detailed(
client=client,
).parsed
async def asyncio_detailed(
*,
client: AuthenticatedClient,
) -> Response[Union[Any, AuthSession]]:
kwargs = _get_kwargs(
client=client,
)
async with httpx.AsyncClient(verify=client.verify_ssl) as _client:
response = await _client.get(**kwargs)
return _build_response(response=response)
async def asyncio(
*,
client: AuthenticatedClient,
) -> Optional[Union[Any, AuthSession]]:
"""Get information about your API request session. This is primarily used for debugging."""
return (
await asyncio_detailed(
client=client,
)
).parsed

View File

@ -1,17 +1,15 @@
from typing import Any, Dict, Optional from typing import Any, Dict, Optional, Union
import httpx import httpx
from ...client import Client from ...client import Client
from ...models.message import Message from ...models.pong_message import PongMessage
from ...types import Response from ...types import Response
def _get_kwargs( def _get_kwargs(
*, *, client: Client) -> Dict[str, Any]:
client: Client, url = "{}/ping".format(client.base_url,
) -> Dict[str, Any]: )
url = "{}/ping".format(client.base_url)
headers: Dict[str, Any] = client.get_headers() headers: Dict[str, Any] = client.get_headers()
cookies: Dict[str, Any] = client.get_cookies() cookies: Dict[str, Any] = client.get_cookies()
@ -24,15 +22,14 @@ def _get_kwargs(
} }
def _parse_response(*, response: httpx.Response) -> Optional[Message]: def _parse_response(*, response: httpx.Response) -> Optional[Union[Any, PongMessage]]:
if response.status_code == 200: if response.status_code == 200:
response_200 = Message.from_dict(response.json()) response_200 = PongMessage.from_dict(response.json())
return response_200 return response_200
return None return None
def _build_response(*, response: httpx.Response) -> Response[Message]: def _build_response(*, response: httpx.Response) -> Response[Union[Any, PongMessage]]:
return Response( return Response(
status_code=response.status_code, status_code=response.status_code,
content=response.content, content=response.content,
@ -42,9 +39,7 @@ def _build_response(*, response: httpx.Response) -> Response[Message]:
def sync_detailed( def sync_detailed(
*, *, client: Client) -> Response[Union[Any, PongMessage]]:
client: Client,
) -> Response[Message]:
kwargs = _get_kwargs( kwargs = _get_kwargs(
client=client, client=client,
) )
@ -58,10 +53,8 @@ def sync_detailed(
def sync( def sync(
*, *, client: Client) -> Optional[Union[Any, PongMessage]]:
client: Client, """ Simple ping to the server. """
) -> Optional[Message]:
"""Simple ping to the server."""
return sync_detailed( return sync_detailed(
client=client, client=client,
@ -69,9 +62,7 @@ def sync(
async def asyncio_detailed( async def asyncio_detailed(
*, *, client: Client) -> Response[Union[Any, PongMessage]]:
client: Client,
) -> Response[Message]:
kwargs = _get_kwargs( kwargs = _get_kwargs(
client=client, client=client,
) )
@ -83,13 +74,9 @@ async def asyncio_detailed(
async def asyncio( async def asyncio(
*, *, client: Client) -> Optional[Union[Any, PongMessage]]:
client: Client, """ Simple ping to the server. """
) -> Optional[Message]:
"""Simple ping to the server."""
return ( return (await asyncio_detailed(
await asyncio_detailed(
client=client, client=client,
) )).parsed
).parsed