diff --git a/.asf.yaml b/.asf.yaml
index d9734f1cd804..8d43adeff39f 100644
--- a/.asf.yaml
+++ b/.asf.yaml
@@ -33,14 +33,16 @@ github:
squash: true
rebase: true
+ pull_requests:
+ # auto-delete head branches after being merged
+ del_branch_on_merge: true
+
protected_branches:
main:
required_pull_request_reviews:
required_approving_review_count: 1
required_linear_history: true
-
- del_branch_on_merge: true
features:
wiki: true
@@ -64,6 +66,7 @@ notifications:
commits: commits@iceberg.apache.org
issues: issues@iceberg.apache.org
pullrequests: issues@iceberg.apache.org
+ jobs: ci-jobs@iceberg.apache.org
jira_options: link label link label
publish:
diff --git a/.baseline/checkstyle/checkstyle-suppressions.xml b/.baseline/checkstyle/checkstyle-suppressions.xml
index 1f180e40a123..1e79b1a7aa4b 100644
--- a/.baseline/checkstyle/checkstyle-suppressions.xml
+++ b/.baseline/checkstyle/checkstyle-suppressions.xml
@@ -55,6 +55,8 @@
+
+
diff --git a/.baseline/scala/.scala212fmt.conf b/.baseline/scala/.scala212fmt.conf
new file mode 100644
index 000000000000..485a11d6ff17
--- /dev/null
+++ b/.baseline/scala/.scala212fmt.conf
@@ -0,0 +1,32 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+version = 3.9.7
+
+align = none
+align.openParenDefnSite = false
+align.openParenCallSite = false
+align.tokens = []
+importSelectors = "singleLine"
+optIn = {
+ configStyleArguments = false
+}
+danglingParentheses.preset = false
+docstrings.style = Asterisk
+docstrings.wrap = false
+maxColumn = 100
+runner.dialect = scala212
diff --git a/.baseline/scala/.scala213fmt.conf b/.baseline/scala/.scala213fmt.conf
new file mode 100644
index 000000000000..57c44a61c46e
--- /dev/null
+++ b/.baseline/scala/.scala213fmt.conf
@@ -0,0 +1,32 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+version = 3.9.7
+
+align = none
+align.openParenDefnSite = false
+align.openParenCallSite = false
+align.tokens = []
+importSelectors = "singleLine"
+optIn = {
+ configStyleArguments = false
+}
+danglingParentheses.preset = false
+docstrings.style = Asterisk
+docstrings.wrap = false
+maxColumn = 100
+runner.dialect = scala213
diff --git a/.github/ISSUE_TEMPLATE/iceberg_bug_report.yml b/.github/ISSUE_TEMPLATE/iceberg_bug_report.yml
index b383de4e766b..2e507128434f 100644
--- a/.github/ISSUE_TEMPLATE/iceberg_bug_report.yml
+++ b/.github/ISSUE_TEMPLATE/iceberg_bug_report.yml
@@ -34,7 +34,8 @@ body:
description: What Apache Iceberg version are you using?
multiple: false
options:
- - "1.10.0 (latest release)"
+ - "1.10.1 (latest release)"
+ - "1.10.0"
- "1.9.2"
- "1.9.1"
- "1.9.0"
diff --git a/.github/ISSUE_TEMPLATE/iceberg_question.yml b/.github/ISSUE_TEMPLATE/iceberg_question.yml
index a170e9acceb5..b5966dccc7ac 100644
--- a/.github/ISSUE_TEMPLATE/iceberg_question.yml
+++ b/.github/ISSUE_TEMPLATE/iceberg_question.yml
@@ -25,7 +25,7 @@ body:
- type: markdown
attributes:
value: |
- Feel free to ask your question on [Slack](https://join.slack.com/t/apache-iceberg/shared_invite/zt-2561tq9qr-UtISlHgsdY3Virs3Z2_btQ) as well.
+ Feel free to ask your question on [Slack](https://join.slack.com/t/apache-iceberg/shared_invite/zt-3kclosz6r-3heAW3d~_PHefmN2A_~cAg) as well.
Do **NOT** share any sensitive information like passwords, security tokens, private URLs etc.
- type: textarea
diff --git a/.github/dependabot.yml b/.github/dependabot.yml
index 51a34758fe09..6acbbacda25d 100644
--- a/.github/dependabot.yml
+++ b/.github/dependabot.yml
@@ -24,9 +24,6 @@ updates:
schedule:
interval: "weekly"
day: "sunday"
- ignore:
- - dependency-name: "*"
- update-types: ["version-update:semver-major"]
- package-ecosystem: "gradle"
directory: "/"
schedule:
diff --git a/.github/workflows/api-binary-compatibility.yml b/.github/workflows/api-binary-compatibility.yml
index 60e89b2fd03c..3343ba4035cb 100644
--- a/.github/workflows/api-binary-compatibility.yml
+++ b/.github/workflows/api-binary-compatibility.yml
@@ -43,7 +43,7 @@ jobs:
revapi:
runs-on: ubuntu-24.04
steps:
- - uses: actions/checkout@v4
+ - uses: actions/checkout@v6
with:
# fetch-depth of zero ensures that the tags are pulled in and we're not in a detached HEAD state
# revapi depends on the tags, specifically the tag from git describe, to find the relevant override
@@ -55,10 +55,17 @@ jobs:
with:
distribution: zulu
java-version: 17
+ - uses: actions/cache@v5
+ with:
+ path: |
+ ~/.gradle/caches
+ ~/.gradle/wrapper
+ key: ${{ runner.os }}-gradle-${{ hashFiles('**/*.gradle*', '**/gradle-wrapper.properties') }}
+ restore-keys: ${{ runner.os }}-gradle-
- run: |
echo "Using the old version tag, as per git describe, of $(git describe)";
- run: ./gradlew revapi --rerun-tasks
- - uses: actions/upload-artifact@v4
+ - uses: actions/upload-artifact@v6
if: failure()
with:
name: test logs
diff --git a/.github/workflows/delta-conversion-ci.yml b/.github/workflows/delta-conversion-ci.yml
index e8da8d33bafc..65da3a445f10 100644
--- a/.github/workflows/delta-conversion-ci.yml
+++ b/.github/workflows/delta-conversion-ci.yml
@@ -61,6 +61,7 @@ on:
- 'CONTRIBUTING.md'
- '**/LICENSE'
- '**/NOTICE'
+ - 'doap.rdf'
concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
@@ -71,16 +72,16 @@ jobs:
runs-on: ubuntu-24.04
strategy:
matrix:
- jvm: [11, 17, 21]
+ jvm: [17, 21]
env:
SPARK_LOCAL_IP: localhost
steps:
- - uses: actions/checkout@v4
+ - uses: actions/checkout@v6
- uses: actions/setup-java@v5
with:
distribution: zulu
java-version: ${{ matrix.jvm }}
- - uses: actions/cache@v4
+ - uses: actions/cache@v5
with:
path: |
~/.gradle/caches
@@ -89,7 +90,7 @@ jobs:
restore-keys: ${{ runner.os }}-gradle-
- run: echo -e "$(ip addr show eth0 | grep "inet\b" | awk '{print $2}' | cut -d/ -f1)\t$(hostname -f) $(hostname -s)" | sudo tee -a /etc/hosts
- run: ./gradlew -DsparkVersions=3.5 -DscalaVersion=2.12 -DkafkaVersions= -DflinkVersions= :iceberg-delta-lake:check -Pquick=true -x javadoc
- - uses: actions/upload-artifact@v4
+ - uses: actions/upload-artifact@v6
if: failure()
with:
name: test logs
@@ -100,16 +101,16 @@ jobs:
runs-on: ubuntu-24.04
strategy:
matrix:
- jvm: [11, 17, 21]
+ jvm: [17, 21]
env:
SPARK_LOCAL_IP: localhost
steps:
- - uses: actions/checkout@v4
+ - uses: actions/checkout@v6
- uses: actions/setup-java@v5
with:
distribution: zulu
java-version: ${{ matrix.jvm }}
- - uses: actions/cache@v4
+ - uses: actions/cache@v5
with:
path: |
~/.gradle/caches
@@ -118,7 +119,7 @@ jobs:
restore-keys: ${{ runner.os }}-gradle-
- run: echo -e "$(ip addr show eth0 | grep "inet\b" | awk '{print $2}' | cut -d/ -f1)\t$(hostname -f) $(hostname -s)" | sudo tee -a /etc/hosts
- run: ./gradlew -DsparkVersions=3.5 -DscalaVersion=2.13 -DkafkaVersions= -DflinkVersions= :iceberg-delta-lake:check -Pquick=true -x javadoc
- - uses: actions/upload-artifact@v4
+ - uses: actions/upload-artifact@v6
if: failure()
with:
name: test logs
diff --git a/.github/workflows/docs-ci.yml b/.github/workflows/docs-ci.yml
index 67d054f8f576..48da4785dadb 100644
--- a/.github/workflows/docs-ci.yml
+++ b/.github/workflows/docs-ci.yml
@@ -32,8 +32,8 @@ jobs:
matrix:
os: [ubuntu-latest, macos-latest]
steps:
- - uses: actions/checkout@v4
- - uses: actions/setup-python@v5
+ - uses: actions/checkout@v6
+ - uses: actions/setup-python@v6
with:
python-version: 3.x
- name: Build Iceberg documentation
diff --git a/.github/workflows/flink-ci.yml b/.github/workflows/flink-ci.yml
index 365744ef906f..479cafd2fe18 100644
--- a/.github/workflows/flink-ci.yml
+++ b/.github/workflows/flink-ci.yml
@@ -61,6 +61,7 @@ on:
- 'CONTRIBUTING.md'
- '**/LICENSE'
- '**/NOTICE'
+ - 'doap.rdf'
concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
@@ -73,17 +74,17 @@ jobs:
runs-on: ubuntu-24.04
strategy:
matrix:
- jvm: [11, 17, 21]
+ jvm: [17, 21]
flink: ['1.20', '2.0', '2.1']
env:
SPARK_LOCAL_IP: localhost
steps:
- - uses: actions/checkout@v4
+ - uses: actions/checkout@v6
- uses: actions/setup-java@v5
with:
distribution: zulu
java-version: ${{ matrix.jvm }}
- - uses: actions/cache@v4
+ - uses: actions/cache@v5
with:
path: |
~/.gradle/caches
@@ -92,7 +93,7 @@ jobs:
restore-keys: ${{ runner.os }}-gradle-
- run: echo -e "$(ip addr show eth0 | grep "inet\b" | awk '{print $2}' | cut -d/ -f1)\t$(hostname -f) $(hostname -s)" | sudo tee -a /etc/hosts
- run: ./gradlew -DsparkVersions= -DkafkaVersions= -DflinkVersions=${{ matrix.flink }} :iceberg-flink:iceberg-flink-${{ matrix.flink }}:check :iceberg-flink:iceberg-flink-runtime-${{ matrix.flink }}:check -Pquick=true -x javadoc -DtestParallelism=auto
- - uses: actions/upload-artifact@v4
+ - uses: actions/upload-artifact@v6
if: failure()
with:
name: test logs
diff --git a/.github/workflows/hive-ci.yml b/.github/workflows/hive-ci.yml
index a01697664cc9..7044c30d0f95 100644
--- a/.github/workflows/hive-ci.yml
+++ b/.github/workflows/hive-ci.yml
@@ -62,6 +62,7 @@ on:
- 'CONTRIBUTING.md'
- '**/LICENSE'
- '**/NOTICE'
+ - 'doap.rdf'
concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
@@ -72,16 +73,16 @@ jobs:
runs-on: ubuntu-24.04
strategy:
matrix:
- jvm: [11, 17, 21]
+ jvm: [17, 21]
env:
SPARK_LOCAL_IP: localhost
steps:
- - uses: actions/checkout@v4
+ - uses: actions/checkout@v6
- uses: actions/setup-java@v5
with:
distribution: zulu
java-version: ${{ matrix.jvm }}
- - uses: actions/cache@v4
+ - uses: actions/cache@v5
with:
path: |
~/.gradle/caches
@@ -90,7 +91,7 @@ jobs:
restore-keys: ${{ runner.os }}-gradle-
- run: echo -e "$(ip addr show eth0 | grep "inet\b" | awk '{print $2}' | cut -d/ -f1)\t$(hostname -f) $(hostname -s)" | sudo tee -a /etc/hosts
- run: ./gradlew -DsparkVersions= -DflinkVersions= -DkafkaVersions= -Pquick=true :iceberg-mr:check -x javadoc
- - uses: actions/upload-artifact@v4
+ - uses: actions/upload-artifact@v6
if: failure()
with:
name: test logs
diff --git a/.github/workflows/java-ci.yml b/.github/workflows/java-ci.yml
index 6e25a1745f0c..0e48f3bb18ee 100644
--- a/.github/workflows/java-ci.yml
+++ b/.github/workflows/java-ci.yml
@@ -57,6 +57,7 @@ on:
- 'CONTRIBUTING.md'
- '**/LICENSE'
- '**/NOTICE'
+ - 'doap.rdf'
concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
@@ -67,16 +68,16 @@ jobs:
runs-on: ubuntu-24.04
strategy:
matrix:
- jvm: [11, 17, 21]
+ jvm: [17, 21]
env:
SPARK_LOCAL_IP: localhost
steps:
- - uses: actions/checkout@v4
+ - uses: actions/checkout@v6
- uses: actions/setup-java@v5
with:
distribution: zulu
java-version: ${{ matrix.jvm }}
- - uses: actions/cache@v4
+ - uses: actions/cache@v5
with:
path: |
~/.gradle/caches
@@ -85,7 +86,7 @@ jobs:
restore-keys: ${{ runner.os }}-gradle-
- run: echo -e "$(ip addr show eth0 | grep "inet\b" | awk '{print $2}' | cut -d/ -f1)\t$(hostname -f) $(hostname -s)" | sudo tee -a /etc/hosts
- run: ./gradlew check -DsparkVersions= -DflinkVersions= -DkafkaVersions= -Pquick=true -x javadoc
- - uses: actions/upload-artifact@v4
+ - uses: actions/upload-artifact@v6
if: failure()
with:
name: test logs
@@ -96,9 +97,9 @@ jobs:
runs-on: ubuntu-24.04
strategy:
matrix:
- jvm: [11, 17, 21]
+ jvm: [17, 21]
steps:
- - uses: actions/checkout@v4
+ - uses: actions/checkout@v6
- uses: actions/setup-java@v5
with:
distribution: zulu
@@ -109,9 +110,9 @@ jobs:
runs-on: ubuntu-24.04
strategy:
matrix:
- jvm: [11, 17, 21]
+ jvm: [17, 21]
steps:
- - uses: actions/checkout@v4
+ - uses: actions/checkout@v6
- uses: actions/setup-java@v5
with:
distribution: zulu
diff --git a/.github/workflows/jmh-benchmarks.yml b/.github/workflows/jmh-benchmarks.yml
index 3018f24b4925..878b1d3e20a8 100644
--- a/.github/workflows/jmh-benchmarks.yml
+++ b/.github/workflows/jmh-benchmarks.yml
@@ -42,7 +42,7 @@ jobs:
matrix: ${{ steps.set-matrix.outputs.matrix }}
foundlabel: ${{ steps.set-matrix.outputs.foundlabel }}
steps:
- - uses: actions/checkout@v4
+ - uses: actions/checkout@v6
with:
repository: ${{ github.event.inputs.repo }}
ref: ${{ github.event.inputs.ref }}
@@ -75,7 +75,7 @@ jobs:
env:
SPARK_LOCAL_IP: localhost
steps:
- - uses: actions/checkout@v4
+ - uses: actions/checkout@v6
with:
repository: ${{ github.event.inputs.repo }}
ref: ${{ github.event.inputs.ref }}
@@ -83,7 +83,7 @@ jobs:
with:
distribution: zulu
java-version: 17
- - uses: actions/cache@v4
+ - uses: actions/cache@v5
with:
path: |
~/.gradle/caches
@@ -95,7 +95,7 @@ jobs:
- name: Run Benchmark
run: ./gradlew :iceberg-spark:${{ github.event.inputs.spark_version }}:jmh -PjmhIncludeRegex=${{ matrix.benchmark }} -PjmhOutputPath=benchmark/${{ matrix.benchmark }}.txt
- - uses: actions/upload-artifact@v4
+ - uses: actions/upload-artifact@v6
if: ${{ always() }}
with:
name: benchmark-results
diff --git a/.github/workflows/kafka-connect-ci.yml b/.github/workflows/kafka-connect-ci.yml
index 72a55dfd991b..8074958f39b5 100644
--- a/.github/workflows/kafka-connect-ci.yml
+++ b/.github/workflows/kafka-connect-ci.yml
@@ -61,6 +61,7 @@ on:
- 'CONTRIBUTING.md'
- '**/LICENSE'
- '**/NOTICE'
+ - 'doap.rdf'
concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
@@ -72,16 +73,16 @@ jobs:
runs-on: ubuntu-24.04
strategy:
matrix:
- jvm: [11, 17, 21]
+ jvm: [17, 21]
env:
SPARK_LOCAL_IP: localhost
steps:
- - uses: actions/checkout@v4
+ - uses: actions/checkout@v6
- uses: actions/setup-java@v5
with:
distribution: zulu
java-version: ${{ matrix.jvm }}
- - uses: actions/cache@v4
+ - uses: actions/cache@v5
with:
path: |
~/.gradle/caches
@@ -96,7 +97,7 @@ jobs:
:iceberg-kafka-connect:iceberg-kafka-connect:check \
:iceberg-kafka-connect:iceberg-kafka-connect-runtime:check \
-Pquick=true -x javadoc
- - uses: actions/upload-artifact@v4
+ - uses: actions/upload-artifact@v6
if: failure()
with:
name: test logs
diff --git a/.github/workflows/labeler.yml b/.github/workflows/labeler.yml
index a34b62c1014e..4d8a367df591 100644
--- a/.github/workflows/labeler.yml
+++ b/.github/workflows/labeler.yml
@@ -28,7 +28,7 @@ jobs:
triage:
runs-on: ubuntu-24.04
steps:
- - uses: actions/labeler@v5
+ - uses: actions/labeler@v6
with:
repo-token: "${{ secrets.GITHUB_TOKEN }}"
sync-labels: true
diff --git a/.github/workflows/license-check.yml b/.github/workflows/license-check.yml
index ab55763c9aef..7640d40f10b2 100644
--- a/.github/workflows/license-check.yml
+++ b/.github/workflows/license-check.yml
@@ -24,6 +24,6 @@ jobs:
rat:
runs-on: ubuntu-24.04
steps:
- - uses: actions/checkout@v4
+ - uses: actions/checkout@v6
- run: |
dev/check-license
diff --git a/.github/workflows/open-api.yml b/.github/workflows/open-api.yml
index ab2109e1df6a..dedae02067e2 100644
--- a/.github/workflows/open-api.yml
+++ b/.github/workflows/open-api.yml
@@ -41,11 +41,10 @@ jobs:
runs-on: ubuntu-24.04
steps:
- - uses: actions/checkout@v4
- - uses: actions/setup-python@v5
- with:
- python-version: 3.9
- - name: Install
+ - uses: actions/checkout@v6
+ - name: Install uv
+ uses: astral-sh/setup-uv@v7
+ - name: Install dependencies
working-directory: ./open-api
run: make install
- name: Validate REST catalog spec
@@ -57,6 +56,3 @@ jobs:
- name: Check if code is up to date
working-directory: ./open-api
run: git diff --exit-code
- - name: Validate S3 REST Signer spec
- working-directory: ./aws/src/main/resources
- run: openapi-spec-validator s3-signer-open-api.yaml
diff --git a/.github/workflows/publish-iceberg-rest-fixture-docker.yml b/.github/workflows/publish-iceberg-rest-fixture-docker.yml
index 09609aa58f43..5d379615f5fe 100644
--- a/.github/workflows/publish-iceberg-rest-fixture-docker.yml
+++ b/.github/workflows/publish-iceberg-rest-fixture-docker.yml
@@ -37,11 +37,18 @@ jobs:
if: github.repository_owner == 'apache'
runs-on: ubuntu-latest
steps:
- - uses: actions/checkout@v3
+ - uses: actions/checkout@v6
- uses: actions/setup-java@v5
with:
distribution: zulu
java-version: 21
+ - uses: actions/cache@v5
+ with:
+ path: |
+ ~/.gradle/caches
+ ~/.gradle/wrapper
+ key: ${{ runner.os }}-gradle-${{ hashFiles('**/*.gradle*', '**/gradle-wrapper.properties') }}
+ restore-keys: ${{ runner.os }}-gradle-
- name: Build Iceberg Open API project
run: ./gradlew :iceberg-open-api:shadowJar
- name: Login to Docker Hub
diff --git a/.github/workflows/publish-snapshot.yml b/.github/workflows/publish-snapshot.yml
index 3ad3d9273b5b..c8012b5d02f9 100644
--- a/.github/workflows/publish-snapshot.yml
+++ b/.github/workflows/publish-snapshot.yml
@@ -30,7 +30,7 @@ jobs:
if: github.repository_owner == 'apache'
runs-on: ubuntu-24.04
steps:
- - uses: actions/checkout@v4
+ - uses: actions/checkout@v6
with:
# we need to fetch all tags so that getProjectVersion() in build.gradle correctly determines the next SNAPSHOT version from the newest tag
fetch-depth: 0
@@ -38,6 +38,13 @@ jobs:
with:
distribution: zulu
java-version: 17
+ - uses: actions/cache@v5
+ with:
+ path: |
+ ~/.gradle/caches
+ ~/.gradle/wrapper
+ key: ${{ runner.os }}-gradle-${{ hashFiles('**/*.gradle*', '**/gradle-wrapper.properties') }}
+ restore-keys: ${{ runner.os }}-gradle-
- run: |
./gradlew printVersion
./gradlew -DallModules publishApachePublicationToMavenRepository -PmavenUser=${{ secrets.NEXUS_USER }} -PmavenPassword=${{ secrets.NEXUS_PW }}
diff --git a/.github/workflows/recurring-jmh-benchmarks.yml b/.github/workflows/recurring-jmh-benchmarks.yml
index 521a09414f4c..3424d7f152dc 100644
--- a/.github/workflows/recurring-jmh-benchmarks.yml
+++ b/.github/workflows/recurring-jmh-benchmarks.yml
@@ -45,7 +45,7 @@ jobs:
env:
SPARK_LOCAL_IP: localhost
steps:
- - uses: actions/checkout@v4
+ - uses: actions/checkout@v6
with:
repository: ${{ github.event.inputs.repo }}
ref: ${{ github.event.inputs.ref }}
@@ -53,7 +53,7 @@ jobs:
with:
distribution: zulu
java-version: 17
- - uses: actions/cache@v4
+ - uses: actions/cache@v5
with:
path: |
~/.gradle/caches
@@ -65,7 +65,7 @@ jobs:
- name: Run Benchmark
run: ./gradlew :iceberg-spark:${{ matrix.spark_version }}:jmh -PjmhIncludeRegex=${{ matrix.benchmark }} -PjmhOutputPath=benchmark/${{ matrix.benchmark }}.txt -PjmhJsonOutputPath=benchmark/${{ matrix.benchmark }}.json
- - uses: actions/upload-artifact@v4
+ - uses: actions/upload-artifact@v6
if: ${{ always() }}
with:
name: benchmark-results
diff --git a/.github/workflows/site-ci.yml b/.github/workflows/site-ci.yml
index b8661f990c96..7efbf94ba884 100644
--- a/.github/workflows/site-ci.yml
+++ b/.github/workflows/site-ci.yml
@@ -30,8 +30,8 @@ jobs:
deploy:
runs-on: ubuntu-latest
steps:
- - uses: actions/checkout@v4
- - uses: actions/setup-python@v5
+ - uses: actions/checkout@v6
+ - uses: actions/setup-python@v6
with:
python-version: 3.x
- name: Deploy Iceberg documentation
diff --git a/.github/workflows/spark-ci.yml b/.github/workflows/spark-ci.yml
index 66193515efb7..71428a100b6e 100644
--- a/.github/workflows/spark-ci.yml
+++ b/.github/workflows/spark-ci.yml
@@ -61,6 +61,7 @@ on:
- 'CONTRIBUTING.md'
- '**/LICENSE'
- '**/NOTICE'
+ - 'doap.rdf'
concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
@@ -71,27 +72,27 @@ jobs:
runs-on: ubuntu-24.04
strategy:
matrix:
- jvm: [11, 17, 21]
- spark: ['3.4', '3.5', '4.0']
+ jvm: [17, 21]
+ spark: ['3.4', '3.5', '4.0', '4.1']
scala: ['2.12', '2.13']
exclude:
# Spark 3.5 is the first version not failing on Java 21 (https://issues.apache.org/jira/browse/SPARK-42369)
# Full Java 21 support is coming in Spark 4 (https://issues.apache.org/jira/browse/SPARK-43831)
- - jvm: 11
- spark: '4.0'
- jvm: 21
spark: '3.4'
- spark: '4.0'
scala: '2.12'
+ - spark: '4.1'
+ scala: '2.12'
env:
SPARK_LOCAL_IP: localhost
steps:
- - uses: actions/checkout@v4
+ - uses: actions/checkout@v6
- uses: actions/setup-java@v5
with:
distribution: zulu
java-version: ${{ matrix.jvm }}
- - uses: actions/cache@v4
+ - uses: actions/cache@v5
with:
path: |
~/.gradle/caches
@@ -108,7 +109,7 @@ jobs:
:iceberg-spark:iceberg-spark-extensions-${{ matrix.spark }}_${{ matrix.scala }}:check \
:iceberg-spark:iceberg-spark-runtime-${{ matrix.spark }}_${{ matrix.scala }}:check \
-Pquick=true -x javadoc
- - uses: actions/upload-artifact@v4
+ - uses: actions/upload-artifact@v6
if: failure()
with:
name: test logs
diff --git a/.github/workflows/stale.yml b/.github/workflows/stale.yml
index 6846d53f538b..9a7134d5465c 100644
--- a/.github/workflows/stale.yml
+++ b/.github/workflows/stale.yml
@@ -32,7 +32,7 @@ jobs:
if: github.repository_owner == 'apache'
runs-on: ubuntu-24.04
steps:
- - uses: actions/stale@v9.1.0
+ - uses: actions/stale@v10.1.1
with:
# stale issues
stale-issue-label: 'stale'
diff --git a/.gitignore b/.gitignore
index f931c10e9407..bcac4d1610fc 100644
--- a/.gitignore
+++ b/.gitignore
@@ -38,6 +38,8 @@ spark/v3.5/spark/benchmark/*
spark/v3.5/spark-extensions/benchmark/*
spark/v4.0/spark/benchmark/*
spark/v4.0/spark-extensions/benchmark/*
+spark/v4.1/spark/benchmark/*
+spark/v4.1/spark-extensions/benchmark/*
*/benchmark/*
__pycache__/
diff --git a/README.md b/README.md
index 582e108b76ea..ea967e077fb2 100644
--- a/README.md
+++ b/README.md
@@ -51,7 +51,7 @@ Community discussions happen primarily on the [dev mailing list][dev-list] or on
### Building
-Iceberg is built using Gradle with Java 11, 17, or 21.
+Iceberg is built using Gradle with Java 17 or 21.
* To invoke a build and run tests: `./gradlew build`
* To skip tests: `./gradlew build -x test -x integrationTest`
diff --git a/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/TestOSSURI.java b/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/TestOSSURI.java
index e388ee932589..cc44564ae6b7 100644
--- a/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/TestOSSURI.java
+++ b/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/TestOSSURI.java
@@ -50,10 +50,10 @@ public void testEncodedString() {
@Test
public void invalidBucket() {
- assertThatThrownBy(() -> new OSSURI("https://test_bucket/path/to/file"))
+ assertThatThrownBy(() -> new OSSURI("https://test#bucket/path/to/file"))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining(
- OSS_RESOURCE_MANAGER.getFormattedString("BucketNameInvalid", "test_bucket"));
+ OSS_RESOURCE_MANAGER.getFormattedString("BucketNameInvalid", "test#bucket"));
}
@Test
diff --git a/api/src/main/java/org/apache/iceberg/BatchScanAdapter.java b/api/src/main/java/org/apache/iceberg/BatchScanAdapter.java
index 02b3d241d893..f723e04d0c4e 100644
--- a/api/src/main/java/org/apache/iceberg/BatchScanAdapter.java
+++ b/api/src/main/java/org/apache/iceberg/BatchScanAdapter.java
@@ -25,11 +25,11 @@
import org.apache.iceberg.metrics.MetricsReporter;
/** An adapter that allows using {@link TableScan} as {@link BatchScan}. */
-class BatchScanAdapter implements BatchScan {
+public class BatchScanAdapter implements BatchScan {
private final TableScan scan;
- BatchScanAdapter(TableScan scan) {
+ public BatchScanAdapter(TableScan scan) {
this.scan = scan;
}
@@ -151,4 +151,9 @@ public long splitOpenFileCost() {
public BatchScan metricsReporter(MetricsReporter reporter) {
return new BatchScanAdapter(scan.metricsReporter(reporter));
}
+
+ @Override
+ public BatchScan minRowsRequested(long numRows) {
+ return new BatchScanAdapter(scan.minRowsRequested(numRows));
+ }
}
diff --git a/api/src/main/java/org/apache/iceberg/PartitionStatistics.java b/api/src/main/java/org/apache/iceberg/PartitionStatistics.java
new file mode 100644
index 000000000000..c0c4c07b7e27
--- /dev/null
+++ b/api/src/main/java/org/apache/iceberg/PartitionStatistics.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+/** Interface for partition statistics returned from a {@link PartitionStatisticsScan}. */
+public interface PartitionStatistics extends StructLike {
+
+ /* The positions of each statistics within the full schema of partition statistics. */
+ int PARTITION_POSITION = 0;
+ int SPEC_ID_POSITION = 1;
+ int DATA_RECORD_COUNT_POSITION = 2;
+ int DATA_FILE_COUNT_POSITION = 3;
+ int TOTAL_DATA_FILE_SIZE_IN_BYTES_POSITION = 4;
+ int POSITION_DELETE_RECORD_COUNT_POSITION = 5;
+ int POSITION_DELETE_FILE_COUNT_POSITION = 6;
+ int EQUALITY_DELETE_RECORD_COUNT_POSITION = 7;
+ int EQUALITY_DELETE_FILE_COUNT_POSITION = 8;
+ int TOTAL_RECORD_COUNT_POSITION = 9;
+ int LAST_UPDATED_AT_POSITION = 10;
+ int LAST_UPDATED_SNAPSHOT_ID_POSITION = 11;
+ int DV_COUNT_POSITION = 12;
+
+ /** Returns the partition of these partition statistics */
+ StructLike partition();
+
+ /** Returns the spec ID of the partition of these partition statistics */
+ Integer specId();
+
+ /** Returns the number of data records in the partition */
+ Long dataRecordCount();
+
+ /** Returns the number of data files in the partition */
+ Integer dataFileCount();
+
+ /** Returns the total size of data files in bytes in the partition */
+ Long totalDataFileSizeInBytes();
+
+ /**
+ * Returns the number of positional delete records in the partition. Also includes dv record count
+ * as per spec
+ */
+ Long positionDeleteRecordCount();
+
+ /** Returns the number of positional delete files in the partition */
+ Integer positionDeleteFileCount();
+
+ /** Returns the number of equality delete records in the partition */
+ Long equalityDeleteRecordCount();
+
+ /** Returns the number of equality delete files in the partition */
+ Integer equalityDeleteFileCount();
+
+ /** Returns the total number of records in the partition */
+ Long totalRecords();
+
+ /** Returns the timestamp in milliseconds when the partition was last updated */
+ Long lastUpdatedAt();
+
+ /** Returns the ID of the snapshot that last updated this partition */
+ Long lastUpdatedSnapshotId();
+
+ /** Returns the number of delete vectors in the partition */
+ Integer dvCount();
+}
diff --git a/api/src/main/java/org/apache/iceberg/PartitionStatisticsScan.java b/api/src/main/java/org/apache/iceberg/PartitionStatisticsScan.java
new file mode 100644
index 000000000000..18d8b2031821
--- /dev/null
+++ b/api/src/main/java/org/apache/iceberg/PartitionStatisticsScan.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.io.CloseableIterable;
+
+/** API for configuring partition statistics scan. */
+public interface PartitionStatisticsScan {
+
+ /**
+ * Create a new scan from this scan's configuration that will use the given snapshot by ID.
+ *
+ * @param snapshotId a snapshot ID
+ * @return a new scan based on this with the given snapshot ID
+ * @throws IllegalArgumentException if the snapshot cannot be found
+ */
+ PartitionStatisticsScan useSnapshot(long snapshotId);
+
+ /**
+ * Create a new scan from the results of this, where partitions are filtered by the {@link
+ * Expression}.
+ *
+ * @param filter a filter expression
+ * @return a new scan based on this with results filtered by the expression
+ */
+ PartitionStatisticsScan filter(Expression filter);
+
+ /**
+ * Create a new scan from this with the schema as its projection.
+ *
+ * @param schema a projection schema
+ * @return a new scan based on this with the given projection
+ */
+ PartitionStatisticsScan project(Schema schema);
+
+ /**
+ * Scans a partition statistics file belonging to a particular snapshot
+ *
+ * @return an Iterable of partition statistics
+ */
+ CloseableIterable scan();
+}
diff --git a/api/src/main/java/org/apache/iceberg/Scan.java b/api/src/main/java/org/apache/iceberg/Scan.java
index 339bc75336ba..9785ce6603ad 100644
--- a/api/src/main/java/org/apache/iceberg/Scan.java
+++ b/api/src/main/java/org/apache/iceberg/Scan.java
@@ -195,4 +195,17 @@ default ThisT metricsReporter(MetricsReporter reporter) {
throw new UnsupportedOperationException(
this.getClass().getName() + " doesn't implement metricsReporter");
}
+
+ /**
+ * Create a new scan that returns files with at least the given number of rows. This is used as a
+ * hint and is entirely optional in order to not have to return more rows than necessary. This may
+ * return fewer rows if the scan does not contain that many, or it may return more than requested.
+ *
+ * @param numRows The minimum number of rows requested
+ * @return A new scan based on this with at least the given number of rows
+ */
+ default ThisT minRowsRequested(long numRows) {
+ throw new UnsupportedOperationException(
+ this.getClass().getName() + " doesn't implement minRowsRequested");
+ }
}
diff --git a/api/src/main/java/org/apache/iceberg/SnapshotAncestryValidator.java b/api/src/main/java/org/apache/iceberg/SnapshotAncestryValidator.java
index 64b579a1a377..6cb486c65f5c 100644
--- a/api/src/main/java/org/apache/iceberg/SnapshotAncestryValidator.java
+++ b/api/src/main/java/org/apache/iceberg/SnapshotAncestryValidator.java
@@ -18,7 +18,6 @@
*/
package org.apache.iceberg;
-import java.util.function.Function;
import javax.annotation.Nonnull;
/**
@@ -28,7 +27,7 @@
* table state.
*/
@FunctionalInterface
-public interface SnapshotAncestryValidator extends Function, Boolean> {
+public interface SnapshotAncestryValidator {
SnapshotAncestryValidator NON_VALIDATING = baseSnapshots -> true;
@@ -38,8 +37,7 @@ public interface SnapshotAncestryValidator extends Function,
* @param baseSnapshots ancestry of the base table metadata snapshots
* @return boolean for whether the update is valid
*/
- @Override
- Boolean apply(Iterable baseSnapshots);
+ boolean validate(Iterable baseSnapshots);
/**
* Validation message that will be included when throwing {@link
diff --git a/api/src/main/java/org/apache/iceberg/Table.java b/api/src/main/java/org/apache/iceberg/Table.java
index 97ea9ba76526..3c0689e89288 100644
--- a/api/src/main/java/org/apache/iceberg/Table.java
+++ b/api/src/main/java/org/apache/iceberg/Table.java
@@ -83,6 +83,18 @@ default IncrementalChangelogScan newIncrementalChangelogScan() {
throw new UnsupportedOperationException("Incremental changelog scan is not supported");
}
+ /**
+ * Create a new {@link PartitionStatisticsScan} for this table.
+ *
+ *
Once a partition statistics scan is created, it can be refined to project columns and filter
+ * data.
+ *
+ * @return a partition statistics scan for this table
+ */
+ default PartitionStatisticsScan newPartitionStatisticsScan() {
+ throw new UnsupportedOperationException("Partition statistics scan is not supported");
+ }
+
/**
* Return the {@link Schema schema} for this table.
*
diff --git a/api/src/main/java/org/apache/iceberg/encryption/EncryptingFileIO.java b/api/src/main/java/org/apache/iceberg/encryption/EncryptingFileIO.java
index 2ab335a4d6c6..b4b99017fa66 100644
--- a/api/src/main/java/org/apache/iceberg/encryption/EncryptingFileIO.java
+++ b/api/src/main/java/org/apache/iceberg/encryption/EncryptingFileIO.java
@@ -30,8 +30,10 @@
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.ManifestListFile;
import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.FileInfo;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.io.SupportsPrefixOperations;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
@@ -46,7 +48,11 @@ public static EncryptingFileIO combine(FileIO io, EncryptionManager em) {
return combine(encryptingIO.io, em);
}
- return new EncryptingFileIO(io, em);
+ if (io instanceof SupportsPrefixOperations) {
+ return new WithSupportsPrefixOperations((SupportsPrefixOperations) io, em);
+ } else {
+ return new EncryptingFileIO(io, em);
+ }
}
private final FileIO io;
@@ -206,4 +212,25 @@ public EncryptionKeyMetadata copy() {
return new SimpleKeyMetadata(metadataBuffer.duplicate());
}
}
+
+ static class WithSupportsPrefixOperations extends EncryptingFileIO
+ implements SupportsPrefixOperations {
+
+ private final SupportsPrefixOperations prefixIo;
+
+ WithSupportsPrefixOperations(SupportsPrefixOperations io, EncryptionManager em) {
+ super(io, em);
+ this.prefixIo = io;
+ }
+
+ @Override
+ public Iterable listPrefix(String prefix) {
+ return prefixIo.listPrefix(prefix);
+ }
+
+ @Override
+ public void deletePrefix(String prefix) {
+ prefixIo.deletePrefix(prefix);
+ }
+ }
}
diff --git a/api/src/main/java/org/apache/iceberg/expressions/ExpressionUtil.java b/api/src/main/java/org/apache/iceberg/expressions/ExpressionUtil.java
index d3dc00d914c7..9bb2b713439d 100644
--- a/api/src/main/java/org/apache/iceberg/expressions/ExpressionUtil.java
+++ b/api/src/main/java/org/apache/iceberg/expressions/ExpressionUtil.java
@@ -24,7 +24,6 @@
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.Locale;
-import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.regex.Pattern;
@@ -69,7 +68,6 @@ public class ExpressionUtil {
"\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}(:\\d{2}(.\\d{7,9})?)?([-+]\\d{2}:\\d{2}|Z)");
static final int LONG_IN_PREDICATE_ABBREVIATION_THRESHOLD = 10;
- private static final int LONG_IN_PREDICATE_ABBREVIATION_MIN_GAIN = 5;
private ExpressionUtil() {}
@@ -502,19 +500,20 @@ public String predicate(UnboundPredicate pred) {
private static List abbreviateValues(List sanitizedValues) {
if (sanitizedValues.size() >= LONG_IN_PREDICATE_ABBREVIATION_THRESHOLD) {
- Set distinctValues = ImmutableSet.copyOf(sanitizedValues);
- if (distinctValues.size()
- <= sanitizedValues.size() - LONG_IN_PREDICATE_ABBREVIATION_MIN_GAIN) {
- List abbreviatedList = Lists.newArrayListWithCapacity(distinctValues.size() + 1);
- abbreviatedList.addAll(distinctValues);
+ List distinctValues = ImmutableSet.copyOf(sanitizedValues).asList();
+ int abbreviatedSize =
+ Math.min(distinctValues.size(), LONG_IN_PREDICATE_ABBREVIATION_THRESHOLD);
+ List abbreviatedList = Lists.newArrayListWithCapacity(abbreviatedSize + 1);
+ abbreviatedList.addAll(distinctValues.subList(0, abbreviatedSize));
+ if (abbreviatedSize < sanitizedValues.size()) {
abbreviatedList.add(
String.format(
Locale.ROOT,
"... (%d values hidden, %d in total)",
- sanitizedValues.size() - distinctValues.size(),
+ sanitizedValues.size() - abbreviatedSize,
sanitizedValues.size()));
- return abbreviatedList;
}
+ return abbreviatedList;
}
return sanitizedValues;
}
diff --git a/api/src/main/java/org/apache/iceberg/expressions/InclusiveMetricsEvaluator.java b/api/src/main/java/org/apache/iceberg/expressions/InclusiveMetricsEvaluator.java
index aa0441f49011..81cbbe785519 100644
--- a/api/src/main/java/org/apache/iceberg/expressions/InclusiveMetricsEvaluator.java
+++ b/api/src/main/java/org/apache/iceberg/expressions/InclusiveMetricsEvaluator.java
@@ -327,6 +327,14 @@ public Boolean eq(Bound term, Literal lit) {
public Boolean notEq(Bound term, Literal lit) {
// because the bounds are not necessarily a min or max value, this cannot be answered using
// them. notEq(col, X) with (X, Y) doesn't guarantee that X is a value in col.
+ // However, when min == max and the file has no nulls or NaN values, we can safely prune
+ // if that value equals the literal.
+ T value = uniqueValue(term);
+
+ if (value != null && lit.comparator().compare(value, lit.value()) == 0) {
+ return ROWS_CANNOT_MATCH;
+ }
+
return ROWS_MIGHT_MATCH;
}
@@ -381,6 +389,14 @@ public Boolean in(Bound term, Set literalSet) {
public Boolean notIn(Bound term, Set literalSet) {
// because the bounds are not necessarily a min or max value, this cannot be answered using
// them. notIn(col, {X, ...}) with (X, Y) doesn't guarantee that X is a value in col.
+ // However, when min == max and the file has no nulls or NaN values, we can safely prune
+ // if that value is in the exclusion set.
+ T value = uniqueValue(term);
+
+ if (value != null && literalSet.contains(value)) {
+ return ROWS_CANNOT_MATCH;
+ }
+
return ROWS_MIGHT_MATCH;
}
@@ -490,6 +506,34 @@ private boolean containsNaNsOnly(Integer id) {
&& nanCounts.get(id).equals(valueCounts.get(id));
}
+ /**
+ * Returns the column's single value if all rows contain the same value. Defined as a column
+ * with no nulls, no NaNs, and lower bound equals upper bound. Returns null otherwise.
+ */
+ private T uniqueValue(Bound term) {
+ int id = term.ref().fieldId();
+ if (mayContainNull(id)) {
+ return null;
+ }
+
+ T lower = lowerBound(term);
+ T upper = upperBound(term);
+
+ if (lower == null || upper == null || NaNUtil.isNaN(lower) || NaNUtil.isNaN(upper)) {
+ return null;
+ }
+
+ if (nanCounts != null && nanCounts.containsKey(id) && nanCounts.get(id) != 0) {
+ return null;
+ }
+
+ if (!lower.equals(upper)) {
+ return null;
+ }
+
+ return lower;
+ }
+
private T lowerBound(Bound term) {
if (term instanceof BoundReference) {
return parseLowerBound((BoundReference) term);
diff --git a/api/src/main/java/org/apache/iceberg/expressions/VariantExpressionUtil.java b/api/src/main/java/org/apache/iceberg/expressions/VariantExpressionUtil.java
index dca11d5e4662..daec5216f0ef 100644
--- a/api/src/main/java/org/apache/iceberg/expressions/VariantExpressionUtil.java
+++ b/api/src/main/java/org/apache/iceberg/expressions/VariantExpressionUtil.java
@@ -24,13 +24,11 @@
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.DateTimeUtil;
import org.apache.iceberg.variants.PhysicalType;
import org.apache.iceberg.variants.VariantValue;
class VariantExpressionUtil {
- // TODO: Implement PhysicalType.TIME
- // TODO: Implement PhysicalType.TIMESTAMPNTZ_NANO and PhysicalType.TIMESTAMPTZ_NANO
- // TODO: Implement PhysicalType.UUID
private static final Map NO_CONVERSION_NEEDED =
ImmutableMap.builder()
.put(Types.IntegerType.get(), PhysicalType.INT32)
@@ -40,6 +38,10 @@ class VariantExpressionUtil {
.put(Types.DateType.get(), PhysicalType.DATE)
.put(Types.TimestampType.withoutZone(), PhysicalType.TIMESTAMPNTZ)
.put(Types.TimestampType.withZone(), PhysicalType.TIMESTAMPTZ)
+ .put(Types.TimestampNanoType.withoutZone(), PhysicalType.TIMESTAMPNTZ_NANOS)
+ .put(Types.TimestampNanoType.withZone(), PhysicalType.TIMESTAMPTZ_NANOS)
+ .put(Types.TimeType.get(), PhysicalType.TIME)
+ .put(Types.UUIDType.get(), PhysicalType.UUID)
.put(Types.StringType.get(), PhysicalType.STRING)
.put(Types.BinaryType.get(), PhysicalType.BINARY)
.put(Types.UnknownType.get(), PhysicalType.NULL)
@@ -47,7 +49,7 @@ class VariantExpressionUtil {
private VariantExpressionUtil() {}
- @SuppressWarnings("unchecked")
+ @SuppressWarnings({"unchecked", "CyclomaticComplexity"})
static T castTo(VariantValue value, Type type) {
if (value == null) {
return null;
@@ -111,6 +113,40 @@ static T castTo(VariantValue value, Type type) {
}
break;
+ case TIMESTAMP:
+ if (value.type() == PhysicalType.TIMESTAMPTZ_NANOS
+ || value.type() == PhysicalType.TIMESTAMPNTZ_NANOS) {
+ return (T)
+ (Long) DateTimeUtil.nanosToMicros(((Number) value.asPrimitive().get()).longValue());
+ } else if (value.type() == PhysicalType.DATE) {
+ return (T)
+ (Long)
+ DateTimeUtil.microsFromTimestamp(
+ DateTimeUtil.dateFromDays(((Number) value.asPrimitive().get()).intValue())
+ .atStartOfDay());
+ }
+ break;
+ case TIMESTAMP_NANO:
+ if (value.type() == PhysicalType.TIMESTAMPTZ || value.type() == PhysicalType.TIMESTAMPNTZ) {
+ return (T)
+ (Long) DateTimeUtil.microsToNanos(((Number) value.asPrimitive().get()).longValue());
+ } else if (value.type() == PhysicalType.DATE) {
+ return (T)
+ (Long)
+ DateTimeUtil.nanosFromTimestamp(
+ DateTimeUtil.dateFromDays(((Number) value.asPrimitive().get()).intValue())
+ .atStartOfDay());
+ }
+ break;
+ case DATE:
+ if (value.type() == PhysicalType.TIMESTAMPTZ || value.type() == PhysicalType.TIMESTAMPNTZ) {
+ return (T)
+ (Integer) DateTimeUtil.microsToDays(((Number) value.asPrimitive().get()).longValue());
+ } else if (value.type() == PhysicalType.TIMESTAMPTZ_NANOS
+ || value.type() == PhysicalType.TIMESTAMPNTZ_NANOS) {
+ return (T)
+ (Integer) DateTimeUtil.nanosToDays(((Number) value.asPrimitive().get()).longValue());
+ }
}
return null;
diff --git a/api/src/main/java/org/apache/iceberg/transforms/Transforms.java b/api/src/main/java/org/apache/iceberg/transforms/Transforms.java
index aacd4d430069..a3a6a3f6321d 100644
--- a/api/src/main/java/org/apache/iceberg/transforms/Transforms.java
+++ b/api/src/main/java/org/apache/iceberg/transforms/Transforms.java
@@ -29,7 +29,7 @@
* Factory methods for transforms.
*
*
Most users should create transforms using a {@link PartitionSpec#builderFor(Schema)} partition
- * spec builder}.
+ * spec builder.
*
* @see PartitionSpec#builderFor(Schema) The partition spec builder.
*/
diff --git a/api/src/main/java/org/apache/iceberg/util/CharSequenceSet.java b/api/src/main/java/org/apache/iceberg/util/CharSequenceSet.java
index cfdac0104c47..c13cbfd0cc28 100644
--- a/api/src/main/java/org/apache/iceberg/util/CharSequenceSet.java
+++ b/api/src/main/java/org/apache/iceberg/util/CharSequenceSet.java
@@ -18,183 +18,53 @@
*/
package org.apache.iceberg.util;
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.Set;
-import java.util.stream.Collectors;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
-import org.apache.iceberg.relocated.com.google.common.collect.Iterators;
-import org.apache.iceberg.relocated.com.google.common.collect.Sets;
-import org.apache.iceberg.relocated.com.google.common.collect.Streams;
-public class CharSequenceSet implements Set, Serializable {
+public class CharSequenceSet extends WrapperSet {
private static final ThreadLocal WRAPPERS =
ThreadLocal.withInitial(() -> CharSequenceWrapper.wrap(null));
- public static CharSequenceSet of(Iterable charSequences) {
- return new CharSequenceSet(charSequences);
- }
-
- public static CharSequenceSet empty() {
- return new CharSequenceSet(ImmutableList.of());
- }
-
- private final Set wrapperSet;
-
- private CharSequenceSet(Iterable charSequences) {
- this.wrapperSet =
- Sets.newHashSet(Iterables.transform(charSequences, CharSequenceWrapper::wrap));
+ private CharSequenceSet() {
+ // needed for serialization
}
- @Override
- public int size() {
- return wrapperSet.size();
+ private CharSequenceSet(Iterable extends CharSequence> charSequences) {
+ super(
+ Iterables.transform(
+ charSequences,
+ obj -> {
+ Preconditions.checkNotNull(obj, "Invalid object: null");
+ return CharSequenceWrapper.wrap(obj);
+ }));
}
- @Override
- public boolean isEmpty() {
- return wrapperSet.isEmpty();
+ public static CharSequenceSet of(Iterable extends CharSequence> charSequences) {
+ return new CharSequenceSet(charSequences);
}
- @Override
- public boolean contains(Object obj) {
- if (obj instanceof CharSequence) {
- CharSequenceWrapper wrapper = WRAPPERS.get();
- boolean result = wrapperSet.contains(wrapper.set((CharSequence) obj));
- wrapper.set(null); // don't hold a reference to the value
- return result;
- }
- return false;
+ public static CharSequenceSet empty() {
+ return new CharSequenceSet();
}
@Override
- public Iterator iterator() {
- return Iterators.transform(wrapperSet.iterator(), CharSequenceWrapper::get);
+ protected Wrapper wrapper() {
+ return WRAPPERS.get();
}
@Override
- public Object[] toArray() {
- return Iterators.toArray(iterator(), CharSequence.class);
+ protected Wrapper wrap(CharSequence file) {
+ return CharSequenceWrapper.wrap(file);
}
@Override
- @SuppressWarnings("unchecked")
- public T[] toArray(T[] destArray) {
- int size = wrapperSet.size();
- if (destArray.length < size) {
- return (T[]) toArray();
- }
-
- Iterator iter = iterator();
- int ind = 0;
- while (iter.hasNext()) {
- destArray[ind] = (T) iter.next();
- ind += 1;
- }
-
- if (destArray.length > size) {
- destArray[size] = null;
- }
-
- return destArray;
+ protected Class elementClass() {
+ return CharSequence.class;
}
@Override
public boolean add(CharSequence charSequence) {
- return wrapperSet.add(CharSequenceWrapper.wrap(charSequence));
- }
-
- @Override
- public boolean remove(Object obj) {
- if (obj instanceof CharSequence) {
- CharSequenceWrapper wrapper = WRAPPERS.get();
- boolean result = wrapperSet.remove(wrapper.set((CharSequence) obj));
- wrapper.set(null); // don't hold a reference to the value
- return result;
- }
- return false;
- }
-
- @Override
- @SuppressWarnings("CollectionUndefinedEquality")
- public boolean containsAll(Collection> objects) {
- if (objects != null) {
- return Iterables.all(objects, this::contains);
- }
- return false;
- }
-
- @Override
- public boolean addAll(Collection extends CharSequence> charSequences) {
- if (charSequences != null) {
- return Iterables.addAll(
- wrapperSet, Iterables.transform(charSequences, CharSequenceWrapper::wrap));
- }
- return false;
- }
-
- @Override
- public boolean retainAll(Collection> objects) {
- if (objects != null) {
- Set toRetain =
- objects.stream()
- .filter(CharSequence.class::isInstance)
- .map(CharSequence.class::cast)
- .map(CharSequenceWrapper::wrap)
- .collect(Collectors.toSet());
-
- return Iterables.retainAll(wrapperSet, toRetain);
- }
-
- return false;
- }
-
- @Override
- @SuppressWarnings("CollectionUndefinedEquality")
- public boolean removeAll(Collection> objects) {
- if (objects != null) {
- return objects.stream().filter(this::remove).count() != 0;
- }
-
- return false;
- }
-
- @Override
- public void clear() {
- wrapperSet.clear();
- }
-
- @SuppressWarnings("CollectionUndefinedEquality")
- @Override
- public boolean equals(Object other) {
- if (this == other) {
- return true;
- } else if (!(other instanceof Set)) {
- return false;
- }
-
- Set> that = (Set>) other;
-
- if (size() != that.size()) {
- return false;
- }
-
- try {
- return containsAll(that);
- } catch (ClassCastException | NullPointerException unused) {
- return false;
- }
- }
-
- @Override
- public int hashCode() {
- return wrapperSet.stream().mapToInt(CharSequenceWrapper::hashCode).sum();
- }
-
- @Override
- public String toString() {
- return Streams.stream(iterator()).collect(Collectors.joining("CharSequenceSet({", ", ", "})"));
+ // method is needed to not break API compatibility
+ return super.add(charSequence);
}
}
diff --git a/api/src/main/java/org/apache/iceberg/util/CharSequenceWrapper.java b/api/src/main/java/org/apache/iceberg/util/CharSequenceWrapper.java
index 854264c1ae21..59e8eb712dc6 100644
--- a/api/src/main/java/org/apache/iceberg/util/CharSequenceWrapper.java
+++ b/api/src/main/java/org/apache/iceberg/util/CharSequenceWrapper.java
@@ -18,12 +18,11 @@
*/
package org.apache.iceberg.util;
-import java.io.Serializable;
import org.apache.iceberg.types.Comparators;
import org.apache.iceberg.types.JavaHashes;
/** Wrapper class to adapt CharSequence for use in maps and sets. */
-public class CharSequenceWrapper implements CharSequence, Serializable {
+public class CharSequenceWrapper implements CharSequence, WrapperSet.Wrapper {
public static CharSequenceWrapper wrap(CharSequence seq) {
return new CharSequenceWrapper(seq);
}
@@ -39,6 +38,7 @@ private CharSequenceWrapper(CharSequence wrapped) {
this.wrapped = wrapped;
}
+ @Override
public CharSequenceWrapper set(CharSequence newWrapped) {
this.wrapped = newWrapped;
this.hashCode = 0;
@@ -46,6 +46,7 @@ public CharSequenceWrapper set(CharSequence newWrapped) {
return this;
}
+ @Override
public CharSequence get() {
return wrapped;
}
diff --git a/api/src/main/java/org/apache/iceberg/util/UUIDUtil.java b/api/src/main/java/org/apache/iceberg/util/UUIDUtil.java
index b72feec00b2c..3146a3763cce 100644
--- a/api/src/main/java/org/apache/iceberg/util/UUIDUtil.java
+++ b/api/src/main/java/org/apache/iceberg/util/UUIDUtil.java
@@ -20,10 +20,13 @@
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
+import java.security.SecureRandom;
import java.util.UUID;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
public class UUIDUtil {
+ private static final SecureRandom SECURE_RANDOM = new SecureRandom();
+
private UUIDUtil() {}
public static UUID convert(byte[] buf) {
@@ -78,4 +81,32 @@ public static ByteBuffer convertToByteBuffer(UUID value, ByteBuffer reuse) {
buffer.putLong(8, value.getLeastSignificantBits());
return buffer;
}
+
+ /**
+ * Generate a RFC 9562 UUIDv7.
+ *
+ *
Layout: - 48-bit Unix epoch milliseconds - 4-bit version (0b0111) - 12-bit random (rand_a) -
+ * 2-bit variant (RFC 4122, 0b10) - 62-bit random (rand_b)
+ */
+ public static UUID generateUuidV7() {
+ long epochMs = System.currentTimeMillis();
+ Preconditions.checkState(
+ (epochMs >>> 48) == 0, "Invalid timestamp: does not fit within 48 bits: %s", epochMs);
+
+ // Draw 10 random bytes once: 2 bytes for rand_a (12 bits) and 8 bytes for rand_b (62 bits)
+ byte[] randomBytes = new byte[10];
+ SECURE_RANDOM.nextBytes(randomBytes);
+ ByteBuffer rb = ByteBuffer.wrap(randomBytes).order(ByteOrder.BIG_ENDIAN);
+ long randMSB = ((long) rb.getShort()) & 0x0FFFL; // 12 bits
+ long randLSB = rb.getLong() & 0x3FFFFFFFFFFFFFFFL; // 62 bits
+
+ long msb = (epochMs << 16); // place timestamp in the top 48 bits
+ msb |= 0x7000L; // version 7 (UUID bits 48..51)
+ msb |= randMSB; // low 12 bits of MSB
+
+ long lsb = 0x8000000000000000L; // RFC 4122 variant '10'
+ lsb |= randLSB;
+
+ return new UUID(msb, lsb);
+ }
}
diff --git a/api/src/test/java/org/apache/iceberg/encryption/TestEncryptingFileIO.java b/api/src/test/java/org/apache/iceberg/encryption/TestEncryptingFileIO.java
new file mode 100644
index 000000000000..944d89489c0a
--- /dev/null
+++ b/api/src/test/java/org/apache/iceberg/encryption/TestEncryptingFileIO.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.encryption;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.withSettings;
+
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.FileInfo;
+import org.apache.iceberg.io.SupportsPrefixOperations;
+import org.junit.jupiter.api.Test;
+
+public class TestEncryptingFileIO {
+
+ @Test
+ public void delegateEncryptingIOWithAndWithoutMixins() {
+ EncryptionManager em = mock(EncryptionManager.class);
+
+ FileIO fileIONoMixins = mock(FileIO.class);
+ assertThat(EncryptingFileIO.combine(fileIONoMixins, em))
+ .isInstanceOf(EncryptingFileIO.class)
+ .extracting(EncryptingFileIO::encryptionManager)
+ .isEqualTo(em);
+
+ FileIO fileIOWithMixins =
+ mock(FileIO.class, withSettings().extraInterfaces(SupportsPrefixOperations.class));
+ assertThat(EncryptingFileIO.combine(fileIOWithMixins, em))
+ .isInstanceOf(EncryptingFileIO.WithSupportsPrefixOperations.class)
+ .extracting(EncryptingFileIO::encryptionManager)
+ .isEqualTo(em);
+ }
+
+ @Test
+ public void prefixOperationsDelegation() {
+ EncryptionManager em = mock(EncryptionManager.class);
+ SupportsPrefixOperations delegate = mock(SupportsPrefixOperations.class);
+
+ EncryptingFileIO.WithSupportsPrefixOperations fileIO =
+ (EncryptingFileIO.WithSupportsPrefixOperations) EncryptingFileIO.combine(delegate, em);
+
+ String prefix = "prefix";
+ Iterable fileInfos = mock(Iterable.class);
+ when(delegate.listPrefix(prefix)).thenReturn(fileInfos);
+ assertThat(fileIO.listPrefix(prefix)).isEqualTo(fileInfos);
+
+ fileIO.deletePrefix(prefix);
+ verify(delegate).deletePrefix(prefix);
+ }
+}
diff --git a/api/src/test/java/org/apache/iceberg/expressions/TestExpressionUtil.java b/api/src/test/java/org/apache/iceberg/expressions/TestExpressionUtil.java
index ca08951b1f53..d9fe26eacc6b 100644
--- a/api/src/test/java/org/apache/iceberg/expressions/TestExpressionUtil.java
+++ b/api/src/test/java/org/apache/iceberg/expressions/TestExpressionUtil.java
@@ -115,6 +115,16 @@ public void testSanitizeLongIn() {
.as("Sanitized string should be abbreviated")
.isEqualTo("test IN ((2-digit-int), (3-digit-int), ... (8 values hidden, 10 in total))");
+ Object[] tooLongStringsList =
+ IntStream.range(0, ExpressionUtil.LONG_IN_PREDICATE_ABBREVIATION_THRESHOLD + 5)
+ .mapToObj(i -> "string_" + i)
+ .toArray();
+
+ assertThat(ExpressionUtil.toSanitizedString(Expressions.in("test", tooLongStringsList)))
+ .as("Sanitized string should be abbreviated")
+ .isEqualTo(
+ "test IN ((hash-14128790), (hash-1056a62b), (hash-22fd6340), (hash-3f9d20e4), (hash-136200f0), (hash-25fc9033), (hash-681d31e2), (hash-6c1796d4), (hash-382d143e), (hash-272f4e5b), ... (5 values hidden, 15 in total))");
+
// The sanitization resulting in an expression tree does not abbreviate
List expectedValues = Lists.newArrayList();
expectedValues.addAll(Collections.nCopies(5, "(2-digit-int)"));
diff --git a/api/src/test/java/org/apache/iceberg/expressions/TestInclusiveManifestEvaluator.java b/api/src/test/java/org/apache/iceberg/expressions/TestInclusiveManifestEvaluator.java
index 068c862e2bda..6c4944e9cd3a 100644
--- a/api/src/test/java/org/apache/iceberg/expressions/TestInclusiveManifestEvaluator.java
+++ b/api/src/test/java/org/apache/iceberg/expressions/TestInclusiveManifestEvaluator.java
@@ -353,7 +353,7 @@ public void testIntegerLt() {
shouldRead =
ManifestEvaluator.forRowFilter(lessThan("id", INT_MAX_VALUE), SPEC, true).eval(FILE);
- assertThat(shouldRead).as("Should read: may possible ids").isTrue();
+ assertThat(shouldRead).as("Should read: many possible ids").isTrue();
}
@Test
@@ -395,7 +395,7 @@ public void testIntegerGt() {
shouldRead =
ManifestEvaluator.forRowFilter(greaterThan("id", INT_MAX_VALUE - 4), SPEC, true).eval(FILE);
- assertThat(shouldRead).as("Should read: may possible ids").isTrue();
+ assertThat(shouldRead).as("Should read: many possible ids").isTrue();
}
@Test
@@ -418,7 +418,7 @@ public void testIntegerGtEq() {
shouldRead =
ManifestEvaluator.forRowFilter(greaterThanOrEqual("id", INT_MAX_VALUE - 4), SPEC, true)
.eval(FILE);
- assertThat(shouldRead).as("Should read: may possible ids").isTrue();
+ assertThat(shouldRead).as("Should read: many possible ids").isTrue();
}
@Test
diff --git a/api/src/test/java/org/apache/iceberg/expressions/TestInclusiveMetricsEvaluator.java b/api/src/test/java/org/apache/iceberg/expressions/TestInclusiveMetricsEvaluator.java
index 2f4fbf395739..5f0ca2659fbf 100644
--- a/api/src/test/java/org/apache/iceberg/expressions/TestInclusiveMetricsEvaluator.java
+++ b/api/src/test/java/org/apache/iceberg/expressions/TestInclusiveMetricsEvaluator.java
@@ -474,7 +474,7 @@ public void testIntegerLt() {
assertThat(shouldRead).as("Should read: one possible id").isTrue();
shouldRead = new InclusiveMetricsEvaluator(SCHEMA, lessThan("id", INT_MAX_VALUE)).eval(FILE);
- assertThat(shouldRead).as("Should read: may possible ids").isTrue();
+ assertThat(shouldRead).as("Should read: many possible ids").isTrue();
}
@Test
@@ -513,7 +513,7 @@ public void testIntegerGt() {
shouldRead =
new InclusiveMetricsEvaluator(SCHEMA, greaterThan("id", INT_MAX_VALUE - 4)).eval(FILE);
- assertThat(shouldRead).as("Should read: may possible ids").isTrue();
+ assertThat(shouldRead).as("Should read: many possible ids").isTrue();
}
@Test
@@ -535,7 +535,7 @@ public void testIntegerGtEq() {
shouldRead =
new InclusiveMetricsEvaluator(SCHEMA, greaterThanOrEqual("id", INT_MAX_VALUE - 4))
.eval(FILE);
- assertThat(shouldRead).as("Should read: may possible ids").isTrue();
+ assertThat(shouldRead).as("Should read: many possible ids").isTrue();
}
@Test
@@ -970,4 +970,172 @@ public void testNotNullInNestedStruct() {
.as("Should not read: optional_address.optional_street2 is optional")
.isFalse();
}
+
+ @Test
+ public void testNotEqWithSingleValue() {
+ DataFile rangeOfValues =
+ new TestDataFile(
+ "range_of_values.avro",
+ Row.of(),
+ 10,
+ ImmutableMap.of(3, 10L),
+ ImmutableMap.of(3, 0L),
+ ImmutableMap.of(3, 0L),
+ ImmutableMap.of(3, toByteBuffer(StringType.get(), "aaa")),
+ ImmutableMap.of(3, toByteBuffer(StringType.get(), "zzz")));
+
+ boolean shouldRead =
+ new InclusiveMetricsEvaluator(SCHEMA, notEqual("required", "aaa")).eval(rangeOfValues);
+ assertThat(shouldRead)
+ .as("Should read: file has range of values, cannot exclude based on literal")
+ .isTrue();
+
+ DataFile singleValueFile =
+ new TestDataFile(
+ "single_value.avro",
+ Row.of(),
+ 10,
+ ImmutableMap.of(3, 10L),
+ ImmutableMap.of(3, 0L),
+ ImmutableMap.of(3, 0L),
+ ImmutableMap.of(3, toByteBuffer(StringType.get(), "abc")),
+ ImmutableMap.of(3, toByteBuffer(StringType.get(), "abc")));
+
+ shouldRead =
+ new InclusiveMetricsEvaluator(SCHEMA, notEqual("required", "abc")).eval(singleValueFile);
+ assertThat(shouldRead)
+ .as("Should not read: file contains single value equal to literal")
+ .isFalse();
+
+ shouldRead =
+ new InclusiveMetricsEvaluator(SCHEMA, notEqual("required", "def")).eval(singleValueFile);
+ assertThat(shouldRead)
+ .as("Should read: file contains single value not equal to literal")
+ .isTrue();
+
+ DataFile singleValueWithNulls =
+ new TestDataFile(
+ "single_value_nulls.avro",
+ Row.of(),
+ 10,
+ ImmutableMap.of(3, 10L),
+ ImmutableMap.of(3, 2L),
+ ImmutableMap.of(3, 0L),
+ ImmutableMap.of(3, toByteBuffer(StringType.get(), "abc")),
+ ImmutableMap.of(3, toByteBuffer(StringType.get(), "abc")));
+
+ shouldRead =
+ new InclusiveMetricsEvaluator(SCHEMA, notEqual("required", "abc"))
+ .eval(singleValueWithNulls);
+ assertThat(shouldRead).as("Should read: file has nulls which match != predicate").isTrue();
+
+ DataFile singleValueWithNaN =
+ new TestDataFile(
+ "single_value_nan.avro",
+ Row.of(),
+ 10,
+ ImmutableMap.of(9, 10L),
+ ImmutableMap.of(9, 0L),
+ ImmutableMap.of(9, 2L),
+ ImmutableMap.of(9, toByteBuffer(Types.FloatType.get(), 5.0F)),
+ ImmutableMap.of(9, toByteBuffer(Types.FloatType.get(), 5.0F)));
+
+ shouldRead =
+ new InclusiveMetricsEvaluator(SCHEMA, notEqual("no_nans", 5.0F)).eval(singleValueWithNaN);
+ assertThat(shouldRead).as("Should read: file has NaN values which match != predicate").isTrue();
+
+ DataFile singleValueNaNBounds =
+ new TestDataFile(
+ "single_value_nan_bounds.avro",
+ Row.of(),
+ 10,
+ ImmutableMap.of(9, 10L),
+ ImmutableMap.of(9, 0L),
+ ImmutableMap.of(9, 0L),
+ ImmutableMap.of(9, toByteBuffer(Types.FloatType.get(), Float.NaN)),
+ ImmutableMap.of(9, toByteBuffer(Types.FloatType.get(), Float.NaN)));
+
+ shouldRead =
+ new InclusiveMetricsEvaluator(SCHEMA, notEqual("no_nans", 5.0F)).eval(singleValueNaNBounds);
+ assertThat(shouldRead).as("Should read: bounds are NaN").isTrue();
+ }
+
+ @Test
+ public void testNotInWithSingleValue() {
+ DataFile rangeOfValues =
+ new TestDataFile(
+ "range_of_values.avro",
+ Row.of(),
+ 10,
+ ImmutableMap.of(3, 10L),
+ ImmutableMap.of(3, 0L),
+ ImmutableMap.of(3, 0L),
+ ImmutableMap.of(3, toByteBuffer(StringType.get(), "aaa")),
+ ImmutableMap.of(3, toByteBuffer(StringType.get(), "zzz")));
+
+ boolean shouldRead =
+ new InclusiveMetricsEvaluator(SCHEMA, notIn("required", "aaa", "bbb")).eval(rangeOfValues);
+ assertThat(shouldRead)
+ .as("Should read: file has range of values, cannot exclude based on literal")
+ .isTrue();
+
+ DataFile singleValueFile =
+ new TestDataFile(
+ "single_value.avro",
+ Row.of(),
+ 10,
+ ImmutableMap.of(3, 10L),
+ ImmutableMap.of(3, 0L),
+ ImmutableMap.of(3, 0L),
+ ImmutableMap.of(3, toByteBuffer(StringType.get(), "abc")),
+ ImmutableMap.of(3, toByteBuffer(StringType.get(), "abc")));
+
+ shouldRead =
+ new InclusiveMetricsEvaluator(SCHEMA, notIn("required", "abc", "def"))
+ .eval(singleValueFile);
+ assertThat(shouldRead)
+ .as("Should not read: file contains single value in exclusion list")
+ .isFalse();
+
+ shouldRead =
+ new InclusiveMetricsEvaluator(SCHEMA, notIn("required", "def", "ghi"))
+ .eval(singleValueFile);
+ assertThat(shouldRead)
+ .as("Should read: file contains single value not in exclusion list")
+ .isTrue();
+
+ DataFile singleValueWithNulls =
+ new TestDataFile(
+ "single_value_nulls.avro",
+ Row.of(),
+ 10,
+ ImmutableMap.of(3, 10L),
+ ImmutableMap.of(3, 2L),
+ ImmutableMap.of(3, 0L),
+ ImmutableMap.of(3, toByteBuffer(StringType.get(), "abc")),
+ ImmutableMap.of(3, toByteBuffer(StringType.get(), "abc")));
+
+ shouldRead =
+ new InclusiveMetricsEvaluator(SCHEMA, notIn("required", "abc", "def"))
+ .eval(singleValueWithNulls);
+ assertThat(shouldRead).as("Should read: file has nulls which match NOT IN predicate").isTrue();
+
+ DataFile singleValueWithNaN =
+ new TestDataFile(
+ "single_value_nan.avro",
+ Row.of(),
+ 10,
+ ImmutableMap.of(9, 10L),
+ ImmutableMap.of(9, 0L),
+ ImmutableMap.of(9, 2L),
+ ImmutableMap.of(9, toByteBuffer(Types.FloatType.get(), 5.0F)),
+ ImmutableMap.of(9, toByteBuffer(Types.FloatType.get(), 5.0F)));
+
+ shouldRead =
+ new InclusiveMetricsEvaluator(SCHEMA, notIn("no_nans", 5.0F, 10.0F))
+ .eval(singleValueWithNaN);
+ assertThat(shouldRead)
+ .as("Should read: file has NaN values which match NOT IN predicate")
+ .isTrue();
+ }
}
diff --git a/api/src/test/java/org/apache/iceberg/util/TestCharSequenceSet.java b/api/src/test/java/org/apache/iceberg/util/TestCharSequenceSet.java
index 324742c07a2d..093d2a0c6b87 100644
--- a/api/src/test/java/org/apache/iceberg/util/TestCharSequenceSet.java
+++ b/api/src/test/java/org/apache/iceberg/util/TestCharSequenceSet.java
@@ -19,10 +19,12 @@
package org.apache.iceberg.util;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
import java.util.Arrays;
import java.util.Collections;
import java.util.Set;
+import org.apache.iceberg.TestHelpers;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.junit.jupiter.api.Test;
@@ -42,15 +44,115 @@ public void testSearchingInCharSequenceCollection() {
@Test
public void nullString() {
- assertThat(CharSequenceSet.of(Arrays.asList((String) null))).contains((String) null);
+ assertThatThrownBy(() -> CharSequenceSet.of(Arrays.asList((String) null)))
+ .isInstanceOf(NullPointerException.class)
+ .hasMessage("Invalid object: null");
assertThat(CharSequenceSet.empty()).doesNotContain((String) null);
}
+ @Test
+ public void emptySet() {
+ assertThat(CharSequenceSet.empty()).isEmpty();
+ assertThat(CharSequenceSet.empty()).doesNotContain("a", "b", "c");
+ }
+
+ @Test
+ public void insertionOrderIsMaintained() {
+ CharSequenceSet set = CharSequenceSet.empty();
+ set.addAll(ImmutableList.of("d", "a", "c"));
+ set.add("b");
+ set.add("d");
+
+ assertThat(set).hasSize(4).containsExactly("d", "a", "c", "b");
+ }
+
+ @Test
+ public void clear() {
+ CharSequenceSet set = CharSequenceSet.of(ImmutableList.of("a", "b"));
+ set.clear();
+ assertThat(set).isEmpty();
+ }
+
+ @Test
+ public void addAll() {
+ CharSequenceSet empty = CharSequenceSet.empty();
+ assertThatThrownBy(() -> empty.add(null))
+ .isInstanceOf(NullPointerException.class)
+ .hasMessage("Invalid object: null");
+
+ assertThatThrownBy(() -> empty.addAll(null))
+ .isInstanceOf(NullPointerException.class)
+ .hasMessage("Invalid collection: null");
+
+ assertThatThrownBy(() -> empty.addAll(Collections.singletonList(null)))
+ .isInstanceOf(NullPointerException.class)
+ .hasMessage("Invalid object: null");
+
+ assertThatThrownBy(() -> empty.addAll(Arrays.asList("a", null)))
+ .isInstanceOf(NullPointerException.class)
+ .hasMessage("Invalid object: null");
+
+ CharSequenceSet set = CharSequenceSet.empty();
+ set.addAll(ImmutableList.of("b", "a", "c", "a"));
+ assertThat(set).hasSize(3).containsExactly("b", "a", "c");
+ }
+
+ @Test
+ public void contains() {
+ CharSequenceSet set = CharSequenceSet.of(ImmutableList.of("b", "a"));
+ assertThatThrownBy(() -> set.contains(null))
+ .isInstanceOf(NullPointerException.class)
+ .hasMessage("Invalid object: null");
+
+ assertThat(set).hasSize(2).containsExactly("b", "a").doesNotContain("c").doesNotContain("d");
+
+ assertThatThrownBy(() -> CharSequenceSet.of(Arrays.asList("c", "b", null, "a")))
+ .isInstanceOf(NullPointerException.class)
+ .hasMessage("Invalid object: null");
+ }
+
+ @Test
+ public void containsAll() {
+ CharSequenceSet set = CharSequenceSet.of(ImmutableList.of("b", "a"));
+ assertThatThrownBy(() -> set.containsAll(null))
+ .isInstanceOf(NullPointerException.class)
+ .hasMessage("Invalid collection: null");
+
+ assertThatThrownBy(() -> set.containsAll(Collections.singletonList(null)))
+ .isInstanceOf(NullPointerException.class)
+ .hasMessage("Invalid object: null");
+
+ assertThatThrownBy(() -> set.containsAll(Arrays.asList("a", null)))
+ .isInstanceOf(NullPointerException.class)
+ .hasMessage("Invalid object: null");
+
+ assertThat(set.containsAll(ImmutableList.of("a", "b"))).isTrue();
+ assertThat(set.containsAll(ImmutableList.of("b", "a", "c"))).isFalse();
+ assertThat(set.containsAll(ImmutableList.of("b"))).isTrue();
+ }
+
@Test
public void testRetainAll() {
+ CharSequenceSet empty = CharSequenceSet.empty();
+ assertThatThrownBy(() -> empty.retainAll(null))
+ .isInstanceOf(NullPointerException.class)
+ .hasMessage("Invalid collection: null");
+
+ assertThatThrownBy(() -> empty.retainAll(Collections.singletonList(null)))
+ .isInstanceOf(NullPointerException.class)
+ .hasMessage("Invalid object: null");
+
+ assertThatThrownBy(() -> empty.retainAll(Arrays.asList("123", null)))
+ .isInstanceOf(NullPointerException.class)
+ .hasMessage("Invalid object: null");
+
+ assertThatThrownBy(() -> empty.retainAll(ImmutableList.of("456", "789", 123)))
+ .isInstanceOf(ClassCastException.class)
+ .hasMessage("Cannot cast java.lang.Integer to java.lang.CharSequence");
+
CharSequenceSet set = CharSequenceSet.of(ImmutableList.of("123", "456"));
- assertThat(set.retainAll(ImmutableList.of("456", "789", 123)))
+ assertThat(set.retainAll(ImmutableList.of("456", "789", "555")))
.overridingErrorMessage("Set should be changed")
.isTrue();
@@ -61,24 +163,74 @@ public void testRetainAll() {
.overridingErrorMessage("Set should not be changed")
.isFalse();
- assertThat(set.retainAll(ImmutableList.of(123, 456)))
+ assertThat(set.retainAll(ImmutableList.of("555", "789")))
.overridingErrorMessage("Set should be changed")
.isTrue();
assertThat(set).isEmpty();
}
+ @Test
+ public void toArray() {
+ CharSequenceSet set = CharSequenceSet.of(ImmutableList.of("b", "a"));
+ assertThat(set.toArray()).hasSize(2).containsExactly("b", "a");
+
+ CharSequence[] array = new CharSequence[1];
+ assertThat(set.toArray(array)).hasSize(2).containsExactly("b", "a");
+
+ array = new CharSequence[0];
+ assertThat(set.toArray(array)).hasSize(2).containsExactly("b", "a");
+
+ array = new CharSequence[5];
+ assertThat(set.toArray(array)).hasSize(5).containsExactly("b", "a", null, null, null);
+
+ array = new CharSequence[2];
+ assertThat(set.toArray(array)).hasSize(2).containsExactly("b", "a");
+ }
+
+ @Test
+ public void remove() {
+ CharSequenceSet set = CharSequenceSet.of(ImmutableSet.of("a", "b", "c"));
+ assertThatThrownBy(() -> set.remove(null))
+ .isInstanceOf(NullPointerException.class)
+ .hasMessage("Invalid object: null");
+
+ set.remove("a");
+ assertThat(set).containsExactly("b", "c");
+ set.remove("b");
+ assertThat(set).containsExactly("c");
+ set.remove("c");
+ assertThat(set).isEmpty();
+ }
+
@Test
public void testRemoveAll() {
+ CharSequenceSet empty = CharSequenceSet.empty();
+ assertThatThrownBy(() -> empty.removeAll(null))
+ .isInstanceOf(NullPointerException.class)
+ .hasMessage("Invalid collection: null");
+
+ assertThatThrownBy(() -> empty.removeAll(Collections.singletonList(null)))
+ .isInstanceOf(NullPointerException.class)
+ .hasMessage("Invalid object: null");
+
+ assertThatThrownBy(() -> empty.removeAll(Arrays.asList("123", null)))
+ .isInstanceOf(NullPointerException.class)
+ .hasMessage("Invalid object: null");
+
+ assertThatThrownBy(() -> empty.removeAll(ImmutableList.of("123", 456)))
+ .isInstanceOf(ClassCastException.class)
+ .hasMessage("Cannot cast java.lang.Integer to java.lang.CharSequence");
+
CharSequenceSet set = CharSequenceSet.of(ImmutableList.of("123", "456"));
- assertThat(set.removeAll(ImmutableList.of("456", "789", 123)))
+ assertThat(set.removeAll(ImmutableList.of("456", "789")))
.overridingErrorMessage("Set should be changed")
.isTrue();
assertThat(set).hasSize(1).contains("123");
set = CharSequenceSet.of(ImmutableList.of("123", "456"));
- assertThat(set.removeAll(ImmutableList.of(123, 456)))
+ assertThat(set.removeAll(ImmutableList.of("333", "789")))
.overridingErrorMessage("Set should not be changed")
.isFalse();
@@ -119,4 +271,17 @@ public void testEqualsAndHashCode() {
.isEqualTo(set3.hashCode())
.isEqualTo(set4.hashCode());
}
+
+ @Test
+ public void kryoSerialization() throws Exception {
+ CharSequenceSet charSequences = CharSequenceSet.of(ImmutableList.of("c", "b", "a"));
+ assertThat(TestHelpers.KryoHelpers.roundTripSerialize(charSequences)).isEqualTo(charSequences);
+ }
+
+ @Test
+ public void javaSerialization() throws Exception {
+ CharSequenceSet charSequences = CharSequenceSet.of(ImmutableList.of("c", "b", "a"));
+ CharSequenceSet deserialize = TestHelpers.deserialize(TestHelpers.serialize(charSequences));
+ assertThat(deserialize).isEqualTo(charSequences);
+ }
}
diff --git a/api/src/test/java/org/apache/iceberg/util/TestUUIDUtil.java b/api/src/test/java/org/apache/iceberg/util/TestUUIDUtil.java
new file mode 100644
index 000000000000..c5f85c2f20b3
--- /dev/null
+++ b/api/src/test/java/org/apache/iceberg/util/TestUUIDUtil.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.util;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.UUID;
+import org.junit.jupiter.api.Test;
+
+public class TestUUIDUtil {
+
+ @Test
+ public void uuidV7HasVersionAndVariant() {
+ UUID uuid = UUIDUtil.generateUuidV7();
+ assertThat(uuid.version()).isEqualTo(7);
+ assertThat(uuid.variant()).isEqualTo(2);
+ }
+}
diff --git a/aws/src/main/java/org/apache/iceberg/aws/ApacheHttpClientConfigurations.java b/aws/src/main/java/org/apache/iceberg/aws/ApacheHttpClientConfigurations.java
index 95fe34b742ff..3445928d1551 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/ApacheHttpClientConfigurations.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/ApacheHttpClientConfigurations.java
@@ -21,13 +21,16 @@
import java.net.URI;
import java.time.Duration;
import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.util.PropertyUtil;
-import software.amazon.awssdk.awscore.client.builder.AwsSyncClientBuilder;
+import software.amazon.awssdk.http.SdkHttpClient;
import software.amazon.awssdk.http.apache.ApacheHttpClient;
import software.amazon.awssdk.http.apache.ProxyConfiguration;
-class ApacheHttpClientConfigurations {
+class ApacheHttpClientConfigurations extends BaseHttpClientConfigurations {
private Long connectionTimeoutMs;
private Long socketTimeoutMs;
private Long acquisitionTimeoutMs;
@@ -41,10 +44,11 @@ class ApacheHttpClientConfigurations {
private ApacheHttpClientConfigurations() {}
- public void configureHttpClientBuilder(T awsClientBuilder) {
- ApacheHttpClient.Builder apacheHttpClientBuilder = ApacheHttpClient.builder();
+ @Override
+ protected SdkHttpClient buildHttpClient() {
+ final ApacheHttpClient.Builder apacheHttpClientBuilder = ApacheHttpClient.builder();
configureApacheHttpClientBuilder(apacheHttpClientBuilder);
- awsClientBuilder.httpClientBuilder(apacheHttpClientBuilder);
+ return apacheHttpClientBuilder.build();
}
private void initialize(Map httpClientProperties) {
@@ -115,6 +119,31 @@ void configureApacheHttpClientBuilder(ApacheHttpClient.Builder apacheHttpClientB
}
}
+ /**
+ * Generate a cache key based on HTTP client configuration. This ensures clients with identical
+ * configurations share the same HTTP client instance.
+ */
+ @Override
+ protected String generateHttpClientCacheKey() {
+ Map keyComponents = Maps.newTreeMap();
+
+ keyComponents.put("type", "apache");
+ keyComponents.put("connectionTimeoutMs", connectionTimeoutMs);
+ keyComponents.put("socketTimeoutMs", socketTimeoutMs);
+ keyComponents.put("acquisitionTimeoutMs", acquisitionTimeoutMs);
+ keyComponents.put("connectionMaxIdleTimeMs", connectionMaxIdleTimeMs);
+ keyComponents.put("connectionTimeToLiveMs", connectionTimeToLiveMs);
+ keyComponents.put("expectContinueEnabled", expectContinueEnabled);
+ keyComponents.put("maxConnections", maxConnections);
+ keyComponents.put("tcpKeepAliveEnabled", tcpKeepAliveEnabled);
+ keyComponents.put("useIdleConnectionReaperEnabled", useIdleConnectionReaperEnabled);
+ keyComponents.put("proxyEndpoint", proxyEndpoint);
+
+ return keyComponents.entrySet().stream()
+ .map(entry -> entry.getKey() + "=" + Objects.toString(entry.getValue(), "null"))
+ .collect(Collectors.joining(",", "apache[", "]"));
+ }
+
public static ApacheHttpClientConfigurations create(Map properties) {
ApacheHttpClientConfigurations configurations = new ApacheHttpClientConfigurations();
configurations.initialize(properties);
diff --git a/aws/src/main/java/org/apache/iceberg/aws/AwsClientProperties.java b/aws/src/main/java/org/apache/iceberg/aws/AwsClientProperties.java
index cf73e80f44c1..056959df2301 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/AwsClientProperties.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/AwsClientProperties.java
@@ -20,6 +20,7 @@
import java.io.Serializable;
import java.util.Map;
+import java.util.function.Predicate;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.aws.s3.VendedCredentialsProvider;
import org.apache.iceberg.common.DynClasses;
@@ -28,7 +29,6 @@
import org.apache.iceberg.relocated.com.google.common.base.Strings;
import org.apache.iceberg.rest.RESTUtil;
import org.apache.iceberg.util.PropertyUtil;
-import org.apache.iceberg.util.SerializableMap;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
@@ -97,7 +97,6 @@ public class AwsClientProperties implements Serializable {
private final String refreshCredentialsEndpoint;
private final boolean refreshCredentialsEnabled;
private final boolean legacyMd5pluginEnabled;
- private final Map allProperties;
public AwsClientProperties() {
this.clientRegion = null;
@@ -106,15 +105,18 @@ public AwsClientProperties() {
this.refreshCredentialsEndpoint = null;
this.refreshCredentialsEnabled = true;
this.legacyMd5pluginEnabled = false;
- this.allProperties = null;
}
public AwsClientProperties(Map properties) {
- this.allProperties = SerializableMap.copyOf(properties);
this.clientRegion = properties.get(CLIENT_REGION);
this.clientCredentialsProvider = properties.get(CLIENT_CREDENTIALS_PROVIDER);
+ // Retain all non-prefixed properties and override with prefixed properties
this.clientCredentialsProviderProperties =
- PropertyUtil.propertiesWithPrefix(properties, CLIENT_CREDENTIAL_PROVIDER_PREFIX);
+ PropertyUtil.mergeProperties(
+ PropertyUtil.filterProperties(
+ properties,
+ Predicate.not(property -> property.startsWith(CLIENT_CREDENTIAL_PROVIDER_PREFIX))),
+ PropertyUtil.propertiesWithPrefix(properties, CLIENT_CREDENTIAL_PROVIDER_PREFIX));
this.refreshCredentialsEndpoint =
RESTUtil.resolveEndpoint(
properties.get(CatalogProperties.URI), properties.get(REFRESH_CREDENTIALS_ENDPOINT));
@@ -211,7 +213,6 @@ public void applyClientCredentialConfigurati
public AwsCredentialsProvider credentialsProvider(
String accessKeyId, String secretAccessKey, String sessionToken) {
if (refreshCredentialsEnabled && !Strings.isNullOrEmpty(refreshCredentialsEndpoint)) {
- clientCredentialsProviderProperties.putAll(allProperties);
clientCredentialsProviderProperties.put(
VendedCredentialsProvider.URI, refreshCredentialsEndpoint);
return credentialsProvider(VendedCredentialsProvider.class.getName());
diff --git a/aws/src/main/java/org/apache/iceberg/aws/BaseHttpClientConfigurations.java b/aws/src/main/java/org/apache/iceberg/aws/BaseHttpClientConfigurations.java
new file mode 100644
index 000000000000..d4301f04487e
--- /dev/null
+++ b/aws/src/main/java/org/apache/iceberg/aws/BaseHttpClientConfigurations.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.aws;
+
+import software.amazon.awssdk.awscore.client.builder.AwsSyncClientBuilder;
+import software.amazon.awssdk.http.SdkHttpClient;
+
+/**
+ * Base class for HTTP client configurations that provides managed HTTP client lifecycle with
+ * reference counting.
+ *
+ *
This class encapsulates the interaction with {@link HttpClientCache} to ensure HTTP clients
+ * are properly shared and their lifecycle managed via reference counting. Subclasses are
+ * responsible for providing configuration-specific cache keys and building the appropriate HTTP
+ * client type (Apache, UrlConnection, etc.).
+ */
+abstract class BaseHttpClientConfigurations {
+
+ private static final HttpClientCache CACHE = HttpClientCache.instance();
+
+ /**
+ * Generate a unique cache key based on the HTTP client configuration. The cache key is used to
+ * determine whether HTTP clients can be shared across different factory instances.
+ *
+ *
Implementations should include all configuration parameters that affect HTTP client behavior
+ * (timeouts, connection settings, proxy configuration, etc.) to ensure clients are only shared
+ * when they have identical configurations.
+ *
+ * @return a unique string representing this HTTP client configuration
+ */
+ protected abstract String generateHttpClientCacheKey();
+
+ /**
+ * Build the actual HTTP client instance based on the configuration. This method is called only
+ * when a new HTTP client needs to be created (i.e., when no cached client exists for the given
+ * cache key).
+ *
+ * @return a configured {@link SdkHttpClient} instance
+ */
+ protected abstract SdkHttpClient buildHttpClient();
+
+ /**
+ * Configure the AWS client builder with a managed HTTP client.
+ *
+ *
This method obtains a managed HTTP client from the cache using the configuration-specific
+ * cache key. If a client with the same configuration already exists in the cache, it will be
+ * reused with an incremented reference count. Otherwise, a new client will be built and cached.
+ *
+ * @param awsClientBuilder the AWS client builder to configure
+ * @param the type of AWS client builder
+ */
+ public void configureHttpClientBuilder(T awsClientBuilder) {
+ String cacheKey = generateHttpClientCacheKey();
+
+ SdkHttpClient managedHttpClient = CACHE.getOrCreateClient(cacheKey, this::buildHttpClient);
+
+ awsClientBuilder.httpClient(managedHttpClient);
+ }
+}
diff --git a/aws/src/main/java/org/apache/iceberg/aws/HttpClientCache.java b/aws/src/main/java/org/apache/iceberg/aws/HttpClientCache.java
new file mode 100644
index 000000000000..79444a62aee1
--- /dev/null
+++ b/aws/src/main/java/org/apache/iceberg/aws/HttpClientCache.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.aws;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.Supplier;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.http.ExecutableHttpRequest;
+import software.amazon.awssdk.http.HttpExecuteRequest;
+import software.amazon.awssdk.http.SdkHttpClient;
+
+/**
+ * A cache that manages the lifecycle of shared HTTP clients for AWS SDK v2 using reference
+ * counting. Package-private - only accessed via {@link BaseHttpClientConfigurations}.
+ */
+final class HttpClientCache {
+ private static final Logger LOG = LoggerFactory.getLogger(HttpClientCache.class);
+
+ private final ConcurrentMap clients = Maps.newConcurrentMap();
+ private static volatile HttpClientCache instance;
+
+ static HttpClientCache instance() {
+ if (instance == null) {
+ synchronized (HttpClientCache.class) {
+ if (instance == null) {
+ instance = new HttpClientCache();
+ }
+ }
+ }
+ return instance;
+ }
+
+ /**
+ * Get or create a managed HTTP client for the given configuration. Each call increments the
+ * reference count for the client and returns a ref-counted wrapper.
+ *
+ * @param clientKey unique key identifying the client configuration
+ * @param clientFactory factory to create the HTTP client if not cached
+ * @return a ref-counted HTTP client wrapper
+ */
+ SdkHttpClient getOrCreateClient(String clientKey, Supplier clientFactory) {
+ ManagedHttpClient managedClient =
+ clients.computeIfAbsent(
+ clientKey,
+ key -> {
+ LOG.debug("Creating new managed HTTP client for key: {}", key);
+ SdkHttpClient httpClient = clientFactory.get();
+ return new ManagedHttpClient(httpClient, key);
+ });
+ // Return the cached ref-counted wrapper
+ return managedClient.acquire();
+ }
+
+ /**
+ * Release a reference to the HTTP client. When the reference count reaches zero, the client is
+ * closed and removed from the cache.
+ *
+ * @param clientKey the key identifying the client to release
+ */
+ void releaseClient(String clientKey) {
+ ManagedHttpClient managedClient = clients.get(clientKey);
+ if (null != managedClient && managedClient.release()) {
+ clients.remove(clientKey, managedClient);
+ }
+ }
+
+ @VisibleForTesting
+ Map clients() {
+ return Collections.unmodifiableMap(clients);
+ }
+
+ @VisibleForTesting
+ void clear() {
+ clients.values().forEach(ManagedHttpClient::close);
+ clients.clear();
+ }
+
+ /**
+ * Managed HTTP client wrapper that provides reference counting for lifecycle management. The HTTP
+ * client is closed when the reference count reaches zero.
+ */
+ static class ManagedHttpClient implements SdkHttpClient {
+ private final SdkHttpClient httpClient;
+ private final String clientKey;
+ private volatile int refCount = 0;
+ private boolean closed = false;
+
+ ManagedHttpClient(SdkHttpClient httpClient, String clientKey) {
+ this.httpClient = httpClient;
+ this.clientKey = clientKey;
+ LOG.debug("Created managed HTTP client: key={}", clientKey);
+ }
+
+ /**
+ * Acquire a reference to the HTTP client, incrementing the reference count.
+ *
+ * @return the ref-counted wrapper client
+ * @throws IllegalStateException if the client has already been closed
+ */
+ synchronized ManagedHttpClient acquire() {
+ if (closed) {
+ throw new IllegalStateException("Cannot acquire closed HTTP client: " + clientKey);
+ }
+ refCount++;
+ LOG.debug("Acquired HTTP client: key={}, refCount={}", clientKey, refCount);
+ return this;
+ }
+
+ /**
+ * Release a reference to the HTTP client, decrementing the reference count. If the count
+ * reaches zero, the client is closed.
+ *
+ * @return true if the client was closed, false otherwise
+ */
+ synchronized boolean release() {
+ if (closed) {
+ LOG.warn("Attempted to release already closed HTTP client: key={}", clientKey);
+ return false;
+ }
+
+ refCount--;
+ LOG.debug("Released HTTP client: key={}, refCount={}", clientKey, refCount);
+ if (refCount == 0) {
+ return closeHttpClient();
+ } else if (refCount < 0) {
+ LOG.warn(
+ "HTTP client reference count went negative key={}, refCount={}", clientKey, refCount);
+ refCount = 0;
+ }
+ return false;
+ }
+
+ @VisibleForTesting
+ SdkHttpClient httpClient() {
+ return httpClient;
+ }
+
+ /**
+ * Close the HTTP client if not already closed.
+ *
+ * @return true if the client was closed by this call, false if already closed
+ */
+ private boolean closeHttpClient() {
+ if (!closed) {
+ closed = true;
+ LOG.debug("Closing HTTP client: key={}", clientKey);
+ try {
+ httpClient.close();
+ } catch (Exception e) {
+ LOG.error("Failed to close HTTP client: key={}", clientKey, e);
+ }
+ return true;
+ }
+ return false;
+ }
+
+ @VisibleForTesting
+ int refCount() {
+ return refCount;
+ }
+
+ @VisibleForTesting
+ boolean isClosed() {
+ return closed;
+ }
+
+ @Override
+ public ExecutableHttpRequest prepareRequest(HttpExecuteRequest request) {
+ return httpClient.prepareRequest(request);
+ }
+
+ @Override
+ public String clientName() {
+ return httpClient.clientName();
+ }
+
+ @Override
+ public void close() {
+ HttpClientCache.instance().releaseClient(clientKey);
+ }
+ }
+}
diff --git a/aws/src/main/java/org/apache/iceberg/aws/UrlConnectionHttpClientConfigurations.java b/aws/src/main/java/org/apache/iceberg/aws/UrlConnectionHttpClientConfigurations.java
index ff8dafcf0645..273baa674804 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/UrlConnectionHttpClientConfigurations.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/UrlConnectionHttpClientConfigurations.java
@@ -21,13 +21,16 @@
import java.net.URI;
import java.time.Duration;
import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.util.PropertyUtil;
-import software.amazon.awssdk.awscore.client.builder.AwsSyncClientBuilder;
+import software.amazon.awssdk.http.SdkHttpClient;
import software.amazon.awssdk.http.urlconnection.ProxyConfiguration;
import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient;
-class UrlConnectionHttpClientConfigurations {
+class UrlConnectionHttpClientConfigurations extends BaseHttpClientConfigurations {
private Long httpClientUrlConnectionConnectionTimeoutMs;
private Long httpClientUrlConnectionSocketTimeoutMs;
@@ -35,11 +38,12 @@ class UrlConnectionHttpClientConfigurations {
private UrlConnectionHttpClientConfigurations() {}
- public void configureHttpClientBuilder(T awsClientBuilder) {
- UrlConnectionHttpClient.Builder urlConnectionHttpClientBuilder =
+ @Override
+ protected SdkHttpClient buildHttpClient() {
+ final UrlConnectionHttpClient.Builder urlConnectionHttpClientBuilder =
UrlConnectionHttpClient.builder();
configureUrlConnectionHttpClientBuilder(urlConnectionHttpClientBuilder);
- awsClientBuilder.httpClientBuilder(urlConnectionHttpClientBuilder);
+ return urlConnectionHttpClientBuilder.build();
}
private void initialize(Map httpClientProperties) {
@@ -71,6 +75,24 @@ void configureUrlConnectionHttpClientBuilder(
}
}
+ /**
+ * Generate a cache key based on HTTP client configuration. This ensures clients with identical
+ * configurations share the same HTTP client instance.
+ */
+ @Override
+ protected String generateHttpClientCacheKey() {
+ Map keyComponents = Maps.newTreeMap(); // TreeMap for consistent ordering
+
+ keyComponents.put("type", "urlconnection");
+ keyComponents.put("connectionTimeoutMs", httpClientUrlConnectionConnectionTimeoutMs);
+ keyComponents.put("socketTimeoutMs", httpClientUrlConnectionSocketTimeoutMs);
+ keyComponents.put("proxyEndpoint", proxyEndpoint);
+
+ return keyComponents.entrySet().stream()
+ .map(entry -> entry.getKey() + "=" + Objects.toString(entry.getValue(), "null"))
+ .collect(Collectors.joining(",", "urlconnection[", "]"));
+ }
+
public static UrlConnectionHttpClientConfigurations create(
Map httpClientProperties) {
UrlConnectionHttpClientConfigurations configurations =
diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/VendedCredentialsProvider.java b/aws/src/main/java/org/apache/iceberg/aws/s3/VendedCredentialsProvider.java
index fc42bd789859..9afa28635c41 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/s3/VendedCredentialsProvider.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/s3/VendedCredentialsProvider.java
@@ -29,6 +29,7 @@
import org.apache.iceberg.relocated.com.google.common.base.Strings;
import org.apache.iceberg.rest.ErrorHandlers;
import org.apache.iceberg.rest.HTTPClient;
+import org.apache.iceberg.rest.RESTCatalogProperties;
import org.apache.iceberg.rest.RESTClient;
import org.apache.iceberg.rest.auth.AuthManager;
import org.apache.iceberg.rest.auth.AuthManagers;
@@ -50,6 +51,7 @@ public class VendedCredentialsProvider implements AwsCredentialsProvider, SdkAut
private final CachedSupplier credentialCache;
private final String catalogEndpoint;
private final String credentialsEndpoint;
+ private final String planId;
private AuthManager authManager;
private AuthSession authSession;
@@ -65,6 +67,7 @@ private VendedCredentialsProvider(Map properties) {
.build();
this.catalogEndpoint = properties.get(CatalogProperties.URI);
this.credentialsEndpoint = properties.get(URI);
+ this.planId = properties.getOrDefault(RESTCatalogProperties.REST_SCAN_PLAN_ID, null);
}
@Override
@@ -103,7 +106,7 @@ private LoadCredentialsResponse fetchCredentials() {
return httpClient()
.get(
credentialsEndpoint,
- null,
+ null != planId ? Map.of("planId", planId) : null,
LoadCredentialsResponse.class,
Map.of(),
ErrorHandlers.defaultErrorHandler());
diff --git a/aws/src/main/resources/s3-signer-open-api.yaml b/aws/src/main/resources/s3-signer-open-api.yaml
index 1e4dafa6c1b7..3d719c515b2a 100644
--- a/aws/src/main/resources/s3-signer-open-api.yaml
+++ b/aws/src/main/resources/s3-signer-open-api.yaml
@@ -88,9 +88,9 @@ paths:
5XX:
$ref: '../../../../open-api/rest-catalog-open-api.yaml#/components/responses/ServerErrorResponse'
- ##############################
- # Application Schema Objects #
- ##############################
+##############################
+# Application Schema Objects #
+##############################
components:
schemas:
diff --git a/aws/src/test/java/org/apache/iceberg/aws/TestAwsClientProperties.java b/aws/src/test/java/org/apache/iceberg/aws/TestAwsClientProperties.java
index 8c177ab5274e..cac8d892badb 100644
--- a/aws/src/test/java/org/apache/iceberg/aws/TestAwsClientProperties.java
+++ b/aws/src/test/java/org/apache/iceberg/aws/TestAwsClientProperties.java
@@ -180,7 +180,7 @@ public void refreshCredentialsEndpointWithOverridingOAuthToken() {
Map properties =
ImmutableMap.of(
CatalogProperties.URI,
- "http://localhost:1234/v1",
+ "http://localhost:1234/v1/catalog",
AwsClientProperties.REFRESH_CREDENTIALS_ENDPOINT,
"http://localhost:1234/v1/credentials",
OAuth2Properties.TOKEN,
@@ -189,17 +189,22 @@ public void refreshCredentialsEndpointWithOverridingOAuthToken() {
"specific-token");
AwsClientProperties awsClientProperties = new AwsClientProperties(properties);
- Map expectedProperties =
- ImmutableMap.builder()
- .putAll(properties)
- .put("credentials.uri", "http://localhost:1234/v1/credentials")
- .build();
-
AwsCredentialsProvider provider =
awsClientProperties.credentialsProvider("key", "secret", "token");
assertThat(provider).isInstanceOf(VendedCredentialsProvider.class);
VendedCredentialsProvider vendedCredentialsProvider = (VendedCredentialsProvider) provider;
- assertThat(vendedCredentialsProvider).extracting("properties").isEqualTo(expectedProperties);
+ assertThat(vendedCredentialsProvider)
+ .extracting("properties")
+ .isEqualTo(
+ ImmutableMap.of(
+ AwsClientProperties.REFRESH_CREDENTIALS_ENDPOINT,
+ "http://localhost:1234/v1/credentials",
+ "credentials.uri",
+ "http://localhost:1234/v1/credentials",
+ CatalogProperties.URI,
+ "http://localhost:1234/v1/catalog",
+ OAuth2Properties.TOKEN,
+ "specific-token"));
}
@Test
diff --git a/aws/src/test/java/org/apache/iceberg/aws/TestHttpClientCache.java b/aws/src/test/java/org/apache/iceberg/aws/TestHttpClientCache.java
new file mode 100644
index 000000000000..9febf37cbd39
--- /dev/null
+++ b/aws/src/test/java/org/apache/iceberg/aws/TestHttpClientCache.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.aws;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Map;
+import java.util.function.Supplier;
+import org.apache.iceberg.aws.HttpClientCache.ManagedHttpClient;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import software.amazon.awssdk.http.SdkHttpClient;
+
+public class TestHttpClientCache {
+
+ @Mock private SdkHttpClient httpClient1;
+ @Mock private SdkHttpClient httpClient2;
+ @Mock private Supplier httpClientFactory1;
+ @Mock private Supplier httpClientFactory2;
+
+ private HttpClientCache cache;
+
+ @BeforeEach
+ public void before() {
+ MockitoAnnotations.openMocks(this);
+ cache = HttpClientCache.instance();
+ // Clean up any existing clients from previous tests
+ cache.clear();
+
+ when(httpClientFactory1.get()).thenReturn(httpClient1);
+ when(httpClientFactory2.get()).thenReturn(httpClient2);
+ }
+
+ @Test
+ public void singletonPattern() {
+ HttpClientCache instance1 = HttpClientCache.instance();
+ HttpClientCache instance2 = HttpClientCache.instance();
+
+ assertThat(instance1).isSameAs(instance2);
+ }
+
+ @Test
+ public void clientCaching() {
+ final String cacheKey = "test-key";
+
+ // First call should create client and increment ref count
+ SdkHttpClient client1 = cache.getOrCreateClient(cacheKey, httpClientFactory1);
+ verify(httpClientFactory1, times(1)).get();
+
+ // Second call with same key should return cached client and increment ref count again
+ SdkHttpClient client2 = cache.getOrCreateClient(cacheKey, httpClientFactory1);
+ verify(httpClientFactory1, times(1)).get(); // Factory should not be called again
+
+ assertThat(client1).isSameAs(client2);
+
+ // Verify reference count is 2
+ ManagedHttpClient managedClient = cache.clients().get(cacheKey);
+ assertThat(managedClient.refCount()).isEqualTo(2);
+ }
+
+ @Test
+ public void differentKeysCreateDifferentClients() {
+ SdkHttpClient client1 = cache.getOrCreateClient("test-key-1", httpClientFactory1);
+ SdkHttpClient client2 = cache.getOrCreateClient("test-key-2", httpClientFactory2);
+
+ verify(httpClientFactory1, times(1)).get();
+ verify(httpClientFactory2, times(1)).get();
+
+ assertThat(client1).isNotSameAs(client2);
+ }
+
+ @Test
+ public void referenceCountingAndCleanup() throws Exception {
+ SdkHttpClient mockClient = mock(SdkHttpClient.class);
+ final String cacheKey = "test-key";
+
+ ManagedHttpClient managedClient = new ManagedHttpClient(mockClient, cacheKey);
+
+ // Acquire twice
+ ManagedHttpClient client1 = managedClient.acquire();
+ ManagedHttpClient client2 = managedClient.acquire();
+
+ assertThat(client1).isSameAs(client2);
+ assertThat(managedClient.refCount()).isEqualTo(2);
+
+ // First release should not close
+ managedClient.release();
+ assertThat(managedClient.refCount()).isEqualTo(1);
+ assertThat(managedClient.isClosed()).isFalse();
+ verify(mockClient, times(0)).close();
+
+ // Second release should close
+ managedClient.release();
+ assertThat(managedClient.refCount()).isEqualTo(0);
+ assertThat(managedClient.isClosed()).isTrue();
+ verify(mockClient, times(1)).close();
+ }
+
+ @Test
+ public void acquireAfterCloseThrows() {
+ SdkHttpClient mockClient = mock(SdkHttpClient.class);
+ final String cacheKey = "test-key";
+
+ ManagedHttpClient managedClient = new ManagedHttpClient(mockClient, cacheKey);
+
+ // Acquire and release to close
+ managedClient.acquire();
+ managedClient.release();
+
+ assertThat(managedClient.isClosed()).isTrue();
+
+ // Trying to acquire a closed client should throw
+ assertThatThrownBy(managedClient::acquire)
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessageContaining("Cannot acquire closed HTTP client");
+ }
+
+ @Test
+ public void releaseRemovesFromRegistry() {
+ final String cacheKey = "test-key";
+
+ // Create client (refCount = 1)
+ SdkHttpClient client1 = cache.getOrCreateClient(cacheKey, httpClientFactory1);
+ assertThat(client1).isNotNull();
+
+ Map clients = cache.clients();
+ assertThat(clients).containsKey(cacheKey);
+
+ // Verify ref count is 1
+ assertThat(clients.get(cacheKey).refCount()).isEqualTo(1);
+
+ // Release (refCount = 0, should close and remove)
+ cache.releaseClient(cacheKey);
+
+ // Client should be removed from map after close
+ assertThat(clients).doesNotContainKey(cacheKey);
+ verify(httpClient1, times(1)).close();
+ }
+
+ @Test
+ public void concurrentAccess() throws InterruptedException {
+ final String cacheKey = "concurrent-test-key";
+ int threadCount = 10;
+ Thread[] threads = new Thread[threadCount];
+ SdkHttpClient[] results = new SdkHttpClient[threadCount];
+
+ // Create multiple threads that access the same cache key
+ for (int i = 0; i < threadCount; i++) {
+ final int index = i;
+ threads[i] =
+ new Thread(() -> results[index] = cache.getOrCreateClient(cacheKey, httpClientFactory1));
+ }
+
+ // Start all threads
+ for (Thread thread : threads) {
+ thread.start();
+ }
+
+ // Wait for all threads to complete
+ for (Thread thread : threads) {
+ thread.join();
+ }
+
+ // Verify factory was called only once (proper caching under concurrency)
+ verify(httpClientFactory1, times(1)).get();
+
+ // Verify all threads got the same client instance
+ SdkHttpClient expectedClient = results[0];
+ for (int i = 1; i < threadCount; i++) {
+ assertThat(results[i]).isSameAs(expectedClient);
+ }
+
+ // Verify reference count equals number of threads
+ ManagedHttpClient managedClient = cache.clients().get(cacheKey);
+ assertThat(managedClient.refCount()).isEqualTo(threadCount);
+ }
+
+ @Test
+ public void registryClear() {
+ Map clients = cache.clients();
+
+ // Create some clients
+ cache.getOrCreateClient("key1", httpClientFactory1);
+ cache.getOrCreateClient("key2", httpClientFactory2);
+
+ // Verify clients were stored
+ assertThat(clients).hasSize(2);
+
+ // Shutdown should clean up the map
+ cache.clear();
+
+ // Map should be empty after shutdown
+ assertThat(clients).isEmpty();
+
+ // Both clients should be closed
+ verify(httpClient1, times(1)).close();
+ verify(httpClient2, times(1)).close();
+ }
+
+ @Test
+ public void doubleReleaseDoesNotCauseNegativeRefCount() throws Exception {
+ SdkHttpClient mockClient = mock(SdkHttpClient.class);
+ final String cacheKey = "test-key";
+
+ ManagedHttpClient managedClient = new ManagedHttpClient(mockClient, cacheKey);
+
+ // Acquire once
+ ManagedHttpClient client = managedClient.acquire();
+ assertThat(managedClient.refCount()).isEqualTo(1);
+
+ // First release should close the client (refCount goes to 0)
+ boolean closed = managedClient.release();
+ assertThat(closed).isTrue();
+ assertThat(managedClient.refCount()).isEqualTo(0);
+ assertThat(managedClient.isClosed()).isTrue();
+ verify(mockClient, times(1)).close();
+
+ // Second release on already closed client should be a no-op
+ // The closed flag prevents decrement, so refCount stays at 0
+ boolean closedAgain = managedClient.release();
+ assertThat(closedAgain).isFalse();
+ assertThat(managedClient.refCount()).isEqualTo(0); // Should still be 0, not negative
+ assertThat(managedClient.isClosed()).isTrue();
+ verify(mockClient, times(1)).close(); // Close should not be called again
+ }
+
+ @Test
+ public void multipleReleasesAfterClose() throws Exception {
+ SdkHttpClient mockClient = mock(SdkHttpClient.class);
+ final String cacheKey = "test-key";
+
+ ManagedHttpClient managedClient = new ManagedHttpClient(mockClient, cacheKey);
+
+ // Acquire once
+ managedClient.acquire();
+ assertThat(managedClient.refCount()).isEqualTo(1);
+
+ // Release to close
+ managedClient.release();
+ assertThat(managedClient.isClosed()).isTrue();
+ assertThat(managedClient.refCount()).isEqualTo(0);
+
+ // Try releasing multiple more times (simulating a bug in caller code)
+ for (int i = 0; i < 5; i++) {
+ boolean result = managedClient.release();
+ assertThat(result).isFalse(); // Should return false, not try to close again
+ assertThat(managedClient.refCount()).isEqualTo(0); // RefCount should never go negative
+ }
+
+ // Close should only have been called once
+ verify(mockClient, times(1)).close();
+ }
+}
diff --git a/aws/src/test/java/org/apache/iceberg/aws/TestHttpClientProperties.java b/aws/src/test/java/org/apache/iceberg/aws/TestHttpClientProperties.java
index b0602a074992..378e5e6ca94a 100644
--- a/aws/src/test/java/org/apache/iceberg/aws/TestHttpClientProperties.java
+++ b/aws/src/test/java/org/apache/iceberg/aws/TestHttpClientProperties.java
@@ -20,8 +20,11 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.verify;
import java.util.Map;
+import org.apache.iceberg.aws.HttpClientCache.ManagedHttpClient;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
@@ -40,16 +43,21 @@ public void testUrlHttpClientConfiguration() {
properties.put(HttpClientProperties.CLIENT_TYPE, "urlconnection");
HttpClientProperties httpProperties = new HttpClientProperties(properties);
S3ClientBuilder mockS3ClientBuilder = Mockito.mock(S3ClientBuilder.class);
- ArgumentCaptor httpClientBuilderCaptor =
- ArgumentCaptor.forClass(SdkHttpClient.Builder.class);
+ ArgumentCaptor httpClientCaptor = ArgumentCaptor.forClass(SdkHttpClient.class);
httpProperties.applyHttpClientConfigurations(mockS3ClientBuilder);
- Mockito.verify(mockS3ClientBuilder).httpClientBuilder(httpClientBuilderCaptor.capture());
- SdkHttpClient.Builder capturedHttpClientBuilder = httpClientBuilderCaptor.getValue();
+ Mockito.verify(mockS3ClientBuilder).httpClient(httpClientCaptor.capture());
+ SdkHttpClient capturedHttpClient = httpClientCaptor.getValue();
- assertThat(capturedHttpClientBuilder)
- .as("Should use url connection http client")
- .isInstanceOf(UrlConnectionHttpClient.Builder.class);
+ assertThat(capturedHttpClient)
+ .as("Should use managed SDK http client")
+ .isInstanceOf(ManagedHttpClient.class);
+
+ // Verify the underlying delegate is UrlConnectionHttpClient
+ ManagedHttpClient managedClient = (ManagedHttpClient) capturedHttpClient;
+ assertThat(managedClient.httpClient())
+ .as("Underlying client should be UrlConnectionHttpClient")
+ .isInstanceOf(UrlConnectionHttpClient.class);
}
@Test
@@ -58,15 +66,21 @@ public void testApacheHttpClientConfiguration() {
properties.put(HttpClientProperties.CLIENT_TYPE, "apache");
HttpClientProperties httpProperties = new HttpClientProperties(properties);
S3ClientBuilder mockS3ClientBuilder = Mockito.mock(S3ClientBuilder.class);
- ArgumentCaptor httpClientBuilderCaptor =
- ArgumentCaptor.forClass(SdkHttpClient.Builder.class);
+ ArgumentCaptor httpClientCaptor = ArgumentCaptor.forClass(SdkHttpClient.class);
httpProperties.applyHttpClientConfigurations(mockS3ClientBuilder);
- Mockito.verify(mockS3ClientBuilder).httpClientBuilder(httpClientBuilderCaptor.capture());
- SdkHttpClient.Builder capturedHttpClientBuilder = httpClientBuilderCaptor.getValue();
- assertThat(capturedHttpClientBuilder)
- .as("Should use apache http client")
- .isInstanceOf(ApacheHttpClient.Builder.class);
+ Mockito.verify(mockS3ClientBuilder).httpClient(httpClientCaptor.capture());
+ SdkHttpClient capturedHttpClient = httpClientCaptor.getValue();
+
+ assertThat(capturedHttpClient)
+ .as("Should use managed SDK http client")
+ .isInstanceOf(ManagedHttpClient.class);
+
+ // Verify the underlying delegate is ApacheHttpClient
+ ManagedHttpClient managedClient = (ManagedHttpClient) capturedHttpClient;
+ assertThat(managedClient.httpClient())
+ .as("Underlying client should be ApacheHttpClient")
+ .isInstanceOf(ApacheHttpClient.class);
}
@Test
@@ -80,4 +94,29 @@ public void testInvalidHttpClientType() {
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Unrecognized HTTP client type test");
}
+
+ @Test
+ public void testApacheHttpClientConfiguredAsSharedResource() {
+ Map properties = Maps.newHashMap();
+ ApacheHttpClientConfigurations apacheConfig = ApacheHttpClientConfigurations.create(properties);
+ S3ClientBuilder mockS3ClientBuilder = Mockito.mock(S3ClientBuilder.class);
+
+ apacheConfig.configureHttpClientBuilder(mockS3ClientBuilder);
+
+ // Verify that httpClient() is called with a managed client (as a shared resource)
+ verify(mockS3ClientBuilder).httpClient(any(ManagedHttpClient.class));
+ }
+
+ @Test
+ public void testUrlConnectionHttpClientConfiguredAsSharedResource() {
+ Map properties = Maps.newHashMap();
+ UrlConnectionHttpClientConfigurations urlConfig =
+ UrlConnectionHttpClientConfigurations.create(properties);
+ S3ClientBuilder mockS3ClientBuilder = Mockito.mock(S3ClientBuilder.class);
+
+ urlConfig.configureHttpClientBuilder(mockS3ClientBuilder);
+
+ // Verify that httpClient() is called with a managed client (as a shared resource)
+ verify(mockS3ClientBuilder).httpClient(any(ManagedHttpClient.class));
+ }
}
diff --git a/aws/src/test/java/org/apache/iceberg/aws/s3/TestVendedCredentialsProvider.java b/aws/src/test/java/org/apache/iceberg/aws/s3/TestVendedCredentialsProvider.java
index 8e2f99e0ccbe..d0287dc3080c 100644
--- a/aws/src/test/java/org/apache/iceberg/aws/s3/TestVendedCredentialsProvider.java
+++ b/aws/src/test/java/org/apache/iceberg/aws/s3/TestVendedCredentialsProvider.java
@@ -26,10 +26,12 @@
import java.time.Instant;
import java.time.temporal.ChronoUnit;
+import java.util.Map;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.exceptions.RESTException;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.rest.HttpMethod;
+import org.apache.iceberg.rest.RESTCatalogProperties;
import org.apache.iceberg.rest.credentials.Credential;
import org.apache.iceberg.rest.credentials.ImmutableCredential;
import org.apache.iceberg.rest.responses.ImmutableLoadCredentialsResponse;
@@ -488,6 +490,54 @@ public void invalidTokenInProperties() {
mockServer.verify(mockRequest, VerificationTimes.once());
}
+ @Test
+ public void planIdQueryParamIsSent() {
+ String planId = "randomPlanId";
+ HttpRequest mockRequest =
+ request("/v1/credentials")
+ .withMethod(HttpMethod.GET.name())
+ .withQueryStringParameter("planId", planId);
+ Credential credential =
+ ImmutableCredential.builder()
+ .prefix("s3")
+ .config(
+ ImmutableMap.of(
+ S3FileIOProperties.ACCESS_KEY_ID,
+ "randomAccessKey",
+ S3FileIOProperties.SECRET_ACCESS_KEY,
+ "randomSecretAccessKey",
+ S3FileIOProperties.SESSION_TOKEN,
+ "sessionToken",
+ S3FileIOProperties.SESSION_TOKEN_EXPIRES_AT_MS,
+ Long.toString(Instant.now().plus(1, ChronoUnit.MINUTES).toEpochMilli())))
+ .build();
+ LoadCredentialsResponse response =
+ ImmutableLoadCredentialsResponse.builder().addCredentials(credential).build();
+
+ HttpResponse mockResponse =
+ response(LoadCredentialsResponseParser.toJson(response)).withStatusCode(200);
+ mockServer.when(mockRequest).respond(mockResponse);
+
+ Map properties =
+ ImmutableMap.builder()
+ .putAll(PROPERTIES)
+ .put(RESTCatalogProperties.REST_SCAN_PLAN_ID, planId)
+ .build();
+
+ try (VendedCredentialsProvider provider = VendedCredentialsProvider.create(properties)) {
+ AwsCredentials awsCredentials = provider.resolveCredentials();
+ verifyCredentials(awsCredentials, credential);
+
+ // resolving credentials multiple times should hit the credentials endpoint again and send the
+ // planId again
+ AwsCredentials refreshedCredentials = provider.resolveCredentials();
+ assertThat(refreshedCredentials).isNotSameAs(awsCredentials);
+ verifyCredentials(refreshedCredentials, credential);
+ }
+
+ mockServer.verify(mockRequest, VerificationTimes.exactly(2));
+ }
+
private void verifyCredentials(AwsCredentials awsCredentials, Credential credential) {
assertThat(awsCredentials).isInstanceOf(AwsSessionCredentials.class);
AwsSessionCredentials creds = (AwsSessionCredentials) awsCredentials;
diff --git a/azure-bundle/LICENSE b/azure-bundle/LICENSE
index 62069e51acb6..eae11b8f4023 100644
--- a/azure-bundle/LICENSE
+++ b/azure-bundle/LICENSE
@@ -243,6 +243,13 @@ License: The MIT License (MIT) - http://opensource.org/licenses/MIT
--------------------------------------------------------------------------------
+
+Group: com.azure Name: azure-security-keyvault-keys Version: 4.10.2
+Project URL: https://github.com/Azure/azure-sdk-for-java
+License: The MIT License (MIT) - http://opensource.org/licenses/MIT
+
+--------------------------------------------------------------------------------
+
Group: com.azure Name: azure-storage-internal-avro Version: 12.16.1
Project URL: https://github.com/Azure/azure-sdk-for-java
License: The MIT License (MIT) - http://opensource.org/licenses/MIT
diff --git a/azure-bundle/build.gradle b/azure-bundle/build.gradle
index 46d1b3daea5b..0bdc30fdaa7e 100644
--- a/azure-bundle/build.gradle
+++ b/azure-bundle/build.gradle
@@ -26,6 +26,7 @@ project(":iceberg-azure-bundle") {
dependencies {
implementation platform(libs.azuresdk.bom)
implementation "com.azure:azure-storage-file-datalake"
+ implementation "com.azure:azure-security-keyvault-keys"
implementation "com.azure:azure-identity"
}
diff --git a/azure/src/integration/java/org/apache/iceberg/azure/keymanagement/TestAzureKeyManagementClient.java b/azure/src/integration/java/org/apache/iceberg/azure/keymanagement/TestAzureKeyManagementClient.java
new file mode 100644
index 000000000000..32adcd46b702
--- /dev/null
+++ b/azure/src/integration/java/org/apache/iceberg/azure/keymanagement/TestAzureKeyManagementClient.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.azure.keymanagement;
+
+import static org.apache.iceberg.azure.AzureProperties.AZURE_KEYVAULT_URL;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import com.azure.identity.DefaultAzureCredentialBuilder;
+import com.azure.security.keyvault.keys.KeyClient;
+import com.azure.security.keyvault.keys.KeyClientBuilder;
+import com.azure.security.keyvault.keys.models.KeyType;
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import org.apache.iceberg.encryption.KeyManagementClient;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable;
+import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariables;
+
+@EnabledIfEnvironmentVariables({
+ @EnabledIfEnvironmentVariable(named = "AZURE_KEYVAULT_URL", matches = ".*")
+})
+public class TestAzureKeyManagementClient {
+ private static final String ICEBERG_TEST_KEY_NAME = "iceberg-test-key";
+
+ private static KeyClient keyClient;
+
+ private static KeyManagementClient azureKeyManagementClient;
+
+ @BeforeAll
+ public static void beforeClass() {
+ String keyVaultUri = System.getenv("AZURE_KEYVAULT_URL");
+ keyClient =
+ new KeyClientBuilder()
+ .vaultUrl(keyVaultUri)
+ .credential(new DefaultAzureCredentialBuilder().build())
+ .buildClient();
+ keyClient.createKey(ICEBERG_TEST_KEY_NAME, KeyType.RSA);
+ azureKeyManagementClient = new AzureKeyManagementClient();
+ azureKeyManagementClient.initialize(ImmutableMap.of(AZURE_KEYVAULT_URL, keyVaultUri));
+ }
+
+ @AfterAll
+ public static void afterClass() {
+ if (keyClient != null) {
+ keyClient.beginDeleteKey(ICEBERG_TEST_KEY_NAME).waitForCompletion(Duration.ofMinutes(3));
+ keyClient.purgeDeletedKey(ICEBERG_TEST_KEY_NAME);
+ }
+ }
+
+ @Test
+ public void keyWrapping() {
+ ByteBuffer key = ByteBuffer.wrap("table-master-key".getBytes());
+
+ ByteBuffer encryptedKey = azureKeyManagementClient.wrapKey(key, ICEBERG_TEST_KEY_NAME);
+ ByteBuffer decryptedKey =
+ azureKeyManagementClient.unwrapKey(encryptedKey, ICEBERG_TEST_KEY_NAME);
+
+ assertThat(decryptedKey).isEqualTo(key);
+ }
+
+ @Test
+ public void keyGenerationNotSupported() {
+ assertThat(azureKeyManagementClient.supportsKeyGeneration()).isFalse();
+ }
+}
diff --git a/azure/src/main/java/org/apache/iceberg/azure/AzureProperties.java b/azure/src/main/java/org/apache/iceberg/azure/AzureProperties.java
index 38ac573b59fa..73e99e029221 100644
--- a/azure/src/main/java/org/apache/iceberg/azure/AzureProperties.java
+++ b/azure/src/main/java/org/apache/iceberg/azure/AzureProperties.java
@@ -21,6 +21,7 @@
import com.azure.core.credential.AccessToken;
import com.azure.core.credential.TokenCredential;
import com.azure.core.credential.TokenRequestContext;
+import com.azure.security.keyvault.keys.cryptography.models.KeyWrapAlgorithm;
import com.azure.storage.common.StorageSharedKeyCredential;
import com.azure.storage.file.datalake.DataLakeFileSystemClientBuilder;
import java.io.Serializable;
@@ -48,6 +49,9 @@ public class AzureProperties implements Serializable {
public static final String ADLS_SHARED_KEY_ACCOUNT_NAME = "adls.auth.shared-key.account.name";
public static final String ADLS_SHARED_KEY_ACCOUNT_KEY = "adls.auth.shared-key.account.key";
public static final String ADLS_TOKEN = "adls.token";
+ public static final String AZURE_KEYVAULT_URL = "azure.keyvault.url";
+ public static final String AZURE_KEYVAULT_KEY_WRAP_ALGORITHM =
+ "azure.keyvault.key-wrap-algorithm";
/**
* Configure the ADLS token credential provider used to get {@link TokenCredential}. A fully
@@ -91,6 +95,8 @@ public class AzureProperties implements Serializable {
private boolean adlsRefreshCredentialsEnabled;
private String token;
private Map allProperties = Collections.emptyMap();
+ private String keyWrapAlgorithm;
+ private String keyVaultUrl;
public AzureProperties() {}
@@ -124,6 +130,14 @@ public AzureProperties(Map properties) {
PropertyUtil.propertyAsBoolean(properties, ADLS_REFRESH_CREDENTIALS_ENABLED, true);
this.token = properties.get(ADLS_TOKEN);
this.allProperties = SerializableMap.copyOf(properties);
+ if (properties.containsKey(AZURE_KEYVAULT_URL)) {
+ this.keyVaultUrl = properties.get(AZURE_KEYVAULT_URL);
+ }
+
+ this.keyWrapAlgorithm =
+ properties.getOrDefault(
+ AzureProperties.AZURE_KEYVAULT_KEY_WRAP_ALGORITHM,
+ KeyWrapAlgorithm.RSA_OAEP_256.getValue());
}
public Optional adlsReadBlockSize() {
@@ -189,4 +203,12 @@ public Mono getToken(TokenRequestContext request) {
builder.endpoint("https://" + account);
}
}
+
+ public KeyWrapAlgorithm keyWrapAlgorithm() {
+ return KeyWrapAlgorithm.fromString(this.keyWrapAlgorithm);
+ }
+
+ public Optional keyVaultUrl() {
+ return Optional.ofNullable(this.keyVaultUrl);
+ }
}
diff --git a/azure/src/main/java/org/apache/iceberg/azure/adlsv2/VendedAdlsCredentialProvider.java b/azure/src/main/java/org/apache/iceberg/azure/adlsv2/VendedAdlsCredentialProvider.java
index 2ede484a732d..6660e765ecf8 100644
--- a/azure/src/main/java/org/apache/iceberg/azure/adlsv2/VendedAdlsCredentialProvider.java
+++ b/azure/src/main/java/org/apache/iceberg/azure/adlsv2/VendedAdlsCredentialProvider.java
@@ -38,6 +38,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.rest.ErrorHandlers;
import org.apache.iceberg.rest.HTTPClient;
+import org.apache.iceberg.rest.RESTCatalogProperties;
import org.apache.iceberg.rest.RESTClient;
import org.apache.iceberg.rest.auth.AuthManager;
import org.apache.iceberg.rest.auth.AuthManagers;
@@ -54,6 +55,7 @@ public class VendedAdlsCredentialProvider implements Serializable, AutoCloseable
private final SerializableMap properties;
private final String credentialsEndpoint;
private final String catalogEndpoint;
+ private final String planId;
private transient volatile Map sasCredentialByAccount;
private transient volatile HTTPClient client;
private transient AuthManager authManager;
@@ -67,6 +69,7 @@ public VendedAdlsCredentialProvider(Map properties) {
this.properties = SerializableMap.copyOf(properties);
this.credentialsEndpoint = properties.get(URI);
this.catalogEndpoint = properties.get(CatalogProperties.URI);
+ this.planId = properties.getOrDefault(RESTCatalogProperties.REST_SCAN_PLAN_ID, null);
}
Mono credentialForAccount(String storageAccount) {
@@ -165,7 +168,7 @@ private LoadCredentialsResponse fetchCredentials() {
return httpClient()
.get(
credentialsEndpoint,
- null,
+ null != planId ? Map.of("planId", planId) : null,
LoadCredentialsResponse.class,
Map.of(),
ErrorHandlers.defaultErrorHandler());
diff --git a/azure/src/main/java/org/apache/iceberg/azure/keymanagement/AzureKeyManagementClient.java b/azure/src/main/java/org/apache/iceberg/azure/keymanagement/AzureKeyManagementClient.java
new file mode 100644
index 000000000000..4732d3d410c4
--- /dev/null
+++ b/azure/src/main/java/org/apache/iceberg/azure/keymanagement/AzureKeyManagementClient.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.azure.keymanagement;
+
+import com.azure.security.keyvault.keys.KeyClient;
+import com.azure.security.keyvault.keys.KeyClientBuilder;
+import com.azure.security.keyvault.keys.cryptography.models.KeyWrapAlgorithm;
+import com.azure.security.keyvault.keys.cryptography.models.UnwrapResult;
+import com.azure.security.keyvault.keys.cryptography.models.WrapResult;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import org.apache.iceberg.azure.AdlsTokenCredentialProviders;
+import org.apache.iceberg.azure.AzureProperties;
+import org.apache.iceberg.encryption.KeyManagementClient;
+import org.apache.iceberg.util.ByteBuffers;
+
+/** Azure key management client which connects to Azure Key Vault. */
+public class AzureKeyManagementClient implements KeyManagementClient {
+ private KeyClient keyClient;
+ private KeyWrapAlgorithm keyWrapAlgorithm;
+
+ @Override
+ public void initialize(Map properties) {
+ AzureProperties azureProperties = new AzureProperties(properties);
+
+ this.keyWrapAlgorithm = azureProperties.keyWrapAlgorithm();
+ KeyClientBuilder keyClientBuilder = new KeyClientBuilder();
+ azureProperties.keyVaultUrl().ifPresent(keyClientBuilder::vaultUrl);
+ this.keyClient =
+ keyClientBuilder
+ .credential(AdlsTokenCredentialProviders.from(properties).credential())
+ .buildClient();
+ }
+
+ @Override
+ public ByteBuffer wrapKey(ByteBuffer key, String wrappingKeyId) {
+ WrapResult wrapResult =
+ keyClient
+ .getCryptographyClient(wrappingKeyId)
+ .wrapKey(keyWrapAlgorithm, ByteBuffers.toByteArray(key));
+ return ByteBuffer.wrap(wrapResult.getEncryptedKey());
+ }
+
+ @Override
+ public ByteBuffer unwrapKey(ByteBuffer wrappedKey, String wrappingKeyId) {
+ UnwrapResult unwrapResult =
+ keyClient
+ .getCryptographyClient(wrappingKeyId)
+ .unwrapKey(keyWrapAlgorithm, ByteBuffers.toByteArray(wrappedKey));
+ return ByteBuffer.wrap(unwrapResult.getKey());
+ }
+}
diff --git a/azure/src/test/java/org/apache/iceberg/azure/TestAzureProperties.java b/azure/src/test/java/org/apache/iceberg/azure/TestAzureProperties.java
index 514e7faad435..c301d4de4741 100644
--- a/azure/src/test/java/org/apache/iceberg/azure/TestAzureProperties.java
+++ b/azure/src/test/java/org/apache/iceberg/azure/TestAzureProperties.java
@@ -26,6 +26,8 @@
import static org.apache.iceberg.azure.AzureProperties.ADLS_SHARED_KEY_ACCOUNT_KEY;
import static org.apache.iceberg.azure.AzureProperties.ADLS_SHARED_KEY_ACCOUNT_NAME;
import static org.apache.iceberg.azure.AzureProperties.ADLS_WRITE_BLOCK_SIZE;
+import static org.apache.iceberg.azure.AzureProperties.AZURE_KEYVAULT_KEY_WRAP_ALGORITHM;
+import static org.apache.iceberg.azure.AzureProperties.AZURE_KEYVAULT_URL;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
import static org.mockito.ArgumentMatchers.any;
@@ -39,6 +41,7 @@
import com.azure.core.credential.TokenCredential;
import com.azure.core.credential.TokenRequestContext;
import com.azure.identity.DefaultAzureCredential;
+import com.azure.security.keyvault.keys.cryptography.models.KeyWrapAlgorithm;
import com.azure.storage.common.StorageSharedKeyCredential;
import com.azure.storage.file.datalake.DataLakeFileSystemClientBuilder;
import java.io.IOException;
@@ -73,11 +76,15 @@ public void testSerializable(TestHelpers.RoundTripSerializer ro
.put(ADLS_SHARED_KEY_ACCOUNT_KEY, "secret")
.put(AzureProperties.ADLS_TOKEN_CREDENTIAL_PROVIDER, "provider")
.put(AzureProperties.ADLS_TOKEN_PROVIDER_PREFIX + "client-id", "clientId")
+ .put(AZURE_KEYVAULT_URL, "https://test-key-vault.vault.azure.net")
+ .put(AZURE_KEYVAULT_KEY_WRAP_ALGORITHM, KeyWrapAlgorithm.RSA1_5.getValue())
.build());
AzureProperties serdedProps = roundTripSerializer.apply(props);
assertThat(serdedProps.adlsReadBlockSize()).isEqualTo(props.adlsReadBlockSize());
assertThat(serdedProps.adlsWriteBlockSize()).isEqualTo(props.adlsWriteBlockSize());
+ assertThat(serdedProps.keyVaultUrl()).isEqualTo(props.keyVaultUrl());
+ assertThat(serdedProps.keyWrapAlgorithm()).isEqualTo(props.keyWrapAlgorithm());
}
@Test
diff --git a/azure/src/test/java/org/apache/iceberg/azure/adlsv2/TestVendedAdlsCredentialProvider.java b/azure/src/test/java/org/apache/iceberg/azure/adlsv2/TestVendedAdlsCredentialProvider.java
index 43958d39acd8..6af2e4de21f0 100644
--- a/azure/src/test/java/org/apache/iceberg/azure/adlsv2/TestVendedAdlsCredentialProvider.java
+++ b/azure/src/test/java/org/apache/iceberg/azure/adlsv2/TestVendedAdlsCredentialProvider.java
@@ -34,6 +34,7 @@
import org.apache.iceberg.TestHelpers;
import org.apache.iceberg.exceptions.RESTException;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.rest.RESTCatalogProperties;
import org.apache.iceberg.rest.credentials.Credential;
import org.apache.iceberg.rest.credentials.ImmutableCredential;
import org.apache.iceberg.rest.responses.ImmutableLoadCredentialsResponse;
@@ -445,4 +446,47 @@ public void serializableTest(
mockServer.verify(mockRequest, VerificationTimes.exactly(2));
}
+
+ @Test
+ public void planIdQueryParamIsSent() {
+ String planId = "randomPlanId";
+ HttpRequest mockRequest =
+ request("/v1/credentials")
+ .withMethod(HttpMethod.GET.name())
+ .withQueryStringParameter("planId", planId);
+ Credential credential =
+ ImmutableCredential.builder()
+ .prefix(CREDENTIAL_PREFIX)
+ .config(
+ ImmutableMap.of(
+ ADLS_SAS_TOKEN_PREFIX + STORAGE_ACCOUNT,
+ "randomSasToken",
+ ADLS_SAS_TOKEN_EXPIRES_AT_MS_PREFIX + STORAGE_ACCOUNT,
+ Long.toString(Instant.now().minus(1, ChronoUnit.MINUTES).toEpochMilli())))
+ .build();
+ LoadCredentialsResponse response =
+ ImmutableLoadCredentialsResponse.builder().addCredentials(credential).build();
+ HttpResponse mockResponse =
+ response(LoadCredentialsResponseParser.toJson(response)).withStatusCode(200);
+ mockServer.when(mockRequest).respond(mockResponse);
+
+ Map properties =
+ ImmutableMap.builder()
+ .putAll(PROPERTIES)
+ .put(RESTCatalogProperties.REST_SCAN_PLAN_ID, planId)
+ .build();
+ try (VendedAdlsCredentialProvider provider = new VendedAdlsCredentialProvider(properties)) {
+ String azureSasCredential = provider.credentialForAccount(STORAGE_ACCOUNT).block();
+ assertThat(azureSasCredential)
+ .isEqualTo(credential.config().get(ADLS_SAS_TOKEN_PREFIX + STORAGE_ACCOUNT));
+
+ // resolving credentials multiple times should hit the credentials endpoint again and send the
+ // planId again
+ String refreshedAzureSasCredential = provider.credentialForAccount(STORAGE_ACCOUNT).block();
+ assertThat(refreshedAzureSasCredential)
+ .isEqualTo(credential.config().get(ADLS_SAS_TOKEN_PREFIX + STORAGE_ACCOUNT));
+ }
+
+ mockServer.verify(mockRequest, VerificationTimes.exactly(2));
+ }
}
diff --git a/baseline.gradle b/baseline.gradle
index 151fa40a1a28..7884c1a65ae3 100644
--- a/baseline.gradle
+++ b/baseline.gradle
@@ -52,16 +52,35 @@ subprojects {
apply plugin: 'com.palantir.baseline-exact-dependencies'
apply plugin: 'com.diffplug.spotless'
+ String scalaVersion = System.getProperty("scalaVersion") != null ? System.getProperty("scalaVersion") : System.getProperty("defaultScalaVersion")
pluginManager.withPlugin('com.diffplug.spotless') {
spotless {
java {
target 'src/main/java/**/*.java', 'src/test/java/**/*.java', 'src/testFixtures/java/**/*.java', 'src/jmh/java/**/*.java', 'src/integration/java/**/*.java'
// 1.23.0 has an issue in formatting comments https://github.com/google/google-java-format/issues/1155
- // so we stick to 1.22.0 to produce consistent result for JDK 11/17/21
+ // so we stick to 1.22.0 to produce consistent result for JDK 17/21
googleJavaFormat("1.22.0")
removeUnusedImports()
licenseHeaderFile "$rootDir/.baseline/copyright/copyright-header-java.txt"
}
+
+ if (project.name.startsWith("iceberg-spark")) {
+ String scalafmtConfigFile = null
+ // Configure different scalafmt rules for specific Scala version
+ if (scalaVersion?.startsWith("2.12")) {
+ scalafmtConfigFile = "$rootDir/.baseline/scala/.scala212fmt.conf"
+ } else if (scalaVersion?.startsWith("2.13")) {
+ scalafmtConfigFile = "$rootDir/.baseline/scala/.scala213fmt.conf"
+ }
+
+ if (scalafmtConfigFile != null) {
+ scala {
+ target 'src/**/*.scala'
+ scalafmt("3.9.7").configFile(scalafmtConfigFile)
+ licenseHeaderFile "$rootDir/.baseline/copyright/copyright-header-java.txt", "package"
+ }
+ }
+ }
}
}
@@ -167,7 +186,6 @@ subprojects {
}
pluginManager.withPlugin('scala') {
- String scalaVersion = System.getProperty("scalaVersion") != null ? System.getProperty("scalaVersion") : System.getProperty("defaultScalaVersion")
tasks.withType(ScalaCompile).configureEach { scalaCompile ->
if (scalaVersion?.startsWith("2.12")) {
scalaCompile.scalaCompileOptions.additionalParameters = [
diff --git a/bigquery/src/main/java/org/apache/iceberg/gcp/bigquery/BigQueryMetastoreCatalog.java b/bigquery/src/main/java/org/apache/iceberg/gcp/bigquery/BigQueryMetastoreCatalog.java
index 34ec7a62d5b5..dd01246cb01f 100644
--- a/bigquery/src/main/java/org/apache/iceberg/gcp/bigquery/BigQueryMetastoreCatalog.java
+++ b/bigquery/src/main/java/org/apache/iceberg/gcp/bigquery/BigQueryMetastoreCatalog.java
@@ -23,7 +23,6 @@
import com.google.api.services.bigquery.model.DatasetReference;
import com.google.api.services.bigquery.model.ExternalCatalogDatasetOptions;
import com.google.api.services.bigquery.model.TableReference;
-import com.google.cloud.ServiceOptions;
import com.google.cloud.bigquery.BigQueryOptions;
import java.io.IOException;
import java.io.UncheckedIOException;
@@ -58,14 +57,25 @@
public class BigQueryMetastoreCatalog extends BaseMetastoreCatalog
implements SupportsNamespaces, Configurable