Skip to content

Local setup#1

Closed
abhishek6s wants to merge 8324 commits into6si-gdp-mainfrom
local-setup
Closed

Local setup#1
abhishek6s wants to merge 8324 commits into6si-gdp-mainfrom
local-setup

Conversation

@abhishek6s
Copy link

What changes were proposed in this pull request?

Why are the changes needed?

Does this PR introduce any user-facing change?

How was this patch tested?

Was this patch authored or co-authored using generative AI tooling?

ueshin and others added 30 commits September 8, 2025 16:34
…nd withColumnsRenamed in Spark Connect

### What changes were proposed in this pull request?

Tries the eager analysis first for `withColumns` and `withColumnsRenamed` in Spark Connect.

### Why are the changes needed?

The planning for `withColumns` and `withColumnsRenamed` in Spark Connect was changed to be lazy at apache#49386, but it could cause a performance issue as the current analyzer can't resolve it fast enough when they are used many times, which can happen relatively more often than the other APIs.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

The existing tests should pass.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#52253 from ueshin/issues/SPARK-53505/eager_analysis.

Authored-by: Takuya Ueshin <ueshin@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…ultCollationToStringType`

### What changes were proposed in this pull request?
Remove using `v2ColumnsToStructType` in `ApplyDefaultCollationToStringType` because of expensive handling for default values, since we only need to know column's data type.

### Why are the changes needed?
Performance improvement.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Existing tests.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes apache#52234 from ilicmarkodb/fix_apply_def.

Authored-by: ilicmarkodb <marko.ilic@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…Suite` which are caused by `executorHolder` undefined

