-
Notifications
You must be signed in to change notification settings - Fork 746
SEDONA-738 Add sedonadb worker #2593
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
|
Working on proper benchmarking |
|
Items on the list are the extensions to the Sedona DB vectorized UDFs:
|
|
@jiayuasu @paleolimbot I think we can start reviewing the changes and the ideas that I am proposing in this MR. What I observed is that this way, UDF can be even faster than native Sedona functions like ST_Buffer. But, for instance, ST_Area is three times slower, and I guess it depends on the specific function. But what is more important, the performance is better than the previous UDFs in Sedona. I would mark this functionality as experimental. Also, I haven't included a documentation update, as we might decide during the review that this MR needs adjustment. |
|
This piece of code is working only for Spark 3.5, but I plan to extend it for Spark 4.0 |
|
I would like to extend it to include table-defined user functions, which will allow us to operate on the entire SedonaDB dataframe. |
| cd python | ||
| uv add apache-flink==1.20.1 | ||
| uv sync | ||
| # uv sync --extra flink |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to remove
| matrix: | ||
| os: ['ubuntu-latest', 'windows-latest', 'macos-15'] | ||
| python: ['3.11', '3.10', '3.9', '3.8'] | ||
| python: ['3.11', '3.10', '3.9'] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had trouble integrating with Python 3.8, it's already one year since it reached EOL, what would you think about removing it? and maybe start supporting Python 3.12 and 3.13
| # KIND, either express or implied. See the License for the | ||
| # specific language governing permissions and limitations | ||
| # under the License. | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This one is almost a copy-paste of what is in Apache Spark. The only difference is the import worker function
from sedona.spark.worker.worker import main as worker_mainI don't know what a better approach is, using the import of functions like manager?
| crs = self.geom_offsets[arg] | ||
| fields.append( | ||
| f"ST_GeomFromSedonaSpark(_{arg}, 'EPSG:{crs}') AS _{arg}" | ||
| ) # nosec |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Theoretical SQL injection, which is not causing any harm here.
| return Py_BuildValue("(Kibi)", geom, geom_type_id, has_z, length); | ||
| } | ||
|
|
||
| static PyObject *to_sedona_func(PyObject *self, PyObject *args) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the Sedona speedup is available, instead of translating to wkb and then loading from wkb with shapely, we can create shapely objects directly to speed up vectorized UDFs.
| "20", | ||
| ) | ||
| # Pandas on PySpark doesn't work with ANSI mode, which is enabled by default | ||
| .config("spark.executor.memory", "10G") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To remove, forgot to remove it after testing.
| from setuptools import setup | ||
| import numpy | ||
|
|
||
| setup( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is needed to make numpy C wrappers available
| val sedonaArrowStrategy = Try( | ||
| Class | ||
| .forName("org.apache.spark.sql.udf.SedonaArrowStrategy") | ||
| .forName("org.apache.spark.sql.execution.python.SedonaArrowStrategy") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
need some execution, python private methods from spark
| case _ => None | ||
| } | ||
|
|
||
| schema |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
infer for geometry fields by taking the firs value
| .config("sedona.join.autoBroadcastJoinThreshold", "-1") | ||
| .config("spark.sql.extensions", "org.apache.sedona.sql.SedonaSqlExtensions") | ||
| .config("sedona.python.worker.udf.module", "sedona.spark.worker.worker") | ||
| .config("sedona.python.worker.udf.daemon.module", "sedonaworker.daemon") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This works because the sedona.python.worker.daemon.enabled is false, need to either remove this param (by default is used sedona.spark.worker.daemon ) from the test or change to sedona.spark.worker.daemon
| <plugin> | ||
| <groupId>org.apache.maven.plugins</groupId> | ||
| <artifactId>maven-javadoc-plugin</artifactId> | ||
| <!-- <version>3.12.0</version>--> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to remove

Did you read the Contributor Guide?
Is this PR related to a ticket?
[SEDONA-738] my subject.What changes were proposed in this PR?
Sedona vectorized udf (Apache Arrow exchange), which is utilizing the SedonaDB. It supports:
How was this patch tested?
unit tests
Did this PR include necessary documentation updates?
TODO