Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions config/log4j.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
log4j.rootLogger=INFO, console
log4j.logger.org.apache.hudi.hive=DEBUG
log4j.logger.org.apache.hudi.hive.HiveSyncTool=DEBUG

log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{ISO8601} %-5p [%c{1}] %m%n
16 changes: 6 additions & 10 deletions config/spark-config-s3.properties
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
spark.serializer=org.apache.spark.serializer.KryoSerializer
spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog
spark.sql.hive.convertMetastoreParquet=false

spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension

# AWS S3 Configuration (replace placeholders with your actual values)
spark.hadoop.fs.s3a.access.key=test
spark.hadoop.fs.s3a.secret.key=test
spark.hadoop.fs.s3a.endpoint=http://localhost:4566
spark.hadoop.fs.s3a.access.key=admin
spark.hadoop.fs.s3a.secret.key=password
spark.hadoop.fs.s3a.endpoint=http://localhost:9000
spark.hadoop.fs.s3a.path.style.access=true
spark.hadoop.fs.s3a.connection.ssl.enabled=false
spark.sql.catalogImplementation=hive
spark.sql.hive.metastore.uris=thrift://localhost:9083
16 changes: 3 additions & 13 deletions hudi-datalake/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,15 +1,6 @@
version: '3.1'

services:
localstack:
container_name: localstack
image: localstack/localstack:latest
environment:
- EDGE_PORT=4566
- SERVICES=lambda,s3
ports:
- '4566-4583:4566-4583'

postgres:
restart: always
container_name: hudidb
Expand Down Expand Up @@ -41,9 +32,9 @@ services:
environment:
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
KAFKA_ADVERTISED_HOST_NAME: localhost
KAFKA_ADVERTISED_HOST_NAME: kafka
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_CREATE_TOPICS: "sampleTopic:1:1"
KAFKA_CREATE_TOPICS: "test1.v1.retail_transactions:1:1"
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"

schema-registry:
Expand Down Expand Up @@ -71,8 +62,7 @@ services:
VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
depends_on: [ kafka ]
depends_on: [kafka]
restart: on-failure
ports:
- "8083:8083"

150 changes: 150 additions & 0 deletions hudi-trino-integration/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
# hudi-trino-integeration-guide
hudi-trino-integeration-guide