### What changes were proposed in this pull request?
This PR aims to fix flaky tests in `SparkConnectServiceSuite` which are caused by `executorHolder` [undefined](https://github.com/apache/spark/blob/ab9a63626018156b3e0f267f14409c30031692b7/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectServiceSuite.scala#L908).
The conditions to reproduce this issue are:

(1) The operation finishes before its `executeHolder` is set in [MockSparkListener#onOtherEvent](https://github.com/apache/spark/blob/ab9a63626018156b3e0f267f14409c30031692b7/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectServiceSuite.scala#L961).
(2) `executeHolder` is accessed through calling `verifyEvents.onComplete` after the operation finishes.

`SparkListenerConnectOperationStarted` is posted asynchronously with the corresponding operation so the condition (1) can be met. After an operation finishes, `executeHolder` is [removed from a map](https://github.com/apache/spark/blob/af16aa8e11c223642f928b0b9893854a851d70bb/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala#L153) so if the condition (1) is met, `executeHolder` is never set because `SparkConnectService.executionManager.getExecuteHolder` consistently returns `None`.

One example of the test affected by this issue is `SPARK-43923: commands send events - get_resources_command`.
You can easily reproduce this issue by inserting sleep into `MockSparkListener#onOtherEvent` like as follows.

```
   val executeKey =
     ExecuteKey(sessionHolder.userId, sessionHolder.sessionId, e.operationId)
+  Thread.sleep(1000)
   executeHolder = SparkConnectService.executionManager.getExecuteHolder(executeKey)
```

And then, run test.
```
$ build/sbt 'connect/testOnly org.apache.spark.sql.connect.planner.SparkConnectServiceSuite -- -z "get_resources_command"'
```
To resolve this issue, this PR proposes:

* Change `VerifyEvents#onCompleted` just to assert `executeHolder.eventsManager.getProducedRowCount == producedRowCount`
* Call `VerifyEvents#onCompleted` from `StreamObserver#onCompleted`
* Add `VerifyEvents#assertClosed` to check if the status is `Closed`

### Why are the changes needed?
For test stability.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Inserting `Thread.sleep(1000)` like mentioned above and then run `SparkConnectServiceSuite`.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes apache#52264 from sarutak/SPARK-48163.

Authored-by: Kousuke Saruta <sarutak@amazon.co.jp>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
### What changes were proposed in this pull request?

This PR simplifies `PipelineTest` by extending `QueryTest`, as many util functions are already defined there.

### Why are the changes needed?

code cleanup

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

existing tests

### Was this patch authored or co-authored using generative AI tooling?

no

Closes apache#52266 from cloud-fan/sdp_test.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
### What changes were proposed in this pull request?

`catalogString` of User Defined Type is mistakenly truncated, which leads to catalog errors.

### Why are the changes needed?

bugfix

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
New Unit Test

### Was this patch authored or co-authored using generative AI tooling?
no

Closes apache#52263 from yaooqinn/SPARK-53518.

Authored-by: Kent Yao <yao@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
…DTFs

### What changes were proposed in this pull request?

Support return type coercion for Arrow Python UDTFs by doing `arrow_cast` by default

### Why are the changes needed?

Consistent behavior across Arrow UDFs and Arrow UDTFs

### Does this PR introduce _any_ user-facing change?

No, Arrow UDTF is not a public API yet

### How was this patch tested?

New and existing UTs

### Was this patch authored or co-authored using generative AI tooling?

No

Closes apache#52140 from shujingyang-db/arrow-udtf-type-corerion.

Lead-authored-by: Shujing Yang <shujing.yang@databricks.com>
Co-authored-by: Shujing Yang <135740748+shujingyang-db@users.noreply.github.com>
Signed-off-by: Takuya Ueshin <ueshin@databricks.com>
…nst scalar under ANSI

### What changes were proposed in this pull request?
- Ensure `==` returns a nameless Series when comparing with another Series/Index, but preserves the name for scalar comparisons.
- Add test cases to compare with `np.nan`

### Why are the changes needed?
Part of https://issues.apache.org/jira/browse/SPARK-53389

### Does this PR introduce _any_ user-facing change?
No, the feature is not released yet.

For example,

Before
```py
>>> psdf['int'] == 'x'
0    False
1    False
dtype: bool
```

After
```py
>>> psdf['int'] == 'x'
0    False
1    False
Name: int, dtype: bool
```

which follows native pandas
```py
>>> pdf['int'] == 'x'
0    False
1    False
Name: int, dtype: bool
```

### How was this patch tested?
Unit tests

Commands below passed
```py
 1037  SPARK_ANSI_SQL_MODE=true  ./python/run-tests --python-executables=python3.11 --testnames "pyspark.pandas.tests.data_type_ops.test_num_ops NumOpsTests.test_comparison_dtype_compatibility"
 1038  SPARK_ANSI_SQL_MODE=false  ./python/run-tests --python-executables=python3.11 --testnames "pyspark.pandas.tests.data_type_ops.test_num_ops NumOpsTests.test_comparison_dtype_compatibility"
 1039  SPARK_ANSI_SQL_MODE=true ./python/run-tests --python-executables=python3.11 --testnames "pyspark.pandas.tests.data_type_ops.test_num_ops NumOpsTests.test_eq"
 1040  SPARK_ANSI_SQL_MODE=false ./python/run-tests --python-executables=python3.11 --testnames "pyspark.pandas.tests.data_type_ops.test_num_ops NumOpsTests.test_eq"
 1041  SPARK_ANSI_SQL_MODE=false ./python/run-tests --python-executables=python3.11 --testnames "pyspark.pandas.tests.data_type_ops.test_num_ops NumOpsTests.test_ne"
 1042  SPARK_ANSI_SQL_MODE=true ./python/run-tests --python-executables=python3.11 --testnames "pyspark.pandas.tests.data_type_ops.test_num_ops NumOpsTests.test_ne"
```

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#52224 from xinrong-meng/cmp_op_test.

Authored-by: Xinrong Meng <xinrong@apache.org>
Signed-off-by: Xinrong Meng <xinrong@apache.org>
### What changes were proposed in this pull request?

apache#49962 added a fallback in case there were already broken (ie, non-resolved) persisted default values in catalogs. A broken one is something like 'current_database, current_user, current_timestamp' , these are non-deterministic and will bring wrong results in EXISTS_DEFAULT, where user expects the value resolved when they set the default.

Add yet another fallback for broken default default value, in this case one where there are nested function calls.

### Why are the changes needed?

Take the case where the EXISTS_DEFAULT is :
```CONCAT(YEAR(CURRENT_DATE), LPAD(WEEKOFYEAR(CURRENT_DATE), 2, '0'))```

the current code `Literal.fromSQL(defaultSQL)` will throw the exception before getting to the fallback:
```
Caused by: java.lang.AssertionError: assertion failed: function arguments must be resolved.
	at scala.Predef$.assert(Predef.scala:279)
	at org.apache.spark.sql.catalyst.analysis.FunctionRegistry$.$anonfun$expressionBuilder$1(FunctionRegistry.scala:1278)
	at org.apache.spark.sql.catalyst.analysis.SimpleFunctionRegistryBase.lookupFunction(FunctionRegistry.scala:251)
	at org.apache.spark.sql.catalyst.analysis.SimpleFunctionRegistryBase.lookupFunction$(FunctionRegistry.scala:245)
	at org.apache.spark.sql.catalyst.analysis.SimpleFunctionRegistry.lookupFunction(FunctionRegistry.scala:317)
	at org.apache.spark.sql.catalyst.expressions.Literal$$anonfun$fromSQL$1.applyOrElse(literals.scala:325)
	at org.apache.spark.sql.catalyst.expressions.Literal$$anonfun$fromSQL$1.applyOrElse(literals.scala:317)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUpWithPruning$4(TreeNode.scala:586)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:121)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUpWithPruning(TreeNode.scala:586)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUpWithPruning$1(TreeNode.scala:579)
	at scala.collection.immutable.List.map(List.scala:251)
	at scala.collection.immutable.List.map(List.scala:79)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:768)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUpWithPruning(TreeNode.scala:579)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:556)
	at org.apache.spark.sql.catalyst.expressions.Literal$.fromSQL(literals.scala:317)
	at org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.analyzeExistenceDefaultValue(ResolveDefaultColumnsUtil.scala:393)
	at org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.getExistenceDefaultValue(ResolveDefaultColumnsUtil.scala:529)
	at org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.$anonfun$getExistenceDefaultValues$1(ResolveDefaultColumnsUtil.scala:524)
	at scala.collection.ArrayOps$.map$extension(ArrayOps.scala:936)
	at org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.getExistenceDefaultValues(ResolveDefaultColumnsUtil.scala:524)
	at org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.$anonfun$existenceDefaultValues$2(ResolveDefaultColumnsUtil.scala:594)
	at scala.Option.getOrElse(Option.scala:201)
	at org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.existenceDefaultValues(ResolveDefaultColumnsUtil.scala:592)
```

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Add unit test in StructTypeSuite

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#52274 from szehon-ho/more_default_value_fallback.

Authored-by: Szehon Ho <szehon.apache@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?

This PR changes default value of `spark.sql.scripting.enabled` in order to enable SQL scripting by default.

### Why are the changes needed?

### Does this PR introduce _any_ user-facing change?

Yes, the default value of `spark.sql.scripting.enabled` is changed.

### How was this patch tested?

Existing tests.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#52272 from dusantism-db/enable-sql-scripting-by-default.

Authored-by: Dušan Tišma <dusan.tisma@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?

Small refactor of the Star trait to make it compatible with the new single-pass Analyzer. Basically, remove `LogicalPlan` from the signature of core methods. This makes it possible to call them using `NameScope`.

### Why are the changes needed?

To eventually support all types of star expressions in the single pass analyzer

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Existing tests

### Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Code v1.0.107

Closes apache#52268 from mikhailnik-db/refactor-star-trait.

Authored-by: Mikhail Nikoliukin <mikhail.nikoliukin@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
Disallow `%` between Decimal and float under ANSI

pandas:
```py
>>> pdf['decimal'] % 0.1
Traceback (most recent call last):
...
TypeError: unsupported operand type(s) for %: 'decimal.Decimal' and 'float'
```

pandas on spark before:
```py
>>> psdf['decimal'] % 0.1
0    0.1
1    0.1
2    0.1
Name: decimal, dtype: float64
```

pandas on spark after:
```py
>>> psdf['decimal'] % 0.1
Traceback (most recent call last):
...
TypeError: Modulo can not be applied to given types.
```

### Why are the changes needed?
Part of https://issues.apache.org/jira/browse/SPARK-53389

### Does this PR introduce _any_ user-facing change?
No, the feature is not released yet

### How was this patch tested?
Unit tests

Commands below passed:
```py
1097  SPARK_ANSI_SQL_MODE=true ./python/run-tests --python-executables=python3.11 --testnames "pyspark.pandas.tests.data_type_ops.test_num_mul_div NumMulDivTests.test_mod"
1098  SPARK_ANSI_SQL_MODE=false ./python/run-tests --python-executables=python3.11 --testnames "pyspark.pandas.tests.data_type_ops.test_num_mul_div NumMulDivTests.test_mod"

```

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#52255 from xinrong-meng/mod.

Authored-by: Xinrong Meng <xinrong@apache.org>
Signed-off-by: Ruifeng Zheng <ruifengz@apache.org>
…ral.Map/Array optional

### What changes were proposed in this pull request?

This PR optimizes the `LiteralValueProtoConverter` to reduce redundant type information in Spark Connect protocol buffers. The key changes include:

1. **Optimized type inference for arrays and maps**: Modified the conversion logic to only include type information in the first element of arrays and the first key-value pair of maps, since subsequent elements can infer their types from the first element.

2. **Added `needDataType` parameter**: Introduced a new parameter to control when type information is necessary, allowing the converter to skip redundant type information.

3. **Updated protobuf documentation**: Enhanced comments in the protobuf definitions to clarify that only the first element needs to contain type information for inference.

4. **Improved test coverage**: Added new test cases for complex nested structures including tuples and maps with array values.

### Why are the changes needed?

The current implementation includes type information for every element in arrays and every key-value pair in maps, which is redundant and increases the size of protocol buffer messages. Since Spark Connect can infer types from the first element, including type information for subsequent elements is unnecessary and wastes bandwidth and processing time.

### Does this PR introduce any user-facing change?

**No** - This PR does not introduce any user-facing changes.
The change is backward compatible and existing connect clients will continue to work unchanged.

### How was this patch tested?

`build/sbt "connect/testOnly *LiteralExpressionProtoConverterSuite"`

### Was this patch authored or co-authored using generative AI tooling?

Generated-by: Cursor 1.4.5

Closes apache#51473 from heyihong/SPARK-52449.

Authored-by: Yihong He <heyihong.cn@gmail.com>
Signed-off-by: Ruifeng Zheng <ruifengz@apache.org>
### What changes were proposed in this pull request?
Currently, ColumnarRow's `get` call didn't check `isNullAt`, but `UnsafeRow.get` does.
https://github.com/apache/spark/blob/b177b6515c8371fe0761b46d2fa45dd5e8465910/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/SpecializedGettersReader.java#L36

And in some cases it's assumed that the `InternalRow.get` is null safe, for example https://github.com/apache/spark/blob/5b2c4cf9ce886b69eeb5d2303d7582f6ecd763aa/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala#L377

We hit it when we extend spark to make it working on columnar data.

### Why are the changes needed?

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Manually

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#52175 from WangGuangxin/fix_columnarrow.

Authored-by: wangguangxin.cn <wangguangxin.cn@bytedance.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
Add tests for arrow udf with numpy output

### Why are the changes needed?
to improve test coverage

### Does this PR introduce _any_ user-facing change?
no, test-only

### How was this patch tested?
ci

### Was this patch authored or co-authored using generative AI tooling?
no

Closes apache#52285 from zhengruifeng/test_numpy_arrow_agg.

Authored-by: Ruifeng Zheng <ruifengz@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
…e` building

### What changes were proposed in this pull request?

This PR aims to add `libwebp-dev` to recover `spark-rm/Dockerfile` building.

### Why are the changes needed?

`Apache Spark` release docker image compilation has been broken for last 7 days due to the SparkR package compilation.
- https://github.com/apache/spark/actions/workflows/release.yml
    - https://github.com/apache/spark/actions/runs/17425825244

```
apache#11 559.4 No package 'libwebpmux' found
...
apache#11 559.4 -------------------------- [ERROR MESSAGE] ---------------------------
apache#11 559.4 <stdin>:1:10: fatal error: ft2build.h: No such file or directory
apache#11 559.4 compilation terminated.
apache#11 559.4 --------------------------------------------------------------------
apache#11 559.4 ERROR: configuration failed for package 'ragg'
```

### Does this PR introduce _any_ user-facing change?

No, this is a fix for Apache Spark release tool.

### How was this patch tested?

Manually build.

```
$ cd dev/create-release/spark-rm
$ docker build .
```

**BEFORE**

```
...
Dockerfile:83
--------------------
  82 |     # See more in SPARK-39959, roxygen2 < 7.2.1
  83 | >>> RUN Rscript -e "install.packages(c('devtools', 'knitr', 'markdown',  \
  84 | >>>     'rmarkdown', 'testthat', 'devtools', 'e1071', 'survival', 'arrow',  \
  85 | >>>     'ggplot2', 'mvtnorm', 'statmod', 'xml2'), repos='https://cloud.r-project.org/')" && \
  86 | >>>     Rscript -e "devtools::install_version('roxygen2', version='7.2.0', repos='https://cloud.r-project.org')" && \
  87 | >>>     Rscript -e "devtools::install_version('lintr', version='2.0.1', repos='https://cloud.r-project.org')" && \
  88 | >>>     Rscript -e "devtools::install_version('pkgdown', version='2.0.1', repos='https://cloud.r-project.org')" && \
  89 | >>>     Rscript -e "devtools::install_version('preferably', version='0.4', repos='https://cloud.r-project.org')"
  90 |
--------------------
ERROR: failed to build: failed to solve:
```

**AFTER**
```
...
 => [ 6/22] RUN add-apt-repository 'deb https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/'                                                             3.8s
 => [ 7/22] RUN Rscript -e "install.packages(c('devtools', 'knitr', 'markdown',      'rmarkdown', 'testthat', 'devtools', 'e1071', 'survival', 'arrow',       892.2s
 => [ 8/22] RUN add-apt-repository ppa:pypy/ppa                                                                                                                15.3s
...
```

After merging this PR, we can validate via the daily release dry-run CI.

- https://github.com/apache/spark/actions/workflows/release.yml

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#52290 from dongjoon-hyun/SPARK-53539.

Authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
### What changes were proposed in this pull request?

This PR aims to remove `pypy3` from `spark-rm` image.

### Why are the changes needed?

- This will reduce the `spark-rm` build time during the daily `dry-run` CI and release process.
    - https://github.com/apache/spark/actions/workflows/release.yml

- `pypy3` was added at 4.0.0 via the following and we didn't have `pypy3` before.
    - apache#46534

### Does this PR introduce _any_ user-facing change?

No. This is an update for Apache Spark release tool.

### How was this patch tested?

Manual review because this is not used technically.

After merging this PR, we can check the daily release dry-run CI too.

- https://github.com/apache/spark/actions/workflows/release.yml

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#52291 from dongjoon-hyun/SPARK-53540.

Authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
### What changes were proposed in this pull request?
Upgrade Jetty from 11.0.25 to 11.0.26.

### Why are the changes needed?
To bring some bug fixes.
https://github.com/jetty/jetty.project/releases/tag/jetty-11.0.26

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Pass the CIs.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes apache#52284 from zml1206/SPARK-53533.

Authored-by: zml1206 <zhuml1206@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
…t API in Spark Connect in Scala

### What changes were proposed in this pull request?

As titled.

### Why are the changes needed?

This allows users to use direct passthrough partitioning API in connect mode

### Does this PR introduce _any_ user-facing change?

Yes.

### How was this patch tested?

New unit tests.

### Was this patch authored or co-authored using generative AI tooling?

No

Closes apache#52242 from shujingyang-db/direct-shuffle-partition-id-connect.

Authored-by: Shujing Yang <shujing.yang@databricks.com>
Signed-off-by: Ruifeng Zheng <ruifengz@apache.org>
### What changes were proposed in this pull request?

Change `ExpandExec#doExecute` to initialize the unsafe projections before using them.

### Why are the changes needed?

The unsafe projections might contain non-deterministic expressions, and non-deterministic expressions must be initialized before used.

For example, see the test added by this PR (which is essentially the test right above it, except with whole-stage codegen turned off). In the case where the "split-updates" property is set to "true", `RewriteUpdateTable` will create an `Expand` operator with a set of projections, one of which will contain a nondeterministic expression (the assignment value). `ExpandExec` fails to initialize the derived `UnsafeProjection`s before using them, resulting in a `NullPointerException`:
```
[info]   org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 11.0 failed 1 times, most recent failure: Lost task 0.0 in stage 11.0 (TID 11) (10.0.0.101 executor driver):
java.lang.NullPointerException: Cannot invoke "java.util.Random.nextDouble()" because "<parameter1>.mutableStateArray_0[0]" is null
[info] 	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.writeFields_0_0$(Unknown Source)
[info] 	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
[info] 	at org.apache.spark.sql.execution.ExpandExec$$anon$1.next(ExpandExec.scala:75)
...
```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New test.

### Was this patch authored or co-authored using generative AI tooling?

Closes apache#52292 from bersprockets/upd_del_oddity.

Authored-by: Bruce Robbins <bersprockets@gmail.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request?

This PR computes RowBasedChecksum for ShuffleWriters, which is controlled under spark.shuffle.rowbased.checksum.enabled.

If enabled, Spark will calculate the RowBasedChecksum values for each partition and each map output and returns the values from executors to the driver. Different from the previous shuffle Checksum, RowBasedChecksum is independent of the input row order, which is used to detect whether different task attempts of the same partition produce different output data or not (key or value). In case the output data has changed across retries, Spark will need to retry all tasks of the consumer stage to avoid correctness issues.

This PR contains only the RowBasedChecksum computation. In next PR, I plan to trigger the full stage retry when we detect checksum mismatches.

### Why are the changes needed?
Problem:

Spark's resilience features can cause an RDD to be partially recomputed, e.g. when an executor is lost due to downscaling, or due to a spot instance kill. When the output of a nondeterministic task is recomputed, Spark does not always recompute everything that depends on this task's output. In some cases, some subsequent computations are based on the output of one "attempt" of the task, while other subsequent computations are based on another "attempt".

This could be problematic when the producer stage is non-deterministic. In which case, the second attempt of the same task can produce output that is very different from the first one. For example, if the stage uses a round-robin partitioning, some of the output data could be placed in different partitions in different task attempts. This could lead to incorrect results unless we retry the whole consumer stage that depends on retried non-deterministic stage. Below is an example of this.

Example:

Let’s say we have Stage 1 and Stage 2, where Stage 1 is the producer and Stage 2 is the consumer. Assume that the data produced by Task 2 were lost due to some reason while Stage 2 is executing. Further assume that at this point, Task 1 of Stage 2 has already gotten all its inputs and finishes, while Task 2 of Stage 2 fails with data fetch failures.

<img width="600" alt="example 1" src="https://github.com/user-attachments/assets/549d1d90-3a8c-43e3-a891-1a6c614e9f24" />

Task 2 of Stage 1 will be retried to reproduce the data, and after which Task 2 of Stage 2 is retried. Eventually, Task 1 and Task 2 of Stage 2 produces the result which contains all 4 tuples {t1, t2, t3, t4} as shown in the example graph.

<img width="720" alt="example 2" src="https://github.com/user-attachments/assets/bebf03d5-f05e-46b6-8f78-bfad08999867" />

Now, let’s assume that Stage 1 is non-deterministic (e.g., when using round-robin partitioning and the input data is not ordering), and Task 2 places tuple t3 for Partition 1 and tuple t4 for Partition 2 in its first attempt. It places tuple t4 for Partition 1 and tuple t3 for Partition 2 in its second attempt. When Task 2 of Stage 2 is retried, instead of reading {t2, t4} as it should, it reads {t2, t3} as its input. The result generated by Stage 2 is {t1, t2, t3, t3}, which is inaccurate.

<img width="720" alt="example 3" src="https://github.com/user-attachments/assets/730fac0f-dfc3-4392-a74f-ed3e0d11e665" />

The problem can be avoided if we retry all tasks of Stage 2. As all tasks read consistent data, we can produce result correctly, regardless of how the retried of Stage 1 Task 2 would partition the data.

<img width="720" alt="example 4" src="https://github.com/user-attachments/assets/a501a33e-97bb-4a01-954f-bc7d0f01f3e6" />

Proposal:

To avoid correctness issues produce by non-deterministic stage with partial retry, we propose an approach which first try to detect inconsistent data that might be generated by different task attempts of a non-deterministic stage. For example, whether all the data partitions generated by Task 2 in the first attempt are the same as the all the data partitions generated by the second attempt. We retry the entire consumer stages if inconsistent data is detected.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Unit tested

Benchmark test:
tpcds (10gb): the overhead of checksum computation with UnsafeRowChecksum is 0.4%.
tpcds (3tb): the overhead of checksum computation with UnsafeRowChecksum is 0.72%.

### Was this patch authored or co-authored using generative AI tooling?

No

Closes apache#50230 from JiexingLi/shuffle-checksum.

Lead-authored-by: Tengfei Huang <tengfei.h@gmail.com>
Co-authored-by: Jiexing Li <jiexing.li@databricks.com>
Co-authored-by: Wenchen Fan <cloud0fan@gmail.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
…StateInPySparkStateServer`

### What changes were proposed in this pull request?
This PR performs the following cleanup on the code related to `TransformWithStateInPySparkStateServer`:

- Removed the `private` function `sendIteratorForListState` from `TransformWithStateInPySparkStateServer`, as it is no longer used after SPARK-51891.
- Removed the function `sendIteratorAsArrowBatches` from `TransformWithStateInPySparkStateServer`, as it is no longer used after SPARK-52333.
- Removed the input parameters `timeZoneId`, `errorOnDuplicatedFieldNames`, `largeVarTypes`, and `arrowStreamWriterForTest` from the constructor of `TransformWithStateInPySparkStateServer`, as they are no longer used after the cleanup of `sendIteratorAsArrowBatches`.
- Removed the input parameter `timeZoneId` from the constructor of `TransformWithStateInPySparkPythonPreInitRunner`, as it was only used for constructing `TransformWithStateInPySparkStateServer`.

### Why are the changes needed?
Code cleanup.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Pass Github Actions

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#52279 from LuciferYang/TransformWithStateInPySparkStateServer.

Lead-authored-by: yangjie01 <yangjie01@baidu.com>
Co-authored-by: YangJie <yangjie01@baidu.com>
Signed-off-by: yangjie01 <yangjie01@baidu.com>
### What changes were proposed in this pull request?

This PR aims to increase the K8s test version to 1.34

### Why are the changes needed?

To improve the test coverage because K8s 1.34.0 was released on 2025-08-27.
- https://kubernetes.io/blog/2025/08/27/kubernetes-v1-34-release/
- https://kubernetes.io/releases/#release-v1-34

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Pass the CIs and check the CI log.

```
...
* Downloading Kubernetes v1.34.0 preload ...
...
System Info:
...
  Kubelet Version:            v1.34.0
```

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#52293 from dongjoon-hyun/SPARK-53541.

Authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: yangjie01 <yangjie01@baidu.com>
### What changes were proposed in this pull request?

When `HadoopRDD.getInputFormat` fails to initialize the InputFormat, the thrown error message is not clear. This PR enhances the exception message to include the class name of InputFormat.

A typical case that triggers this issue is reading an Iceberg table without a properly configured Iceberg catalog.

https://stackoverflow.com/questions/72620351/getting-error-when-querying-iceberg-table-via-spark-thrift-server-using-beeline

### Why are the changes needed?

Improve the error message.

### Does this PR introduce _any_ user-facing change?

Yes, the user would know more info from the error message.

### How was this patch tested?

Before
```
spark-sql (default)> select * from i;
2025-09-09 05:52:13 ERROR SparkSQLDriver: Failed in [select * from i]
java.lang.RuntimeException: java.lang.InstantiationException
	at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:157)
	at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:126)
	at org.apache.spark.rdd.HadoopRDD.getInputFormat(HadoopRDD.scala:219)
	at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:233)
	at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:301)
	...
