* YOYO NEW API SPEC! * updates Signed-off-by: Jessie Frazelle <github@jessfraz.com> * I have generated the latest API! --------- Signed-off-by: Jessie Frazelle <github@jessfraz.com> Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com> Co-authored-by: Jessie Frazelle <github@jessfraz.com>
2391 lines
83 KiB
Python
Executable File
2391 lines
83 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
import io
|
|
import json
|
|
import logging
|
|
import os
|
|
import random
|
|
import re
|
|
from typing import Any, Dict, List, Optional, Tuple, TypedDict
|
|
|
|
import black
|
|
import isort
|
|
import jinja2
|
|
import jsonpatch
|
|
from prance import BaseParser
|
|
|
|
package_name = "kittycad"
|
|
|
|
random.seed(10)
|
|
|
|
examples: List[str] = []
|
|
|
|
|
|
def main():
|
|
cwd = os.getcwd()
|
|
spec_path = os.path.join(cwd, "spec.json")
|
|
logging.info("opening spec file: ", spec_path)
|
|
parser = BaseParser(spec_path)
|
|
|
|
# Generate the types.
|
|
generateTypes(cwd, parser.specification)
|
|
|
|
# Generate the paths.
|
|
data = generatePaths(cwd, parser.specification)
|
|
|
|
# Add the client information to the generation.
|
|
data["info"]["x-python"] = {
|
|
"client": """# Create a client with your token.
|
|
from kittycad.client import Client
|
|
|
|
client = Client(token="$TOKEN")
|
|
|
|
# - OR -
|
|
|
|
# Create a new client with your token parsed from the environment variable:
|
|
# `KITTYCAD_API_TOKEN` or `ZOO_API_TOKEN`.
|
|
# Optionally, you can pass in `ZOO_HOST` to specify the host. But this only
|
|
# needs to be done if you are using a different host than the default,
|
|
# which implies you are running your own instance of the API.
|
|
from kittycad.client import ClientFromEnv
|
|
|
|
client = ClientFromEnv()
|
|
|
|
# NOTE: The python library additionally implements asyncio, however all the code samples we
|
|
# show below use the sync functions for ease of use and understanding.
|
|
# Check out the library docs at:
|
|
# https://python.api.docs.zoo.dev/_autosummary/kittycad.api.html#module-kittycad.api
|
|
# for more details.""",
|
|
"install": "pip install kittycad",
|
|
}
|
|
|
|
# Read the original spec file as a dict.
|
|
spec = open(spec_path, "r")
|
|
original = json.load(spec)
|
|
# Create the json patch document.
|
|
patch = jsonpatch.make_patch(original, data)
|
|
|
|
# Convert this to a dict.
|
|
patch = json.loads(patch.to_string())
|
|
|
|
new_patch = []
|
|
# Make sure we aren't changing any components/schemas.
|
|
for index, p in enumerate(patch):
|
|
if not p["path"].startswith("/components"):
|
|
new_patch.append(p)
|
|
|
|
# Rewrite the spec back out.
|
|
patch_file = os.path.join(cwd, "kittycad.py.patch.json")
|
|
f = open(patch_file, "w")
|
|
f.write(json.dumps(new_patch, indent=2))
|
|
f.close()
|
|
|
|
# Write all the examples to a file.
|
|
examples_test_path = os.path.join(cwd, "kittycad", "examples_test.py")
|
|
logging.info("opening examples test file: ", spec_path)
|
|
|
|
f = open(examples_test_path, "w")
|
|
f.write("import pytest\n\n")
|
|
f.write("import datetime\n\n")
|
|
f.write("\n\n".join(examples))
|
|
f.close()
|
|
|
|
|
|
def generatePaths(cwd: str, parser: dict) -> dict:
|
|
# Make sure we have the directory.
|
|
path = os.path.join(cwd, "kittycad", "api")
|
|
os.makedirs(path, exist_ok=True)
|
|
|
|
# Open the __init__.py file.
|
|
file_name = "__init__.py"
|
|
file_path = os.path.join(path, file_name)
|
|
f = open(file_path, "w")
|
|
f.write('""" Contains methods for accessing the API """\n')
|
|
# Close the file.
|
|
f.close()
|
|
|
|
# Generate the directory/__init__.py for each of the tags.
|
|
tags = parser["tags"]
|
|
for tag in tags:
|
|
tag_name = tag["name"].replace("-", "_")
|
|
tag_description = tag["description"]
|
|
tag_path = os.path.join(path, tag_name)
|
|
# Esnure the directory exists.
|
|
os.makedirs(tag_path, exist_ok=True)
|
|
# Open the __init__.py file.
|
|
file_name = "__init__.py"
|
|
file_path = os.path.join(tag_path, file_name)
|
|
f = open(file_path, "w")
|
|
f.write(
|
|
'""" Contains methods for accessing the '
|
|
+ tag_name
|
|
+ " API paths: "
|
|
+ tag_description
|
|
+ ' """ # noqa: E501\n'
|
|
)
|
|
# Close the file.
|
|
f.close()
|
|
|
|
# Generate the paths.
|
|
data = parser
|
|
paths = data["paths"]
|
|
for p in paths:
|
|
# If p starts with /oauth2 we can skip it.
|
|
# We don't care about generating methods for those.
|
|
if p.startswith("/oauth2"):
|
|
continue
|
|
else:
|
|
for method in paths[p]:
|
|
# Skip OPTIONS.
|
|
if method.upper() != "OPTIONS":
|
|
endpoint = paths[p][method]
|
|
data = generatePath(path, p, method, endpoint, data)
|
|
|
|
return data
|
|
|
|
|
|
def generateTypeAndExamplePython(
|
|
name: str,
|
|
schema: dict,
|
|
data: dict,
|
|
import_path: Optional[str],
|
|
tag: Optional[str],
|
|
wrapper: Optional[str] = None,
|
|
) -> Tuple[str, str, str]:
|
|
parameter_type = ""
|
|
parameter_example = ""
|
|
example_imports = ""
|
|
ip: str = ""
|
|
if import_path is not None:
|
|
ip = import_path
|
|
if "type" in schema:
|
|
if "format" in schema and schema["format"] == "uuid":
|
|
if name != "":
|
|
parameter_type = name
|
|
if import_path is None:
|
|
example_imports = example_imports + (
|
|
"from kittycad.models."
|
|
+ camel_to_snake(parameter_type)
|
|
+ " import "
|
|
+ parameter_type
|
|
+ "\n"
|
|
)
|
|
else:
|
|
example_imports = example_imports + (
|
|
"from kittycad.models."
|
|
+ ip
|
|
+ " import "
|
|
+ parameter_type
|
|
+ "\n"
|
|
)
|
|
|
|
parameter_example = parameter_type + '("<uuid>")'
|
|
else:
|
|
parameter_type = "str"
|
|
parameter_example = '"<uuid>"'
|
|
if "format" in schema and schema["format"] == "date-time":
|
|
if name != "":
|
|
parameter_type = name
|
|
if import_path is None:
|
|
example_imports = example_imports + (
|
|
"from kittycad.models."
|
|
+ camel_to_snake(parameter_type)
|
|
+ " import "
|
|
+ parameter_type
|
|
+ "\n"
|
|
)
|
|
else:
|
|
example_imports = example_imports + (
|
|
"from kittycad.models."
|
|
+ ip
|
|
+ " import "
|
|
+ parameter_type
|
|
+ "\n"
|
|
)
|
|
|
|
parameter_example = parameter_type + "(datetime.datetime.now())"
|
|
else:
|
|
parameter_type = "datetime"
|
|
parameter_example = "datetime.datetime.now()"
|
|
elif "format" in schema and schema["format"] == "byte":
|
|
if name != "":
|
|
parameter_type = name
|
|
if import_path is None:
|
|
example_imports = example_imports + (
|
|
"from kittycad.models."
|
|
+ camel_to_snake(parameter_type)
|
|
+ " import "
|
|
+ parameter_type
|
|
+ "\n"
|
|
)
|
|
else:
|
|
example_imports = example_imports + (
|
|
"from kittycad.models."
|
|
+ ip
|
|
+ " import "
|
|
+ parameter_type
|
|
+ "\n"
|
|
)
|
|
|
|
example_imports = (
|
|
example_imports
|
|
+ "from kittycad.models.base64data import Base64Data\n"
|
|
)
|
|
|
|
parameter_example = parameter_type + 'Base64Data(b"<bytes>")'
|
|
else:
|
|
example_imports = (
|
|
example_imports
|
|
+ "from kittycad.models.base64data import Base64Data\n"
|
|
)
|
|
parameter_type = "Base64Data"
|
|
parameter_example = 'Base64Data(b"<bytes>")'
|
|
|
|
elif (
|
|
schema["type"] == "string" and "enum" in schema and len(schema["enum"]) > 0
|
|
):
|
|
if name == "":
|
|
if len(schema["enum"]) == 1:
|
|
name = schema["enum"][0]
|
|
else:
|
|
logging.error("schema: %s", json.dumps(schema, indent=4))
|
|
raise Exception("Unknown type name for enum")
|
|
|
|
parameter_type = name
|
|
|
|
if import_path is None:
|
|
example_imports = example_imports + (
|
|
"from kittycad.models."
|
|
+ camel_to_snake(parameter_type)
|
|
+ " import "
|
|
+ parameter_type
|
|
+ "\n"
|
|
)
|
|
else:
|
|
example_imports = example_imports + (
|
|
"from kittycad.models." + ip + " import " + parameter_type + "\n"
|
|
)
|
|
|
|
parameter_example = (
|
|
parameter_type + "." + camel_to_screaming_snake(schema["enum"][0])
|
|
)
|
|
elif schema["type"] == "string":
|
|
if name != "":
|
|
parameter_type = name
|
|
if import_path is None:
|
|
example_imports = example_imports + (
|
|
"from kittycad.models."
|
|
+ camel_to_snake(parameter_type)
|
|
+ " import "
|
|
+ parameter_type
|
|
+ "\n"
|
|
)
|
|
else:
|
|
example_imports = example_imports + (
|
|
"from kittycad.models."
|
|
+ ip
|
|
+ " import "
|
|
+ parameter_type
|
|
+ "\n"
|
|
)
|
|
parameter_example = parameter_type + '("<string>")'
|
|
else:
|
|
parameter_type = "str"
|
|
parameter_example = '"<string>"'
|
|
elif schema["type"] == "integer":
|
|
parameter_type = "int"
|
|
parameter_example = "10"
|
|
elif schema["type"] == "boolean":
|
|
parameter_type = "bool"
|
|
parameter_example = "False"
|
|
elif (
|
|
schema["type"] == "number"
|
|
and "format" in schema
|
|
and (
|
|
schema["format"] == "float"
|
|
or schema["format"] == "double"
|
|
or schema["format"] == "money-usd"
|
|
)
|
|
):
|
|
parameter_type = "float"
|
|
parameter_example = "3.14"
|
|
elif schema["type"] == "array" and "items" in schema:
|
|
items_type, items_example, items_imports = generateTypeAndExamplePython(
|
|
"", schema["items"], data, None, None
|
|
)
|
|
example_imports = example_imports + items_imports
|
|
parameter_type = "List[" + items_type + "]"
|
|
if "minItems" in schema and schema["minItems"] > 1:
|
|
parameter_example = "["
|
|
for i in range(schema["minItems"] - 1):
|
|
parameter_example = parameter_example + items_example + ", "
|
|
parameter_example = parameter_example + "]"
|
|
else:
|
|
parameter_example = "[" + items_example + "]"
|
|
|
|
example_imports = example_imports + ("from typing import List\n")
|
|
elif schema["type"] == "object" and "properties" in schema:
|
|
if name == "":
|
|
logging.error("schema: %s", json.dumps(schema, indent=4))
|
|
raise Exception("Unknown type name for object")
|
|
|
|
parameter_type = name
|
|
|
|
if import_path is None:
|
|
example_imports = example_imports + (
|
|
"from kittycad.models."
|
|
+ camel_to_snake(parameter_type)
|
|
+ " import "
|
|
+ parameter_type
|
|
+ "\n"
|
|
)
|
|
else:
|
|
example_imports = example_imports + (
|
|
"from kittycad.models." + ip + " import " + parameter_type + "\n"
|
|
)
|
|
parameter_example = name + "("
|
|
for property_name in schema["properties"]:
|
|
prop = schema["properties"][property_name]
|
|
if "nullable" in prop:
|
|
# We don't care if it's nullable
|
|
continue
|
|
elif property_name == tag:
|
|
# We don't care if it's the tag, since we already have it.
|
|
continue
|
|
else:
|
|
(
|
|
prop_type,
|
|
prop_example,
|
|
prop_imports,
|
|
) = generateTypeAndExamplePython("", prop, data, import_path, tag)
|
|
example_imports = example_imports + prop_imports
|
|
parameter_example = parameter_example + (
|
|
"\n"
|
|
+ clean_parameter_name(property_name)
|
|
+ "="
|
|
+ prop_example
|
|
+ ",\n"
|
|
)
|
|
|
|
parameter_example = parameter_example + ")"
|
|
|
|
if wrapper is not None:
|
|
if wrapper != "WebSocketRequest":
|
|
example_imports = example_imports + (
|
|
"from kittycad.models." + ip + " import " + wrapper + "\n"
|
|
)
|
|
parameter_example = wrapper + "(" + parameter_example + ")"
|
|
elif (
|
|
schema["type"] == "object"
|
|
and "additionalProperties" in schema
|
|
and schema["additionalProperties"] is not False
|
|
):
|
|
items_type, items_example, items_imports = generateTypeAndExamplePython(
|
|
"", schema["additionalProperties"], data, None, None
|
|
)
|
|
example_imports = example_imports + items_imports
|
|
parameter_type = "Dict[str, " + items_type + "]"
|
|
parameter_example = '{"<string>": ' + items_example + "}"
|
|
else:
|
|
logging.error("schema: %s", json.dumps(schema, indent=4))
|
|
raise Exception("Unknown parameter type")
|
|
elif "oneOf" in schema and len(schema["oneOf"]) > 0:
|
|
# Choose a random one.
|
|
index = random.randint(0, len(schema["oneOf"]) - 1)
|
|
one_of = schema["oneOf"][index]
|
|
|
|
# Check if this is a nested object.
|
|
if isNestedObjectOneOf(schema):
|
|
if "properties" in one_of:
|
|
properties = one_of["properties"]
|
|
for prop in properties:
|
|
return generateTypeAndExamplePython(
|
|
prop, properties[prop], data, camel_to_snake(name), None
|
|
)
|
|
break
|
|
elif "type" in one_of and one_of["type"] == "string":
|
|
return generateTypeAndExamplePython(
|
|
name, one_of, data, camel_to_snake(name), None
|
|
)
|
|
|
|
tag = getTagOneOf(schema)
|
|
|
|
if (
|
|
"properties" in one_of
|
|
and "type" in one_of["properties"]
|
|
and "enum" in one_of["properties"]["type"]
|
|
):
|
|
return generateTypeAndExamplePython(
|
|
snake_to_title("option_" + one_of["properties"]["type"]["enum"][0]),
|
|
one_of,
|
|
data,
|
|
camel_to_snake(name),
|
|
tag,
|
|
name,
|
|
)
|
|
else:
|
|
return generateTypeAndExamplePython(name, one_of, data, None, None)
|
|
elif "allOf" in schema and len(schema["allOf"]) == 1:
|
|
return generateTypeAndExamplePython(name, schema["allOf"][0], data, None, None)
|
|
elif "$ref" in schema:
|
|
parameter_type = schema["$ref"].replace("#/components/schemas/", "")
|
|
# Get the schema for the reference.
|
|
ref_schema = data["components"]["schemas"][parameter_type]
|
|
|
|
return generateTypeAndExamplePython(
|
|
parameter_type, ref_schema, data, None, None
|
|
)
|
|
else:
|
|
logging.error("schema: %s", json.dumps(schema, indent=4))
|
|
raise Exception("Unknown parameter type")
|
|
|
|
return parameter_type, parameter_example, example_imports
|
|
|
|
|
|
def generatePath(path: str, name: str, method: str, endpoint: dict, data: dict) -> dict:
|
|
# Generate the path.
|
|
fn_name = camel_to_snake(endpoint["operationId"])
|
|
file_name = fn_name + ".py"
|
|
tag_name = ""
|
|
# Add the tag to the path if it exists.
|
|
if "tags" in endpoint:
|
|
tag_name = endpoint["tags"][0].replace("-", "_")
|
|
path = os.path.join(path, tag_name)
|
|
file_path = os.path.join(path, file_name)
|
|
logging.info("generating path functions: ", name, " at: ", file_path)
|
|
|
|
endpoint_refs = getEndpointRefs(endpoint, data)
|
|
parameter_refs = getParameterRefs(endpoint)
|
|
request_body_refs = getRequestBodyRefs(endpoint)
|
|
(request_body_type, request_body_schema) = getRequestBodyTypeSchema(endpoint, data)
|
|
|
|
success_type = ""
|
|
if len(endpoint_refs) > 0:
|
|
if len(endpoint_refs) > 2:
|
|
er = getEndpointRefs(endpoint, data)
|
|
er.remove("Error")
|
|
success_type = "Union[" + ", ".join(er) + "]"
|
|
else:
|
|
success_type = endpoint_refs[0]
|
|
|
|
example_imports = (
|
|
"""
|
|
from kittycad.client import ClientFromEnv
|
|
from kittycad.api."""
|
|
+ tag_name
|
|
+ """ import """
|
|
+ fn_name
|
|
+ """
|
|
from kittycad.types import Response
|
|
"""
|
|
)
|
|
|
|
# Iterate over the parameters.
|
|
params_str = ""
|
|
if "parameters" in endpoint:
|
|
parameters = endpoint["parameters"]
|
|
optional_args = []
|
|
for parameter in parameters:
|
|
parameter_name = parameter["name"]
|
|
(
|
|
parameter_type,
|
|
parameter_example,
|
|
more_example_imports,
|
|
) = generateTypeAndExamplePython("", parameter["schema"], data, None, None)
|
|
example_imports = example_imports + more_example_imports
|
|
|
|
if "nullable" in parameter["schema"] and parameter["schema"]["nullable"]:
|
|
parameter_type = "Optional[" + parameter_type + "]"
|
|
optional_args.append(
|
|
clean_parameter_name(parameter_name)
|
|
+ "= None, # "
|
|
+ parameter_type
|
|
+ "\n"
|
|
)
|
|
else:
|
|
params_str += (
|
|
clean_parameter_name(parameter_name)
|
|
+ "="
|
|
+ parameter_example
|
|
+ ",\n"
|
|
)
|
|
|
|
for optional_arg in optional_args:
|
|
params_str += optional_arg
|
|
|
|
body_example = "{}"
|
|
if request_body_type:
|
|
if request_body_type == "str":
|
|
params_str += "body='<string>',\n"
|
|
elif request_body_type == "bytes":
|
|
params_str += "body=bytes('some bytes', 'utf-8'),\n"
|
|
elif request_body_schema:
|
|
# Generate an example for the schema.
|
|
rbs: Dict[Any, Any] = request_body_schema
|
|
(
|
|
body_type,
|
|
body_ex,
|
|
more_example_imports,
|
|
) = generateTypeAndExamplePython(request_body_type, rbs, data, None, None)
|
|
body_example = body_ex
|
|
if "x-dropshot-websocket" not in endpoint:
|
|
params_str += "body=" + body_example + ",\n"
|
|
else:
|
|
body_example = request_body_type + "(" + body_example + ")"
|
|
example_imports = (
|
|
example_imports
|
|
+ "from kittycad.models import "
|
|
+ request_body_type
|
|
+ "\n"
|
|
)
|
|
example_imports = example_imports + more_example_imports
|
|
|
|
example_variable = ""
|
|
example_variable_response = ""
|
|
|
|
response_type = getFunctionResultType(endpoint, endpoint_refs)
|
|
detailed_response_type = getDetailedFunctionResultType(endpoint, endpoint_refs)
|
|
if (
|
|
success_type != "str"
|
|
and success_type != "dict"
|
|
and success_type != "None"
|
|
and success_type != ""
|
|
):
|
|
for endpoint_ref in endpoint_refs:
|
|
if endpoint_ref == "Error":
|
|
continue
|
|
# For some reason, PrivacySettings is showing up twice so we want to skip
|
|
# it here. Obviously this is a hack and we should fix the root cause.
|
|
# When this happens again with another struct there might be a more obvious
|
|
# solution, but alas, I am lazy.
|
|
if endpoint_ref != "PrivacySettings" and endpoint_ref != "List[str]":
|
|
example_imports = example_imports + (
|
|
"""from kittycad.models import """
|
|
+ endpoint_ref.replace("List[", "").replace("]", "")
|
|
+ "\n"
|
|
)
|
|
example_imports = (
|
|
example_imports + "from typing import Union, Any, Optional, List, Tuple\n"
|
|
)
|
|
|
|
example_variable = "result: " + response_type + " = "
|
|
|
|
example_imports = example_imports + "from kittycad.types import Response\n"
|
|
example_imports = example_imports + "from kittycad.models import Error\n"
|
|
example_variable_response = "response: " + detailed_response_type + " = "
|
|
|
|
# Add some new lines.
|
|
example_imports = example_imports + "\n\n"
|
|
|
|
short_sync_example = (
|
|
"""def test_"""
|
|
+ fn_name
|
|
+ """():
|
|
# Create our client.
|
|
client = ClientFromEnv()
|
|
|
|
"""
|
|
+ example_variable
|
|
+ fn_name
|
|
+ """.sync(client=client,\n"""
|
|
+ params_str
|
|
+ """)
|
|
"""
|
|
)
|
|
|
|
if (
|
|
success_type != "str"
|
|
and success_type != "dict"
|
|
and success_type != "None"
|
|
and success_type != ""
|
|
):
|
|
example_success_type = success_type
|
|
|
|
short_sync_example = short_sync_example + (
|
|
"""
|
|
if isinstance(result, Error) or result is None:
|
|
print(result)
|
|
raise Exception("Error in response")
|
|
|
|
body: """
|
|
+ example_success_type
|
|
+ """ = result
|
|
print(body)
|
|
|
|
"""
|
|
)
|
|
|
|
long_example = (
|
|
"""
|
|
|
|
# OR if you need more info (e.g. status_code)
|
|
"""
|
|
+ example_variable_response
|
|
+ fn_name
|
|
+ """.sync_detailed(client=client,\n"""
|
|
+ params_str
|
|
+ """)
|
|
|
|
# OR run async
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.skip
|
|
async def test_"""
|
|
+ fn_name
|
|
+ """_async():
|
|
# Create our client.
|
|
client = ClientFromEnv()
|
|
|
|
"""
|
|
+ example_variable
|
|
+ "await "
|
|
+ fn_name
|
|
+ """.asyncio(client=client,\n"""
|
|
+ params_str
|
|
+ """)
|
|
|
|
# OR run async with more info
|
|
"""
|
|
+ example_variable_response
|
|
+ "await "
|
|
+ fn_name
|
|
+ """.asyncio_detailed(client=client,\n"""
|
|
+ params_str
|
|
+ """)"""
|
|
)
|
|
|
|
# Generate the websocket examples.
|
|
if "x-dropshot-websocket" in endpoint:
|
|
if request_body_type is None:
|
|
short_sync_example = (
|
|
"""def test_"""
|
|
+ fn_name
|
|
+ """():
|
|
# Create our client.
|
|
client = ClientFromEnv()
|
|
|
|
# Connect to the websocket.
|
|
with """
|
|
+ fn_name
|
|
+ """.sync(client=client,"""
|
|
+ params_str
|
|
+ """) as websocket:
|
|
|
|
# Send a message.
|
|
websocket.send("{}")
|
|
|
|
# Get the messages.
|
|
for message in websocket:
|
|
print(message)
|
|
|
|
"""
|
|
)
|
|
else:
|
|
short_sync_example = (
|
|
"""def test_"""
|
|
+ fn_name
|
|
+ """():
|
|
# Create our client.
|
|
client = ClientFromEnv()
|
|
|
|
# Connect to the websocket.
|
|
with """
|
|
+ fn_name
|
|
+ """.WebSocket(client=client,"""
|
|
+ params_str
|
|
+ """) as websocket:
|
|
|
|
# Send a message.
|
|
websocket.send("""
|
|
+ body_example
|
|
+ """)
|
|
|
|
# Get a message.
|
|
message = websocket.recv()
|
|
print(message)
|
|
|
|
"""
|
|
)
|
|
|
|
long_example = (
|
|
"""
|
|
|
|
# OR run async
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.skip
|
|
async def test_"""
|
|
+ fn_name
|
|
+ """_async():
|
|
# Create our client.
|
|
client = ClientFromEnv()
|
|
|
|
# Connect to the websocket.
|
|
websocket = await """
|
|
+ fn_name
|
|
+ """.asyncio(client=client,"""
|
|
+ params_str
|
|
+ """)
|
|
|
|
# Send a message.
|
|
await websocket.send("{}")
|
|
|
|
# Get the messages.
|
|
async for message in websocket:
|
|
print(message)
|
|
"""
|
|
)
|
|
|
|
# This longer example we use for generating tests.
|
|
# We only show the short example in the docs since it is much more intuitive to MEs
|
|
example = (
|
|
example_imports
|
|
+ """
|
|
|
|
@pytest.mark.skip
|
|
"""
|
|
+ short_sync_example
|
|
+ long_example
|
|
)
|
|
|
|
# Make pretty.
|
|
line_length = 82
|
|
short_sync_example = example_imports + short_sync_example
|
|
cleaned_example = black.format_str(
|
|
isort.api.sort_code_string(
|
|
short_sync_example,
|
|
),
|
|
mode=black.FileMode(line_length=line_length),
|
|
)
|
|
|
|
examples.append(example)
|
|
|
|
# Add our example to our json output.
|
|
data["paths"][name][method]["x-python"] = {
|
|
"example": cleaned_example.replace("def test_", "def example_"),
|
|
"libDocsLink": "https://python.api.docs.zoo.dev/_autosummary/kittycad.api."
|
|
+ tag_name
|
|
+ "."
|
|
+ fn_name
|
|
+ ".html",
|
|
}
|
|
|
|
# Start defining the template info.
|
|
ArgType = TypedDict(
|
|
"ArgType",
|
|
{
|
|
"name": str,
|
|
"type": str,
|
|
"in_url": bool,
|
|
"in_query": bool,
|
|
"is_optional": bool,
|
|
},
|
|
)
|
|
TemplateType = TypedDict(
|
|
"TemplateType",
|
|
{
|
|
"imports": List[str],
|
|
"response_type": str,
|
|
"args": List[ArgType],
|
|
"url_template": str,
|
|
"method": str,
|
|
"docs": str,
|
|
"parse_response": str,
|
|
"has_request_body": bool,
|
|
"request_body_type": str,
|
|
},
|
|
)
|
|
template_info: TemplateType = {
|
|
"imports": [],
|
|
"response_type": response_type,
|
|
"args": [],
|
|
"url_template": "{}" + name,
|
|
"method": method,
|
|
"docs": "",
|
|
"parse_response": "",
|
|
"has_request_body": False,
|
|
"request_body_type": "",
|
|
}
|
|
|
|
if len(endpoint_refs) == 0:
|
|
template_info["response_type"] = ""
|
|
|
|
if "x-dropshot-websocket" in endpoint:
|
|
template_info["response_type"] = (
|
|
template_info["response_type"].replace("Optional[", "").replace("]", "")
|
|
)
|
|
|
|
if "description" in endpoint:
|
|
template_info["docs"] = endpoint["description"]
|
|
|
|
# Import our references for responses.
|
|
for ref in endpoint_refs:
|
|
if ref.startswith("List[") and ref.endswith("]"):
|
|
ref = ref.replace("List[", "").replace("]", "")
|
|
if ref != "str" and ref != "dict":
|
|
template_info["imports"].append(
|
|
"from ...models." + camel_to_snake(ref) + " import " + ref
|
|
)
|
|
for ref in parameter_refs:
|
|
template_info["imports"].append(
|
|
"from ...models." + camel_to_snake(ref) + " import " + ref
|
|
)
|
|
for ref in request_body_refs:
|
|
template_info["imports"].append(
|
|
"from ...models." + camel_to_snake(ref) + " import " + ref
|
|
)
|
|
|
|
# Iterate over the responses.
|
|
parse_response = io.StringIO()
|
|
if len(endpoint_refs) > 0:
|
|
responses = endpoint["responses"]
|
|
for response_code in responses:
|
|
response = responses[response_code]
|
|
if response_code == "default":
|
|
# This is no content.
|
|
parse_response.write("\treturn None\n")
|
|
elif response_code == "204" or response_code == "302":
|
|
# This is no content.
|
|
parse_response.write("\treturn None\n")
|
|
else:
|
|
parse_response.write(
|
|
"\tif response.status_code == "
|
|
+ response_code.replace("XX", "00")
|
|
+ ":\n"
|
|
)
|
|
is_one_of = False
|
|
if "content" in response:
|
|
content = response["content"]
|
|
for content_type in content:
|
|
if content_type == "application/json":
|
|
json = content[content_type]["schema"]
|
|
if "$ref" in json:
|
|
ref = json["$ref"].replace("#/components/schemas/", "")
|
|
schema = data["components"]["schemas"][ref]
|
|
# Let's check if it is a oneOparse_response.
|
|
if "oneOf" in schema:
|
|
is_one_of = True
|
|
# We want to parse each of the possible types.
|
|
parse_response.write("\t\tdata = response.json()\n")
|
|
for index, one_of in enumerate(schema["oneOf"]):
|
|
ref = getOneOfRefType(one_of)
|
|
parse_response.write("\t\ttry:\n")
|
|
parse_response.write(
|
|
"\t\t\tif not isinstance(data, dict):\n"
|
|
)
|
|
parse_response.write(
|
|
"\t\t\t\traise TypeError()\n"
|
|
)
|
|
option_name = "option_" + camel_to_snake(ref)
|
|
parse_response.write(
|
|
"\t\t\t"
|
|
+ option_name
|
|
+ " = "
|
|
+ snake_to_title(ref)
|
|
+ "(**data)\n"
|
|
)
|
|
parse_response.write(
|
|
"\t\t\treturn " + option_name + "\n"
|
|
)
|
|
parse_response.write("\t\texcept ValueError:\n")
|
|
if index == len(schema["oneOf"]) - 1:
|
|
# On the last one raise the error.
|
|
parse_response.write("\t\t\traise\n")
|
|
else:
|
|
parse_response.write("\t\t\tpass\n")
|
|
parse_response.write("\t\texcept TypeError:\n")
|
|
if index == len(schema["oneOf"]) - 1:
|
|
# On the last one raise the error.
|
|
parse_response.write("\t\t\traise\n")
|
|
else:
|
|
parse_response.write("\t\t\tpass\n")
|
|
else:
|
|
parse_response.write(
|
|
"\t\tresponse_"
|
|
+ response_code
|
|
+ " = "
|
|
+ ref
|
|
+ "(**response.json())\n"
|
|
)
|
|
elif "type" in json:
|
|
if json["type"] == "array":
|
|
items = json["items"]
|
|
if "$ref" in items:
|
|
ref = items["$ref"].replace(
|
|
"#/components/schemas/", ""
|
|
)
|
|
parse_response.write(
|
|
"\t\tresponse_" + response_code + " = [\n"
|
|
)
|
|
parse_response.write(
|
|
"\t\t\t" + ref + "(**item)\n"
|
|
)
|
|
parse_response.write(
|
|
"\t\t\tfor item in response.json()\n"
|
|
)
|
|
parse_response.write("\t\t]\n")
|
|
elif "type" in items:
|
|
if items["type"] == "string":
|
|
parse_response.write(
|
|
"\t\tresponse_"
|
|
+ response_code
|
|
+ " = [\n"
|
|
)
|
|
parse_response.write("\t\t\tstr(**item)\n")
|
|
parse_response.write(
|
|
"\t\t\tfor item in response.json()\n"
|
|
)
|
|
parse_response.write("\t\t]\n")
|
|
else:
|
|
raise Exception("Unknown array type", items)
|
|
else:
|
|
raise Exception("Unknown array type")
|
|
elif json["type"] == "string":
|
|
parse_response.write(
|
|
"\t\tresponse_"
|
|
+ response_code
|
|
+ " = response.text\n"
|
|
)
|
|
elif (
|
|
json["type"] == "object"
|
|
and "additionalProperties" in json
|
|
):
|
|
parse_response.write(
|
|
"\t\tresponse_"
|
|
+ response_code
|
|
+ " = response.json()\n"
|
|
)
|
|
else:
|
|
print(json)
|
|
raise Exception("Unknown type", json["type"])
|
|
else:
|
|
parse_response.write(
|
|
"\t\tresponse_"
|
|
+ response_code
|
|
+ " = response.json()\n"
|
|
)
|
|
|
|
elif "$ref" in response:
|
|
schema_name = response["$ref"].replace(
|
|
"#/components/responses/", ""
|
|
)
|
|
schema = data["components"]["responses"][schema_name]
|
|
if "content" in schema:
|
|
content = schema["content"]
|
|
for content_type in content:
|
|
if content_type == "application/json":
|
|
json = content[content_type]["schema"]
|
|
if "$ref" in json:
|
|
ref = json["$ref"].replace(
|
|
"#/components/schemas/", ""
|
|
)
|
|
parse_response.write(
|
|
"\t\tresponse_"
|
|
+ response_code
|
|
+ " = "
|
|
+ ref
|
|
+ "(**response.json())\n"
|
|
)
|
|
else:
|
|
print(endpoint)
|
|
raise Exception("response not supported")
|
|
|
|
if not is_one_of:
|
|
parse_response.write("\t\treturn response_" + response_code + "\n")
|
|
|
|
# End the method.
|
|
parse_response.write("\treturn Error(**response.json())\n")
|
|
else:
|
|
parse_response.write("\treturn\n")
|
|
|
|
template_info["parse_response"] = parse_response.getvalue()
|
|
|
|
# Iterate over the parameters.
|
|
optional_args = []
|
|
if "parameters" in endpoint:
|
|
parameters = endpoint["parameters"]
|
|
for parameter in parameters:
|
|
parameter_name = parameter["name"]
|
|
if "type" in parameter["schema"]:
|
|
parameter_type = (
|
|
parameter["schema"]["type"]
|
|
.replace("string", "str")
|
|
.replace("integer", "int")
|
|
.replace("number", "float")
|
|
.replace("boolean", "bool")
|
|
)
|
|
elif "$ref" in parameter["schema"]:
|
|
parameter_type = parameter["schema"]["$ref"].replace(
|
|
"#/components/schemas/", ""
|
|
)
|
|
else:
|
|
logging.error("parameter: ", parameter)
|
|
raise Exception("Unknown parameter type")
|
|
if "nullable" in parameter["schema"]:
|
|
if parameter["schema"]["nullable"]:
|
|
parameter_type = "Optional[" + parameter_type + "] = None"
|
|
template_info["args"].append(
|
|
{
|
|
"name": camel_to_snake(parameter_name),
|
|
"type": parameter_type,
|
|
"in_url": "in" in parameter and (parameter["in"] == "path"),
|
|
"in_query": "in" in parameter
|
|
and (parameter["in"] == "query"),
|
|
"is_optional": True,
|
|
}
|
|
)
|
|
else:
|
|
template_info["args"].append(
|
|
{
|
|
"name": camel_to_snake(parameter_name),
|
|
"type": parameter_type,
|
|
"in_url": "in" in parameter and (parameter["in"] == "path"),
|
|
"in_query": "in" in parameter
|
|
and (parameter["in"] == "query"),
|
|
"is_optional": False,
|
|
}
|
|
)
|
|
else:
|
|
template_info["args"].append(
|
|
{
|
|
"name": camel_to_snake(parameter_name),
|
|
"type": parameter_type,
|
|
"in_url": "in" in parameter and (parameter["in"] == "path"),
|
|
"in_query": "in" in parameter and (parameter["in"] == "query"),
|
|
"is_optional": False,
|
|
}
|
|
)
|
|
|
|
if request_body_type:
|
|
template_info["args"].append(
|
|
{
|
|
"name": "body",
|
|
"type": request_body_type,
|
|
"in_url": False,
|
|
"in_query": False,
|
|
"is_optional": False,
|
|
}
|
|
)
|
|
template_info["has_request_body"] = True
|
|
template_info["request_body_type"] = request_body_type
|
|
|
|
# Generate the template for the functions.
|
|
environment = jinja2.Environment(loader=jinja2.FileSystemLoader("generate/"))
|
|
template_file = "functions.py.jinja2"
|
|
if "x-dropshot-websocket" in endpoint:
|
|
template_file = "functions-ws.py.jinja2"
|
|
template = environment.get_template(template_file)
|
|
content = template.render(**template_info)
|
|
with open(file_path, mode="w", encoding="utf-8") as message:
|
|
message.write(content)
|
|
logging.info(f"... wrote {file_path}")
|
|
|
|
return data
|
|
|
|
|
|
def generateTypes(cwd: str, parser: dict):
|
|
# Make sure we have the directory.
|
|
path = os.path.join(cwd, "kittycad", "models")
|
|
os.makedirs(path, exist_ok=True)
|
|
|
|
# Open the __init__.py file.
|
|
file_name = "__init__.py"
|
|
file_path = os.path.join(path, file_name)
|
|
f = open(file_path, "w")
|
|
f.write('""" Contains all the data models used in inputs/outputs """\n')
|
|
f.write("\n")
|
|
|
|
# Generate the types.
|
|
data = parser
|
|
schemas = data["components"]["schemas"]
|
|
for key in schemas:
|
|
schema = schemas[key]
|
|
logging.info("generating schema: ", key)
|
|
generateType(path, key, schema, data)
|
|
f.write("from ." + camel_to_snake(key) + " import " + key + "\n")
|
|
|
|
# This is a hot fix for the empty type.
|
|
# We likely need a better way to handle this.
|
|
f.write("from .empty import Empty\n")
|
|
|
|
# Close the file.
|
|
f.close()
|
|
|
|
|
|
def generateType(path: str, name: str, schema: dict, data: dict):
|
|
file_path = path
|
|
if path.endswith(".py") is False:
|
|
# Generate the type.
|
|
file_name = camel_to_snake(name) + ".py"
|
|
file_path = os.path.join(path, file_name)
|
|
|
|
if "type" in schema:
|
|
type_name = schema["type"]
|
|
if type_name == "object":
|
|
generateObjectType(file_path, name, schema, type_name, data)
|
|
elif type_name == "string" and "enum" in schema and schema["enum"] != [None]:
|
|
generateEnumType(file_path, name, schema, type_name, [])
|
|
elif type_name == "integer":
|
|
generateIntegerType(file_path, name, schema, type_name)
|
|
elif type_name == "number":
|
|
generateFloatType(file_path, name, schema, type_name)
|
|
elif type_name == "string":
|
|
generateStringType(file_path, name, schema, type_name)
|
|
else:
|
|
logging.error("unsupported type: ", type_name)
|
|
raise Exception("unsupported type: ", type_name)
|
|
elif "$ref" in schema:
|
|
# Skip it since we will already have generated it.
|
|
return
|
|
elif "oneOf" in schema:
|
|
generateOneOfType(file_path, name, schema, data)
|
|
elif "anyOf" in schema:
|
|
generateAnyOfType(file_path, name, schema, data)
|
|
else:
|
|
logging.error("schema: ", [schema])
|
|
logging.error("unsupported type: ", name)
|
|
raise Exception("unsupported type: ", name)
|
|
|
|
|
|
def generateStringType(path: str, name: str, schema: dict, type_name: str):
|
|
logging.info("generating type: ", name, " at: ", path)
|
|
f = open(path, "w")
|
|
|
|
TemplateType = TypedDict(
|
|
"TemplateType",
|
|
{
|
|
"description": str,
|
|
"name": str,
|
|
},
|
|
)
|
|
|
|
description = ""
|
|
if "description" in schema:
|
|
description = schema["description"]
|
|
|
|
template_info: TemplateType = {
|
|
"description": description,
|
|
"name": name,
|
|
}
|
|
|
|
environment = jinja2.Environment(loader=jinja2.FileSystemLoader("generate/"))
|
|
template_file = "str.py.jinja2"
|
|
template = environment.get_template(template_file)
|
|
content = template.render(**template_info)
|
|
|
|
f.write(content)
|
|
|
|
# Close the file.
|
|
f.close()
|
|
|
|
|
|
def generateIntegerType(path: str, name: str, schema: dict, type_name: str):
|
|
logging.info("generating type: ", name, " at: ", path)
|
|
f = open(path, "w")
|
|
|
|
TemplateType = TypedDict(
|
|
"TemplateType",
|
|
{
|
|
"description": str,
|
|
"name": str,
|
|
},
|
|
)
|
|
|
|
description = ""
|
|
if "description" in schema:
|
|
description = schema["description"]
|
|
|
|
template_info: TemplateType = {
|
|
"description": description,
|
|
"name": name,
|
|
}
|
|
|
|
environment = jinja2.Environment(loader=jinja2.FileSystemLoader("generate/"))
|
|
template_file = "int.py.jinja2"
|
|
template = environment.get_template(template_file)
|
|
content = template.render(**template_info)
|
|
|
|
f.write(content)
|
|
|
|
# Close the file.
|
|
f.close()
|
|
|
|
|
|
def generateFloatType(path: str, name: str, schema: dict, type_name: str):
|
|
logging.info("generating type: ", name, " at: ", path)
|
|
f = open(path, "w")
|
|
|
|
TemplateType = TypedDict(
|
|
"TemplateType",
|
|
{
|
|
"description": str,
|
|
"name": str,
|
|
},
|
|
)
|
|
|
|
description = ""
|
|
if "description" in schema:
|
|
description = schema["description"]
|
|
|
|
template_info: TemplateType = {
|
|
"description": description,
|
|
"name": name,
|
|
}
|
|
|
|
environment = jinja2.Environment(loader=jinja2.FileSystemLoader("generate/"))
|
|
template_file = "float.py.jinja2"
|
|
template = environment.get_template(template_file)
|
|
content = template.render(**template_info)
|
|
|
|
f.write(content)
|
|
|
|
# Close the file.
|
|
f.close()
|
|
|
|
|
|
def generateEnumType(
|
|
path: str,
|
|
name: str,
|
|
schema: dict,
|
|
type_name: str,
|
|
additional_docs: List[str],
|
|
):
|
|
logging.info("generating type: ", name, " at: ", path)
|
|
f = open(path, "w")
|
|
|
|
code = generateEnumTypeCode(name, schema, type_name, additional_docs)
|
|
f.write(code)
|
|
|
|
# Close the file.
|
|
f.close()
|
|
|
|
|
|
def generateEnumTypeCode(
|
|
name: str,
|
|
schema: dict,
|
|
type_name: str,
|
|
additional_docs: List[str],
|
|
) -> str:
|
|
f = io.StringIO()
|
|
|
|
f.write("from enum import Enum\n")
|
|
f.write("\n")
|
|
f.write("class " + name + "(str, Enum):\n")
|
|
if "description" in schema:
|
|
f.write('\t""" ' + schema["description"] + ' """ # noqa: E501\n')
|
|
# Iterate over the properties.
|
|
for num, value in enumerate(schema["enum"], start=0):
|
|
enum_name = camel_to_screaming_snake(value)
|
|
if enum_name == "":
|
|
enum_name = "EMPTY"
|
|
elif enum_name == "1":
|
|
enum_name = "ONE"
|
|
elif enum_name == "2":
|
|
enum_name = "TWO"
|
|
|
|
# Write the description if there is one.
|
|
if len(additional_docs) > 0:
|
|
additional_doc = additional_docs[num]
|
|
if additional_doc != "":
|
|
f.write('\t"""# ' + additional_docs[num] + ' """ # noqa: E501\n')
|
|
f.write("\t" + enum_name + " = '" + value + "'\n")
|
|
|
|
# close the enum.
|
|
f.write("\n")
|
|
f.write("\tdef __str__(self) -> str:\n")
|
|
f.write("\t\treturn str(self.value)\n")
|
|
|
|
value = f.getvalue()
|
|
|
|
# Close the file.
|
|
f.close()
|
|
|
|
return value
|
|
|
|
|
|
def generateAnyOfType(path: str, name: str, schema: dict, data: dict):
|
|
logging.info("generating type: ", name, " at: ", path)
|
|
|
|
if isEnumWithDocsOneOf(schema):
|
|
additional_docs = []
|
|
enum = []
|
|
# We want to treat this as an enum with additional docs.
|
|
for any_of in schema["anyOf"]:
|
|
enum.append(any_of["enum"][0])
|
|
if "description" in any_of:
|
|
additional_docs.append(any_of["description"])
|
|
else:
|
|
additional_docs.append("")
|
|
# Write the enum.
|
|
schema["enum"] = enum
|
|
schema["type"] = "string"
|
|
generateEnumType(path, name, schema, "string", additional_docs)
|
|
# return early.
|
|
return
|
|
|
|
# Open our file.
|
|
f = open(path, "w")
|
|
|
|
# Import the refs if there are any.
|
|
all_options = []
|
|
for any_of in schema["anyOf"]:
|
|
if "allOf" in any_of:
|
|
for all_of in any_of["allOf"]:
|
|
if "$ref" in all_of:
|
|
ref = all_of["$ref"]
|
|
ref_name = ref[ref.rfind("/") + 1 :]
|
|
f.write(
|
|
"from ."
|
|
+ camel_to_snake(ref_name)
|
|
+ " import "
|
|
+ ref_name
|
|
+ "\n"
|
|
)
|
|
all_options.append(ref_name)
|
|
if "$ref" in any_of:
|
|
ref = any_of["$ref"]
|
|
ref_name = ref[ref.rfind("/") + 1 :]
|
|
f.write("from ." + camel_to_snake(ref_name) + " import " + ref_name + "\n")
|
|
all_options.append(ref_name)
|
|
|
|
if isNestedObjectAnyOf(schema):
|
|
# We want to write each of the nested objects.
|
|
for any_of in schema["anyOf"]:
|
|
# Get the nested object.
|
|
if "properties" in any_of:
|
|
for prop_name in any_of["properties"]:
|
|
nested_object = any_of["properties"][prop_name]
|
|
if nested_object == {}:
|
|
f.write("from typing import Any\n")
|
|
f.write(prop_name + " = Any\n")
|
|
f.write("\n")
|
|
all_options.append(prop_name)
|
|
elif "$ref" in nested_object:
|
|
ref = nested_object["$ref"]
|
|
ref_name = ref[ref.rfind("/") + 1 :]
|
|
f.write(
|
|
"from ."
|
|
+ camel_to_snake(ref_name)
|
|
+ " import "
|
|
+ ref_name
|
|
+ "\n"
|
|
)
|
|
f.write("\n")
|
|
if prop_name != ref_name:
|
|
f.write(prop_name + " = " + ref_name + "\n")
|
|
f.write("\n")
|
|
all_options.append(prop_name)
|
|
else:
|
|
object_code = generateObjectTypeCode(
|
|
prop_name, nested_object, "object", data, None, None
|
|
)
|
|
f.write(object_code)
|
|
f.write("\n")
|
|
all_options.append(prop_name)
|
|
elif "type" in any_of and any_of["type"] == "string":
|
|
enum_code = generateEnumTypeCode(
|
|
any_of["enum"][0], any_of, "string", []
|
|
)
|
|
f.write(enum_code)
|
|
f.write("\n")
|
|
all_options.append(any_of["enum"][0])
|
|
|
|
# Check if each any_of has the same enum of one.
|
|
tag = getTagAnyOf(schema)
|
|
|
|
if tag is not None:
|
|
# Generate each of the options from the tag.
|
|
for any_of in schema["anyOf"]:
|
|
# Get the value of the tag.
|
|
object_name = any_of["properties"][tag]["enum"][0]
|
|
object_code = generateObjectTypeCode(
|
|
object_name, any_of, "object", data, tag, None
|
|
)
|
|
f.write(object_code)
|
|
f.write("\n")
|
|
all_options.append(object_name)
|
|
else:
|
|
# We want to write each of the nested objects.
|
|
for any_of in schema["anyOf"]:
|
|
# Get the nested object.
|
|
if "properties" in any_of:
|
|
for prop_name in any_of["properties"]:
|
|
nested_object = any_of["properties"][prop_name]
|
|
if nested_object == {}:
|
|
f.write("from typing import Any\n")
|
|
f.write(prop_name + " = Any\n")
|
|
f.write("\n")
|
|
all_options.append(prop_name)
|
|
elif "$ref" in nested_object:
|
|
ref = nested_object["$ref"]
|
|
ref_name = ref[ref.rfind("/") + 1 :]
|
|
f.write(
|
|
"from ."
|
|
+ camel_to_snake(ref_name)
|
|
+ " import "
|
|
+ ref_name
|
|
+ "\n"
|
|
)
|
|
f.write("\n")
|
|
if prop_name != ref_name:
|
|
f.write(prop_name + " = " + ref_name + "\n")
|
|
f.write("\n")
|
|
all_options.append(prop_name)
|
|
else:
|
|
object_code = generateObjectTypeCode(
|
|
prop_name, nested_object, "object", data, None, None
|
|
)
|
|
f.write(object_code)
|
|
f.write("\n")
|
|
all_options.append(prop_name)
|
|
elif "type" in any_of and any_of["type"] == "string":
|
|
enum_code = generateEnumTypeCode(
|
|
any_of["enum"][0], any_of, "string", []
|
|
)
|
|
f.write(enum_code)
|
|
f.write("\n")
|
|
all_options.append(any_of["enum"][0])
|
|
|
|
# Write the sum type.
|
|
description = getAnyOfDescription(schema)
|
|
content = generateUnionType(all_options, name, description, tag)
|
|
f.write(content)
|
|
|
|
# Close the file.
|
|
f.close()
|
|
|
|
|
|
def getAnyOfDescription(schema: dict) -> str:
|
|
if "description" in schema:
|
|
return schema["description"]
|
|
else:
|
|
return ""
|
|
|
|
|
|
def generateUnionType(
|
|
types: List[str], name: str, description: str, tag: Optional[str]
|
|
) -> str:
|
|
ArgType = TypedDict(
|
|
"ArgType",
|
|
{
|
|
"name": str,
|
|
"var0": str,
|
|
"var1": str,
|
|
"check": str,
|
|
"value": str,
|
|
},
|
|
)
|
|
TemplateType = TypedDict(
|
|
"TemplateType",
|
|
{
|
|
"types": List[ArgType],
|
|
"description": str,
|
|
"name": str,
|
|
"tag": Optional[str],
|
|
},
|
|
)
|
|
template_info: TemplateType = {
|
|
"types": [],
|
|
"description": description,
|
|
"name": name,
|
|
"tag": tag,
|
|
}
|
|
for type in types:
|
|
if type == "SuccessWebSocketResponse":
|
|
template_info["types"].append(
|
|
{
|
|
"name": type,
|
|
"var0": randletter(),
|
|
"var1": randletter(),
|
|
"check": "success",
|
|
"value": "True",
|
|
}
|
|
)
|
|
elif type == "FailureWebSocketResponse":
|
|
template_info["types"].append(
|
|
{
|
|
"name": type,
|
|
"var0": randletter(),
|
|
"var1": randletter(),
|
|
"check": "success",
|
|
"value": "False",
|
|
}
|
|
)
|
|
else:
|
|
template_info["types"].append(
|
|
{
|
|
"name": snake_to_title(type),
|
|
"var0": randletter(),
|
|
"var1": randletter(),
|
|
"check": "type",
|
|
"value": '"' + type + '"',
|
|
}
|
|
)
|
|
|
|
environment = jinja2.Environment(loader=jinja2.FileSystemLoader("generate/"))
|
|
template_file = "union-type.py.jinja2"
|
|
template = environment.get_template(template_file)
|
|
content = template.render(**template_info)
|
|
|
|
return content
|
|
|
|
|
|
def generateOneOfType(path: str, name: str, schema: dict, data: dict):
|
|
logging.info("generating type: ", name, " at: ", path)
|
|
|
|
if isEnumWithDocsOneOf(schema):
|
|
additional_docs = []
|
|
enum = []
|
|
# We want to treat this as an enum with additional docs.
|
|
for one_of in schema["oneOf"]:
|
|
enum.append(one_of["enum"][0])
|
|
if "description" in one_of:
|
|
additional_docs.append(one_of["description"])
|
|
else:
|
|
additional_docs.append("")
|
|
# Write the enum.
|
|
schema["enum"] = enum
|
|
schema["type"] = "string"
|
|
generateEnumType(path, name, schema, "string", additional_docs)
|
|
# return early.
|
|
return
|
|
|
|
# Open our file.
|
|
f = open(path, "w")
|
|
|
|
# Import the refs if there are any.
|
|
all_options = []
|
|
for one_of in schema["oneOf"]:
|
|
if "$ref" in one_of:
|
|
ref = one_of["$ref"]
|
|
ref_name = ref[ref.rfind("/") + 1 :]
|
|
f.write(
|
|
"from ."
|
|
+ camel_to_snake(ref_name)
|
|
+ " import "
|
|
+ snake_to_title(ref_name)
|
|
+ "\n"
|
|
)
|
|
all_options.append(snake_to_title(ref_name))
|
|
|
|
if isNestedObjectOneOf(schema):
|
|
# We want to write each of the nested objects.
|
|
for one_of in schema["oneOf"]:
|
|
# Get the nested object.
|
|
if "properties" in one_of:
|
|
for prop_name in one_of["properties"]:
|
|
nested_object = one_of["properties"][prop_name]
|
|
if nested_object == {}:
|
|
f.write("from typing import Any\n")
|
|
f.write(prop_name + " = Any\n")
|
|
f.write("\n")
|
|
all_options.append(prop_name)
|
|
elif "$ref" in nested_object:
|
|
ref = nested_object["$ref"]
|
|
ref_name = ref[ref.rfind("/") + 1 :]
|
|
f.write(
|
|
"from ."
|
|
+ camel_to_snake(ref_name)
|
|
+ " import "
|
|
+ ref_name
|
|
+ "\n"
|
|
)
|
|
f.write("\n")
|
|
if prop_name != ref_name:
|
|
f.write(prop_name + " = " + ref_name + "\n")
|
|
f.write("\n")
|
|
all_options.append(prop_name)
|
|
else:
|
|
object_code = generateObjectTypeCode(
|
|
prop_name, nested_object, "object", data, None, None
|
|
)
|
|
f.write(object_code)
|
|
f.write("\n")
|
|
all_options.append(prop_name)
|
|
elif "type" in one_of and one_of["type"] == "string":
|
|
enum_code = generateEnumTypeCode(
|
|
one_of["enum"][0], one_of, "string", []
|
|
)
|
|
f.write(enum_code)
|
|
f.write("\n")
|
|
all_options.append(one_of["enum"][0])
|
|
|
|
# Check if each one_of has the same enum of one.
|
|
tag = getTagOneOf(schema)
|
|
content = getContentOneOf(schema, tag)
|
|
|
|
if tag is not None and content is not None:
|
|
# Generate each of the options from the tag.
|
|
for one_of in schema["oneOf"]:
|
|
# Get the value of the tag.
|
|
object_name = one_of["properties"][tag]["enum"][0]
|
|
# Generate the type for the object.
|
|
content_code = generateObjectTypeCode(
|
|
snake_to_title(object_name) + "Data",
|
|
one_of["properties"][content],
|
|
"object",
|
|
data,
|
|
None,
|
|
None,
|
|
)
|
|
f.write(content_code)
|
|
f.write("\n")
|
|
object_code = generateObjectTypeCode(
|
|
object_name, one_of, "object", data, tag, content, True
|
|
)
|
|
f.write(object_code)
|
|
f.write("\n")
|
|
all_options.append("option_" + object_name)
|
|
elif tag is not None:
|
|
# Generate each of the options from the tag.
|
|
for one_of in schema["oneOf"]:
|
|
# Get the value of the tag.
|
|
object_name = one_of["properties"][tag]["enum"][0]
|
|
object_code = generateObjectTypeCode(
|
|
object_name, one_of, "object", data, tag, None, True
|
|
)
|
|
f.write(object_code)
|
|
f.write("\n")
|
|
all_options.append("option_" + object_name)
|
|
elif schema["oneOf"].__len__() == 1:
|
|
description = getOneOfDescription(schema)
|
|
object_code = generateObjectTypeCode(
|
|
name,
|
|
schema["oneOf"][0],
|
|
"object",
|
|
data,
|
|
None,
|
|
None,
|
|
)
|
|
f.write(object_code)
|
|
f.write("\n")
|
|
f.close()
|
|
# return early.
|
|
return
|
|
else:
|
|
# Generate each of the options from the tag.
|
|
i = 0
|
|
for one_of in schema["oneOf"]:
|
|
# Get the value of the tag.
|
|
object_name = name + str(i)
|
|
object_code = generateObjectTypeCode(
|
|
object_name,
|
|
one_of,
|
|
"object",
|
|
data,
|
|
None,
|
|
None,
|
|
)
|
|
f.write(object_code)
|
|
f.write("\n")
|
|
all_options.append(object_name)
|
|
i += 1
|
|
|
|
# Write the sum type.
|
|
description = getOneOfDescription(schema)
|
|
content = generateUnionType(all_options, name, description, tag)
|
|
f.write(content)
|
|
|
|
# Close the file.
|
|
f.close()
|
|
|
|
|
|
def getOneOfDescription(schema: dict) -> str:
|
|
if "description" in schema:
|
|
return schema["description"]
|
|
else:
|
|
return ""
|
|
|
|
|
|
def generateObjectTypeCode(
|
|
name: str,
|
|
schema: dict,
|
|
type_name: str,
|
|
data: dict,
|
|
tag: Optional[str],
|
|
content: Optional[str],
|
|
is_option: bool = False,
|
|
) -> str:
|
|
FieldType = TypedDict(
|
|
"FieldType",
|
|
{
|
|
"name": str,
|
|
"type": str,
|
|
"value": str,
|
|
},
|
|
)
|
|
TemplateType = TypedDict(
|
|
"TemplateType",
|
|
{
|
|
"fields": List[FieldType],
|
|
"description": str,
|
|
"name": str,
|
|
"imports": List[str],
|
|
},
|
|
)
|
|
|
|
description = ""
|
|
if "description" in schema:
|
|
description = schema["description"].replace('"', '\\"')
|
|
|
|
imports = []
|
|
refs = getRefs(schema)
|
|
for ref in refs:
|
|
imports.append("from ..models." + camel_to_snake(ref) + " import " + ref + "\n")
|
|
|
|
required = []
|
|
if "required" in schema:
|
|
required = schema["required"]
|
|
|
|
fields = []
|
|
if "properties" in schema:
|
|
for property_name in schema["properties"]:
|
|
property_schema = schema["properties"][property_name]
|
|
if property_name == tag:
|
|
field0: FieldType = {
|
|
"name": property_name,
|
|
"type": "str",
|
|
"value": '"' + name + '"',
|
|
}
|
|
fields.append(field0)
|
|
elif property_name == content:
|
|
field1: FieldType = {
|
|
"name": property_name,
|
|
"type": snake_to_title(name) + "Data",
|
|
"value": "",
|
|
}
|
|
fields.append(field1)
|
|
else:
|
|
field_type = getTypeName(property_schema)
|
|
if property_name not in required:
|
|
if "default" in property_schema:
|
|
if field_type == "str":
|
|
field_type += ' = "' + property_schema["default"] + '"'
|
|
elif isinstance(property_schema["default"], str):
|
|
field_type += (
|
|
' = "' + property_schema["default"] + '" # type: ignore'
|
|
)
|
|
elif "allOf" in property_schema:
|
|
field_type += (
|
|
" = "
|
|
+ str(property_schema["default"])
|
|
+ " # type: ignore"
|
|
)
|
|
else:
|
|
field_type += " = " + str(property_schema["default"])
|
|
else:
|
|
field_type = "Optional[" + field_type + "] = None"
|
|
field2: FieldType = {
|
|
"name": property_name,
|
|
"type": field_type,
|
|
"value": "",
|
|
}
|
|
fields.append(field2)
|
|
|
|
name = snake_to_title(name)
|
|
if is_option:
|
|
name = "Option" + name
|
|
template_info: TemplateType = {
|
|
"fields": fields,
|
|
"description": description,
|
|
"name": name,
|
|
"imports": imports,
|
|
}
|
|
|
|
# Iterate over the properties.
|
|
|
|
environment = jinja2.Environment(loader=jinja2.FileSystemLoader("generate/"))
|
|
template_file = "object.py.jinja2"
|
|
template = environment.get_template(template_file)
|
|
content = template.render(**template_info)
|
|
|
|
return content
|
|
|
|
|
|
def generateObjectType(path: str, name: str, schema: dict, type_name: str, data: dict):
|
|
logging.info("generating type: ", name, " at: ", path)
|
|
|
|
f = open(path, "w")
|
|
|
|
code = generateObjectTypeCode(name, schema, type_name, data, None, None)
|
|
f.write(code)
|
|
|
|
# Close the file.
|
|
f.close()
|
|
|
|
|
|
def getRefs(schema: dict) -> List[str]:
|
|
refs = []
|
|
if "$ref" in schema:
|
|
refs.append(schema["$ref"].replace("#/components/schemas/", ""))
|
|
|
|
else:
|
|
# Generate the type.
|
|
if "type" not in schema:
|
|
if "allOf" in schema:
|
|
for sub_schema in schema["allOf"]:
|
|
refs.extend(getRefs(sub_schema))
|
|
else:
|
|
type_name = schema["type"]
|
|
if type_name == "object":
|
|
if "properties" in schema:
|
|
# Iternate over the properties.
|
|
for property_name in schema["properties"]:
|
|
property_schema = schema["properties"][property_name]
|
|
schema_refs = getRefs(property_schema)
|
|
for ref in schema_refs:
|
|
if ref not in refs:
|
|
refs.append(ref)
|
|
elif "additionalProperties" in schema:
|
|
schema_refs = getRefs(schema["additionalProperties"])
|
|
for ref in schema_refs:
|
|
if ref not in refs:
|
|
refs.append(ref)
|
|
elif schema == {"type": "object"}:
|
|
# do nothing
|
|
pass
|
|
else:
|
|
# This is likely an empty object like above but with a description
|
|
# so we will just skip it.
|
|
pass
|
|
elif type_name == "array":
|
|
if "items" in schema:
|
|
schema_refs = getRefs(schema["items"])
|
|
for ref in schema_refs:
|
|
if ref not in refs:
|
|
refs.append(ref)
|
|
|
|
return refs
|
|
|
|
|
|
def getEndpointRefs(endpoint: dict, data: dict) -> List[str]:
|
|
refs = []
|
|
|
|
responses = endpoint["responses"]
|
|
for response_code in responses:
|
|
response = responses[response_code]
|
|
if "content" in response:
|
|
content = response["content"]
|
|
for content_type in content:
|
|
if content_type == "application/json":
|
|
json = content[content_type]["schema"]
|
|
if "$ref" in json:
|
|
# If the reference is to a oneOf type, we want to return
|
|
# all the possible outcomes.
|
|
ref = json["$ref"].replace("#/components/schemas/", "")
|
|
schema = data["components"]["schemas"][ref]
|
|
if isNestedObjectOneOf(schema) or isEnumWithDocsOneOf(schema):
|
|
if snake_to_title(ref) not in refs:
|
|
refs.append(snake_to_title(ref))
|
|
elif isTypedObjectOneOf(schema):
|
|
for t in schema["oneOf"]:
|
|
ref = getOneOfRefType(t)
|
|
if snake_to_title(ref) not in refs:
|
|
refs.append(snake_to_title(ref))
|
|
else:
|
|
if ref not in refs:
|
|
refs.append(ref)
|
|
elif "type" in json:
|
|
if json["type"] == "array":
|
|
items = json["items"]
|
|
if "$ref" in items:
|
|
ref = items["$ref"].replace("#/components/schemas/", "")
|
|
refs.append("List[" + ref + "]")
|
|
elif "type" in items:
|
|
if items["type"] == "string":
|
|
refs.append("List[str]")
|
|
else:
|
|
raise Exception("Unknown array type", items)
|
|
else:
|
|
raise Exception("Unknown array type", items)
|
|
elif json["type"] == "string":
|
|
refs.append("str")
|
|
elif (
|
|
json["type"] == "object" and "additionalProperties" in json
|
|
):
|
|
refs.append("dict")
|
|
else:
|
|
print(json)
|
|
raise Exception("Unknown type ", json["type"])
|
|
else:
|
|
refs.append("dict")
|
|
elif content_type == "*/*":
|
|
s = content[content_type]["schema"]
|
|
if s == {}:
|
|
# We don't care it's an empty body.
|
|
continue
|
|
else:
|
|
# Throw an error for an unsupported content type.
|
|
logging.error("content: ", content)
|
|
raise Exception("Unsupported content type: ", content_type)
|
|
else:
|
|
# Throw an error for an unsupported content type.
|
|
logging.error("content: ", content)
|
|
raise Exception("Unsupported content type: ", content_type)
|
|
elif "$ref" in response:
|
|
schema_name = response["$ref"].replace("#/components/responses/", "")
|
|
schema = data["components"]["responses"][schema_name]
|
|
if "content" in schema:
|
|
content = schema["content"]
|
|
for content_type in content:
|
|
if content_type == "application/json":
|
|
json = content[content_type]["schema"]
|
|
if "$ref" in json:
|
|
ref = json["$ref"].replace("#/components/schemas/", "")
|
|
if snake_to_title(ref) not in refs:
|
|
refs.append(snake_to_title(ref))
|
|
|
|
return refs
|
|
|
|
|
|
def getParameterRefs(endpoint: dict) -> List[str]:
|
|
refs = []
|
|
|
|
if "parameters" in endpoint:
|
|
parameters = endpoint["parameters"]
|
|
for parameter in parameters:
|
|
parameter["name"]
|
|
if "$ref" in parameter["schema"]:
|
|
parameter_type = parameter["schema"]["$ref"].replace(
|
|
"#/components/schemas/", ""
|
|
)
|
|
refs.append(parameter_type)
|
|
|
|
return refs
|
|
|
|
|
|
def getRequestBodyRefs(endpoint: dict) -> List[str]:
|
|
refs = []
|
|
|
|
if "requestBody" in endpoint:
|
|
requestBody = endpoint["requestBody"]
|
|
if "content" in requestBody:
|
|
content = requestBody["content"]
|
|
for content_type in content:
|
|
if content_type == "application/json":
|
|
json = content[content_type]["schema"]
|
|
if "$ref" in json:
|
|
ref = json["$ref"].replace("#/components/schemas/", "")
|
|
refs.append(ref)
|
|
elif content_type == "application/octet-stream":
|
|
# do nothing we dont't care
|
|
continue
|
|
elif content_type == "application/x-www-form-urlencoded":
|
|
form = content[content_type]["schema"]
|
|
if "$ref" in form:
|
|
ref = form["$ref"].replace("#/components/schemas/", "")
|
|
refs.append(ref)
|
|
elif content_type == "multipart/form-data":
|
|
form = content[content_type]["schema"]
|
|
if "$ref" in form:
|
|
ref = form["$ref"].replace("#/components/schemas/", "")
|
|
refs.append(ref)
|
|
else:
|
|
# Throw an error for an unsupported content type.
|
|
logging.error("content: ", content)
|
|
raise Exception("Unsupported content type: ", content_type)
|
|
|
|
return refs
|
|
|
|
|
|
def getRequestBodyTypeSchema(
|
|
endpoint: dict, data: dict
|
|
) -> Tuple[Optional[str], Optional[dict]]:
|
|
if "requestBody" in endpoint:
|
|
requestBody = endpoint["requestBody"]
|
|
if "content" in requestBody:
|
|
content = requestBody["content"]
|
|
for content_type in content:
|
|
if content_type == "application/json":
|
|
json = content[content_type]["schema"]
|
|
if "$ref" in json:
|
|
ref = json["$ref"].replace("#/components/schemas/", "")
|
|
type_schema = data["components"]["schemas"][ref]
|
|
return ref, type_schema
|
|
elif json != {}:
|
|
logging.error("not a ref: ", json)
|
|
raise Exception("not a ref")
|
|
elif content_type == "text/plain":
|
|
return "str", None
|
|
elif content_type == "application/octet-stream":
|
|
return "bytes", None
|
|
elif content_type == "application/x-www-form-urlencoded":
|
|
form = content[content_type]["schema"]
|
|
if "$ref" in form:
|
|
ref = form["$ref"].replace("#/components/schemas/", "")
|
|
type_schema = data["components"]["schemas"][ref]
|
|
return ref, type_schema
|
|
elif form != {}:
|
|
logging.error("not a ref: ", form)
|
|
raise Exception("not a ref")
|
|
elif content_type == "multipart/form-data":
|
|
form = content[content_type]["schema"]
|
|
if "$ref" in form:
|
|
ref = form["$ref"].replace("#/components/schemas/", "")
|
|
type_schema = data["components"]["schemas"][ref]
|
|
return ref, type_schema
|
|
elif form != {}:
|
|
type_schema = form
|
|
return None, type_schema
|
|
else:
|
|
logging.error("unsupported content type: ", content_type)
|
|
raise Exception("unsupported content type")
|
|
|
|
return None, None
|
|
|
|
|
|
def to_camel_case(s: str):
|
|
s = re.sub(r"(_|-)+", " ", s).title().replace(" ", "")
|
|
return "".join([s[0].lower(), s[1:]])
|
|
|
|
|
|
def clean_parameter_name(name: str):
|
|
return camel_to_snake(name).replace("from", "from_")
|
|
|
|
|
|
def rename_if_keyword(name: str):
|
|
"""Rename a name if it is also a Python keyword."""
|
|
KEYWORDS = ["global"] # there are more, but this is the only one we overlap now
|
|
if name in KEYWORDS:
|
|
return name + "_"
|
|
return name
|
|
|
|
|
|
def camel_to_snake(name: str):
|
|
name = re.sub("(.)([A-Z][a-z]+)", r"\1_\2", name)
|
|
return re.sub("([a-z0-9])([A-Z])", r"\1_\2", name).lower().replace("-", "_")
|
|
|
|
|
|
def camel_to_screaming_snake(name: str):
|
|
name = re.sub("(.)([A-Z][a-z]+)", r"\1_\2", name)
|
|
return (
|
|
re.sub("([a-z0-9])([A-Z])", r"\1_\2", name)
|
|
.replace(" ", "")
|
|
.upper()
|
|
.replace("-", "_")
|
|
.replace(":", "_")
|
|
)
|
|
|
|
|
|
# Change `file_conversion` to `FileConversion`
|
|
def snake_to_title(name: str):
|
|
if "_" in name or name.islower():
|
|
return "".join([word.title() for word in name.split("_")])
|
|
return name
|
|
|
|
|
|
def get_function_parameters(
|
|
endpoint: dict, request_body_type: Optional[str]
|
|
) -> List[str]:
|
|
params = []
|
|
if "parameters" in endpoint:
|
|
parameters = endpoint["parameters"]
|
|
for parameter in parameters:
|
|
parameter_name = parameter["name"]
|
|
if "type" in parameter["schema"]:
|
|
(
|
|
parameter["schema"]["type"]
|
|
.replace("string", "str")
|
|
.replace("integer", "int")
|
|
.replace("number", "float")
|
|
)
|
|
elif "$ref" in parameter["schema"]:
|
|
parameter["schema"]["$ref"].replace("#/components/schemas/", "")
|
|
else:
|
|
logging.error("parameter: ", parameter)
|
|
raise Exception("Unknown parameter type")
|
|
params.append(camel_to_snake(parameter_name))
|
|
if request_body_type:
|
|
params.append("body")
|
|
return params
|
|
|
|
|
|
def getOneOfRefType(schema: dict) -> str:
|
|
if (
|
|
"type" in schema["properties"]
|
|
and "enum" in schema["properties"]["type"]
|
|
and len(schema["properties"]["type"]["enum"]) == 1
|
|
):
|
|
t = schema["properties"]["type"]["enum"][0]
|
|
return t
|
|
|
|
raise Exception("Cannot get oneOf ref type for schema: ", schema)
|
|
|
|
|
|
def isNestedObjectAnyOf(schema: dict) -> bool:
|
|
if "anyOf" not in schema:
|
|
return False
|
|
|
|
is_nested_object = False
|
|
for any_of in schema["anyOf"]:
|
|
# Check if each are an object w 1 property in it.
|
|
if (
|
|
"type" in any_of
|
|
and any_of["type"] == "object"
|
|
and "properties" in any_of
|
|
and len(any_of["properties"]) == 1
|
|
):
|
|
for prop_name in any_of["properties"]:
|
|
nested_object = any_of["properties"][prop_name]
|
|
if "type" in nested_object and nested_object["type"] == "object":
|
|
is_nested_object = True
|
|
else:
|
|
is_nested_object = False
|
|
break
|
|
elif (
|
|
"type" in any_of
|
|
and any_of["type"] == "string"
|
|
and "enum" in any_of
|
|
and len(any_of["enum"]) == 1
|
|
):
|
|
is_nested_object = True
|
|
else:
|
|
is_nested_object = False
|
|
break
|
|
|
|
return is_nested_object
|
|
|
|
|
|
def isNestedObjectOneOf(schema: dict) -> bool:
|
|
if "oneOf" not in schema:
|
|
return False
|
|
|
|
is_nested_object = False
|
|
for one_of in schema["oneOf"]:
|
|
# Check if each are an object w 1 property in it.
|
|
if (
|
|
"type" in one_of
|
|
and one_of["type"] == "object"
|
|
and "properties" in one_of
|
|
and len(one_of["properties"]) == 1
|
|
):
|
|
for prop_name in one_of["properties"]:
|
|
nested_object = one_of["properties"][prop_name]
|
|
if "type" in nested_object and nested_object["type"] == "object":
|
|
is_nested_object = True
|
|
else:
|
|
is_nested_object = False
|
|
break
|
|
elif (
|
|
"type" in one_of
|
|
and one_of["type"] == "string"
|
|
and "enum" in one_of
|
|
and len(one_of["enum"]) == 1
|
|
):
|
|
is_nested_object = True
|
|
else:
|
|
is_nested_object = False
|
|
break
|
|
|
|
return is_nested_object
|
|
|
|
|
|
def getTagAnyOf(schema: dict) -> Optional[str]:
|
|
tag = None
|
|
for any_of in schema["anyOf"]:
|
|
has_tag = False
|
|
# Check if each are an object w 1 property in it.
|
|
if "type" in any_of and any_of["type"] == "object" and "properties" in any_of:
|
|
for prop_name in any_of["properties"]:
|
|
prop = any_of["properties"][prop_name]
|
|
if (
|
|
"type" in prop
|
|
and prop["type"] == "string"
|
|
and "enum" in prop
|
|
and len(prop["enum"]) == 1
|
|
):
|
|
if tag is not None and tag != prop_name:
|
|
has_tag = False
|
|
break
|
|
else:
|
|
has_tag = True
|
|
tag = prop_name
|
|
|
|
if has_tag is False:
|
|
tag = None
|
|
break
|
|
|
|
return tag
|
|
|
|
|
|
def getTagOneOf(schema: dict) -> Optional[str]:
|
|
tag = None
|
|
for one_of in schema["oneOf"]:
|
|
has_tag = False
|
|
# Check if each are an object w 1 property in it.
|
|
if one_of["type"] == "object" and "properties" in one_of:
|
|
for prop_name in one_of["properties"]:
|
|
prop = one_of["properties"][prop_name]
|
|
if (
|
|
"type" in prop
|
|
and prop["type"] == "string"
|
|
and "enum" in prop
|
|
and len(prop["enum"]) == 1
|
|
):
|
|
if tag is not None and tag != prop_name:
|
|
has_tag = False
|
|
break
|
|
else:
|
|
has_tag = True
|
|
tag = prop_name
|
|
|
|
if has_tag is False:
|
|
tag = None
|
|
break
|
|
|
|
return tag
|
|
|
|
|
|
def getContentOneOf(schema: dict, tag: Optional[str]) -> Optional[str]:
|
|
if tag is None:
|
|
return None
|
|
content = None
|
|
for one_of in schema["oneOf"]:
|
|
has_content = False
|
|
# Check if each are an object w 1 property in it.
|
|
if one_of["type"] == "object" and "properties" in one_of:
|
|
if len(one_of["properties"]) != 2:
|
|
return None
|
|
|
|
for prop_name in one_of["properties"]:
|
|
if prop_name == tag:
|
|
continue
|
|
if content is not None and content != prop_name:
|
|
has_content = False
|
|
break
|
|
else:
|
|
has_content = True
|
|
content = prop_name
|
|
|
|
if has_content is False:
|
|
content = None
|
|
break
|
|
|
|
return content
|
|
|
|
|
|
def isEnumWithDocsOneOf(schema: dict) -> bool:
|
|
if "oneOf" not in schema:
|
|
return False
|
|
|
|
is_enum_with_docs = False
|
|
for one_of in schema["oneOf"]:
|
|
if one_of["type"] == "string" and "enum" in one_of and len(one_of["enum"]) == 1:
|
|
is_enum_with_docs = True
|
|
else:
|
|
is_enum_with_docs = False
|
|
break
|
|
|
|
return is_enum_with_docs
|
|
|
|
|
|
def isTypedObjectOneOf(schema: dict) -> bool:
|
|
if "oneOf" not in schema:
|
|
return False
|
|
|
|
is_typed_object = False
|
|
for one_of in schema["oneOf"]:
|
|
if (
|
|
"type" in one_of["properties"]
|
|
and "enum" in one_of["properties"]["type"]
|
|
and len(one_of["properties"]["type"]["enum"]) == 1
|
|
):
|
|
is_typed_object = True
|
|
else:
|
|
is_typed_object = False
|
|
break
|
|
|
|
return is_typed_object
|
|
|
|
|
|
def hasNoContentResponse(endpoint: dict) -> bool:
|
|
responses = endpoint["responses"]
|
|
for response_code in responses:
|
|
if (
|
|
response_code == "default"
|
|
or response_code == "204"
|
|
or response_code == "302"
|
|
):
|
|
return True
|
|
|
|
return False
|
|
|
|
|
|
def getFunctionResultType(endpoint: dict, endpoint_refs: List[str]) -> str:
|
|
result = ", ".join(endpoint_refs)
|
|
if len(endpoint_refs) > 1:
|
|
result = "Optional[Union[" + result + "]]"
|
|
|
|
if hasNoContentResponse(endpoint):
|
|
result = "Optional[" + result + "]"
|
|
|
|
return result
|
|
|
|
|
|
def getDetailedFunctionResultType(endpoint: dict, endpoint_refs: List[str]) -> str:
|
|
return "Response[" + getFunctionResultType(endpoint, endpoint_refs) + "]"
|
|
|
|
|
|
def getTypeName(schema: dict) -> str:
|
|
if "type" in schema:
|
|
if schema["type"] == "string":
|
|
if "format" in schema:
|
|
if (
|
|
schema["format"] == "date-time"
|
|
or schema["format"] == "partial-date-time"
|
|
):
|
|
return "datetime.datetime"
|
|
elif schema["format"] == "byte":
|
|
return "Base64Data"
|
|
elif schema["format"] == "uuid":
|
|
return "str"
|
|
elif schema["format"] == "url":
|
|
return "AnyUrl"
|
|
elif schema["format"] == "phone":
|
|
return "str"
|
|
return "str"
|
|
elif schema["type"] == "number":
|
|
return "float"
|
|
elif schema["type"] == "boolean":
|
|
return "bool"
|
|
elif schema["type"] == "integer":
|
|
return "int"
|
|
elif schema["type"] == "array":
|
|
if "items" in schema:
|
|
item_type = getTypeName(schema["items"])
|
|
if "format" in schema["items"] and schema["items"]["format"] == "uint8":
|
|
return "bytes"
|
|
else:
|
|
return "List[" + item_type + "]"
|
|
elif "additionalProperties" in schema and schema["type"] == "object":
|
|
item_type = getTypeName(schema["additionalProperties"])
|
|
return "Dict[str, " + item_type + "]"
|
|
elif "$ref" in schema:
|
|
return schema["$ref"].replace("#/components/schemas/", "")
|
|
elif "allOf" in schema and len(schema["allOf"]) == 1:
|
|
return getTypeName(schema["allOf"][0])
|
|
elif "description" in schema:
|
|
return "Any"
|
|
|
|
logging.error("schema: ", [schema])
|
|
raise Exception("Unknown schema type")
|
|
|
|
|
|
letters: List[str] = []
|
|
|
|
|
|
# generate a random letter combination in the range A - Z
|
|
# do not use O or I.
|
|
# make sure we do not use a letter we have already used.
|
|
def randletter() -> str:
|
|
letter1 = chr(random.randint(ord("A"), ord("Z")))
|
|
letter2 = chr(random.randint(ord("A"), ord("Z")))
|
|
letter3 = chr(random.randint(ord("A"), ord("Z")))
|
|
letter = letter1 + letter2 + letter3
|
|
while letter in letters:
|
|
return randletter()
|
|
letters.append(letter)
|
|
return letter
|
|
|
|
|
|
if __name__ == "__main__":
|
|
exit_code = main()
|
|
exit(exit_code)
|