Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
334 changes: 334 additions & 0 deletions airbyte/_util/api_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -1382,6 +1382,340 @@ def is_safe_to_delete(name: str) -> bool:
)


def create_custom_docker_source_definition(
name: str,
docker_repository: str,
docker_image_tag: str,
*,
workspace_id: str,
documentation_url: str | None = None,
api_root: str,
client_id: SecretString,
client_secret: SecretString,
) -> models.DefinitionResponse:
"""Create a custom Docker source definition."""
airbyte_instance = get_airbyte_server_instance(
api_root=api_root,
client_id=client_id,
client_secret=client_secret,
)

request_body = models.CreateDefinitionRequest(
name=name,
docker_repository=docker_repository,
docker_image_tag=docker_image_tag,
documentation_url=documentation_url,
)
request = api.CreateSourceDefinitionRequest(
workspace_id=workspace_id,
create_definition_request=request_body,
)
response = airbyte_instance.source_definitions.create_source_definition(request)
if response.definition_response is None:
raise AirbyteError(
message="Failed to create custom Docker source definition",
context={"name": name, "workspace_id": workspace_id},
)
return response.definition_response


def list_custom_docker_source_definitions(
workspace_id: str,
*,
api_root: str,
client_id: SecretString,
client_secret: SecretString,
) -> list[models.DefinitionResponse]:
"""List all custom Docker source definitions in a workspace."""
airbyte_instance = get_airbyte_server_instance(
api_root=api_root,
client_id=client_id,
client_secret=client_secret,
)

request = api.ListSourceDefinitionsRequest(
workspace_id=workspace_id,
)
response = airbyte_instance.source_definitions.list_source_definitions(request)
if not status_ok(response.status_code) or response.definitions_response is None:
raise AirbyteError(
message="Failed to list custom Docker source definitions",
context={
"workspace_id": workspace_id,
"response": response,
},
)
return response.definitions_response.data


def get_custom_docker_source_definition(
workspace_id: str,
definition_id: str,
*,
api_root: str,
client_id: SecretString,
client_secret: SecretString,
) -> models.DefinitionResponse:
"""Get a specific custom Docker source definition."""
airbyte_instance = get_airbyte_server_instance(
api_root=api_root,
client_id=client_id,
client_secret=client_secret,
)

request = api.GetSourceDefinitionRequest(
workspace_id=workspace_id,
definition_id=definition_id,
)
response = airbyte_instance.source_definitions.get_source_definition(request)
if not status_ok(response.status_code) or response.definition_response is None:
raise AirbyteError(
message="Failed to get custom Docker source definition",
context={
"workspace_id": workspace_id,
"definition_id": definition_id,
"response": response,
},
)
return response.definition_response


def update_custom_docker_source_definition(
workspace_id: str,
definition_id: str,
*,
name: str,
docker_image_tag: str,
api_root: str,
client_id: SecretString,
client_secret: SecretString,
) -> models.DefinitionResponse:
"""Update a custom Docker source definition."""
airbyte_instance = get_airbyte_server_instance(
api_root=api_root,
client_id=client_id,
client_secret=client_secret,
)

request_body = models.UpdateDefinitionRequest(
name=name,
docker_image_tag=docker_image_tag,
)
request = api.UpdateSourceDefinitionRequest(
workspace_id=workspace_id,
definition_id=definition_id,
update_definition_request=request_body,
)
response = airbyte_instance.source_definitions.update_source_definition(request)
if not status_ok(response.status_code) or response.definition_response is None:
raise AirbyteError(
message="Failed to update custom Docker source definition",
context={
"workspace_id": workspace_id,
"definition_id": definition_id,
"response": response,
},
)
return response.definition_response


def delete_custom_docker_source_definition(
workspace_id: str,
definition_id: str,
*,
api_root: str,
client_id: SecretString,
client_secret: SecretString,
) -> None:
"""Delete a custom Docker source definition."""
airbyte_instance = get_airbyte_server_instance(
api_root=api_root,
client_id=client_id,
client_secret=client_secret,
)

request = api.DeleteSourceDefinitionRequest(
workspace_id=workspace_id,
definition_id=definition_id,
)
response = airbyte_instance.source_definitions.delete_source_definition(request)
if not status_ok(response.status_code):
raise AirbyteError(
message="Failed to delete custom Docker source definition",
context={
"workspace_id": workspace_id,
"definition_id": definition_id,
},
)