Caused by: java.lang.InstantiationException
	at java.base/jdk.internal.reflect.InstantiationExceptionConstructorAccessorImpl.newInstance(InstantiationExceptionConstructorAccessorImpl.java:48)
	at java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:500)
	at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:481)
	at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:155)
	... 67 more
```

After
```
spark-sql (default)> select * from i;
2025-09-09 06:23:48 ERROR SparkSQLDriver: Failed in [select * from i]
java.lang.RuntimeException: Failed to instantiate org.apache.hadoop.mapred.FileInputFormat
	at org.apache.spark.rdd.HadoopRDD.getInputFormat(HadoopRDD.scala:223)
	at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:231)
	at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:301)
	...
Caused by: java.lang.InstantiationException
	at java.base/jdk.internal.reflect.InstantiationExceptionConstructorAccessorImpl.newInstance(InstantiationExceptionConstructorAccessorImpl.java:48)
	at java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:500)
	at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:481)
	at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:155)
	at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:126)
	at org.apache.spark.rdd.HadoopRDD.getInputFormat(HadoopRDD.scala:218)
	... 65 more
```

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#52282 from pan3793/SPARK-53531.

Authored-by: Cheng Pan <chengpan@apache.org>
Signed-off-by: yangjie01 <yangjie01@baidu.com>
…es attribute for compatibility with PyArrow

### What changes were proposed in this pull request?

This is a follow up for SPARK-53029. The `.names` attribute for PyArrow's StructType was added in PyArrow version 18.0.0, but the minimum required version for pyarrow now is 15.0.0.

### Why are the changes needed?

To fix compatibility issue.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

UT

### Was this patch authored or co-authored using generative AI tooling?

No

Closes apache#52289 from allisonwang-db/SPARK-53426-fix.

Lead-authored-by: Allison Wang <allison.wang@databricks.com>
Co-authored-by: Allison Wang <allisonwang@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
…ries'

### What changes were proposed in this pull request?

Rename the pipeline spec field from "definitions" to "libraries".
This field allows user to include pipeline source code files.

```diff
  name: libraries-test
