Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .asf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ github:
main:
required_pull_request_reviews:
required_approving_review_count: 1
required_status_checks:
contexts:
- "Check License Header"
- "Use prettier to check formatting of documents"
- "Validate required_status_checks in .asf.yaml"
- "Spell Check with Typos"
# needs to be updated as part of the release process
# .asf.yaml doesn't support wildcard branch protection rules, only exact branch names
# https://github.com/apache/infrastructure-asfyaml?tab=readme-ov-file#branch-protection
Expand Down
8 changes: 8 additions & 0 deletions .github/workflows/dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,15 @@
# if you encounter error, see instructions inside the script
run: ci/scripts/doc_prettier_check.sh

asf-yaml-check:
name: Validate required_status_checks in .asf.yaml
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
- run: pip install pyyaml
- run: python3 ci/scripts/check_asf_yaml_status_checks.py

typos:

Check warning

Code scanning / CodeQL

Workflow does not contain permissions Medium

Actions job or workflow does not limit the permissions of the GITHUB_TOKEN. Consider setting an explicit permissions block, using the following as a minimal starting point: {contents: read}
name: Spell Check with Typos
runs-on: ubuntu-latest
steps:
Expand Down
14 changes: 8 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ liblzma = { version = "0.4.6", features = ["static"] }
log = "^0.4"
memchr = "2.8.0"
num-traits = { version = "0.2" }
object_store = { version = "0.13.1", default-features = false }
object_store = { version = "0.13.2", default-features = false }
parking_lot = "0.12"
parquet = { version = "58.1.0", default-features = false, features = [
"arrow",
Expand Down
145 changes: 145 additions & 0 deletions ci/scripts/check_asf_yaml_status_checks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
#!/usr/bin/env python3
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""
Validate that every entry in .asf.yaml required_status_checks
matches an actual GitHub Actions job name, and that the workflow
is not filtered by paths/paths-ignore (which would prevent the
check from running on some PRs, blocking merges).

A typo or stale entry in required_status_checks will block all
merges for the project, so this check catches that early.
"""

import glob
import os
import sys

import yaml


def get_required_checks(asf_yaml_path):
"""Extract all required_status_checks contexts from .asf.yaml."""
with open(asf_yaml_path) as f:
config = yaml.safe_load(f)

checks = {} # context -> list of branches requiring it
branches = config.get("github", {}).get("protected_branches", {})
for branch, settings in branches.items():
contexts = (
settings.get("required_status_checks", {}).get("contexts", [])
)
for ctx in contexts:
checks.setdefault(ctx, []).append(branch)

return checks


def get_workflow_jobs(workflows_dir):
"""Collect all jobs with their metadata from GitHub Actions workflow files.

Returns a dict mapping job identifier (name or key) to a list of
(workflow_file, has_path_filters) tuples.
"""
jobs = {} # identifier -> [(workflow_file, has_path_filters)]
for workflow_file in sorted(glob.glob(os.path.join(workflows_dir, "*.yml"))):
with open(workflow_file) as f:
workflow = yaml.safe_load(f)

if not workflow or "jobs" not in workflow:
continue

# Check if pull_request trigger has path filters
on = workflow.get(True, workflow.get("on", {})) # yaml parses `on:` as True
pr_trigger = on.get("pull_request", {}) if isinstance(on, dict) else {}
has_path_filters = bool(
isinstance(pr_trigger, dict)
and (pr_trigger.get("paths") or pr_trigger.get("paths-ignore"))
)

basename = os.path.basename(workflow_file)
for job_key, job_config in workflow.get("jobs", {}).items():
if not isinstance(job_config, dict):
continue
job_name = job_config.get("name", job_key)
info = (basename, has_path_filters)
jobs.setdefault(job_name, []).append(info)
if job_key != job_name:
jobs.setdefault(job_key, []).append(info)

return jobs


def main():
repo_root = os.path.dirname(
os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
)
asf_yaml = os.path.join(repo_root, ".asf.yaml")
workflows_dir = os.path.join(repo_root, ".github", "workflows")

required_checks = get_required_checks(asf_yaml)
if not required_checks:
print("No required_status_checks found in .asf.yaml — nothing to validate.")
return

jobs = get_workflow_jobs(workflows_dir)
errors = []

for ctx in sorted(required_checks):
branches = ", ".join(sorted(required_checks[ctx]))
if ctx not in jobs:
errors.append(
f' - "{ctx}" (branch: {branches}): '
f"not found in any GitHub Actions workflow"
)
continue

# Check if ALL workflows providing this job have path filters
# (if at least one doesn't, the check will still run)
filtered_workflows = [
wf for wf, has_filter in jobs[ctx] if has_filter
]
unfiltered_workflows = [
wf for wf, has_filter in jobs[ctx] if not has_filter
]
if filtered_workflows and not unfiltered_workflows:
wf_list = ", ".join(filtered_workflows)
errors.append(
f' - "{ctx}" (branch: {branches}): '
f"workflow {wf_list} uses paths/paths-ignore filters on "
f"pull_request, so this check won't run for some PRs "
f"and will block merging"
)

if errors:
print("ERROR: Problems found with required_status_checks in .asf.yaml:\n")
print("\n".join(errors))
print()
print("Available job names across all workflows:")
for name in sorted(jobs):
print(f" - {name}")
sys.exit(1)

print(
f"OK: All {len(required_checks)} required_status_checks "
"match existing GitHub Actions jobs."
)


if __name__ == "__main__":
main()
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/physical_plan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1704,7 +1704,7 @@ mod tests {
let state = session_ctx.state();
let location = Path::from_filesystem_path(".")
.unwrap()
.child("invalid.parquet");
.join("invalid.parquet");

let partitioned_file = PartitionedFile::new_from_meta(ObjectMeta {
location,
Expand Down
9 changes: 8 additions & 1 deletion datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1198,7 +1198,14 @@ impl DefaultPhysicalPlanner {
let new_sort = SortExec::new(ordering, physical_input).with_fetch(*fetch);
Arc::new(new_sort)
}
LogicalPlan::Subquery(_) => todo!(),
// The optimizer's decorrelation passes remove Subquery nodes
// for supported patterns. This error is hit for correlated
// patterns that the optimizer cannot (yet) decorrelate.
LogicalPlan::Subquery(_) => {
return not_impl_err!(
"Physical plan does not support undecorrelated Subquery"
);
}
LogicalPlan::SubqueryAlias(_) => children.one()?,
LogicalPlan::Limit(limit) => {
let input = children.one()?;
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/tests/sql/path_partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -774,7 +774,7 @@ impl ObjectStore for MirroringObjectStore {
};

if parts.next().is_some() {
common_prefixes.insert(prefix.child(common_prefix));
common_prefixes.insert(prefix.clone().join(common_prefix));
} else {
let object = ObjectMeta {
location: k.clone(),
Expand Down
2 changes: 1 addition & 1 deletion datafusion/datasource/src/url.rs
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,7 @@ mod tests {
let root = root.to_string_lossy();

let url = ListingTableUrl::parse(root).unwrap();
let child = url.prefix.child("partition").child("file");
let child = url.prefix.clone().join("partition").join("file");

let prefix: Vec<_> = url.strip_prefix(&child).unwrap().collect();
assert_eq!(prefix, vec!["partition", "file"]);
Expand Down
7 changes: 4 additions & 3 deletions datafusion/datasource/src/write/demux.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,8 @@ fn generate_file_path(
if !single_file_output {
base_output_path
.prefix()
.child(format!("{write_id}_{part_idx}.{file_extension}"))
.clone()
.join(format!("{write_id}_{part_idx}.{file_extension}"))
} else {
base_output_path.prefix().to_owned()
}
Expand Down Expand Up @@ -588,8 +589,8 @@ fn compute_hive_style_file_path(
) -> Path {
let mut file_path = base_output_path.prefix().clone();
for j in 0..part_key.len() {
file_path = file_path.child(format!("{}={}", partition_by[j].0, part_key[j]));
file_path = file_path.join(format!("{}={}", partition_by[j].0, part_key[j]));
}

file_path.child(format!("{write_id}.{file_extension}"))
file_path.join(format!("{write_id}.{file_extension}"))
}
19 changes: 18 additions & 1 deletion datafusion/functions/src/datetime/current_date.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,24 @@ The `current_date()` return value is determined at query time and will return th
"#,
syntax_example = r#"current_date()
(optional) SET datafusion.execution.time_zone = '+00:00';
SELECT current_date();"#
SELECT current_date();"#,
sql_example = r#"```sql
> SELECT current_date();
+----------------+
| current_date() |
+----------------+
| 2024-12-23 |
+----------------+

-- The current date is based on the session time zone (UTC by default)
> SET datafusion.execution.time_zone = 'Asia/Tokyo';
> SELECT current_date();
+----------------+
| current_date() |
+----------------+
| 2024-12-24 |
+----------------+
```"#
)]
#[derive(Debug, PartialEq, Eq, Hash)]
pub struct CurrentDateFunc {
Expand Down
19 changes: 18 additions & 1 deletion datafusion/functions/src/datetime/current_time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,24 @@ The session time zone can be set using the statement 'SET datafusion.execution.t
"#,
syntax_example = r#"current_time()
(optional) SET datafusion.execution.time_zone = '+00:00';
SELECT current_time();"#
SELECT current_time();"#,
sql_example = r#"```sql
> SELECT current_time();
+--------------------+
| current_time() |
+--------------------+
| 06:30:00.123456789 |
+--------------------+

-- The current time is based on the session time zone (UTC by default)
> SET datafusion.execution.time_zone = 'Asia/Tokyo';
> SELECT current_time();
+--------------------+
| current_time() |
+--------------------+
| 15:30:00.123456789 |
+--------------------+
```"#
)]
#[derive(Debug, PartialEq, Eq, Hash)]
pub struct CurrentTimeFunc {
Expand Down
16 changes: 15 additions & 1 deletion datafusion/functions/src/datetime/date_part.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,21 @@ use datafusion_macros::user_doc;
argument(
name = "expression",
description = "Time expression to operate on. Can be a constant, column, or function."
)
),
sql_example = r#"```sql
> SELECT date_part('year', '2024-05-01T00:00:00');
+-----------------------------------------------------+
| date_part(Utf8("year"),Utf8("2024-05-01T00:00:00")) |
+-----------------------------------------------------+
| 2024 |
+-----------------------------------------------------+
> SELECT extract(day FROM timestamp '2024-05-01T00:00:00');
+----------------------------------------------------+
| date_part(Utf8("DAY"),Utf8("2024-05-01T00:00:00")) |
+----------------------------------------------------+
| 1 |
+----------------------------------------------------+
```"#
)]
#[derive(Debug, PartialEq, Eq, Hash)]
pub struct DatePartFunc {
Expand Down
16 changes: 15 additions & 1 deletion datafusion/functions/src/datetime/date_trunc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,21 @@ impl DateTruncGranularity {
argument(
name = "expression",
description = "Timestamp or time expression to operate on. Can be a constant, column, or function."
)
),
sql_example = r#"```sql
> SELECT date_trunc('month', '2024-05-15T10:30:00');
+-----------------------------------------------+
| date_trunc(Utf8("month"),Utf8("2024-05-15T10:30:00")) |
+-----------------------------------------------+
| 2024-05-01T00:00:00 |
+-----------------------------------------------+
> SELECT date_trunc('hour', '2024-05-15T10:30:00');
+----------------------------------------------+
| date_trunc(Utf8("hour"),Utf8("2024-05-15T10:30:00")) |
+----------------------------------------------+
| 2024-05-15T10:00:00 |
+----------------------------------------------+
```"#
)]
#[derive(Debug, PartialEq, Eq, Hash)]
pub struct DateTruncFunc {
Expand Down
Loading
Loading