Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/java.yml
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,9 @@ jobs:
SPARK_COMPAT_VERSION=${SPARK_VERSION:0:3}

if [[ "${SPARK_VERSION}" == "3.5"* ]] || [[ "${SPARK_VERSION}" == "4."* ]]; then
pip install pyspark==$SPARK_VERSION pandas shapely apache-sedona pyarrow
pip install pyspark==$SPARK_VERSION pandas shapely apache-sedona pyarrow geoarrow-pyarrow sedonadb
export SPARK_HOME=$(python -c "import pyspark; print(pyspark.__path__[0])")
(cd python; pip install --force-reinstall --no-deps -e .)
fi

mvn -q clean install -Dspark=${SPARK_COMPAT_VERSION} -Dscala=${SCALA_VERSION:0:4} -Dspark.version=${SPARK_VERSION} ${SKIP_TESTS}
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/pyflink.yml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ jobs:
run: |
cd python
uv add apache-flink==1.20.1
uv sync
# uv sync --extra flink
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to remove

- name: Run PyFlink tests
run: |
wget -q https://repo1.maven.org/maven2/org/datasyslab/geotools-wrapper/1.8.0-33.1-rc1/geotools-wrapper-1.8.0-33.1-rc1.jar
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/python-extension.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ jobs:
strategy:
matrix:
os: ['ubuntu-latest', 'windows-latest', 'macos-15']
python: ['3.11', '3.10', '3.9', '3.8']
python: ['3.11', '3.10', '3.9']
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had trouble integrating with Python 3.8, it's already one year since it reached EOL, what would you think about removing it? and maybe start supporting Python 3.12 and 3.13

runs-on: ${{ matrix.os }}
defaults:
run:
Expand Down
26 changes: 12 additions & 14 deletions .github/workflows/python.yml
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,6 @@ jobs:
java: '11'
python: '3.9'
- spark: '3.5.0'
scala: '2.12.8'
java: '11'
python: '3.8'
- spark: '3.4.0'
scala: '2.12.8'
java: '11'
python: '3.11'
Expand All @@ -101,15 +97,6 @@ jobs:
scala: '2.12.8'
java: '11'
python: '3.9'
- spark: '3.4.0'
scala: '2.12.8'
java: '11'
python: '3.8'
- spark: '3.4.0'
scala: '2.12.8'
java: '11'
python: '3.8'
shapely: '1'

steps:
- uses: actions/checkout@v6
Expand Down Expand Up @@ -181,7 +168,18 @@ jobs:
run: |
cd python
export SPARK_HOME=$(uv run python -c "import site; print(site.getsitepackages()[0]+'/pyspark')")
uv run pytest -v tests
uv run pytest -m "not vectorized" -v tests
- name: Run vectorized udf tests
if: ${{ matrix.spark < '4.0.0' }}
run: |
cd python
export SPARK_HOME=$(uv run python -c "import site; print(site.getsitepackages()[0]+'/pyspark')")
uv pip install --force-reinstall --no-deps -e .
uv remove apache-flink --optional flink
uv add "pyarrow>=16.0.0"
uv add "shapely>=2.0.0"
uv add sedonadb geopandas geoarrow-pyarrow
uv run pytest -m vectorized tests
- name: Run basic tests without rasterio
run: |
cd python
Expand Down
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -631,6 +631,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<!-- <version>3.12.0</version>-->
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to remove

<version>2.10.4</version>
<executions>
<execution>
Expand Down
4 changes: 2 additions & 2 deletions python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
# under the License.

[build-system]
requires = ["setuptools>=69", "wheel"]
requires = ["setuptools", "wheel", "numpy"]
build-backend = "setuptools.build_meta"

