diff --git a/config/log4j.properties b/config/log4j.properties new file mode 100644 index 0000000..2559484 --- /dev/null +++ b/config/log4j.properties @@ -0,0 +1,7 @@ +log4j.rootLogger=INFO, console +log4j.logger.org.apache.hudi.hive=DEBUG +log4j.logger.org.apache.hudi.hive.HiveSyncTool=DEBUG + +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{ISO8601} %-5p [%c{1}] %m%n diff --git a/config/spark-config-s3.properties b/config/spark-config-s3.properties index 13298e8..1cffdd5 100644 --- a/config/spark-config-s3.properties +++ b/config/spark-config-s3.properties @@ -1,11 +1,7 @@ -spark.serializer=org.apache.spark.serializer.KryoSerializer -spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog -spark.sql.hive.convertMetastoreParquet=false - -spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension - -# AWS S3 Configuration (replace placeholders with your actual values) -spark.hadoop.fs.s3a.access.key=test -spark.hadoop.fs.s3a.secret.key=test -spark.hadoop.fs.s3a.endpoint=http://localhost:4566 +spark.hadoop.fs.s3a.access.key=admin +spark.hadoop.fs.s3a.secret.key=password +spark.hadoop.fs.s3a.endpoint=http://localhost:9000 spark.hadoop.fs.s3a.path.style.access=true +spark.hadoop.fs.s3a.connection.ssl.enabled=false +spark.sql.catalogImplementation=hive +spark.sql.hive.metastore.uris=thrift://localhost:9083 diff --git a/hudi-datalake/docker-compose.yml b/hudi-datalake/docker-compose.yml index 6cfd4d2..8d79999 100644 --- a/hudi-datalake/docker-compose.yml +++ b/hudi-datalake/docker-compose.yml @@ -1,15 +1,6 @@ version: '3.1' services: - localstack: - container_name: localstack - image: localstack/localstack:latest - environment: - - EDGE_PORT=4566 - - SERVICES=lambda,s3 - ports: - - '4566-4583:4566-4583' - postgres: restart: always container_name: hudidb @@ -41,9 +32,9 @@ services: environment: KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092 KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092 - KAFKA_ADVERTISED_HOST_NAME: localhost + KAFKA_ADVERTISED_HOST_NAME: kafka KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 - KAFKA_CREATE_TOPICS: "sampleTopic:1:1" + KAFKA_CREATE_TOPICS: "test1.v1.retail_transactions:1:1" KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true" schema-registry: @@ -71,8 +62,7 @@ services: VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081 CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081 - depends_on: [ kafka ] + depends_on: [kafka] restart: on-failure ports: - "8083:8083" - diff --git a/hudi-trino-integration/README.md b/hudi-trino-integration/README.md new file mode 100644 index 0000000..1455549 --- /dev/null +++ b/hudi-trino-integration/README.md @@ -0,0 +1,150 @@ +# hudi-trino-integeration-guide +hudi-trino-integeration-guide + +![Screenshot 2024-05-09 at 11 47 08 AM](https://github.com/soumilshah1995/hudi-trino-integeration-guide/assets/39345855/1dd59542-1ff4-44ab-b254-ae7ce7d08945) + + +# Ingest Code +``` +try: + import os + import sys + import uuid + import pyspark + import datetime + from pyspark.sql import SparkSession + from pyspark import SparkConf, SparkContext + from faker import Faker + import datetime + from datetime import datetime + from pyspark.sql.types import StructType, StructField, StringType + + import random + import pandas as pd # Import Pandas library for pretty printing + + print("Imports loaded ") + +except Exception as e: + print("error", e) + +HUDI_VERSION = '1.0.0-beta1' +SPARK_VERSION = '3.4' + +os.environ["JAVA_HOME"] = "/opt/homebrew/opt/openjdk@11" +SUBMIT_ARGS = f"--packages org.apache.hadoop:hadoop-aws:3.3.2,org.apache.hudi:hudi-spark{SPARK_VERSION}-bundle_2.12:{HUDI_VERSION} pyspark-shell" +os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS +os.environ['PYSPARK_PYTHON'] = sys.executable + +# Spark session +spark = SparkSession.builder \ + .config('spark.executor.memory', '4g') \ + .config('spark.driver.memory', '4g') \ + .config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \ + .config('spark.sql.extensions', 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \ + .config('className', 'org.apache.hudi') \ + .config('spark.sql.hive.convertMetastoreParquet', 'false') \ + .getOrCreate() + +spark._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "http://127.0.0.1:9000/") +spark._jsc.hadoopConfiguration().set("fs.s3a.access.key", "admin") +spark._jsc.hadoopConfiguration().set("fs.s3a.secret.key", "password") +spark._jsc.hadoopConfiguration().set("fs.s3a.path.style.access", "true") +spark._jsc.hadoopConfiguration().set("fs.s3a.connection.ssl.enabled", "false") +spark._jsc.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") +spark._jsc.hadoopConfiguration().set("fs.s3a.aws.credentials.provider", + "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") + +import uuid +from faker import Faker +from datetime import datetime + +faker = Faker() + +def get_customer_data(total_customers=2): + customers_array = [] + for i in range(0, total_customers): + customer_data = { + "customer_id": str(uuid.uuid4()), + "name": faker.name(), + "created_at": datetime.now().isoformat().__str__(), + "address": faker.address(), + "state": str(faker.state_abbr()), # Adding state information + "salary": faker.random_int(min=30000, max=100000) + } + customers_array.append(customer_data) + return customers_array + +global total_customers, order_data_sample_size +total_customers = 10000 +customer_data = get_customer_data(total_customers=total_customers) + + +from pyspark.sql.types import StructType, StructField, StringType, TimestampType + +# Assuming you have the customer_data already defined + +# Define schema +schema = StructType([ + StructField("customer_id", StringType(), nullable=True), + StructField("name", StringType(), nullable=True), + StructField("created_at", StringType(), nullable=True), # Change to StringType + StructField("address", StringType(), nullable=True), + StructField("state", StringType(), nullable=True), + StructField("salary", StringType(), nullable=True) +]) + +# Create DataFrame +spark_df_customers = spark.createDataFrame(data=customer_data, schema=schema) + +# Show DataFrame +spark_df_customers.show(1, truncate=True) + +# Print DataFrame schema +spark_df_customers.printSchema() +spark_df = spark_df_customers +database="default" +table_name="customers_t1" + + +path = f"s3a://warehouse/{database}/{table_name}" + +hudi_options = { + 'hoodie.table.name': table_name, + 'hoodie.datasource.write.table.type': 'COPY_ON_WRITE', + 'hoodie.datasource.write.table.name': 'customers', + 'hoodie.datasource.write.operation': 'upsert', + 'hoodie.datasource.write.recordkey.field': 'customer_id', + 'hoodie.datasource.write.precombine.field': 'created_at', + "hoodie.datasource.write.partitionpath.field": "state", + + + "hoodie.enable.data.skipping": "true", + "hoodie.metadata.enable": "true", + "hoodie.metadata.index.column.stats.enable": "true", + + "hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor", + "hoodie.datasource.hive_sync.metastore.uris": "thrift://localhost:9083", + "hoodie.datasource.hive_sync.mode": "hms", + "hoodie.datasource.hive_sync.enable": "true", + "hoodie.datasource.hive_sync.database": "default", + "hoodie.datasource.hive_sync.table": table_name, + +} + +print("\n") +print(path) +print("\n") + +spark_df.write.format("hudi"). \ + options(**hudi_options). \ + mode("append"). \ + save(path) + +``` + +``` +%load_ext sql +%sql trino://admin@localhost:8080/default +%sql USE hudi.default +%sql select * from customers_t1 limit 2 +``` diff --git a/hudi-trino-integration/docker-compose.yml b/hudi-trino-integration/docker-compose.yml new file mode 100644 index 0000000..9e49db7 --- /dev/null +++ b/hudi-trino-integration/docker-compose.yml @@ -0,0 +1,102 @@ +version: "3" + +services: + trino-coordinator: + image: 'trinodb/trino:400' + hostname: trino-coordinator + ports: + - '8080:8080' + volumes: + - ./trino/etc:/etc/trino + + metastore_db: + image: postgres:11 + hostname: metastore_db + ports: + - 5433:5432 # Change the host port to 5433 + environment: + POSTGRES_USER: hive + POSTGRES_PASSWORD: hive + POSTGRES_DB: metastore + command: [ "postgres", "-c", "wal_level=logical" ] + healthcheck: + test: [ "CMD", "psql", "-U", "hive", "-c", "SELECT 1" ] + interval: 10s + timeout: 5s + retries: 5 + volumes: + - ./postgresscripts:/docker-entrypoint-initdb.d + + hive-metastore: + hostname: hive-metastore + image: 'starburstdata/hive:3.1.2-e.18' + ports: + - '9083:9083' # Metastore Thrift + environment: + HIVE_METASTORE_DRIVER: org.postgresql.Driver + HIVE_METASTORE_JDBC_URL: jdbc:postgresql://metastore_db:5432/metastore # Internal port remains 5432 + HIVE_METASTORE_USER: hive + HIVE_METASTORE_PASSWORD: hive + HIVE_METASTORE_WAREHOUSE_DIR: s3://warehouse/ + S3_ENDPOINT: http://minio:9000 + S3_ACCESS_KEY: admin + S3_SECRET_KEY: password + S3_PATH_STYLE_ACCESS: "true" + REGION: "" + GOOGLE_CLOUD_KEY_FILE_PATH: "" + AZURE_ADL_CLIENT_ID: "" + AZURE_ADL_CREDENTIAL: "" + AZURE_ADL_REFRESH_URL: "" + AZURE_ABFS_STORAGE_ACCOUNT: "" + AZURE_ABFS_ACCESS_KEY: "" + AZURE_WASB_STORAGE_ACCOUNT: "" + AZURE_ABFS_OAUTH: "" + AZURE_ABFS_OAUTH_TOKEN_PROVIDER: "" + AZURE_ABFS_OAUTH_CLIENT_ID: "" + AZURE_ABFS_OAUTH_SECRET: "" + AZURE_ABFS_OAUTH_ENDPOINT: "" + AZURE_WASB_ACCESS_KEY: "" + HIVE_METASTORE_USERS_IN_ADMIN_ROLE: "admin" + depends_on: + - metastore_db + healthcheck: + test: bash -c "exec 6<> /dev/tcp/localhost/9083" + + minio: + image: minio/minio + environment: + - MINIO_ROOT_USER=admin + - MINIO_ROOT_PASSWORD=password + - MINIO_DOMAIN=minio + networks: + default: + aliases: + - warehouse.minio + ports: + - 9001:9001 + - 9000:9000 + command: [ "server", "/data", "--console-address", ":9001" ] + + mc: + depends_on: + - minio + image: minio/mc + environment: + - AWS_ACCESS_KEY_ID=admin + - AWS_SECRET_ACCESS_KEY=password + - AWS_REGION=us-east-1 + entrypoint: > + /bin/sh -c " + until (/usr/bin/mc config host add minio http://minio:9000 admin password) do echo '...waiting...' && sleep 1; done; + /usr/bin/mc rm -r --force minio/warehouse; + /usr/bin/mc mb minio/warehouse; + /usr/bin/mc policy set public minio/warehouse; + tail -f /dev/null + " + +volumes: + hive-metastore-postgresql: + +networks: + default: + name: hudi diff --git a/hudi-trino-integration/postgresscripts/create_hive_table.sql b/hudi-trino-integration/postgresscripts/create_hive_table.sql new file mode 100644 index 0000000..fa5c238 --- /dev/null +++ b/hudi-trino-integration/postgresscripts/create_hive_table.sql @@ -0,0 +1,2 @@ +-- Create a database called 'hive' +CREATE DATABASE hive; \ No newline at end of file diff --git a/hudi-trino-integration/trino/etc/catalog/hive.properties b/hudi-trino-integration/trino/etc/catalog/hive.properties new file mode 100644 index 0000000..abefa0f --- /dev/null +++ b/hudi-trino-integration/trino/etc/catalog/hive.properties @@ -0,0 +1,7 @@ +connector.name=hive +hive.metastore.uri=thrift://hive-metastore:9083 +hive.s3.aws-access-key=admin +hive.s3.aws-secret-key=password +hive.s3.endpoint=http://minio:9000 +hive.s3.path-style-access=true +hive.s3.ssl.enabled=false \ No newline at end of file diff --git a/hudi-trino-integration/trino/etc/catalog/hudi.properties b/hudi-trino-integration/trino/etc/catalog/hudi.properties new file mode 100644 index 0000000..274e7d9 --- /dev/null +++ b/hudi-trino-integration/trino/etc/catalog/hudi.properties @@ -0,0 +1,7 @@ +connector.name=hudi +hive.metastore.uri=thrift://hive-metastore:9083 +hive.s3.aws-access-key=admin +hive.s3.aws-secret-key=password +hive.s3.endpoint=http://minio:9000 +hive.s3.path-style-access=true +hive.s3.ssl.enabled=false \ No newline at end of file diff --git a/hudi-trino-integration/trino/etc/catalog/minio.properties b/hudi-trino-integration/trino/etc/catalog/minio.properties new file mode 100644 index 0000000..4f21aa1 --- /dev/null +++ b/hudi-trino-integration/trino/etc/catalog/minio.properties @@ -0,0 +1,2 @@ +connector.name=hudi +hive.metastore.uri=thrift://hive-metastore:9083 diff --git a/hudi-trino-integration/trino/etc/config.properties b/hudi-trino-integration/trino/etc/config.properties new file mode 100644 index 0000000..e3661dc --- /dev/null +++ b/hudi-trino-integration/trino/etc/config.properties @@ -0,0 +1,6 @@ +#single node install config +coordinator=true +node-scheduler.include-coordinator=true +http-server.http.port=8080 +discovery-server.enabled=true +discovery.uri=http://localhost:8080 \ No newline at end of file diff --git a/hudi-trino-integration/trino/etc/jvm.config b/hudi-trino-integration/trino/etc/jvm.config new file mode 100644 index 0000000..d8d2170 --- /dev/null +++ b/hudi-trino-integration/trino/etc/jvm.config @@ -0,0 +1,11 @@ +-server +-Xmx1G +-XX:+UseG1GC +-XX:G1HeapRegionSize=32M +-XX:+ExplicitGCInvokesConcurrent +-XX:+HeapDumpOnOutOfMemoryError +-XX:+UseGCOverheadLimit +-XX:+ExitOnOutOfMemoryError +-XX:ReservedCodeCacheSize=256M +-Djdk.attach.allowAttachSelf=true +-Djdk.nio.maxCachedBufferSize=2000000 \ No newline at end of file diff --git a/hudi-trino-integration/trino/etc/log.properties b/hudi-trino-integration/trino/etc/log.properties new file mode 100644 index 0000000..76d3955 --- /dev/null +++ b/hudi-trino-integration/trino/etc/log.properties @@ -0,0 +1,2 @@ +# Enable verbose logging from Presto +#io.trino=DEBUG \ No newline at end of file diff --git a/hudi-trino-integration/trino/etc/node.properties b/hudi-trino-integration/trino/etc/node.properties new file mode 100644 index 0000000..72cb8a0 --- /dev/null +++ b/hudi-trino-integration/trino/etc/node.properties @@ -0,0 +1,3 @@ +node.environment=docker +node.data-dir=/data/trino +plugin.dir=/usr/lib/trino/plugin \ No newline at end of file diff --git a/readme.md b/readme.md index 22e38c6..001c0c8 100644 --- a/readme.md +++ b/readme.md @@ -179,7 +179,7 @@ To get started with this project, follow these steps: --executor-memory 1g \ utilities-jar/hudi-utilities-slim-bundle_2.12-0.14.0.jar \ --table-type COPY_ON_WRITE \ - --target-base-path s3a://hudi-demo-bucket/hudidb/ \ + --target-base-path s3a://warehouse/default/retail_transactions \ --target-table retail_transactions \ --source-ordering-field tran_date \ --source-class org.apache.hudi.utilities.sources.debezium.PostgresDebeziumSource \ @@ -195,11 +195,23 @@ To get started with this project, follow these steps: --hoodie-conf hoodie.deltastreamer.source.kafka.topic=test1.v1.retail_transactions \ --hoodie-conf auto.offset.reset=earliest \ --hoodie-conf hoodie.datasource.write.recordkey.field=tran_id \ + --hoodie-conf hoodie.database.name=default \ + --hoodie-conf hoodie.table.name=retail_transactions \ + --hoodie-conf hoodie.datasource.write.table.name=retail_transactions \ --hoodie-conf hoodie.datasource.write.partitionpath.field=store_city \ - --hoodie-conf hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.SimpleKeyGenerator \ - --hoodie-conf hoodie.datasource.write.hive_style_partitioning=true \ - --hoodie-conf hoodie.datasource.write.precombine.field=tran_date - ``` + --hoodie-conf hoodie.datasource.write.precombine.field=tran_date \ + --hoodie-conf hoodie.enable.data.skipping=true \ + --hoodie-conf hoodie.metadata.enable=true \ + --hoodie-conf hoodie.metadata.index.column.stats.enable=true \ + --hoodie-conf hoodie.datasource.hive_sync.enable=true \ + --hoodie-conf hoodie.datasource.hive_sync.database=default \ + --hoodie-conf hoodie.datasource.hive_sync.table=retail_transactions \ + --hoodie-conf hoodie.datasource.hive_sync.username=hive \ + --hoodie-conf hoodie.datasource.hive_sync.password=hive \ + --hoodie-conf hoodie.datasource.hive_sync.mode=hms \ + --hoodie-conf hoodie.datasource.hive_sync.metastore.uris=thrift://localhost:9083 \ + --hoodie-conf hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.MultiPartKeysValueExtractor +``` 8. Monitor the data flow and storage under the path mentioned in `--target-base-path`