- definitions:
+ libraries:
    - glob:
        include: transformations/**/*.py
    - glob:
        include: transformations/**/*.sql

```

### Why are the changes needed?

Open up the possibility to add other types of dependencies for a pipeline execution, such as python wheels. `libraries` is a more general term.

### Does this PR introduce _any_ user-facing change?

Yes, but SDP not released.

### How was this patch tested?

Existing tests

### Was this patch authored or co-authored using generative AI tooling?

No

Closes apache#52294 from JiaqiWang18/rename-spec-field-libraries.

Authored-by: Jacky Wang <jacky.wang@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…top of `Aggregate`

### What changes were proposed in this pull request?
Add more test coverage for `Window` on top of `Aggregate`. Fixed-point currently doesn't know how to resolve `Sort` on top of `Window` on top of `Aggregate` in some cases. This will be addressed by single-pass analyzer only.

### Why are the changes needed?
To improve test coverage.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Added golden file tests

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#52296 from mihailotim-db/mihailo-timotic_data/window_aggregates_tests.

Authored-by: Mihailo Timotic <mihailo.timotic@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…ly snapshotStartBatchId option)

### What changes were proposed in this pull request?

This PR enables StateDataSource support with state checkpoint v2 format for the `snapshotStartBatchId` and related options, completing the StateDataSource checkpoint v2 integration.

There is changes to the replayStateFromSnapshot method signature. `snapshotVersionStateStoreCkptId` and `endVersionStateStoreCkptId`. Both are needed as `snapshotVersionStateStoreCkptId` is used when getting the snapshot and `endVersionStateStoreCkptId` for calculating the full lineage from the final version.

Before
```
def replayStateFromSnapshot(
      snapshotVersion: Long, endVersion: Long, readOnly: Boolean = false): StateStore
```

After
```
def replayStateFromSnapshot(
      snapshotVersion: Long, endVersion: Long, readOnly: Boolean = false): StateStore
      snapshotVersion: Long,
      endVersion: Long,
      readOnly: Boolean = false,
      snapshotVersionStateStoreCkptId: Option[String] = None,
      endVersionStateStoreCkptId: Option[String] = None): StateStore
```

This is the final PR in the series following:
  - apache#52047: Enable StateDataSource with state checkpoint v2 (only batchId option)
  - apache#52148: Enable StateDataSource with state checkpoint v2 (only readChangeFeed)

NOTE: To read checkpoint v2 state data sources it is required to have `"spark.sql.streaming.stateStore.checkpointFormatVersion" -> 2`. It is possible to allow reading state data sources arbitrarily based on what is in the CommitLog by relaxing assertion checks but this is left as a future change.

### Why are the changes needed?

State checkpoint v2 (`"spark.sql.streaming.stateStore.checkpointFormatVersion"`) introduces a new format for storing state metadata that includes unique identifiers in the file path for each state store. The existing StateDataSource implementation only worked with checkpoint v1 format, making it incompatible with streaming queries using the newer checkpoint format. Only `batchId` was implemented in apache#52047 and only `readChangeFeed` was implemented in apache#52148.

### Does this PR introduce _any_ user-facing change?

Yes.

State Data Source will work when checkpoint v2 is used and the `snapshotStartBatchId` and related options are used.

### How was this patch tested?

In the previous PRs test suites were added to parameterize the current tests with checkpoint v2. All of these tests are now added back. All tests that previously intentionally tested some feature of the State Data Source Reader with checkpoint v1 should now be parameterized with checkpoint v2 (including python tests).

`RocksDBWithCheckpointV2StateDataSourceReaderSnapshotSuite` is added which uses the golden file approach similar to apache#46944 where `snapshotStartBatchId` is first added.

### Was this patch authored or co-authored using generative AI tooling?

No

Closes apache#52202 from dylanwong250/SPARK-53332.

Authored-by: Dylan Wong <dylan.wong@databricks.com>
Signed-off-by: Anish Shrigondekar <anish.shrigondekar@databricks.com>
### What changes were proposed in this pull request?

This PR fixes a critical issue in the protobuf conversion of observed metrics in Spark Connect, specifically when dealing with complex data types like structs, arrays, and maps. The main changes include:
1. **Modified Observation class to store Row objects instead of Map[String, Any]**: Changed the internal promise type from `Promise[Map[String, Any]]` to `Promise[Row]` to preserve type information during protobuf serialization/deserialization.
2. **Enhanced protobuf conversion for complex types**:
   - Added proper handling for struct types by creating `GenericRowWithSchema` objects instead of tuples
   - Added support for map type conversion in `LiteralValueProtoConverter`
   - Improved data type inference with a new `getDataType` method that properly handles all literal types
3. **Fixed observed metrics**: Modified the observed metrics processing to include data type information in the protobuf conversion, ensuring that complex types are properly serialized and deserialized.

### Why are the changes needed?

The previous implementation had several issues:
1. **Data type loss**: Observed metrics were losing their original data types during Protobuf conversion, causing errors
2. **Struct handling problems**: The conversion logic didn't properly handle Row objects and struct types

### Does this PR introduce _any_ user-facing change?

Yes, this PR fixes a bug that was preventing users from successfully using observed metrics with complex data types (structs, arrays, maps) in Spark Connect. Users can now:
- Use `struct()` expressions in observed metrics and receive properly typed `GenericRowWithSchema` objects
- Use `array()` expressions in observed metrics and receive properly typed arrays
- Use `map()` expressions in observed metrics and receive properly typed maps

Previously, the code below would fail.
```scala
val observation = Observation("struct")
spark
  .range(10)
  .observe(observation, struct(count(lit(1)).as("rows"), max("id").as("maxid")).as("struct"))
  .collect()
observation.get
// Below is the error message:
"""
org.apache.spark.SparkUnsupportedOperationException: literal [10,9] not supported (yet).
org.apache.spark.sql.connect.common.LiteralValueProtoConverter$.toLiteralProtoBuilder(LiteralValueProtoConverter.scala:104)
org.apache.spark.sql.connect.common.LiteralValueProtoConverter$.toLiteralProto(LiteralValueProtoConverter.scala:203)
org.apache.spark.sql.connect.execution.SparkConnectPlanExecution$.$anonfun$createObservedMetricsResponse$2(SparkConnectPlanExecution.scala:571)
org.apache.spark.sql.connect.execution.SparkConnectPlanExecution$.$anonfun$createObservedMetricsResponse$2$adapted(SparkConnectPlanExecution.scala:570)
"""
```

### How was this patch tested?

`build/sbt "connect-client-jvm/testOnly *ClientE2ETestSuite -- -z SPARK-53490"`
`build/sbt "connect/testOnly *LiteralExpressionProtoConverterSuite"`

### Was this patch authored or co-authored using generative AI tooling?

Generated-by: Cursor 1.5.9

Closes apache#52236 from heyihong/SPARK-53490.

Authored-by: Yihong He <heyihong.cn@gmail.com>
Signed-off-by: Ruifeng Zheng <ruifengz@apache.org>
### What changes were proposed in this pull request?
Skip some tests if numpy not installed

### Why are the changes needed?
these tests depends on numpy

### Does this PR introduce _any_ user-facing change?
no, test-only

### How was this patch tested?
ci

### Was this patch authored or co-authored using generative AI tooling?
no

Closes apache#52300 from zhengruifeng/test_skip_numpy.

Authored-by: Ruifeng Zheng <ruifengz@apache.org>
Signed-off-by: Ruifeng Zheng <ruifengz@apache.org>
### What changes were proposed in this pull request?

This PR makes SupportsPushDownCatalystFilters a trait that extends ScanBuilder, and adds it to the pattern matching of PushDownUtils.

### Why are the changes needed?

Currently, the PushDownUtils has following matching cases:

 ```
