diff --git a/.circleci/config.yml b/.circleci/config.yml new file mode 100644 index 000000000000..d3c52fd4733e --- /dev/null +++ b/.circleci/config.yml @@ -0,0 +1,85 @@ +version: 2.1 + +orbs: + aws-ecr: circleci/aws-ecr@8.2.1 + +jobs: + test: + docker: + - image: cimg/openjdk:24.0.2 + steps: + - checkout + - run: + name: test with coverage + command: | + cd plugin/trino-af/ + ../../mvnw clean test -Djacoco.skip=false -Djava.vendor="Eclipse Adoptium" + - store_test_results: + path: plugin/trino-af/target/surefire-reports + - store_artifacts: + path: plugin/trino-af/target/site/jacoco + destination: coverage-report + - run: + name: verify coverage summary + command: | + cd plugin/trino-af/ + if [ -f target/site/jacoco/index.html ]; then + echo "Coverage report generated successfully" + echo "View coverage report in CircleCI artifacts" + else + echo "Coverage report generation failed" + exit 1 + fi + + build-and-push: + docker: + - image: cimg/openjdk:24.0.2 + steps: + - checkout + - run: + name: build + command: | + cd plugin/trino-af/ + ../../mvnw clean package -Djava.vendor="Eclipse Adoptium" + - run: + name: set environment variables + command: | + echo 'export REGISTRY_ACCOUNT_ID="307946643675"' >> "$BASH_ENV" + source "$BASH_ENV" + + SHORT_COMMIT_SHA=${CIRCLE_SHA1:0:7} + echo "export SHORT_COMMIT_SHA='${SHORT_COMMIT_SHA}'" >> $BASH_ENV + + TAG=${CIRCLE_BRANCH//\//-}-${SHORT_COMMIT_SHA} + echo "export TAG='${TAG}'" >> $BASH_ENV + + DOCKER_HOST_BLOCK=$(curl -fsSL http://npm-yarn-blocklist.security.appf.io | tr ' ' '\n' | sed 's/^/--add-host=/' | sed 's/$/:127.0.0.1/' | tr '\n' ' ' | sed 's/ $//') + echo "export DOCKER_HOST_BLOCK='${DOCKER_HOST_BLOCK}'" >> $BASH_ENV + + echo "ACCESS KEY ID: ${DATA_API_MACHINE_USER_AWS_ACCESS_KEY_ID}" + - setup_remote_docker: + docker_layer_caching: true + - aws-ecr/build-and-push-image: + registry-id: REGISTRY_ACCOUNT_ID + aws-access-key-id: DATA_API_MACHINE_USER_AWS_ACCESS_KEY_ID + aws-secret-access-key: DATA_API_MACHINE_USER_AWS_SECRET_ACCESS_KEY + region: us-west-2 + create-repo: false + repo: data-api/trino-af + build-path: plugin/trino-af/ + path: plugin/trino-af/ + tag: ${TAG} + platform: linux/amd64,linux/arm64 + skip-when-tags-exist: true + extra-build-args: ${DOCKER_HOST_BLOCK} + + +workflows: + version: 2 + test-build-push: + jobs: + - test + - build-and-push: + requires: + - test + context: data-api-pipeline-trigger-group diff --git a/plugin/trino-af/Dockerfile b/plugin/trino-af/Dockerfile new file mode 100644 index 000000000000..0f15e3b068f0 --- /dev/null +++ b/plugin/trino-af/Dockerfile @@ -0,0 +1,4 @@ +FROM trinodb/trino:476 + +# Copy the trino-af plugin files +COPY --chown=trino:trino target/trino-af-476/ /usr/lib/trino/plugin/trino-af diff --git a/plugin/trino-af/README.md b/plugin/trino-af/README.md new file mode 100644 index 000000000000..7f30b00727aa --- /dev/null +++ b/plugin/trino-af/README.md @@ -0,0 +1,112 @@ +# Guide for trino-af Plugin +The `trino-af` plugin implements the `af_find_datasource_by_guid` function which receives a vhost guid and then calls [the registry app](https://github.com/appfolio/registry_app) to get the slice the vhost is active on. + +Vhost databases with the same guid can exist on multiple slices. The function can be used in Trino's row filter to make sure Trino only return rows belong to the slice a vhost is active on. + +# Development + +## Install dependencies + +```commandline +brew install trino jenv +``` + +## Install Java + +Trino works with Temurin JDK 24. +```commandline +brew install temurin +jenv add /Library/Java/JavaVirtualMachines/temurin-24.jdk/Contents/Home +``` + +## Create a branch + +### Pull all tags + +May need to run the following if new trino verions are released. + +```commandline +git remote add upstream https://github.com/trinodb/trino.git +git fetch upstream --tags +git push origin --tags +``` + +### Check out the Branch + +```commandline +git checkout 476-af +``` + +All development work happen in this branch. + +## Test + +Run automated tests: + +```commandline +cd plugin/trino-af/ +../../mvnw test +``` + +Start a test server for manual testing: + +```commandline +cd plugin/trino-af +../../mvnw compile test-compile exec:java +``` + +Access the test server: + +```commandline +trino --server http://localhost:8080 --execute "select af.af.af_find_datasource_by_guid('test-guid-1')" +``` + +### Code Coverage + +JaCoCo code coverage report is generated on CircleCI. + +To manually generate coverage reports: + +```commandline +cd plugin/trino-af/ +mvn clean test -Djacoco.skip=false -Djava.vendor="Eclipse Adoptium" +``` + +After running, view the coverage report: + +```commandline +open target/site/jacoco/index.html +``` + +The coverage report shows: +- **Instruction Coverage**: Percentage of bytecode instructions executed +- **Branch Coverage**: Percentage of conditional branches tested +- **Line Coverage**: Percentage of source code lines executed +- **Method Coverage**: Percentage of methods called +- **Class Coverage**: Percentage of classes instantiated + +## Build + +The plugin is built on CircleCI. + +To manually build the plugin: + +```commandline +cd plugin/trino-af/ +../../mvnw package +``` + +`target/trino-af-476.zip` is the plugin file + +## Push + +CircleCI pushes a Trino container image with the plugin installed to ECR in the data-api-prod account. + +To manually push an image: + +```commandline +awsume data-api-prod +./build-and-push.sh +``` + +The output should have the image tag which can be used in Kubernetes Trino setup. diff --git a/plugin/trino-af/build-and-push.sh b/plugin/trino-af/build-and-push.sh new file mode 100755 index 000000000000..8f5981b3a08c --- /dev/null +++ b/plugin/trino-af/build-and-push.sh @@ -0,0 +1,36 @@ +#!/bin/bash +set -e + +TRINO_BASE_VERSION=476 +TRINO_AF_REPO=data-api/trino-af + +echo "Step 1: Building Maven package..." +../../mvnw clean package + +echo "Step 2: Finding ECR repository for trino-af..." +ECR_REPO_URI=$(aws ecr describe-repositories --repository-names $TRINO_AF_REPO --query 'repositories[0].repositoryUri' --output text) +if [ "$ECR_REPO_URI" == "None" ] || [ -z "$ECR_REPO_URI" ]; then + echo "Error: ECR repository 'trino-af' not found" + exit 1 +fi +echo "Found ECR repository: $ECR_REPO_URI" + +echo "Step 3: Finding latest version tag..." +LATEST_VERSION=$(aws ecr list-images --repository-name $TRINO_AF_REPO --query 'imageIds[?imageTag!=`null`].imageTag' --output text | tr '\t' '\n' | grep -E '\-[0-9]+$' | sort -V | tail -1) +if [ -z "$LATEST_VERSION" ]; then + NEW_VERSION="$TRINO_BASE_VERSION-1" +else + # Extract the number after the dash and increment it + VERSION_NUMBER=$(echo "$LATEST_VERSION" | grep -oE '[0-9]+$') + NEW_VERSION="$TRINO_BASE_VERSION-$((VERSION_NUMBER + 1))" +fi +echo "Latest version: ${LATEST_VERSION:-none}, New version: $NEW_VERSION" + +echo "Step 4: Authenticating with ECR..." +aws ecr get-login-password | docker login --username AWS --password-stdin $ECR_REPO_URI + +echo "Step 5: Building multi-architecture Docker image..." +docker buildx create --use --name multiarch-builder 2>/dev/null || docker buildx use multiarch-builder +docker buildx build --platform linux/arm64,linux/amd64 -t $ECR_REPO_URI:$NEW_VERSION --push . + +echo "Successfully built and pushed $ECR_REPO_URI:$NEW_VERSION" diff --git a/plugin/trino-af/pom.xml b/plugin/trino-af/pom.xml new file mode 100644 index 000000000000..44ac4289d6c9 --- /dev/null +++ b/plugin/trino-af/pom.xml @@ -0,0 +1,210 @@ + + + 4.0.0 + + io.trino + trino-root + 476 + ../../pom.xml + + + trino-af + trino-plugin + ${project.artifactId} + Trino - AppFolio plugin + + + true + false + + + + + + com.google.guava + guava + + + + com.google.inject + guice + + + + io.airlift + bootstrap + + + + io.airlift + configuration + + + + io.airlift + http-client + + + + io.airlift + json + + + + io.trino + trino-cache + + + + jakarta.validation + jakarta.validation-api + + + + + com.fasterxml.jackson.core + jackson-annotations + provided + + + + io.airlift + slice + provided + + + + io.opentelemetry + opentelemetry-api + provided + + + + io.opentelemetry + opentelemetry-context + provided + + + + io.trino + trino-spi + provided + + + + + io.airlift + log + runtime + + + + io.airlift + node + runtime + + + + + io.airlift + configuration-testing + test + + + + io.airlift + http-server + test + + + + io.trino + trino-main + test + + + + io.trino + trino-main + test-jar + test + + + + io.trino + trino-testing + test + + + + jakarta.servlet + jakarta.servlet-api + test + + + + org.assertj + assertj-core + test + + + + org.junit.jupiter + junit-jupiter-api + test + + + + org.junit.jupiter + junit-jupiter-engine + test + + + + + + + + org.codehaus.mojo + exec-maven-plugin + 3.1.0 + + io.trino.plugin.af.AfQueryRunner + test + + + + + + + + + org.jacoco + jacoco-maven-plugin + 0.8.13 + + ${jacoco.skip} + + + + prepare-agent + + prepare-agent + + + + report + + report + + test + + ${project.build.directory}/site/jacoco + + + + + + + + diff --git a/plugin/trino-af/src/main/java/io/trino/plugin/af/AfConfig.java b/plugin/trino-af/src/main/java/io/trino/plugin/af/AfConfig.java new file mode 100644 index 000000000000..e2570fabd24f --- /dev/null +++ b/plugin/trino-af/src/main/java/io/trino/plugin/af/AfConfig.java @@ -0,0 +1,96 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.af; + +import io.airlift.configuration.Config; +import io.airlift.configuration.ConfigSecuritySensitive; +import jakarta.validation.constraints.NotBlank; +import jakarta.validation.constraints.NotNull; + +import java.net.URI; + +public class AfConfig +{ + private URI registryEndpoint; + private String registryUsername; + private String registryPassword; + private int registryCacheTtl; + private int registryCacheMaxEntries; + + @NotNull + public URI getRegistryEndpoint() + { + return registryEndpoint; + } + + @Config("af.registry.endpoint") + public AfConfig setRegistryEndpoint(URI registryEndpoint) + { + this.registryEndpoint = registryEndpoint; + return this; + } + + @NotBlank + public String getRegistryUsername() + { + return registryUsername; + } + + @Config("af.registry.username") + public AfConfig setRegistryUsername(String registryUsername) + { + this.registryUsername = registryUsername; + return this; + } + + @NotBlank + public String getRegistryPassword() + { + return registryPassword; + } + + @Config("af.registry.password") + @ConfigSecuritySensitive + public AfConfig setRegistryPassword(String registryPassword) + { + this.registryPassword = registryPassword; + return this; + } + + @NotNull + public int getRegistryCacheTtl() + { + return registryCacheTtl; + } + + @Config("af.registry.cache-ttl") + public AfConfig setRegistryCacheTtl(int registryCacheTtl) + { + this.registryCacheTtl = registryCacheTtl; + return this; + } + + @NotNull + public int getRegistryCacheMaxEntries() + { + return registryCacheMaxEntries; + } + + @Config("af.registry.cache-max-entries") + public AfConfig setRegistryCacheMaxEntries(int registryCacheMaxEntries) + { + this.registryCacheMaxEntries = registryCacheMaxEntries; + return this; + } +} diff --git a/plugin/trino-af/src/main/java/io/trino/plugin/af/AfConnector.java b/plugin/trino-af/src/main/java/io/trino/plugin/af/AfConnector.java new file mode 100644 index 000000000000..c7fcdbc6c401 --- /dev/null +++ b/plugin/trino-af/src/main/java/io/trino/plugin/af/AfConnector.java @@ -0,0 +1,58 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.af; + +import com.google.inject.Inject; +import io.trino.spi.connector.Connector; +import io.trino.spi.connector.ConnectorMetadata; +import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.connector.ConnectorTransactionHandle; +import io.trino.spi.function.FunctionProvider; +import io.trino.spi.transaction.IsolationLevel; + +import java.util.Optional; + +import static java.util.Objects.requireNonNull; + +public class AfConnector + implements Connector +{ + private final ConnectorMetadata metadata; + private final FunctionProvider functionProvider; + + @Inject + public AfConnector(ConnectorMetadata metadata, FunctionProvider functionProvider) + { + this.metadata = requireNonNull(metadata, "metadata is null"); + this.functionProvider = requireNonNull(functionProvider, "functionProvider is null"); + } + + @Override + public ConnectorTransactionHandle beginTransaction(IsolationLevel isolationLevel, boolean readOnly, boolean autoCommit) + { + return AfTransactionHandle.INSTANCE; + } + + @Override + public ConnectorMetadata getMetadata(ConnectorSession session, ConnectorTransactionHandle transactionHandle) + { + return metadata; + } + + @Override + public Optional getFunctionProvider() + { + return Optional.of(functionProvider); + } +} diff --git a/plugin/trino-af/src/main/java/io/trino/plugin/af/AfConnectorFactory.java b/plugin/trino-af/src/main/java/io/trino/plugin/af/AfConnectorFactory.java new file mode 100644 index 000000000000..9d8474067cbb --- /dev/null +++ b/plugin/trino-af/src/main/java/io/trino/plugin/af/AfConnectorFactory.java @@ -0,0 +1,52 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.af; + +import com.google.inject.Injector; +import io.airlift.bootstrap.Bootstrap; +import io.opentelemetry.api.trace.Tracer; +import io.trino.spi.catalog.CatalogName; +import io.trino.spi.connector.Connector; +import io.trino.spi.connector.ConnectorContext; +import io.trino.spi.connector.ConnectorFactory; + +import java.util.Map; + +public class AfConnectorFactory + implements ConnectorFactory +{ + @Override + public String getName() + { + return "af"; + } + + @Override + public Connector create(String catalogName, Map config, ConnectorContext context) + { + Bootstrap app = new Bootstrap( + new AfModule(), + binder -> { + binder.bind(Tracer.class).toInstance(context.getTracer()); + binder.bind(CatalogName.class).toInstance(new CatalogName(catalogName)); + }); + + Injector injector = app + .doNotInitializeLogging() + .setRequiredConfigurationProperties(config) + .initialize(); + + return injector.getInstance(Connector.class); + } +} diff --git a/plugin/trino-af/src/main/java/io/trino/plugin/af/AfMetadata.java b/plugin/trino-af/src/main/java/io/trino/plugin/af/AfMetadata.java new file mode 100644 index 000000000000..95764a9154a5 --- /dev/null +++ b/plugin/trino-af/src/main/java/io/trino/plugin/af/AfMetadata.java @@ -0,0 +1,79 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.af; + +import com.google.common.collect.ImmutableList; +import com.google.inject.Inject; +import io.trino.spi.connector.ConnectorMetadata; +import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.function.BoundSignature; +import io.trino.spi.function.FunctionDependencyDeclaration; +import io.trino.spi.function.FunctionId; +import io.trino.spi.function.FunctionMetadata; +import io.trino.spi.function.SchemaFunctionName; + +import java.util.Collection; +import java.util.List; + +import static io.trino.spi.type.TypeSignature.mapType; +import static io.trino.spi.type.VarcharType.VARCHAR; +import static java.util.Objects.requireNonNull; + +public class AfMetadata + implements ConnectorMetadata +{ + private static final String SCHEMA_NAME = "af"; + + private final List functions; + + @Inject + public AfMetadata(List functions) + { + this.functions = ImmutableList.copyOf(requireNonNull(functions, "functions is null")); + } + + @Override + public Collection listFunctions(ConnectorSession session, String schemaName) + { + return schemaName.equals(SCHEMA_NAME) ? functions : List.of(); + } + + @Override + public Collection getFunctions(ConnectorSession session, SchemaFunctionName name) + { + if (!name.getSchemaName().equals(SCHEMA_NAME)) { + return List.of(); + } + return functions.stream() + .filter(function -> function.getCanonicalName().equals(name.getFunctionName())) + .toList(); + } + + @Override + public FunctionMetadata getFunctionMetadata(ConnectorSession session, FunctionId functionId) + { + return functions.stream() + .filter(function -> function.getFunctionId().equals(functionId)) + .findFirst() + .orElseThrow(); + } + + @Override + public FunctionDependencyDeclaration getFunctionDependencies(ConnectorSession session, FunctionId functionId, BoundSignature boundSignature) + { + return FunctionDependencyDeclaration.builder() + .addType(mapType(VARCHAR.getTypeSignature(), VARCHAR.getTypeSignature())) + .build(); + } +} diff --git a/plugin/trino-af/src/main/java/io/trino/plugin/af/AfModule.java b/plugin/trino-af/src/main/java/io/trino/plugin/af/AfModule.java new file mode 100644 index 000000000000..19bf74c218ca --- /dev/null +++ b/plugin/trino-af/src/main/java/io/trino/plugin/af/AfModule.java @@ -0,0 +1,52 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.af; + +import com.google.inject.Binder; +import com.google.inject.Module; +import com.google.inject.Provides; +import com.google.inject.Scopes; +import io.trino.spi.connector.Connector; +import io.trino.spi.connector.ConnectorMetadata; +import io.trino.spi.function.FunctionMetadata; +import io.trino.spi.function.FunctionProvider; + +import java.util.List; + +import static io.airlift.configuration.ConfigBinder.configBinder; +import static io.airlift.http.client.HttpClientBinder.httpClientBinder; + +public class AfModule + implements Module +{ + @Provides + public static List getFunctionMetadata(RegistryFunctions functions) + { + return functions.getFunctions(); + } + + @Override + public void configure(Binder binder) + { + configBinder(binder).bindConfig(AfConfig.class); + httpClientBinder(binder).bindHttpClient("af.registry", ForRegistryClient.class); + binder.bind(RegistryClient.class).in(Scopes.SINGLETON); + binder.bind(AfConnector.class).in(Scopes.SINGLETON); + binder.bind(AfMetadata.class).in(Scopes.SINGLETON); + binder.bind(RegistryFunctions.class).in(Scopes.SINGLETON); + binder.bind(Connector.class).to(AfConnector.class).in(Scopes.SINGLETON); + binder.bind(ConnectorMetadata.class).to(AfMetadata.class).in(Scopes.SINGLETON); + binder.bind(FunctionProvider.class).to(RegistryFunctions.class).in(Scopes.SINGLETON); + } +} diff --git a/plugin/trino-af/src/main/java/io/trino/plugin/af/AfPlugin.java b/plugin/trino-af/src/main/java/io/trino/plugin/af/AfPlugin.java new file mode 100644 index 000000000000..6777b1c5cd06 --- /dev/null +++ b/plugin/trino-af/src/main/java/io/trino/plugin/af/AfPlugin.java @@ -0,0 +1,28 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.af; + +import com.google.common.collect.ImmutableList; +import io.trino.spi.Plugin; +import io.trino.spi.connector.ConnectorFactory; + +public class AfPlugin + implements Plugin +{ + @Override + public Iterable getConnectorFactories() + { + return ImmutableList.of(new AfConnectorFactory()); + } +} diff --git a/plugin/trino-af/src/main/java/io/trino/plugin/af/AfTransactionHandle.java b/plugin/trino-af/src/main/java/io/trino/plugin/af/AfTransactionHandle.java new file mode 100644 index 000000000000..ac99958cd840 --- /dev/null +++ b/plugin/trino-af/src/main/java/io/trino/plugin/af/AfTransactionHandle.java @@ -0,0 +1,20 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.af; + +import io.trino.spi.connector.ConnectorTransactionHandle; + +public enum AfTransactionHandle implements ConnectorTransactionHandle { + INSTANCE +} diff --git a/plugin/trino-af/src/main/java/io/trino/plugin/af/ForRegistryClient.java b/plugin/trino-af/src/main/java/io/trino/plugin/af/ForRegistryClient.java new file mode 100644 index 000000000000..b4dacb55ecbe --- /dev/null +++ b/plugin/trino-af/src/main/java/io/trino/plugin/af/ForRegistryClient.java @@ -0,0 +1,30 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.af; + +import com.google.inject.BindingAnnotation; + +import java.lang.annotation.Retention; +import java.lang.annotation.Target; + +import static java.lang.annotation.ElementType.FIELD; +import static java.lang.annotation.ElementType.METHOD; +import static java.lang.annotation.ElementType.PARAMETER; +import static java.lang.annotation.RetentionPolicy.RUNTIME; + +@Retention(RUNTIME) +@Target({FIELD, PARAMETER, METHOD}) +@BindingAnnotation +public @interface ForRegistryClient { +} diff --git a/plugin/trino-af/src/main/java/io/trino/plugin/af/RegistryClient.java b/plugin/trino-af/src/main/java/io/trino/plugin/af/RegistryClient.java new file mode 100644 index 000000000000..2017fb064312 --- /dev/null +++ b/plugin/trino-af/src/main/java/io/trino/plugin/af/RegistryClient.java @@ -0,0 +1,91 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.af; + +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.net.HttpHeaders; +import com.google.inject.Inject; +import io.airlift.http.client.HttpClient; +import io.airlift.http.client.Request; +import io.airlift.json.JsonCodec; +import io.trino.cache.NonEvictableLoadingCache; + +import java.net.URI; +import java.net.URLEncoder; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.Base64; + +import static io.airlift.http.client.JsonResponseHandler.createJsonResponseHandler; +import static io.airlift.http.client.Request.Builder.prepareGet; +import static io.airlift.json.JsonCodec.jsonCodec; +import static io.trino.cache.SafeCaches.buildNonEvictableCache; + +public class RegistryClient +{ + private static final JsonCodec VHOST_RESPONSE_CODEC = jsonCodec(VhostResponse.class); + private final URI endpoint; + private final String username; + private final String password; + private final HttpClient httpClient; + private final NonEvictableLoadingCache hostCache; + + @Inject + public RegistryClient(@ForRegistryClient HttpClient httpClient, AfConfig config) + { + this.httpClient = httpClient; + this.endpoint = config.getRegistryEndpoint(); + this.username = config.getRegistryUsername(); + this.password = config.getRegistryPassword(); + this.hostCache = buildNonEvictableCache( + CacheBuilder.newBuilder() + .maximumSize(config.getRegistryCacheMaxEntries()) + .expireAfterWrite(Duration.ofSeconds(config.getRegistryCacheTtl())), + CacheLoader.from(this::fetchHostByGuid)); + } + + private static String getBasicAuthHeader(String username, String password) + { + String token = username + ":" + password; + return "Basic " + Base64.getEncoder().encodeToString(token.getBytes(StandardCharsets.UTF_8)); + } + + public String findHostByGuid(String guid) + { + try { + return hostCache.getUnchecked(guid); + } + catch (Exception e) { + throw new RuntimeException("Failed to fetch host for guid: " + guid, e); + } + } + + private String fetchHostByGuid(String guid) + { + String query = "?id=" + URLEncoder.encode(guid, StandardCharsets.UTF_8); + URI requestUri = endpoint.resolve("/vhosts" + query); + Request request = prepareGet() + .setUri(requestUri) + .setHeader(HttpHeaders.AUTHORIZATION, getBasicAuthHeader(username, password)) + .build(); + + VhostResponse response = httpClient.execute(request, createJsonResponseHandler(VHOST_RESPONSE_CODEC)); + return response.host(); + } + + public record VhostResponse(String guid, String domain, String host, String app) + { + } +} diff --git a/plugin/trino-af/src/main/java/io/trino/plugin/af/RegistryFunctions.java b/plugin/trino-af/src/main/java/io/trino/plugin/af/RegistryFunctions.java new file mode 100644 index 000000000000..d7cf83a0f3db --- /dev/null +++ b/plugin/trino-af/src/main/java/io/trino/plugin/af/RegistryFunctions.java @@ -0,0 +1,137 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.af; + +import com.google.common.collect.ImmutableList; +import com.google.inject.Inject; +import io.airlift.slice.Slice; +import io.trino.spi.function.BoundSignature; +import io.trino.spi.function.FunctionDependencies; +import io.trino.spi.function.FunctionId; +import io.trino.spi.function.FunctionMetadata; +import io.trino.spi.function.FunctionProvider; +import io.trino.spi.function.InvocationConvention; +import io.trino.spi.function.ScalarFunctionAdapter; +import io.trino.spi.function.ScalarFunctionImplementation; +import io.trino.spi.function.Signature; +import io.trino.spi.type.TypeSignature; + +import java.lang.invoke.MethodHandle; +import java.util.List; + +import static io.airlift.slice.Slices.utf8Slice; +import static io.trino.spi.function.InvocationConvention.InvocationArgumentConvention.NEVER_NULL; +import static io.trino.spi.function.InvocationConvention.InvocationReturnConvention.FAIL_ON_NULL; +import static io.trino.spi.type.VarcharType.VARCHAR; +import static java.lang.invoke.MethodHandles.lookup; +import static java.lang.invoke.MethodType.methodType; +import static java.util.Collections.nCopies; +import static java.util.Objects.requireNonNull; + +public class RegistryFunctions + implements FunctionProvider +{ + private static final TypeSignature TEXT = VARCHAR.getTypeSignature(); + private static final List FUNCTIONS = ImmutableList.builder() + .add(function("af_find_datasource_by_guid") + .description("Find datasource by vhost guid") + .signature(signature(TEXT, TEXT)) + .build()) + .build(); + private static final MethodHandle AF_FIND_DATASOURCE_BY_GUID; + + static { + try { + AF_FIND_DATASOURCE_BY_GUID = lookup().findSpecial(RegistryFunctions.class, "afFindDatasourceByGuid", methodType(Slice.class, Slice.class), RegistryFunctions.class); + } + catch (ReflectiveOperationException e) { + throw new AssertionError(e); + } + } + + private final RegistryClient client; + + @Inject + public RegistryFunctions(RegistryClient client) + { + this.client = requireNonNull(client, "client is null"); + } + + private static String getDataSourceFromHost(String host) + { + int idx = host.indexOf('.'); + if (idx < 0) { + return host; + } + return host.substring(0, idx); + } + + private static FunctionMetadata.Builder function(String name) + { + return FunctionMetadata.scalarBuilder(name) + .functionId(new FunctionId(name)); + } + + private static Signature signature(TypeSignature returnType, TypeSignature... argumentTypes) + { + return Signature.builder() + .returnType(returnType) + .argumentTypes(List.of(argumentTypes)) + .build(); + } + + public List getFunctions() + { + return FUNCTIONS; + } + + @Override + public ScalarFunctionImplementation getScalarFunctionImplementation( + FunctionId functionId, + BoundSignature boundSignature, + FunctionDependencies functionDependencies, + InvocationConvention invocationConvention) + { + String name = functionId.toString(); + MethodHandle handle = switch (name) { + case "af_find_datasource_by_guid" -> AF_FIND_DATASOURCE_BY_GUID; + default -> throw new IllegalArgumentException("Invalid function ID: " + functionId); + }; + handle = handle.bindTo(this); + + InvocationConvention actualConvention = new InvocationConvention( + nCopies(boundSignature.getArity(), NEVER_NULL), + FAIL_ON_NULL, + false, + false); + + handle = ScalarFunctionAdapter.adapt( + handle, + boundSignature.getReturnType(), + boundSignature.getArgumentTypes(), + actualConvention, + invocationConvention); + + return ScalarFunctionImplementation.builder() + .methodHandle(handle) + .build(); + } + + public Slice afFindDatasourceByGuid(Slice guid) + { + String host = client.findHostByGuid(guid.toStringUtf8()); + String datasource = getDataSourceFromHost(host); + return utf8Slice(datasource); + } +} diff --git a/plugin/trino-af/src/test/java/io/trino/plugin/af/AfQueryRunner.java b/plugin/trino-af/src/test/java/io/trino/plugin/af/AfQueryRunner.java new file mode 100644 index 000000000000..8cb8982239a6 --- /dev/null +++ b/plugin/trino-af/src/test/java/io/trino/plugin/af/AfQueryRunner.java @@ -0,0 +1,55 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.af; + +import com.google.common.collect.ImmutableMap; +import io.airlift.log.Logger; +import io.trino.testing.DistributedQueryRunner; +import io.trino.testing.QueryRunner; + +import java.net.URI; + +import static io.trino.testing.TestingSession.testSessionBuilder; + +public final class AfQueryRunner +{ + private AfQueryRunner() + { + } + + public static void main(String[] args) + throws Exception + { + FakeRegistryServer fakeRegistryServer = new FakeRegistryServer(); + URI registerServerUri = fakeRegistryServer.getBaseUri(); + + QueryRunner queryRunner = DistributedQueryRunner.builder(testSessionBuilder().build()) + .addCoordinatorProperty("http-server.http.port", "8080") + .addCoordinatorProperty("sql.path", "af.af") + .build(); + queryRunner.installPlugin(new AfPlugin()); + + queryRunner.createCatalog("af", "af", ImmutableMap.builder() + .put("af.registry.endpoint", registerServerUri.toString()) + .put("af.registry.username", "testuser") + .put("af.registry.password", "testpassword") + .put("af.registry.cache-ttl", "10") + .put("af.registry.cache-max-entries", "1000") + .buildOrThrow()); + Logger log = Logger.get(AfQueryRunner.class); + log.info("======== SERVER STARTED ========"); + log.info("\n====\n%s\n====", queryRunner.getCoordinator().getBaseUrl()); + log.info("Registry Server URI: %s", registerServerUri); + } +} diff --git a/plugin/trino-af/src/test/java/io/trino/plugin/af/FakeRegistryServer.java b/plugin/trino-af/src/test/java/io/trino/plugin/af/FakeRegistryServer.java new file mode 100644 index 000000000000..b38efb789562 --- /dev/null +++ b/plugin/trino-af/src/test/java/io/trino/plugin/af/FakeRegistryServer.java @@ -0,0 +1,161 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.af; + +import com.google.inject.Binder; +import com.google.inject.Injector; +import com.google.inject.Module; +import io.airlift.bootstrap.Bootstrap; +import io.airlift.bootstrap.LifeCycleManager; +import io.airlift.http.server.testing.TestingHttpServer; +import io.airlift.http.server.testing.TestingHttpServerModule; +import io.airlift.json.JsonCodec; +import io.airlift.node.testing.TestingNodeModule; +import jakarta.servlet.Servlet; +import jakarta.servlet.http.HttpServlet; +import jakarta.servlet.http.HttpServletRequest; +import jakarta.servlet.http.HttpServletResponse; + +import java.io.IOException; +import java.net.URI; +import java.nio.charset.StandardCharsets; +import java.util.Base64; +import java.util.HashMap; +import java.util.Map; + +public class FakeRegistryServer +{ + private final LifeCycleManager lifeCycleManager; + private final URI baseUri; + + public FakeRegistryServer() + { + Bootstrap app = new Bootstrap( + new TestingNodeModule(), + new TestingHttpServerModule(), + new RegistryServerModule()); + + Injector injector = app + .doNotInitializeLogging() + .initialize(); + + lifeCycleManager = injector.getInstance(LifeCycleManager.class); + baseUri = injector.getInstance(TestingHttpServer.class).getBaseUrl(); + } + + public void stop() + { + lifeCycleManager.stop(); + } + + public URI resolve(String s) + { + return baseUri.resolve(s); + } + + public URI getBaseUri() + { + return baseUri; + } + + private static class RegistryServerModule + implements Module + { + @Override + public void configure(Binder binder) + { + binder.bind(Servlet.class).toInstance(new RegistryHttpServlet()); + } + } + + private static class RegistryHttpServlet + extends HttpServlet + { + private static final String EXPECTED_USERNAME = "testuser"; + private static final String EXPECTED_PASSWORD = "testpassword"; + private final JsonCodec vhostResponseCodec = JsonCodec.jsonCodec(VhostResponse.class); + // Mock data for testing + private final Map mockData = createMockData(); + + private static Map createMockData() + { + Map data = new HashMap<>(); + data.put("test-guid-1", new VhostResponse("test-guid-1", "example.com", "slice1.poh1", "app1")); + data.put("test-guid-2", new VhostResponse("test-guid-2", "test.com", "slice2.por1", "app2")); + data.put("sample-guid", new VhostResponse("sample-guid", "sample.org", "host.sample.org", "sample-app")); + return data; + } + + private boolean isAuthorized(HttpServletRequest request) + { + String authHeader = request.getHeader("Authorization"); + if (authHeader == null || !authHeader.startsWith("Basic ")) { + return false; + } + + String base64Credentials = authHeader.substring("Basic ".length()); + String credentials = new String(Base64.getDecoder().decode(base64Credentials), StandardCharsets.UTF_8); + String[] parts = credentials.split(":", 2); + + if (parts.length != 2) { + return false; + } + + return EXPECTED_USERNAME.equals(parts[0]) && EXPECTED_PASSWORD.equals(parts[1]); + } + + @Override + protected void doGet(HttpServletRequest request, HttpServletResponse response) + throws IOException + { + // Check basic authentication + if (!isAuthorized(request)) { + response.setStatus(HttpServletResponse.SC_UNAUTHORIZED); + response.setHeader("WWW-Authenticate", "Basic realm=\"Registry Server\""); + response.getWriter().write("{\"error\": \"Unauthorized\"}"); + return; + } + + String path = request.getPathInfo(); + + if ("/vhosts".equals(path)) { + String id = request.getParameter("id"); + if (id == null) { + response.setStatus(HttpServletResponse.SC_BAD_REQUEST); + response.getWriter().write("{\"error\": \"Missing id parameter\"}"); + return; + } + + VhostResponse vhostResponse = mockData.get(id); + if (vhostResponse == null) { + // Return a default response for unknown GUIDs + vhostResponse = new VhostResponse(id, "default.com", "defaultslice.defaultvdc", "default-app"); + } + + response.setContentType("application/json"); + response.setStatus(HttpServletResponse.SC_OK); + response.getWriter().write(vhostResponseCodec.toJson(vhostResponse)); + } + else { + response.setStatus(HttpServletResponse.SC_NOT_FOUND); + response.getWriter().write("{\"error\": \"Not found\"}"); + } + } + } + + // Local copy of VhostResponse record for JSON serialization + public record VhostResponse(String guid, String domain, String host, String app) + { + } +} diff --git a/plugin/trino-af/src/test/java/io/trino/plugin/af/TestAfConfig.java b/plugin/trino-af/src/test/java/io/trino/plugin/af/TestAfConfig.java new file mode 100644 index 000000000000..b38513d1b494 --- /dev/null +++ b/plugin/trino-af/src/test/java/io/trino/plugin/af/TestAfConfig.java @@ -0,0 +1,59 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.af; + +import com.google.common.collect.ImmutableMap; +import org.junit.jupiter.api.Test; + +import java.net.URI; +import java.util.Map; + +import static io.airlift.configuration.testing.ConfigAssertions.assertFullMapping; +import static io.airlift.configuration.testing.ConfigAssertions.assertRecordedDefaults; +import static io.airlift.configuration.testing.ConfigAssertions.recordDefaults; + +public class TestAfConfig +{ + @Test + public void testDefaults() + { + assertRecordedDefaults(recordDefaults(AfConfig.class) + .setRegistryEndpoint(null) + .setRegistryUsername(null) + .setRegistryPassword(null) + .setRegistryCacheTtl(0) + .setRegistryCacheMaxEntries(0)); + } + + @Test + public void testExplicitPropertyMappings() + { + Map properties = ImmutableMap.builder() + .put("af.registry.endpoint", "http://localhost:8080/registry") + .put("af.registry.username", "testuser") + .put("af.registry.password", "testpassword") + .put("af.registry.cache-ttl", "10") + .put("af.registry.cache-max-entries", "1000") + .buildOrThrow(); + + AfConfig expected = new AfConfig() + .setRegistryEndpoint(URI.create("http://localhost:8080/registry")) + .setRegistryUsername("testuser") + .setRegistryPassword("testpassword") + .setRegistryCacheTtl(10) + .setRegistryCacheMaxEntries(1000); + + assertFullMapping(properties, expected); + } +} diff --git a/plugin/trino-af/src/test/java/io/trino/plugin/af/TestRegistryFunctions.java b/plugin/trino-af/src/test/java/io/trino/plugin/af/TestRegistryFunctions.java new file mode 100644 index 000000000000..62cbf015e22f --- /dev/null +++ b/plugin/trino-af/src/test/java/io/trino/plugin/af/TestRegistryFunctions.java @@ -0,0 +1,87 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.af; + +import com.google.common.collect.ImmutableMap; +import io.trino.sql.SqlPath; +import io.trino.sql.query.QueryAssertions; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.parallel.Execution; + +import java.net.URI; +import java.util.Optional; + +import static io.trino.spi.type.VarcharType.VARCHAR; +import static io.trino.testing.TestingSession.testSessionBuilder; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.TestInstance.Lifecycle.PER_CLASS; +import static org.junit.jupiter.api.parallel.ExecutionMode.SAME_THREAD; + +@TestInstance(PER_CLASS) +@Execution(SAME_THREAD) +public class TestRegistryFunctions +{ + QueryAssertions assertions; + FakeRegistryServer fakeRegistryServer; + + @BeforeAll + public void init() + { + fakeRegistryServer = new FakeRegistryServer(); + URI registerServerUri = fakeRegistryServer.getBaseUri(); + + assertions = new QueryAssertions(testSessionBuilder() + .setPath(SqlPath.buildPath("af.af", Optional.empty())) + .build()); + assertions.addPlugin(new AfPlugin()); + assertions.getQueryRunner().createCatalog("af", "af", ImmutableMap.builder() + .put("af.registry.endpoint", registerServerUri.toString()) + .put("af.registry.username", "testuser") + .put("af.registry.password", "testpassword") + .buildOrThrow()); + } + + @AfterAll + public void teardown() + { + assertions.close(); + fakeRegistryServer.stop(); + } + + @Test + public void testShowFunctions() + { + assertThat(assertions.query("SHOW FUNCTIONS LIKE 'af_%'")) + .skippingTypesCheck() + .matches(""" + VALUES + ('af_find_datasource_by_guid', 'varchar', 'varchar', 'scalar', true, 'Find datasource by vhost guid') + """); + } + + @Test + public void testGetDatasourceByGuid() + { + assertThat(assertions.function("af_find_datasource_by_guid", "'test-guid-1'")) + .hasType(VARCHAR) + .isEqualTo("slice1"); + + assertThat(assertions.function("af_find_datasource_by_guid", "'test-guid-2'")) + .hasType(VARCHAR) + .isEqualTo("slice2"); + } +} diff --git a/pom.xml b/pom.xml index 579d46b7649e..8ad5a5e684c3 100644 --- a/pom.xml +++ b/pom.xml @@ -59,6 +59,7 @@ lib/trino-parquet lib/trino-plugin-toolkit lib/trino-record-decoder + plugin/trino-af plugin/trino-ai-functions plugin/trino-base-jdbc plugin/trino-bigquery