[project]
Expand All @@ -26,7 +26,7 @@ description = "Apache Sedona is a cluster computing system for processing large-
readme = "README.md"
license = { text = "Apache-2.0" }
authors = [ { name = "Apache Sedona", email = "dev@sedona.apache.org" } ]
requires-python = ">=3.8"
requires-python = ">=3.9"
classifiers = [
"Programming Language :: Python :: 3",
"License :: OSI Approved :: Apache Software License",
Expand Down
87 changes: 84 additions & 3 deletions python/sedona/spark/sql/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,20 @@

import pandas as pd

from sedona.spark.sql.types import GeometryType
from sedona.spark.utils import geometry_serde
from pyspark.sql.udf import UserDefinedFunction
from pyspark.sql.types import DataType
from shapely.geometry.base import BaseGeometry
from pyspark.sql.udf import UserDefinedFunction
from sedona.spark.sql.types import GeometryType
from pyspark.sql.types import (
DataType,
FloatType,
DoubleType,
IntegerType,
StringType,
ByteType,
)

from sedona.spark.utils.geometry_serde import sedona_db_speedup_enabled

SEDONA_SCALAR_EVAL_TYPE = 5200
SEDONA_PANDAS_ARROW_NAME = "SedonaPandasArrowUDF"
Expand Down Expand Up @@ -142,3 +150,76 @@ def serialize_to_geometry_if_geom(data, return_type: DataType):
return geometry_serde.serialize(data)

return data


def infer_pa_type(spark_type: DataType):
import pyarrow as pa
import geoarrow.pyarrow as ga

if isinstance(spark_type, GeometryType):
return ga.wkb()
elif isinstance(spark_type, FloatType):
return pa.float32()
elif isinstance(spark_type, DoubleType):
return pa.float64()
elif isinstance(spark_type, IntegerType):
return pa.int32()
elif isinstance(spark_type, StringType):
return pa.string()
else:
raise NotImplementedError(f"Type {spark_type} is not supported yet.")


def infer_input_type(spark_type: DataType):
from sedonadb import udf as sedona_udf_module

if isinstance(spark_type, GeometryType):
return sedona_udf_module.GEOMETRY
elif (
isinstance(spark_type, FloatType)
or isinstance(spark_type, DoubleType)
or isinstance(spark_type, IntegerType)
):
return sedona_udf_module.NUMERIC
elif isinstance(spark_type, StringType):
return sedona_udf_module.STRING
elif isinstance(spark_type, ByteType):
return sedona_udf_module.BINARY
else:
raise NotImplementedError(f"Type {spark_type} is not supported yet.")


def infer_input_types(spark_types: list[DataType]):
pa_types = []
for spark_type in spark_types:
pa_type = infer_input_type(spark_type)
pa_types.append(pa_type)

return pa_types


def sedona_db_vectorized_udf(
return_type: DataType,
input_types: list[DataType],
):
from sedonadb import udf as sedona_udf_module

eval_type = 6200
if sedona_db_speedup_enabled:
eval_type = 6201

def apply_fn(fn):
out_type = infer_pa_type(return_type)
input_types_sedona_db = infer_input_types(input_types)

@sedona_udf_module.arrow_udf(out_type, input_types=input_types_sedona_db)
def shapely_udf(*args, **kwargs):
return fn(*args, **kwargs)

udf = UserDefinedFunction(
lambda: shapely_udf, return_type, "SedonaPandasArrowUDF", evalType=eval_type
)

return udf

return apply_fn
38 changes: 37 additions & 1 deletion python/sedona/spark/utils/geometry_serde.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from shapely.geometry.base import BaseGeometry

speedup_enabled = False

sedona_db_speedup_enabled = False

# Use geomserde_speedup when available, otherwise fallback to general pure
# python implementation.
Expand Down Expand Up @@ -62,7 +62,14 @@ def deserialize(buf: bytearray) -> Optional[BaseGeometry]:
return None
return geomserde_speedup.deserialize(buf)

def to_sedona(arr):
return geomserde_speedup.to_sedona_func(arr)

def from_sedona(arr):
return geomserde_speedup.from_sedona_func(arr)

speedup_enabled = True
sedona_db_speedup_enabled = True

elif shapely.__version__.startswith("1."):
# Shapely 1.x uses ctypes.CDLL to load geos_c library. We can obtain the
Expand Down Expand Up @@ -123,14 +130,43 @@ def deserialize(buf: bytearray) -> Optional[BaseGeometry]:
ob.__dict__["_is_empty"] = False
return ob, bytes_read

warn(
f"optimized sedonadb vectorized function is only available for shapely 2.x, using fallback implementation."
)

def to_sedona(arr):
return shapely.to_wkb(arr)

def from_sedona(arr):
return shapely.from_wkb(arr)

speedup_enabled = True

else:

def to_sedona(arr):
return shapely.to_wkb(arr)

def from_sedona(arr):
return shapely.from_wkb(arr)

# fallback to our general pure python implementation
from .geometry_serde_general import deserialize, serialize


except Exception as e:
warn(
f"Cannot load geomserde_speedup, fallback to general python implementation. Reason: {e}"
)

warn(
f"Cannot load optimized version of sedonadb vectorized function, using fallback implementation. Reason: {e}"
)

def to_sedona(arr):
return shapely.to_wkb(arr)

def from_sedona(arr):
return shapely.from_wkb(arr)

from .geometry_serde_general import deserialize, serialize
16 changes: 16 additions & 0 deletions python/sedona/spark/worker/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
Loading
Loading