object PushDownUtils {
  def pushFilters(scanBuilder: ScanBuilder, filters: Seq[Expression])
    : (Either[Seq[sources.Filter], Seq[Predicate]], Seq[Expression]) = {
    scanBuilder match {
      case r: SupportsPushDownFilters => ...  // public interface extends ScanBuilder
      case r: SupportsPushDownV2Filters => ... // public interface extends ScanBuilder
      case f: FileScanBuilder => ... // which extends with SupportsPushDownCatalystFilters
      case _ => (Left(Nil), filters)
    }
  }
}
```

As a result, when a new scanBuilder wants to SupportsPushDownCatalystFilters (but doesn't want to extend FileScanBuilder), it will not be picked up by PushDownUtils. To better unify these filter pushdown interfaces, it would be better if the matching cases are like following:

```
scanBuilder match {
  case r: SupportsPushDownFilters => ... // public interface extends ScanBuilder
  case r: SupportsPushDownV2Filters => ... // public interface extends ScanBuilder
  case r: SupportsPushDownCatalystFilters => ... // trait extends ScanBuilder
  case _ => (Left(Nil), filters)
}
```

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Existing tests.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#52257 from yhuang-db/support_pushdown_catalyst_filter.

Authored-by: yhuang-db <itisyuchuan@gmail.com>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
dongjoon-hyun and others added 29 commits October 9, 2025 19:14
…driver.[host|port] from checkpoint'` test flakiness

### What changes were proposed in this pull request?

This PR aims to fix `CheckpointSuite` flakiness.

### Why are the changes needed?

Currently, master branch is flaky.
- https://github.com/apache/spark/actions/runs/18373591172/job/52342350296

```
[info] CheckpointSuite:
[info] - non-existent checkpoint dir (1 millisecond)
[info] - basic rdd checkpoints + dstream graph checkpoint recovery (6 seconds, 787 milliseconds)
[info] - recovery of conf through checkpoints (114 milliseconds)
[info] - get correct spark.driver.[host|port] from checkpoint *** FAILED *** (76 milliseconds)
[info]   "[10000]" did not equal "[9999]" (CheckpointSuite.scala:403)
[info]   Analysis:
[info]   "[10000]" -> "[9999]"
[info]   org.scalatest.exceptions.TestFailedException:
[info]   at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
[info]   at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
[info]   at org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231)
[info]   at org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295)
[info]   at org.apache.spark.streaming.CheckpointSuite.$anonfun$new$20(CheckpointSuite.scala:403)
```

### Does this PR introduce _any_ user-facing change?

No because this is a test case change.

### How was this patch tested?

Pass the CIs.

I also manually repeat multiple times locally.
```
$ build/sbt "streaming/testOnly *.CheckpointSuite -- -z correct"
[info] CheckpointSuite:
[info] - get correct spark.driver.[host|port] from checkpoint (617 milliseconds)
[info] Run completed in 1 second, 262 milliseconds.
[info] Total number of tests run: 1
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 1, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.
[success] Total time: 20 s, completed Oct 9, 2025, 7:06:29 PM
```

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#52565 from dongjoon-hyun/SPARK-53862.

Authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
### What changes were proposed in this pull request?

This PR aims to upgrade `sbt-jupiter-interfac` to `0.17.0`.

### Why are the changes needed?

To bring the latest improvements including JUnit 6 support.

- https://github.com/sbt/sbt-jupiter-interface/releases/tag/v0.17.0 (2025-10-09)
  - sbt/sbt-jupiter-interface#194

- https://github.com/sbt/sbt-jupiter-interface/releases/tag/v0.16.0 (2025-10-08)
  - sbt/sbt-jupiter-interface#186

### Does this PR introduce _any_ user-facing change?

No behavior change because this is a test dependency.

### How was this patch tested?

Pass the CIs.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#52562 from dongjoon-hyun/SPARK-53860.

Authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
…erBatch`

### What changes were proposed in this pull request?
Add more tests for `maxBytesPerBatch`

### Why are the changes needed?
to make sure `maxBytesPerBatch` works

### Does this PR introduce _any_ user-facing change?
no, test-only

### How was this patch tested?
ci

### Was this patch authored or co-authored using generative AI tooling?
no

Closes apache#52567 from zhengruifeng/test_bytes.

Authored-by: Ruifeng Zheng <ruifengz@apache.org>
Signed-off-by: Ruifeng Zheng <ruifengz@apache.org>
### What changes were proposed in this pull request?
Retry `test_observe_with_map_type`

### Why are the changes needed?
it occasionally fails https://github.com/apache/spark/actions/runs/18387627760/job/52390087983

### Does this PR introduce _any_ user-facing change?
no, test-only

### How was this patch tested?
ci

### Was this patch authored or co-authored using generative AI tooling?
no

Closes apache#52566 from zhengruifeng/retry_test_observe_with_map_type.

Authored-by: Ruifeng Zheng <ruifengz@apache.org>
Signed-off-by: Ruifeng Zheng <ruifengz@apache.org>
### What changes were proposed in this pull request?

This PR aims to upgrade `JUnit` to 6.0.0 for Apache Spark 4.1.0.

### Why are the changes needed?

JUnit 6 is released finally.
- https://docs.junit.org/6.0.0/release-notes/ (2025-09-30)
  - Minimum required Java version is 17.
  - Single version number for Platform, Jupiter, and Vintage

### Does this PR introduce _any_ user-facing change?

No behavior change because this is a test dependency.

### How was this patch tested?

Pass the CIs.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#52561 from dongjoon-hyun/SPARK-53859.

Authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
…builtin` if pyarrow is not installed

### What changes were proposed in this pull request?
Skip `udf` doctests without pyarrow

### Why are the changes needed?
to make python 3.14 scheduled workflow work

### Does this PR introduce _any_ user-facing change?
no

### How was this patch tested?
manually check without pyarrow

```
(spark_dev_313) ➜  spark git:(py_314_udf) pip uninstall pyarrow
Found existing installation: pyarrow 21.0.0
Uninstalling pyarrow-21.0.0:
  Would remove:
    /Users/ruifeng.zheng/.dev/miniconda3/envs/spark_dev_313/lib/python3.13/site-packages/pyarrow-21.0.0.dist-info/*
    /Users/ruifeng.zheng/.dev/miniconda3/envs/spark_dev_313/lib/python3.13/site-packages/pyarrow/*
Proceed (Y/n)? y
  Successfully uninstalled pyarrow-21.0.0

(spark_dev_313) ➜  spark git:(py_314_udf) ✗ python/run-tests -k --testnames 'pyspark.sql.functions.builtin'
Running PySpark tests. Output is in /Users/ruifeng.zheng/spark/python/unit-tests.log
Will test against the following Python executables: ['/Users/ruifeng.zheng/.dev/miniconda3/envs/spark_dev_313/bin/python3']
Will test the following Python tests: ['pyspark.sql.functions.builtin']
/Users/ruifeng.zheng/.dev/miniconda3/envs/spark_dev_313/bin/python3 python_implementation is CPython
/Users/ruifeng.zheng/.dev/miniconda3/envs/spark_dev_313/bin/python3 version is: Python 3.13.5
Starting test(/Users/ruifeng.zheng/.dev/miniconda3/envs/spark_dev_313/bin/python3): pyspark.sql.functions.builtin (temp output: /Users/ruifeng.zheng/spark/python/target/cff4b76c-ff9c-4226-89dd-e1eabe4ebbad/Users_ruifeng.zheng_.dev_miniconda3_envs_spark_dev_313_bin_python3__pyspark.sql.functions.builtin__unaf2g6y.log)
Finished test(/Users/ruifeng.zheng/.dev/miniconda3/envs/spark_dev_313/bin/python3): pyspark.sql.functions.builtin (64s)
Tests passed in 64 seconds
```

### Was this patch authored or co-authored using generative AI tooling?
no

Closes apache#52569 from zhengruifeng/py_314_udf.

Authored-by: Ruifeng Zheng <ruifengz@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
…section of `README.md`

### What changes were proposed in this pull request?

This PR aims to add `build_python_3.14.yml` to `Build Pipeline Status` section of `README.md`.

### Why are the changes needed?

To show the status of new `Python 3.14` GitHub Action job.

### Does this PR introduce _any_ user-facing change?

No behavior change.

### How was this patch tested?

Manual review.

- https://github.com/dongjoon-hyun/spark/tree/minor

<img width="491" height="118" alt="Screenshot 2025-10-09 at 21 39 54" src="https://github.com/user-attachments/assets/5de63e9f-9fb9-49b4-bb45-ee14780bb52e" />

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#52568 from dongjoon-hyun/minor.

Authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
…-sql` and `pyspark-connect`

### What changes were proposed in this pull request?
Factor out streaming tests from `pyspark-sql` and `pyspark-connect`

### Why are the changes needed?
`pyspark-sql` and `pyspark-connect` are still prone to timeout, even after we increase `timeout-minutes` to `150`, see https://github.com/apache/spark/actions/runs/18389137953/workflow

<img width="1080" height="106" alt="image" src="https://github.com/user-attachments/assets/1a4ecabe-2a4c-4306-af71-fbaf4ee27b42" />

the streaming tests are large, move them to dedicated testing modules to speed up ci:
```
Starting test(python3.11): pyspark.sql.tests.pandas.test_pandas_transform_with_state (temp output: /Users/runner/work/spark/spark/python/target/92efa305-098c-4839-8bb4-d13c9b60a405/python3.11__pyspark.sql.tests.pandas.test_pandas_transform_with_state__o7ragjh2.log)
Finished test(python3.11): pyspark.sql.tests.pandas.test_pandas_transform_with_state (1509s) ... 2 tests were skipped
Starting test(python3.11): pyspark.sql.tests.pandas.test_pandas_transform_with_state_checkpoint_v2 (temp output: /Users/runner/work/spark/spark/python/target/929d4ad3-5518-4011-85a9-b14355974ead/python3.11__pyspark.sql.tests.pandas.test_pandas_transform_with_state_checkpoint_v2__m41avtp4.log)
Finished test(python3.11): pyspark.sql.tests.pandas.test_pandas_transform_with_state_checkpoint_v2 (1537s) ... 2 tests were skipped
```

### Does this PR introduce _any_ user-facing change?
no, infra-only

### How was this patch tested?

after this change, in PR builder
<img width="1157" height="128" alt="image" src="https://github.com/user-attachments/assets/02b78417-7dd5-4cde-b835-ec53624a9307" />

<img width="536" height="99" alt="image" src="https://github.com/user-attachments/assets/643269ff-2ddb-4b4c-aaf6-f930681114e1" />

<img width="993" height="104" alt="image" src="https://github.com/user-attachments/assets/bdf9c3d2-2ed5-4952-8fba-675742b6b37d" />

### Was this patch authored or co-authored using generative AI tooling?
no

Closes apache#52564 from zhengruifeng/infra_ss_module.

Authored-by: Ruifeng Zheng <ruifengz@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
### What changes were proposed in this pull request?

This PR aims to remove `blacklist` alternative config names at Apache Spark 4.1.0 while keeping the revised deprecation warning messages.

### Why are the changes needed?

At Apache Spark 2.1.0, `blacklist` configurations were added.
- apache#15249

At Apache Spark 3.1.0, we renamed the blacklist features roughly from `*.blacklist.*` to `*.excludeOnFailure.*`.

- apache#29906

At Apache Spark 4.1.0, this PR aims to complete the removal process at Apache Spark 4.x. Note that we didn't remove these at Spark 4 in order to reduce the hurdle from 3.x to 4.x in this configuration name layer.

### Does this PR introduce _any_ user-facing change?

Yes if someone still uses `*.blacklist.*` configurations. However, Apache Spark deprecated and has been warning these configurations since 3.1.0 (March 2, 2021). It's highly unlikely.

### How was this patch tested?

Manual review.

**BEFORE**

```
$ git grep withAlternative | grep -i blacklist | wc -l
      13
```

**AFTER**

```
$ git grep withAlternative | grep -i blacklist | wc -l
       0
```

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#52558 from dongjoon-hyun/SPARK-53856.

Authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Kent Yao <yao@apache.org>
### What changes were proposed in this pull request?

Add `transform()` API in Columns API, similar to `Dataset.transform()`:
```
def transform(f: Column => Column): Column
```

### Why are the changes needed?

We want to give users a way to chain their methods, such as
```
df.select($"fruit"
  .transform(addPrefix)
  .transform(uppercase)
)
```
This pattern is also easier for AI agents to learn and write.

### Does this PR introduce _any_ user-facing change?

Yes. New API is introduced.

### How was this patch tested?

Unit tests.

### Was this patch authored or co-authored using generative AI tooling?

Tests generated by Copilot.

Closes apache#52537 from Yicong-Huang/feat/transform-in-column-api.

Lead-authored-by: Yicong-Huang <17627829+Yicong-Huang@users.noreply.github.com>
Co-authored-by: Yicong Huang <17627829+Yicong-Huang@users.noreply.github.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…conflict with real column

### What changes were proposed in this pull request?

Fix the analysis of default value expression to not include column names

### Why are the changes needed?
The following query:

```CREATE TABLE t (current_timestamp DEFAULT current_timestamp)```

fails with an exception:
```
[INVALID_DEFAULT_VALUE.NOT_CONSTANT] Failed to execute CREATE TABLE command because the destination column or variable `current_timestamp` has a DEFAULT value CURRENT_TIMESTAMP, which is not a constant expression whose equivalent value is known at query planning time. SQLSTATE: 42623;

```
This is introduced in : apache#50631, there CreateTable child's ResolvedIdentifier starts to have output, which are the CREATE TABLE columns.  Thus the analyzer will resolve the default value against the other columns, causing the regression.  Previously the CreateTable output is empty, so the resolver will fail to resolve against the columns and fallback to literal functions.

### Does this PR introduce _any_ user-facing change?
Should fix a regression of Spark 4.0.

### How was this patch tested?
Add new unit test in DataSourceV2DataFrameSuite

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#52504 from szehon-ho/default_value_conflict.

Authored-by: Szehon Ho <szehon.apache@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
Restore pyspark execution timeout to 2 hours

### Why are the changes needed?
after apache#52564, 2 hours should be enough for pyspark execution

### Does this PR introduce _any_ user-facing change?
No, infra-only

### How was this patch tested?
ci

### Was this patch authored or co-authored using generative AI tooling?
no

Closes apache#52570 from zhengruifeng/restore_120.

Authored-by: Ruifeng Zheng <ruifengz@apache.org>
Signed-off-by: Ruifeng Zheng <ruifengz@apache.org>
…ns` without pyarrow/pandas

### What changes were proposed in this pull request?
Skip doctest `pyspark.sql.pandas.functions` without pyarrow/pandas

### Why are the changes needed?
to resolve failure in https://github.com/apache/spark/actions/runs/18397357525/job/52419309837

### Does this PR introduce _any_ user-facing change?
no, test-only

### How was this patch tested?
manually check in my local

### Was this patch authored or co-authored using generative AI tooling?
no

Closes apache#52572 from zhengruifeng/314_pandas_udf.

Authored-by: Ruifeng Zheng <ruifengz@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
### What changes were proposed in this pull request?

This PR aims to upgrade `commons-lang3` from 3.18.0 to 3.19.0.

### Why are the changes needed?

The new version brings some bug fixes, such as:

- https://issues.apache.org/jira/browse/LANG-1778
- https://issues.apache.org/jira/browse/LANG-1782
- https://issues.apache.org/jira/browse/LANG-1597

The full release notes as follows:

- https://commons.apache.org/proper/commons-lang/changes.html#a3.19.0

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Pass the CIs.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#52485 from LuciferYang/commons-lang3-3.19.

Lead-authored-by: yangjie01 <yangjie01@baidu.com>
Co-authored-by: YangJie <yangjie01@baidu.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
### What changes were proposed in this pull request?

Small refactoring, extracting `makeGeneratorOutput` to separate the object.

### Why are the changes needed?

In the future, it will be reused in the single-pass analyzer. It's a much clearer approach than having a direct dependency on the legacy rule in the new analyzer.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Existing tests

### Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Code v2.0.13

Closes apache#52571 from mikhailnik-db/refactor-resolve-generate.

Authored-by: Mikhail Nikoliukin <mikhail.nikoliukin@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…ase for composite/nested output schemas

### What changes were proposed in this pull request?

Add a new test case for python transform_with_state APIs to ensure that output schemas containing nested structs are properly handled.

The output schema tested is reflected in the test code and copied below:
```

        # Define the output schema with inner nested class schema
        inner_nested_class_schema = StructType(
            [
                StructField("intValue", IntegerType(), True),
                StructField("doubleValue", DoubleType(), True),
                StructField("arrayValue", ArrayType(StringType()), True),
                StructField("mapValue", MapType(StringType(), StringType()), True),
            ]
        )

        output_schema = StructType(
            [
                StructField("primitiveValue", StringType(), True),
                StructField("listOfPrimitive", ArrayType(StringType()), True),
                StructField("mapOfPrimitive", MapType(StringType(), StringType()), True),
                StructField("listOfComposite", ArrayType(inner_nested_class_schema), True),
                StructField(
                    "mapOfComposite", MapType(StringType(), inner_nested_class_schema), True
                ),
            ]
        )
```
### Why are the changes needed?

Missing test coverage: the existing test case for composite types `test_transform_with_state_in_pandas_composite_type` does check the state value schemas, but actual output is only handled as StringTypes.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

```
build/sbt -Phive -Phive-thriftserver -DskipTests package
python/run-tests --testnames 'pyspark.sql.tests.pandas.test_pandas_transform_with_state TransformWithStateInPandasTests'
python/run-tests --testnames 'pyspark.sql.tests.pandas.test_pandas_transform_with_state TransformWithStateInPySparkTests'
```

### Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Code (claude-sonnet-4-5-20250929)

Closes apache#52536 from jiateoh/tws_python_composite_output.

Lead-authored-by: Jia Teoh <jiateoh@gmail.com>
Co-authored-by: Jason Teoh <jason.teoh@databricks.com>
Signed-off-by: Anish Shrigondekar <anish.shrigondekar@databricks.com>
### What changes were proposed in this pull request?
Push Variant into DSv2 scan

### Why are the changes needed?
with the change, DSV2 scan only needs to fetch the necessary shredded columns required by the plan

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
new tests

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#52522 from huaxingao/variant-v2-pushdown.

Authored-by: Huaxin Gao <huaxin.gao11@gmail.com>
Signed-off-by: Huaxin Gao <huaxin.gao11@gmail.com>
…t in Kafka sources object in progress json