![Screenshot 2024-05-09 at 11 47 08 AM](https://github.com/soumilshah1995/hudi-trino-integeration-guide/assets/39345855/1dd59542-1ff4-44ab-b254-ae7ce7d08945)


# Ingest Code
```
try:
import os
import sys
import uuid
import pyspark
import datetime
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
from faker import Faker
import datetime
from datetime import datetime
from pyspark.sql.types import StructType, StructField, StringType

import random
import pandas as pd # Import Pandas library for pretty printing

print("Imports loaded ")

except Exception as e:
print("error", e)

HUDI_VERSION = '1.0.0-beta1'
SPARK_VERSION = '3.4'

os.environ["JAVA_HOME"] = "/opt/homebrew/opt/openjdk@11"
SUBMIT_ARGS = f"--packages org.apache.hadoop:hadoop-aws:3.3.2,org.apache.hudi:hudi-spark{SPARK_VERSION}-bundle_2.12:{HUDI_VERSION} pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
os.environ['PYSPARK_PYTHON'] = sys.executable

# Spark session
spark = SparkSession.builder \
.config('spark.executor.memory', '4g') \
.config('spark.driver.memory', '4g') \
.config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
.config('spark.sql.extensions', 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \
.config('className', 'org.apache.hudi') \
.config('spark.sql.hive.convertMetastoreParquet', 'false') \
.getOrCreate()

spark._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "http://127.0.0.1:9000/")
spark._jsc.hadoopConfiguration().set("fs.s3a.access.key", "admin")
spark._jsc.hadoopConfiguration().set("fs.s3a.secret.key", "password")
spark._jsc.hadoopConfiguration().set("fs.s3a.path.style.access", "true")
spark._jsc.hadoopConfiguration().set("fs.s3a.connection.ssl.enabled", "false")
spark._jsc.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
spark._jsc.hadoopConfiguration().set("fs.s3a.aws.credentials.provider",
"org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")

import uuid
from faker import Faker
from datetime import datetime

faker = Faker()

def get_customer_data(total_customers=2):
customers_array = []
for i in range(0, total_customers):
customer_data = {
"customer_id": str(uuid.uuid4()),
"name": faker.name(),
"created_at": datetime.now().isoformat().__str__(),
"address": faker.address(),
"state": str(faker.state_abbr()), # Adding state information
"salary": faker.random_int(min=30000, max=100000)
}
customers_array.append(customer_data)
return customers_array

global total_customers, order_data_sample_size
total_customers = 10000
customer_data = get_customer_data(total_customers=total_customers)


from pyspark.sql.types import StructType, StructField, StringType, TimestampType

# Assuming you have the customer_data already defined

# Define schema
schema = StructType([
StructField("customer_id", StringType(), nullable=True),
StructField("name", StringType(), nullable=True),
StructField("created_at", StringType(), nullable=True), # Change to StringType
StructField("address", StringType(), nullable=True),
StructField("state", StringType(), nullable=True),
StructField("salary", StringType(), nullable=True)
])

# Create DataFrame
spark_df_customers = spark.createDataFrame(data=customer_data, schema=schema)

# Show DataFrame
spark_df_customers.show(1, truncate=True)

# Print DataFrame schema
spark_df_customers.printSchema()
spark_df = spark_df_customers
database="default"
table_name="customers_t1"


path = f"s3a://warehouse/{database}/{table_name}"

hudi_options = {
'hoodie.table.name': table_name,
'hoodie.datasource.write.table.type': 'COPY_ON_WRITE',
'hoodie.datasource.write.table.name': 'customers',
'hoodie.datasource.write.operation': 'upsert',
'hoodie.datasource.write.recordkey.field': 'customer_id',
'hoodie.datasource.write.precombine.field': 'created_at',
"hoodie.datasource.write.partitionpath.field": "state",


"hoodie.enable.data.skipping": "true",
"hoodie.metadata.enable": "true",
"hoodie.metadata.index.column.stats.enable": "true",

"hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor",
"hoodie.datasource.hive_sync.metastore.uris": "thrift://localhost:9083",
"hoodie.datasource.hive_sync.mode": "hms",
"hoodie.datasource.hive_sync.enable": "true",
"hoodie.datasource.hive_sync.database": "default",
"hoodie.datasource.hive_sync.table": table_name,

}

print("\n")
print(path)
print("\n")

spark_df.write.format("hudi"). \
options(**hudi_options). \
mode("append"). \
save(path)

```

```
%load_ext sql
%sql trino://admin@localhost:8080/default
%sql USE hudi.default
%sql select * from customers_t1 limit 2
```
102 changes: 102 additions & 0 deletions hudi-trino-integration/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
version: "3"

services:
trino-coordinator:
image: 'trinodb/trino:400'
hostname: trino-coordinator
ports:
- '8080:8080'
volumes:
- ./trino/etc:/etc/trino

metastore_db:
image: postgres:11
hostname: metastore_db
ports:
- 5433:5432 # Change the host port to 5433
environment:
POSTGRES_USER: hive
POSTGRES_PASSWORD: hive
POSTGRES_DB: metastore
command: [ "postgres", "-c", "wal_level=logical" ]
healthcheck:
test: [ "CMD", "psql", "-U", "hive", "-c", "SELECT 1" ]
interval: 10s
timeout: 5s
retries: 5
volumes:
- ./postgresscripts:/docker-entrypoint-initdb.d

hive-metastore:
hostname: hive-metastore
image: 'starburstdata/hive:3.1.2-e.18'
ports:
- '9083:9083' # Metastore Thrift
environment:
HIVE_METASTORE_DRIVER: org.postgresql.Driver
HIVE_METASTORE_JDBC_URL: jdbc:postgresql://metastore_db:5432/metastore # Internal port remains 5432
HIVE_METASTORE_USER: hive
HIVE_METASTORE_PASSWORD: hive
HIVE_METASTORE_WAREHOUSE_DIR: s3://warehouse/
S3_ENDPOINT: http://minio:9000
S3_ACCESS_KEY: admin
S3_SECRET_KEY: password
S3_PATH_STYLE_ACCESS: "true"
REGION: ""
GOOGLE_CLOUD_KEY_FILE_PATH: ""
AZURE_ADL_CLIENT_ID: ""
AZURE_ADL_CREDENTIAL: ""
AZURE_ADL_REFRESH_URL: ""
AZURE_ABFS_STORAGE_ACCOUNT: ""
AZURE_ABFS_ACCESS_KEY: ""
AZURE_WASB_STORAGE_ACCOUNT: ""
AZURE_ABFS_OAUTH: ""
AZURE_ABFS_OAUTH_TOKEN_PROVIDER: ""
AZURE_ABFS_OAUTH_CLIENT_ID: ""
AZURE_ABFS_OAUTH_SECRET: ""
AZURE_ABFS_OAUTH_ENDPOINT: ""
AZURE_WASB_ACCESS_KEY: ""
HIVE_METASTORE_USERS_IN_ADMIN_ROLE: "admin"
depends_on:
- metastore_db
healthcheck:
test: bash -c "exec 6<> /dev/tcp/localhost/9083"

minio:
image: minio/minio
environment:
- MINIO_ROOT_USER=admin
- MINIO_ROOT_PASSWORD=password
- MINIO_DOMAIN=minio
networks:
default:
aliases:
- warehouse.minio
ports:
- 9001:9001
- 9000:9000
command: [ "server", "/data", "--console-address", ":9001" ]

mc:
depends_on:
- minio
image: minio/mc
environment:
- AWS_ACCESS_KEY_ID=admin
- AWS_SECRET_ACCESS_KEY=password
- AWS_REGION=us-east-1
entrypoint: >
/bin/sh -c "
until (/usr/bin/mc config host add minio http://minio:9000 admin password) do echo '...waiting...' && sleep 1; done;
/usr/bin/mc rm -r --force minio/warehouse;
/usr/bin/mc mb minio/warehouse;
/usr/bin/mc policy set public minio/warehouse;
tail -f /dev/null
"

volumes:
hive-metastore-postgresql:

networks:
default:
name: hudi
2 changes: 2 additions & 0 deletions hudi-trino-integration/postgresscripts/create_hive_table.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- Create a database called 'hive'
CREATE DATABASE hive;
7 changes: 7 additions & 0 deletions hudi-trino-integration/trino/etc/catalog/hive.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
connector.name=hive
hive.metastore.uri=thrift://hive-metastore:9083
hive.s3.aws-access-key=admin
hive.s3.aws-secret-key=password
hive.s3.endpoint=http://minio:9000
hive.s3.path-style-access=true
hive.s3.ssl.enabled=false
7 changes: 7 additions & 0 deletions hudi-trino-integration/trino/etc/catalog/hudi.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
connector.name=hudi
hive.metastore.uri=thrift://hive-metastore:9083
hive.s3.aws-access-key=admin
hive.s3.aws-secret-key=password
hive.s3.endpoint=http://minio:9000
hive.s3.path-style-access=true
hive.s3.ssl.enabled=false
2 changes: 2 additions & 0 deletions hudi-trino-integration/trino/etc/catalog/minio.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
connector.name=hudi
hive.metastore.uri=thrift://hive-metastore:9083
6 changes: 6 additions & 0 deletions hudi-trino-integration/trino/etc/config.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
#single node install config
coordinator=true
node-scheduler.include-coordinator=true
http-server.http.port=8080
discovery-server.enabled=true
discovery.uri=http://localhost:8080
11 changes: 11 additions & 0 deletions hudi-trino-integration/trino/etc/jvm.config
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
-server
-Xmx1G
-XX:+UseG1GC
-XX:G1HeapRegionSize=32M
-XX:+ExplicitGCInvokesConcurrent
-XX:+HeapDumpOnOutOfMemoryError
-XX:+UseGCOverheadLimit
-XX:+ExitOnOutOfMemoryError
-XX:ReservedCodeCacheSize=256M
-Djdk.attach.allowAttachSelf=true
-Djdk.nio.maxCachedBufferSize=2000000
2 changes: 2 additions & 0 deletions hudi-trino-integration/trino/etc/log.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# Enable verbose logging from Presto
#io.trino=DEBUG
3 changes: 3 additions & 0 deletions hudi-trino-integration/trino/etc/node.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
node.environment=docker
node.data-dir=/data/trino
plugin.dir=/usr/lib/trino/plugin
Loading