def create_custom_docker_destination_definition(
name: str,
docker_repository: str,
docker_image_tag: str,
*,
workspace_id: str,
documentation_url: str | None = None,
api_root: str,
client_id: SecretString,
client_secret: SecretString,
) -> models.DefinitionResponse:
"""Create a custom Docker destination definition."""
airbyte_instance = get_airbyte_server_instance(
api_root=api_root,
client_id=client_id,
client_secret=client_secret,
)

request_body = models.CreateDefinitionRequest(
name=name,
docker_repository=docker_repository,
docker_image_tag=docker_image_tag,
documentation_url=documentation_url,
)
request = api.CreateDestinationDefinitionRequest(
workspace_id=workspace_id,
create_definition_request=request_body,
)
response = airbyte_instance.destination_definitions.create_destination_definition(request)
if response.definition_response is None:
raise AirbyteError(
message="Failed to create custom Docker destination definition",
context={"name": name, "workspace_id": workspace_id},
)
return response.definition_response


def list_custom_docker_destination_definitions(
workspace_id: str,
*,
api_root: str,
client_id: SecretString,
client_secret: SecretString,
) -> list[models.DefinitionResponse]:
"""List all custom Docker destination definitions in a workspace."""
airbyte_instance = get_airbyte_server_instance(
api_root=api_root,
client_id=client_id,
client_secret=client_secret,
)

request = api.ListDestinationDefinitionsRequest(
workspace_id=workspace_id,
)
response = airbyte_instance.destination_definitions.list_destination_definitions(request)
if not status_ok(response.status_code) or response.definitions_response is None:
raise AirbyteError(
message="Failed to list custom Docker destination definitions",
context={
"workspace_id": workspace_id,
"response": response,
},
)
return response.definitions_response.data


def get_custom_docker_destination_definition(
workspace_id: str,
definition_id: str,
*,
api_root: str,
client_id: SecretString,
client_secret: SecretString,
) -> models.DefinitionResponse:
"""Get a specific custom Docker destination definition."""
airbyte_instance = get_airbyte_server_instance(
api_root=api_root,
client_id=client_id,
client_secret=client_secret,
)

request = api.GetDestinationDefinitionRequest(
workspace_id=workspace_id,
definition_id=definition_id,
)
response = airbyte_instance.destination_definitions.get_destination_definition(request)
if not status_ok(response.status_code) or response.definition_response is None:
raise AirbyteError(
message="Failed to get custom Docker destination definition",
context={
"workspace_id": workspace_id,
"definition_id": definition_id,
"response": response,
},
)
return response.definition_response


def update_custom_docker_destination_definition(
workspace_id: str,
definition_id: str,
*,
name: str,
docker_image_tag: str,
api_root: str,
client_id: SecretString,
client_secret: SecretString,
) -> models.DefinitionResponse:
"""Update a custom Docker destination definition."""
airbyte_instance = get_airbyte_server_instance(
api_root=api_root,
client_id=client_id,
client_secret=client_secret,
)

request_body = models.UpdateDefinitionRequest(
name=name,
docker_image_tag=docker_image_tag,
)
request = api.UpdateDestinationDefinitionRequest(
workspace_id=workspace_id,
definition_id=definition_id,
update_definition_request=request_body,
)
response = airbyte_instance.destination_definitions.update_destination_definition(request)
if not status_ok(response.status_code) or response.definition_response is None:
raise AirbyteError(
message="Failed to update custom Docker destination definition",
context={
"workspace_id": workspace_id,
"definition_id": definition_id,
"response": response,
},
)
return response.definition_response


def delete_custom_docker_destination_definition(
workspace_id: str,
definition_id: str,
*,
api_root: str,
client_id: SecretString,
client_secret: SecretString,
) -> None:
"""Delete a custom Docker destination definition."""
airbyte_instance = get_airbyte_server_instance(
api_root=api_root,
client_id=client_id,
client_secret=client_secret,
)

request = api.DeleteDestinationDefinitionRequest(
workspace_id=workspace_id,
definition_id=definition_id,
)
response = airbyte_instance.destination_definitions.delete_destination_definition(request)
if not status_ok(response.status_code):
raise AirbyteError(
message="Failed to delete custom Docker destination definition",
context={
"workspace_id": workspace_id,
"definition_id": definition_id,
},
)


def get_connector_builder_project_for_definition_id(
*,
workspace_id: str,
Expand Down
Loading