### What changes were proposed in this pull request?
This PR fixes an issue where `avgOffsetsBehindLatest` metric of Kafka sources object from streaming progress metrics JSON were displayed in scientific notation (e.g., 2.70941269E8). The fix uses safe Decimal casting to ensure values are displayed in a more human-readable format.

Before change:
```
{
  "id" : "19815c70-c0f1-4e89-8360-2ef444a35b76",
  "runId" : "6c243067-fff6-47ef-99d8-41db0e278949",
  "name" : "KafkaMetricsTest",
  "timestamp" : "2025-09-23T06:00:00.000Z",
  "batchId" : 1250,
  "batchDuration" : 111255,
  "numInputRows" : 800000,
  "inputRowsPerSecond" : 75291.2831516931
  "processedRowsPerSecond" : 71906.88058963642,
  "durationMs" : {
    "addBatch" : 110481,
    "commitBatch" : 410,
    "commitOffsets" : 107,
    "getBatch" : 0,
    "latestOffset" : 2,
    "queryPlanning" : 179,
    "triggerExecution" : 111255,
    "walCommit" : 74
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KafkaV2[Subscribe[bigdata_omsTrade_cdc]]",
    "startOffset" : {
      "bigdata_omsTrade_cdc" : {
        "0" : 18424809459
      }
    },
    "endOffset" : {
      "bigdata_omsTrade_cdc" : {
        "0" : 18432809459
      }
    },
    "latestOffset" : {
      "bigdata_omsTrade_cdc" : {
        "0" : 18703750728
      }
    },
    "numInputRows" : 800000,
    "inputRowsPerSecond" : 75291.2831516931,
    "processedRowsPerSecond" : 71906.88058963642,
    "metrics" : {
      "avgOffsetsBehindLatest" : "2.70941269E8",
      "maxOffsetsBehindLatest" : "270941269",
      "minOffsetsBehindLatest" : "270941269"
    }
  } ],
  "sink" : {
    "description" : "DeltaSink[s3://<masked-storage/__unitystorage/schemas/75bc9f38-9af3-4af9-852b-7077489f93d3/tables/c963dc05-f685-4fe8-a744-500cb40ce28a]",
    "numOutputRows" : -1
  }
}
```
After change:
```
{
  "id" : "19815c70-c0f1-4e89-8360-2ef444a35b76",
  "runId" : "6c243067-fff6-47ef-99d8-41db0e278949",
  "name" : "KafkaMetricsTest",
  "timestamp" : "2025-09-23T06:00:00.000Z",
  "batchId" : 1250,
  "batchDuration" : 111255,
  "numInputRows" : 800000,
  "inputRowsPerSecond" : 75291.3,
  "processedRowsPerSecond" : 71906.9,
  "durationMs" : {
    "addBatch" : 110481,
    "commitBatch" : 410,
    "commitOffsets" : 107,
    "getBatch" : 0,
    "latestOffset" : 2,
    "queryPlanning" : 179,
    "triggerExecution" : 111255,
    "walCommit" : 74
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KafkaV2[Subscribe[bigdata_omsTrade_cdc]]",
    "startOffset" : {
      "bigdata_omsTrade_cdc" : {
        "0" : 18424809459
      }
    },
    "endOffset" : {
      "bigdata_omsTrade_cdc" : {
        "0" : 18432809459
      }
    },
    "latestOffset" : {
      "bigdata_omsTrade_cdc" : {
        "0" : 18703750728
      }
    },
    "numInputRows" : 800000,
    "inputRowsPerSecond" : 75291.3,
    "processedRowsPerSecond" : 71906.9,
    "metrics" : {
      "avgOffsetsBehindLatest" : "270941269.0",
      "maxOffsetsBehindLatest" : "270941269",
      "minOffsetsBehindLatest" : "270941269"
    }
  } ],
  "sink" : {
    "description" : "DeltaSink[s3://<masked-storage/__unitystorage/schemas/75bc9f38-9af3-4af9-852b-7077489f93d3/tables/c963dc05-f685-4fe8-a744-500cb40ce28a]",
    "numOutputRows" : -1
  }
}
```

### Why are the changes needed?
Current formatting is not user-friendly. A user can easily interpret `2.70941269E8` as `2.7` instead of `270,941,269`, as E can be missed to be spotted. This fix will improve the readability of Spark Structured Streaming progress metrics JSON.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Run this Maven test:
```
./build/mvn -pl sql/core,sql/api \
-am test \
-DwildcardSuites=org.apache.spark.sql.streaming.StreamingQueryStatusAndProgressSuite \
-DwildcardTestName="SPARK-53690"
```
Results:
```
Run completed in 8 seconds, 917 milliseconds.
Total number of tests run: 13
Suites: completed 2, aborted 0
Tests: succeeded 13, failed 0, canceled 0, ignored 0, pending 0
All tests passed.
[INFO] ------------------------------------------------------------------------
[INFO] Reactor Summary for Spark Project Parent POM 4.1.0-SNAPSHOT:
[INFO]
[INFO] Spark Project Parent POM ........................... SUCCESS [  1.134 s]
[INFO] Spark Project Tags ................................. SUCCESS [  1.429 s]
[INFO] Spark Project Sketch ............................... SUCCESS [  1.427 s]
[INFO] Spark Project Common Java Utils .................... SUCCESS [  1.689 s]
[INFO] Spark Project Common Utils ......................... SUCCESS [  3.001 s]
[INFO] Spark Project Local DB ............................. SUCCESS [  4.117 s]
[INFO] Spark Project Networking ........................... SUCCESS [ 52.545 s]
[INFO] Spark Project Shuffle Streaming Service ............ SUCCESS [  5.946 s]
[INFO] Spark Project Variant .............................. SUCCESS [  0.956 s]
[INFO] Spark Project Unsafe ............................... SUCCESS [  3.161 s]
[INFO] Spark Project Connect Shims ........................ SUCCESS [  0.750 s]
[INFO] Spark Project Launcher ............................. SUCCESS [  3.660 s]
[INFO] Spark Project Core ................................. SUCCESS [ 27.959 s]
[INFO] Spark Project SQL API .............................. SUCCESS [  1.985 s]
[INFO] Spark Project Catalyst ............................. SUCCESS [  6.554 s]
[INFO] Spark Project SQL .................................. SUCCESS [ 42.743 s]
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  02:39 min
[INFO] Finished at: 2025-10-06T10:11:39+05:30
[INFO] ------------------------------------------------------------------------
```
### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#52445 from jayantdb/SPARK-53690-fix-formatting.

Authored-by: jayant.sharma <jayant.sharma@databricks.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
…support forward compatibility

### What changes were proposed in this pull request?

Adding `google.protobuf.Any extension = 999;` field to `PipelineCommand` and `SourceCodeLocation` Protos to support forward-compatibility by carrying additional pipeline command types, source code location fields that are not yet defined in this version of the proto.

During the planning stage, the Spark Server will resolve and dispatch command / message to the correct handler.

### Why are the changes needed?

To support forward-compatibility by carrying additional pipeline command types / dataset or flow's fields that are not yet defined in this version of the proto. Useful for platforms that want to extend pipeline commands with platform-specific capabilities.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Test will be added for feature works using this new field

### Was this patch authored or co-authored using generative AI tooling?

No

Closes apache#52514 from SCHJonathan/jonathan-chang_data/add-extension.

