better and clean;

Signed-off-by: Jess Frazelle <github@jessfraz.com>
This commit is contained in:
Jess Frazelle
2023-05-08 12:58:35 -07:00
parent c209414459
commit 8cf515b39f
92 changed files with 2619 additions and 1373 deletions

View File

@ -5,7 +5,7 @@ import logging
import os
import random
import re
from typing import List, Optional, Tuple
from typing import Any, Dict, List, Optional, Tuple
import black
import isort
@ -130,22 +130,35 @@ def generatePaths(cwd: str, parser: dict) -> dict:
def generateTypeAndExamplePython(
name: str, schema: dict, data: dict
name: str, schema: dict, data: dict, import_path: Optional[str]
) -> Tuple[str, str, str]:
parameter_type = ""
parameter_example = ""
example_imports = ""
ip: str = ""
if import_path is not None:
ip = import_path
if "type" in schema:
if "format" in schema and schema["format"] == "uuid":
if name != "":
parameter_type = name
example_imports = example_imports + (
"from kittycad.models."
+ camel_to_snake(parameter_type)
+ " import "
+ parameter_type
+ "\n"
)
if import_path is None:
example_imports = example_imports + (
"from kittycad.models."
+ camel_to_snake(parameter_type)
+ " import "
+ parameter_type
+ "\n"
)
else:
example_imports = example_imports + (
"from kittycad.models."
+ ip
+ " import "
+ parameter_type
+ "\n"
)
parameter_example = parameter_type + '("<uuid>")'
else:
parameter_type = "str"
@ -159,13 +172,18 @@ def generateTypeAndExamplePython(
parameter_type = name
example_imports = example_imports + (
"from kittycad.models."
+ camel_to_snake(parameter_type)
+ " import "
+ parameter_type
+ "\n"
)
if import_path is None:
example_imports = example_imports + (
"from kittycad.models."
+ camel_to_snake(parameter_type)
+ " import "
+ parameter_type
+ "\n"
)
else:
example_imports = example_imports + (
"from kittycad.models." + ip + " import " + parameter_type + "\n"
)
parameter_example = (
parameter_type + "." + camel_to_screaming_snake(schema["enum"][0])
@ -173,13 +191,22 @@ def generateTypeAndExamplePython(
elif schema["type"] == "string":
if name != "":
parameter_type = name
example_imports = example_imports + (
"from kittycad.models."
+ camel_to_snake(parameter_type)
+ " import "
+ parameter_type
+ "\n"
)
if import_path is None:
example_imports = example_imports + (
"from kittycad.models."
+ camel_to_snake(parameter_type)
+ " import "
+ parameter_type
+ "\n"
)
else:
example_imports = example_imports + (
"from kittycad.models."
+ ip
+ " import "
+ parameter_type
+ "\n"
)
parameter_example = parameter_type + '("<string>")'
else:
parameter_type = "str"
@ -196,7 +223,7 @@ def generateTypeAndExamplePython(
parameter_example = "3.14"
elif schema["type"] == "array" and "items" in schema:
items_type, items_example, items_imports = generateTypeAndExamplePython(
"", schema["items"], data
"", schema["items"], data, None
)
example_imports = example_imports + items_imports
parameter_type = "List[" + items_type + "]"
@ -207,6 +234,8 @@ def generateTypeAndExamplePython(
parameter_example = parameter_example + "]"
else:
parameter_example = "[" + items_example + "]"
example_imports = example_imports + ("from typing import List\n")
elif schema["type"] == "object" and "properties" in schema:
if name == "":
logging.error("schema: %s", json.dumps(schema, indent=4))
@ -214,13 +243,18 @@ def generateTypeAndExamplePython(
parameter_type = name
example_imports = example_imports + (
"from kittycad.models."
+ camel_to_snake(parameter_type)
+ " import "
+ parameter_type
+ "\n"
)
if import_path is None:
example_imports = example_imports + (
"from kittycad.models."
+ camel_to_snake(parameter_type)
+ " import "
+ parameter_type
+ "\n"
)
else:
example_imports = example_imports + (
"from kittycad.models." + ip + " import " + parameter_type + "\n"
)
parameter_example = name + "("
for property_name in schema["properties"]:
prop = schema["properties"][property_name]
@ -232,7 +266,7 @@ def generateTypeAndExamplePython(
prop_type,
prop_example,
prop_imports,
) = generateTypeAndExamplePython("", prop, data)
) = generateTypeAndExamplePython("", prop, data, None)
example_imports = example_imports + prop_imports
parameter_example = parameter_example + (
"\n" + property_name + "=" + prop_example + ",\n"
@ -245,7 +279,7 @@ def generateTypeAndExamplePython(
and schema["additionalProperties"] is not False
):
items_type, items_example, items_imports = generateTypeAndExamplePython(
"", schema["additionalProperties"], data
"", schema["additionalProperties"], data, None
)
example_imports = example_imports + items_imports
parameter_type = "Dict[str, " + items_type + "]"
@ -258,16 +292,18 @@ def generateTypeAndExamplePython(
if isNestedObjectOneOf(schema):
properties = schema["oneOf"][0]["properties"]
for prop in properties:
return generateTypeAndExamplePython(prop, properties[prop], data)
return generateTypeAndExamplePython(
prop, properties[prop], data, camel_to_snake(name)
)
break
return generateTypeAndExamplePython(name, schema["oneOf"][0], data)
return generateTypeAndExamplePython(name, schema["oneOf"][0], data, None)
elif "$ref" in schema:
parameter_type = schema["$ref"].replace("#/components/schemas/", "")
# Get the schema for the reference.
ref_schema = data["components"]["schemas"][parameter_type]
return generateTypeAndExamplePython(parameter_type, ref_schema, data)
return generateTypeAndExamplePython(parameter_type, ref_schema, data, None)
else:
logging.error("schema: %s", json.dumps(schema, indent=4))
raise Exception("Unknown parameter type")
@ -297,7 +333,12 @@ def generatePath(path: str, name: str, method: str, endpoint: dict, data: dict)
success_type = ""
if len(endpoint_refs) > 0:
success_type = endpoint_refs[0]
if len(endpoint_refs) > 2:
er = getEndpointRefs(endpoint, data)
er.remove("Error")
success_type = "Union[" + ", ".join(er) + "]"
else:
success_type = endpoint_refs[0]
if fn_name == "get_file_conversion" or fn_name == "create_file_conversion":
fn_name += "_with_base64_helper"
@ -314,6 +355,15 @@ from kittycad.types import Response
"""
)
if fn_name.endswith("_with_base64_helper"):
example_imports += (
"""from kittycad.api."""
+ tag_name
+ """ import """
+ fn_name.replace("_with_base64_helper", "")
+ "\n"
)
# Iterate over the parameters.
params_str = ""
if "parameters" in endpoint:
@ -325,7 +375,7 @@ from kittycad.types import Response
parameter_type,
parameter_example,
more_example_imports,
) = generateTypeAndExamplePython("", parameter["schema"], data)
) = generateTypeAndExamplePython("", parameter["schema"], data, None)
example_imports = example_imports + more_example_imports
if "nullable" in parameter["schema"] and parameter["schema"]["nullable"]:
@ -349,30 +399,42 @@ from kittycad.types import Response
params_str += "body='<string>',\n"
elif request_body_type == "bytes":
params_str += "body=bytes('some bytes', 'utf-8'),\n"
else:
elif request_body_schema:
# Generate an example for the schema.
rbs: dict = request_body_schema
rbs: Dict[Any, Any] = request_body_schema
(
body_type,
body_example,
more_example_imports,
) = generateTypeAndExamplePython(request_body_type, rbs, data)
) = generateTypeAndExamplePython(request_body_type, rbs, data, None)
params_str += "body=" + body_example + ",\n"
example_imports = example_imports + more_example_imports
example_variable = ""
example_variable_response = ""
response_type = getFunctionResultType(endpoint, endpoint_refs)
detailed_response_type = getDetailedFunctionResultType(endpoint, endpoint_refs)
if (
success_type != "str"
and success_type != "dict"
and success_type != "None"
and success_type != ""
):
example_imports = example_imports + (
"""from kittycad.models import """
+ success_type.replace("List[", "").replace("]", "")
)
example_variable = "fc: " + success_type + " = "
"response: Response[" + success_type + "] = "
for endpoint_ref in endpoint_refs:
if endpoint_ref == "Error":
continue
example_imports = example_imports + (
"""from kittycad.models import """
+ endpoint_ref.replace("List[", "").replace("]", "")
+ "\n"
)
example_imports = example_imports + "from typing import Union, Any, Optional\n"
example_variable = "result: " + response_type + " = "
example_imports = example_imports + "from kittycad.types import Response\n"
example_imports = example_imports + "from kittycad.models import Error\n"
example_variable_response = "response: " + detailed_response_type + " = "
# Add some new lines.
example_imports = example_imports + "\n\n"
@ -389,9 +451,30 @@ from kittycad.types import Response
+ fn_name
+ """.sync(client=client,\n"""
+ params_str
+ """)"""
+ """)
"""
)
if (
success_type != "str"
and success_type != "dict"
and success_type != "None"
and success_type != ""
):
short_sync_example = short_sync_example + (
"""
if isinstance(result, Error) or result == None:
print(result)
raise Exception("Error in response")
body: """
+ success_type
+ """ = result
print(body)
"""
)
# This longer example we use for generating tests.
# We only show the short example in the docs since it is much more intuitive to MEs
example = (
@ -405,8 +488,8 @@ from kittycad.types import Response
# OR if you need more info (e.g. status_code)
"""
+ example_variable
+ fn_name
+ example_variable_response
+ fn_name.replace("_with_base64_helper", "")
+ """.sync_detailed(client=client,\n"""
+ params_str
+ """)
@ -430,9 +513,9 @@ async def test_"""
# OR run async with more info
"""
+ example_variable
+ example_variable_response
+ "await "
+ fn_name
+ fn_name.replace("_with_base64_helper", "")
+ """.asyncio_detailed(client=client,\n"""
+ params_str
+ """)"""
@ -588,131 +671,157 @@ async def test_"""
f.write("\n")
f.write("\n")
f.write(
"def _parse_response(*, response: httpx.Response) -> Optional[Union[Any, "
+ ", ".join(endpoint_refs)
+ "]]:\n"
)
# Iterate over the responses.
responses = endpoint["responses"]
for response_code in responses:
response = responses[response_code]
if response_code == "default":
f.write("\treturn None\n")
else:
f.write(
"\tif response.status_code == "
+ response_code.replace("XX", "00")
+ ":\n"
)
is_one_of = False
if "content" in response:
content = response["content"]
for content_type in content:
if content_type == "application/json":
json = content[content_type]["schema"]
if "$ref" in json:
ref = json["$ref"].replace("#/components/schemas/", "")
schema = data["components"]["schemas"][ref]
# Let's check if it is a oneOf.
if "oneOf" in schema:
is_one_of = True
# We want to parse each of the possible types.
f.write("\t\tdata = response.json()\n")
for index, one_of in enumerate(schema["oneOf"]):
ref = getOneOfRefType(one_of)
f.write("\t\ttry:\n")
f.write("\t\t\tif not isinstance(data, dict):\n")
f.write("\t\t\t\traise TypeError()\n")
option_name = "option_" + camel_to_snake(ref)
f.write(
"\t\t\t"
+ option_name
+ " = "
+ ref
+ ".from_dict(data)\n"
)
f.write("\t\t\treturn " + option_name + "\n")
f.write("\t\texcept ValueError:\n")
if index == len(schema["oneOf"]) - 1:
# On the last one raise the error.
f.write("\t\t\traise\n")
else:
f.write("\t\t\tpass\n")
f.write("\t\texcept TypeError:\n")
if index == len(schema["oneOf"]) - 1:
# On the last one raise the error.
f.write("\t\t\traise\n")
else:
f.write("\t\t\tpass\n")
else:
f.write(
"\t\tresponse_"
+ response_code
+ " = "
+ ref
+ ".from_dict(response.json())\n"
)
elif "type" in json:
if json["type"] == "array":
items = json["items"]
if "$ref" in items:
ref = items["$ref"].replace(
"#/components/schemas/", ""
)
f.write("\t\tresponse_" + response_code + " = [\n")
f.write("\t\t\t" + ref + ".from_dict(item)\n")
f.write("\t\t\tfor item in response.json()\n")
f.write("\t\t]\n")
else:
raise Exception("Unknown array type")
elif json["type"] == "string":
f.write(
"\t\tresponse_"
+ response_code
+ " = response.text\n"
)
else:
raise Exception("Unknown type", json["type"])
else:
f.write(
"\t\tresponse_" + response_code + " = response.json()\n"
)
if len(endpoint_refs) > 0:
f.write(
"def _parse_response(*, response: httpx.Response) -> "
+ response_type
+ ":\n"
)
else:
f.write("def _parse_response(*, response: httpx.Response):\n")
elif "$ref" in response:
schema_name = response["$ref"].replace("#/components/responses/", "")
schema = data["components"]["responses"][schema_name]
if "content" in schema:
content = schema["content"]
# Iterate over the responses.
if len(endpoint_refs) > 0:
responses = endpoint["responses"]
for response_code in responses:
response = responses[response_code]
if response_code == "default":
# This is no content.
f.write("\treturn None\n")
elif response_code == "204" or response_code == "302":
# This is no content.
f.write("\treturn None\n")
else:
f.write(
"\tif response.status_code == "
+ response_code.replace("XX", "00")
+ ":\n"
)
is_one_of = False
if "content" in response:
content = response["content"]
for content_type in content:
if content_type == "application/json":
json = content[content_type]["schema"]
if "$ref" in json:
ref = json["$ref"].replace("#/components/schemas/", "")
schema = data["components"]["schemas"][ref]
# Let's check if it is a oneOf.
if "oneOf" in schema:
is_one_of = True
# We want to parse each of the possible types.
f.write("\t\tdata = response.json()\n")
for index, one_of in enumerate(schema["oneOf"]):
ref = getOneOfRefType(one_of)
f.write("\t\ttry:\n")
f.write(
"\t\t\tif not isinstance(data, dict):\n"
)
f.write("\t\t\t\traise TypeError()\n")
option_name = "option_" + camel_to_snake(ref)
f.write(
"\t\t\t"
+ option_name
+ " = "
+ ref
+ ".from_dict(data)\n"
)
f.write("\t\t\treturn " + option_name + "\n")
f.write("\t\texcept ValueError:\n")
if index == len(schema["oneOf"]) - 1:
# On the last one raise the error.
f.write("\t\t\traise\n")
else:
f.write("\t\t\tpass\n")
f.write("\t\texcept TypeError:\n")
if index == len(schema["oneOf"]) - 1:
# On the last one raise the error.
f.write("\t\t\traise\n")
else:
f.write("\t\t\tpass\n")
else:
f.write(
"\t\tresponse_"
+ response_code
+ " = "
+ ref
+ ".from_dict(response.json())\n"
)
elif "type" in json:
if json["type"] == "array":
items = json["items"]
if "$ref" in items:
ref = items["$ref"].replace(
"#/components/schemas/", ""
)
f.write(
"\t\tresponse_" + response_code + " = [\n"
)
f.write("\t\t\t" + ref + ".from_dict(item)\n")
f.write("\t\t\tfor item in response.json()\n")
f.write("\t\t]\n")
else:
raise Exception("Unknown array type")
elif json["type"] == "string":
f.write(
"\t\tresponse_"
+ response_code
+ " = response.text\n"
)
else:
raise Exception("Unknown type", json["type"])
else:
f.write(
"\t\tresponse_"
+ response_code
+ " = "
+ ref
+ ".from_dict(response.json())\n"
+ " = response.json()\n"
)
else:
f.write("\t\tresponse_" + response_code + " = None\n")
if not is_one_of:
f.write("\t\treturn response_" + response_code + "\n")
elif "$ref" in response:
schema_name = response["$ref"].replace(
"#/components/responses/", ""
)
schema = data["components"]["responses"][schema_name]
if "content" in schema:
content = schema["content"]
for content_type in content:
if content_type == "application/json":
json = content[content_type]["schema"]
if "$ref" in json:
ref = json["$ref"].replace(
"#/components/schemas/", ""
)
f.write(
"\t\tresponse_"
+ response_code
+ " = "
+ ref
+ ".from_dict(response.json())\n"
)
else:
print(endpoint)
raise Exception("response not supported")
# End the method.
f.write("\treturn None\n")
if not is_one_of:
f.write("\t\treturn response_" + response_code + "\n")
# End the method.
f.write("\treturn Error.from_dict(response.json())\n")
else:
f.write("\treturn\n")
# Define the build response method.
f.write("\n")
f.write("\n")
f.write(
"def _build_response(*, response: httpx.Response) -> Response[Union[Any, "
+ ", ".join(endpoint_refs)
+ "]]:\n"
)
if len(endpoint_refs) > 0:
f.write(
"def _build_response(*, response: httpx.Response) -> "
+ detailed_response_type
+ ":\n"
)
else:
f.write("def _build_response(*, response: httpx.Response) -> Response[Any]:\n")
f.write("\treturn Response(\n")
f.write("\t\tstatus_code=response.status_code,\n")
f.write("\t\tcontent=response.content,\n")
@ -776,7 +885,12 @@ async def test_"""
f.write("\tclient: Client,\n")
for optional_arg in optional_args:
f.write(optional_arg)
f.write(") -> Response[Union[Any, " + ", ".join(endpoint_refs) + "]]:\n")
if len(endpoint_refs) > 0:
f.write(") -> " + detailed_response_type + ":\n")
else:
f.write(") -> Response[Any]:\n")
f.write("\tkwargs = _get_kwargs(\n")
params = get_function_parameters(endpoint, request_body_type)
for param in params:
@ -847,7 +961,12 @@ async def test_"""
f.write("\tclient: Client,\n")
for optional_arg in optional_args:
f.write(optional_arg)
f.write(") -> Optional[Union[Any, " + ", ".join(endpoint_refs) + "]]:\n")
if len(endpoint_refs) > 0:
f.write(") -> " + response_type + ":\n")
else:
f.write("):\n")
if "description" in endpoint:
f.write('\t""" ' + endpoint["description"] + ' """ # noqa: E501\n')
f.write("\n")
@ -914,7 +1033,12 @@ async def test_"""
f.write("\tclient: Client,\n")
for optional_arg in optional_args:
f.write(optional_arg)
f.write(") -> Response[Union[Any, " + ", ".join(endpoint_refs) + "]]:\n")
if len(endpoint_refs) > 0:
f.write(") -> " + detailed_response_type + ":\n")
else:
f.write(") -> Response[Any]:\n")
f.write("\tkwargs = _get_kwargs(\n")
params = get_function_parameters(endpoint, request_body_type)
for param in params:
@ -983,7 +1107,12 @@ async def test_"""
f.write("\tclient: Client,\n")
for optional_arg in optional_args:
f.write(optional_arg)
f.write(") -> Optional[Union[Any, " + ", ".join(endpoint_refs) + "]]:\n")
if len(endpoint_refs) > 0:
f.write(") -> " + response_type + ":\n")
else:
f.write("):\n")
if "description" in endpoint:
f.write('\t""" ' + endpoint["description"] + ' """ # noqa: E501\n')
f.write("\n")
@ -1819,7 +1948,10 @@ def getEndpointRefs(endpoint: dict, data: dict) -> List[str]:
# all the possible outcomes.
ref = json["$ref"].replace("#/components/schemas/", "")
schema = data["components"]["schemas"][ref]
if "oneOf" in schema:
if isNestedObjectOneOf(schema) or isEnumWithDocsOneOf(schema):
if ref not in refs:
refs.append(ref)
elif isTypedObjectOneOf(schema):
for t in schema["oneOf"]:
ref = getOneOfRefType(t)
if ref not in refs:
@ -2000,7 +2132,11 @@ def get_function_parameters(
def getOneOfRefType(schema: dict) -> str:
if "type" in schema["properties"]:
if (
"type" in schema["properties"]
and "enum" in schema["properties"]["type"]
and len(schema["properties"]["type"]["enum"]) == 1
):
t = schema["properties"]["type"]["enum"][0]
return t
@ -2008,6 +2144,9 @@ def getOneOfRefType(schema: dict) -> str:
def isNestedObjectOneOf(schema: dict) -> bool:
if "oneOf" not in schema:
return False
is_nested_object = False
for one_of in schema["oneOf"]:
# Check if each are an object w 1 property in it.
@ -2031,6 +2170,9 @@ def isNestedObjectOneOf(schema: dict) -> bool:
def isEnumWithDocsOneOf(schema: dict) -> bool:
if "oneOf" not in schema:
return False
is_enum_with_docs = False
for one_of in schema["oneOf"]:
if one_of["type"] == "string" and "enum" in one_of and len(one_of["enum"]) == 1:
@ -2042,6 +2184,53 @@ def isEnumWithDocsOneOf(schema: dict) -> bool:
return is_enum_with_docs
def isTypedObjectOneOf(schema: dict) -> bool:
if "oneOf" not in schema:
return False
is_typed_object = False
for one_of in schema["oneOf"]:
if (
"type" in one_of["properties"]
and "enum" in one_of["properties"]["type"]
and len(one_of["properties"]["type"]["enum"]) == 1
):
is_typed_object = True
else:
is_typed_object = False
break
return is_typed_object
def hasNoContentResponse(endpoint: dict) -> bool:
responses = endpoint["responses"]
for response_code in responses:
if (
response_code == "default"
or response_code == "204"
or response_code == "302"
):
return True
return False
def getFunctionResultType(endpoint: dict, endpoint_refs: List[str]) -> str:
result = ", ".join(endpoint_refs)
if len(endpoint_refs) > 1:
result = "Optional[Union[" + result + "]]"
if hasNoContentResponse(endpoint):
result = "Optional[" + result + "]"
return result
def getDetailedFunctionResultType(endpoint: dict, endpoint_refs: List[str]) -> str:
return "Response[" + getFunctionResultType(endpoint, endpoint_refs) + "]"
# generate a random letter in the range A - Z
# do not use O or I.
def randletter():