Authored-by: Yuheng Chang <jonathanyuheng@gmail.com>
Signed-off-by: Sandy Ryza <sandy.ryza@databricks.com>
…egateFunction` in V2ExpressionSQBuilder

### What changes were proposed in this pull request?

apache#50143 This PR introduced `visitAggregateFunction` with `inputs: Array[Expression]`, but it's not the only usage of `visitAggregateFunction` for example here
https://github.com/apache/spark/blob/6eb4d3c9d38f6849b0acfcffdbadce03c8f49ac6/sql/catalyst/src/main/java/org/apache/spark/sql/connector/util/V2ExpressionSQLBuilder.java#L134
the old API is still used but it no longer checked if the function is supported.

This means that if some dialect did not support one of `(MIN, MAX, COUNT, SUM, AVG)` it would not be blocked, but as of now all the dialects have them in the `supportedFunctions` so this is not a behavioral change.

### Why are the changes needed?

To unify the API in case in the future if some dialect does not support an aggregate function of `(MIN, MAX, COUNT, SUM, AVG)` it should be blocked.
### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Existing tests.
### Was this patch authored or co-authored using generative AI tooling?

No

Closes apache#52573 from alekjarmov/block-aggregates.

Lead-authored-by: alekjarmov <alek.jarmov@databricks.com>
Co-authored-by: Alek Jarmov <alekjarmov@Aleks-MacBook-Pro.local>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?

Add a new in memory sink called "ContinuousMemorySink" to facilitate RTM testing.  This sink differentiates from the existing MemorySink by immediately sending output back to the driver once the output is generated and not just at the end of the batch which is what the current MemorySink does.

### Why are the changes needed?

To facilitate RTM testing

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Added simple test.  There will be many RTM related tests that will be added in future PRs.

### Was this patch authored or co-authored using generative AI tooling?

Closes apache#52550 from jerrypeng/SPARK-53847.

Authored-by: Jerry Peng <jerry.peng@databricks.com>
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
…es in TransformWithStateInPySparkStateServer

### What changes were proposed in this pull request?

Fix the TransformWithState StateServer's `parseProtoMessage` method to fully read the desired message using the correct [readFully DataInputStream API](https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/io/DataInput.html#readFully(byte%5B%5D)) rather than `read` (InputStream/FilterInputStream) which only reads all available data and may not return the full message. [`readFully` (DataInputStream)](https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/io/DataInput.html#readFully(byte%5B%5D)) will continue fetching until it fills up the provided buffer.

In addition to the linked API above, this StackOverflow post also illustrates the difference between the two APIs: https://stackoverflow.com/a/25900095

### Why are the changes needed?

For large state values used in the TransformWithState API, `inputStream.read` is not guaranteed to read `messageLen`'s bytes of data as per the InputStream API. For large values, `read` will return prematurely and the messageBytes will only be partially filled, yielding an incorrect and likely unparseable proto message.

This is not a common scenario, as testing also indicated that the actual proto messages had to be somewhat large to consistently trigger this error. The test case I added uses 512KB strings in the state value updates.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Added a new test case using 512KB strings:

- Value state update
- List state update with 3 (different) values (note: list state provides a multi-value update API, so this message is even larger than the other two)
- Map state update with single key/value

```
build/sbt -Phive -Phive-thriftserver -DskipTests package
python/run-tests --testnames 'pyspark.sql.tests.pandas.test_pandas_transform_with_state TransformWithStateInPandasTests'
python/run-tests --testnames 'pyspark.sql.tests.pandas.test_pandas_transform_with_state TransformWithStateInPySparkTests'
```

The configured data size (512KB) triggers an incomplete read, while also completing in a reasonable time (within 30s on my laptop). I had separately tested a larger input size of 4MB which took 30min which I considered too expensive to include in the test.
Below is sample/testing results from using `read` only (i.e., no fix) and adding a check on message length vs read bytes ([test code is included in this commit](apache@b68cfd7) but reverted later for the PR). The check is no longer required after the `readFully` fix as that is handled within the provided API.
```
    TransformWithStateInPandasTests
        pyspark.errors.exceptions.base.PySparkRuntimeError: Error updating map state value: TESTING: Failed to read message bytes: expected 524369 bytes, but only read 261312 bytes

    TransformWithStateInPySparkTests
        pyspark.errors.exceptions.base.PySparkRuntimeError: Error updating value state: TESTING: Failed to read message bytes: expected 524336 bytes, but only read 392012 bytes

```

### Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Code (claude-sonnet-4-5-20250929)

Closes apache#52539 from jiateoh/tws_readFully_fix.

Lead-authored-by: Jason Teoh <jiateoh@gmail.com>
Co-authored-by: Jason Teoh <jason.teoh@databricks.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
### What changes were proposed in this pull request?

This PR aims to upgrade `Ammonite` to 3.0.3.

### Why are the changes needed?

Ammonite `3.0.3` addes Scala 2.13.17 support. To be ready for Scala 2.13.17.
- https://github.com/com-lihaoyi/Ammonite/releases/tag/3.0.3
  - https://repo1.maven.org/maven2/com/lihaoyi/ammonite_2.13.17/3.0.3/

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Pass the CIs.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#52577 from dongjoon-hyun/SPARK-53879.

Authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
### What changes were proposed in this pull request?

This PR aims to upgrade `Selenium` to 4.32.0.

### Why are the changes needed?

To bring the updates. Note that 4.33 and the above is incompatible for now.
- https://www.selenium.dev/blog/2025/selenium-4-32-released/
- https://www.selenium.dev/blog/2025/selenium-4-31-released/
- https://www.selenium.dev/blog/2025/selenium-4-30-released/
- https://www.selenium.dev/blog/2025/selenium-4-29-released/
- https://www.selenium.dev/blog/2025/selenium-4-28-released/
- https://www.selenium.dev/blog/2024/selenium-4-27-released/
- https://www.selenium.dev/blog/2024/selenium-4-26-released/
- https://www.selenium.dev/blog/2024/selenium-4-25-released/
- https://www.selenium.dev/blog/2024/selenium-4-24-released/
- https://www.selenium.dev/blog/2024/selenium-4-23-released/
- https://www.selenium.dev/blog/2024/selenium-4-22-released/

### Does this PR introduce _any_ user-facing change?

No, this is a test dependency change.

### How was this patch tested?

Pass the CIs.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#52579 from dongjoon-hyun/SPARK-53881.

Authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
### What changes were proposed in this pull request?
Upgrade to scala 2.13.17

### Why are the changes needed?
To bring the latest bug fixes and improvements like JDK 25 support. Note that Scala community announces two breaking changes due to the bug fixes.

> Breaking changes
> - Mix in the productPrefix hash statically in case class hashCode
> - Improve scala.util.Using suppression order

- https://github.com/scala/scala/releases/tag/v2.13.17
  - scala/scala#11046
  - scala/scala#10937
  - scala/scala#10927
    - scala/bug#13058
  - scala/scala#11023
    - scala/bug#13033
  - scala/scala#11000

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
local and github builds

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#52509 from vrozov/SPARK-53585.

Authored-by: Vlad Rozov <vrozov@amazon.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
### What changes were proposed in this pull request?

Adds a new experimental/developer RPC `CloneSession` to the `SparkConnectService`.

  ✅ CLONED (from SparkSession)

  - SessionState - SQL configs, temp views, UDFs, catalog metadata
  - ArtifactManager - JARs, files, classes added to session
  - ManagedJobTags - Job group tags for tracking
  - SharedState (reference) - Metastore, global temp views
  - SparkContext (reference) - Core Spark engine

  ❌ NOT CLONED (SessionHolder - Spark Connect layer)

  - planCache - (Partially analyzed) Logical plans for query optimization
  - operationIds - Currently executing operations
  - errorIdToError - Recent errors for debugging
  - eventManager - Session lifecycle events
  - dataFrameCache - DataFrames for foreachBatch callbacks
  - mlCache - ML models and pipelines
  - listenerCache - Streaming query listeners
  - pipelineExecutions - Active pipeline contexts
  - dataflowGraphRegistry - Registered dataflow graphs
  - streamingForeachBatchRunnerCleanerCache - Python streaming workers
  - pythonAccumulator - Python metrics collection
  - Session timings - Start time, last access, custom timeout

The clone preserves all SQL/catalog state but creates a fresh runtime environment. An analogy is cloning a database schema/config but not the active connections, caches, or running jobs.
### Why are the changes needed?

Spark Connect introduced the concept of resource isolation (via `ArtifactManager`, which has been ported to classic Spark) and thus, jars/pyfiles/artifacts added to each session are isolated from other sessions.

A slight rough edge is that if a user wishes to fork the state of a session but maintain independence, the only possible way is to create a new session and reupload/reinit all base jars/artifacts/pyfiles, etc.

Support for cloning through the API helps address the rough edge while maintaining all the benefits of session resource isolation.
### Does this PR introduce _any_ user-facing change?

Yes

```python
  spark = SparkSession.builder.remote("sc://localhost").getOrCreate()
  spark.conf.set("my.custom.config", "value")
  spark.addArtifact("/path/to/my.jar")
  spark.sql("CREATE TEMP VIEW my_view AS SELECT 1 AS id")

  # Clone the session
  cloned_spark = spark.cloneSession()

  # The cloned session has all the same state
  assert cloned_spark.conf.get("my.custom.config") == "value"
  assert cloned_spark.sql("SELECT * FROM my_view").collect() == [Row(id=1)]

  # But operations are isolated between sessions
  cloned_spark.sql("DROP VIEW my_view")  # Only affects cloned session
  spark.sql("SELECT * FROM my_view").collect()  # Original still works
```

### How was this patch tested?

New individual unit tests along with new test suites.

### Was this patch authored or co-authored using generative AI tooling?

Co-authored with assistance from Claude Code.

Closes apache#52200 from vicennial/cloneAPI.

Authored-by: vicennial <venkata.gudesa@databricks.com>
Signed-off-by: Herman van Hovell <herman@databricks.com>
… check in V2ExpressionBuilder

### What changes were proposed in this pull request?

apache#52573 here I did a slight refactor to `visitAggregate`, but I used direct reference check to compare arrays, this works as it's the same reference, but we should make the check more generic to any empty array.

### Why are the changes needed?

If someone were to use different reference in CountStar it would fail, which is not the best behavior as any empty array shuold work.
### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Existing tests

### Was this patch authored or co-authored using generative AI tooling?

No

Closes apache#52587 from alekjarmov/improve-check.

Authored-by: alekjarmov <alek.jarmov@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Currently, Spark has two bitmap aggregation functions `bitmap_construct_agg` and `bitmap_or_agg` for constructing a bitmap out of set of integers and performing union on two sets represented by bitmaps, respectively. However, efficient intersect operation (bitwise AND) is missing.

## What changes were proposed in this pull request?
- **Implemented `bitmap_and_agg` expression**: New aggregation function that performs bitwise AND operations on binary column inputs.

### Design Decisions
- **Result on empty input is identity element for the operation**: Empty input groups return all-ones bitmaps (AND identity).
- **Missing bytes handling**: For AND operations, missing bytes in input are treated as zeros to maintain intersection semantics.

## How was this patch tested?
Added new test cases to cover `bitmap_and_agg` functionality:
- **`BitmapExpressionsQuerySuite`**: Added test cases for basic AND operations, edge cases, empty group handling, and integration with other bitmap functions.

### Does this PR introduce _any_ user-facing change?
No.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes apache#52586 from uros7251brick/add-bitmap-and-agg.

Authored-by: Uros Stojkovic <uros.stojkovic@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
@abhishek6s abhishek6s closed this Feb 16, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.