Use per-child temp dirs for local update data generation#252
Use per-child temp dirs for local update data generation#252wjxiz1992 wants to merge 5 commits intoNVIDIA:devfrom
Conversation
There was a problem hiding this comment.
Pull request overview
This PR fixes a data generation bug (#221) where running dsdgen with --update and parallel workers in local mode fails because all children try to write identically-named delete files to the same output directory. The fix isolates each child's output into its own temp directory, merges results afterward (deduplicating delete tables), and adds a pre-flight safety check for existing update data.
Changes:
- Introduces three new helper functions (
_check_existing_update_data,_merge_update_data_local,_generate_update_data_local) to handle local update data generation with per-child temp directories - Delegates
--updatepath ingenerate_data_local()to the new orchestrator function, removing the previous incomplete inline handling of the update case - Restricts the existing base-data post-generation file-moving loop strictly to
source_table_names(removing maintenance table handling that is now done separately)
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| for p in procs: | ||
| p.wait() | ||
| if p.returncode != 0: | ||
| print("dsdgen failed with return code {}".format(p.returncode)) | ||
| raise Exception("dsdgen failed") |
There was a problem hiding this comment.
When one dsdgen process returns a non-zero exit code, the code raises an exception inside the for p in procs loop without waiting for or terminating the remaining still-running child processes. The finally block then immediately calls shutil.rmtree(temp_base), which may fail or produce unpredictable results if some dsdgen processes are still actively writing to that directory.
To fix this, before raising the exception (or as part of the finally block), iterate over the remaining processes and call p.terminate() (or p.kill()) followed by p.wait() to ensure all child processes have stopped before deleting the temp directory.
There was a problem hiding this comment.
Fixed: the finally block now terminates all still-running child processes via p.terminate() and reaps all children via p.wait() before deleting the temp directory.
Greptile SummaryThis PR redesigns local update data generation to use per-child temp directories, eliminating the need for Key changes:
One minor issue was found: the comment in Confidence Score: 4/5
Important Files Changed
Flowchart%%{init: {'theme': 'neutral'}}%%
flowchart TD
A[generate_data_local] -->|args.update is set| B{data_dir non-empty?}
B -->|Yes| C[Print warning: update data alongside existing content]
B -->|No| D[_generate_update_data_local]
C --> D
D --> E{Maintenance table dirs exist?}
E -->|Yes and no --overwrite_output| F[Raise Exception: use --overwrite_output]
E -->|No, or --overwrite_output set| G{Stale _temp_update exists?}
G -->|Yes| H[Print warning + shutil.rmtree stale temp]
G -->|No| I[Create per-child temp dirs]
H --> I
I --> J[Launch parallel dsdgen procs one per child]
J --> K[Wait for all procs sequentially]
K --> L{Any returncode != 0?}
L -->|Yes| M[Raise Exception: dsdgen failed]
L -->|No| N[_merge_update_data_local]
M --> O[finally: terminate+wait remaining procs\nshutil.rmtree temp_base]
N --> O
O --> P[du summary]
N --> Q{For each maintenance table}
Q -->|delete / inventory_delete| R{target file exists and not overwrite?}
R -->|Yes| S[Skip: print already present]
R -->|No| T[shutil.move from child_range_start]
Q -->|Regular table| U[shutil.move each child's file]
A -->|args.update not set| V[Original base-data path unchanged]
Last reviewed commit: ee83044 |
| for p in procs: | ||
| p.wait() | ||
| if p.returncode != 0: | ||
| print("dsdgen failed with return code {}".format(p.returncode)) | ||
| raise Exception("dsdgen failed") | ||
| _merge_update_data_local(temp_base, data_dir, range_start, range_end, | ||
| args.parallel, args.update) | ||
| finally: | ||
| if os.path.exists(temp_base): | ||
| shutil.rmtree(temp_base) |
There was a problem hiding this comment.
Orphaned child processes on dsdgen failure
When one dsdgen process fails and the exception is raised at line 218, the remaining processes in procs that haven't been waited on yet are still running. The finally block then calls shutil.rmtree(temp_base), deleting the per-child directories underneath those still-running processes. On Linux, processes with already-open file handles can continue writing to deleted inodes, but any subsequent file creations will fail silently, producing incomplete data and wasting CPU/memory until the orphaned processes eventually terminate on their own.
The fix is to terminate remaining processes inside the finally block before removing the temp tree:
finally:
for p in procs:
if p.poll() is None:
p.terminate()
if os.path.exists(temp_base):
shutil.rmtree(temp_base)There was a problem hiding this comment.
Fixed: finally block terminates and waits on all child processes before cleanup.
nds/nds_gen_data.py
Outdated
| if table in delete_tables: | ||
| # All children produce identical delete files; keep only one copy. | ||
| # Skip if already present (e.g. from a previous --range invocation). | ||
| target_file = os.path.join(target_dir, f'{table}_{update}.dat') | ||
| if not os.path.exists(target_file): | ||
| src = os.path.join(temp_base, f'child_{range_start}', | ||
| f'{table}_{update}.dat') | ||
| if os.path.exists(src): | ||
| shutil.move(src, target_file) |
There was a problem hiding this comment.
Delete file skip ignores --overwrite_output
The if not os.path.exists(target_file) guard silently preserves the delete table file even when --overwrite_output is passed. For the same update number this is harmless (content is deterministic), but the behavior differs from the regular maintenance tables, which are always replaced on re-run. A user doing a full re-generation with --overwrite_output would reasonably expect all files to be refreshed. Consider adding a comment explaining the intentional skip, and/or honoring an explicit overwrite when --overwrite_output is set:
if table in delete_tables:
# All children produce identical delete files; keep only one copy.
# Skip if already present (e.g. from a previous --range invocation)
# unless the caller explicitly requested an overwrite.
target_file = os.path.join(target_dir, f'{table}_{update}.dat')
if os.path.exists(target_file) and not overwrite:
continue
src = os.path.join(temp_base, f'child_{range_start}',
f'{table}_{update}.dat')
if os.path.exists(src):
shutil.move(src, target_file)This would require threading an overwrite boolean through from _generate_update_data_local.
There was a problem hiding this comment.
Fixed: overwrite_output is now threaded through to the merge function and honored for delete tables.
453e716 to
8f80295
Compare
nds/nds_gen_data.py
Outdated
| if os.path.exists(src): | ||
| shutil.move(src, target_file) | ||
| else: | ||
| for i in range(range_start, range_end + 1): | ||
| filename = f'{table}_{i}_{parallel}.dat' | ||
| src = os.path.join(temp_base, f'child_{i}', filename) | ||
| if os.path.exists(src): | ||
| shutil.move(src, os.path.join(target_dir, filename)) |
There was a problem hiding this comment.
Silent drop of expected merge sources
Both the delete-table path (line 165) and the regular-table path (line 171) silently skip a file when the source doesn't exist in the child temp directory. In the normal flow every expected file should be present; a missing file most likely indicates that dsdgen exited cleanly but produced no output for that table (e.g., empty scale factor for that partition). Without any diagnostic output the caller has no way to distinguish "table truly not generated at this scale" from "merge bug / disk-full left a partial child directory".
Consider adding a warning so incomplete output is surfaced immediately:
| if os.path.exists(src): | |
| shutil.move(src, target_file) | |
| else: | |
| for i in range(range_start, range_end + 1): | |
| filename = f'{table}_{i}_{parallel}.dat' | |
| src = os.path.join(temp_base, f'child_{i}', filename) | |
| if os.path.exists(src): | |
| shutil.move(src, os.path.join(target_dir, filename)) | |
| if os.path.exists(src): | |
| shutil.move(src, os.path.join(target_dir, filename)) | |
| else: | |
| print(f"Warning: expected source not found, skipping: {src}") |
The same applies to the delete-table branch at line 165 — a print before the early return would help diagnose issues:
if os.path.exists(src):
shutil.move(src, target_file)
else:
print(f"Warning: expected delete source not found, skipping: {src}")There was a problem hiding this comment.
Fixed: both delete-table and regular-table paths now print a warning when the expected source file is not found.
| temp_base = os.path.join(data_dir, '_temp_update') | ||
| if os.path.exists(temp_base): | ||
| shutil.rmtree(temp_base) |
There was a problem hiding this comment.
No diagnostic when stale temp directory is silently removed
If a previous run crashed after creating _temp_update/, the leftover directory is quietly deleted here. A user debugging a data consistency problem would have no indication that orphaned temp data was discarded. Adding a log line makes this behaviour explicit without changing any logic:
| temp_base = os.path.join(data_dir, '_temp_update') | |
| if os.path.exists(temp_base): | |
| shutil.rmtree(temp_base) | |
| temp_base = os.path.join(data_dir, '_temp_update') | |
| if os.path.exists(temp_base): | |
| print(f"Warning: removing stale temp directory from a previous run: {temp_base}") | |
| shutil.rmtree(temp_base) |
There was a problem hiding this comment.
Fixed: a warning is now printed before removing a stale _temp_update directory.
8f80295 to
37275f3
Compare
| def _check_existing_update_data(data_dir): | ||
| """Check if data_dir already contains maintenance/update table data. | ||
|
|
||
| Returns: | ||
| list: maintenance table names that have existing data in data_dir. | ||
| """ | ||
| existing = [] | ||
| for table_name in maintenance_table_names: | ||
| table_dir = os.path.join(data_dir, table_name) | ||
| if os.path.isdir(table_dir) and os.listdir(table_dir): | ||
| existing.append(table_name) | ||
| return existing |
There was a problem hiding this comment.
Pre-flight check blocks valid multi-update-set accumulation
_check_existing_update_data flags any non-empty maintenance table directory as a blocker, regardless of which update number was already generated. This prevents a common workflow where multiple update sets (e.g., update 1, then update 2) are accumulated into the same output directory, even though their output files never collide (delete_1.dat vs delete_2.dat, s_catalog_order_1_4.dat vs s_catalog_order_1_4.dat — wait, regular table filenames do collide across update numbers since the name encodes child and parallel, not the update number).
Looking at the regular table filename pattern on line 172: f'{table}_{i}_{parallel}.dat' — this does collide between update runs (child 1 of update 1 and child 1 of update 2 both produce s_catalog_order_1_4.dat). So the current pre-flight check is actually correct in blocking the second run without --overwrite_output. However, it's worth adding a comment explaining this reasoning so it isn't seen as overly conservative:
def _check_existing_update_data(data_dir):
"""Check if data_dir already contains maintenance/update table data.
Note: regular maintenance table filenames encode only child index and parallel
count (e.g. s_catalog_order_1_4.dat), not the update number, so files from
different update runs would collide. Any existing maintenance data therefore
requires --overwrite_output to proceed.
Returns:
list: maintenance table names that have existing data in data_dir.
"""There was a problem hiding this comment.
Fixed: docstring now explains that regular maintenance table filenames don't encode the update number, so files from different update runs collide.
| for p in procs: | ||
| p.wait() | ||
| if p.returncode != 0: | ||
| print("dsdgen failed with return code {}".format(p.returncode)) | ||
| raise Exception("dsdgen failed") |
There was a problem hiding this comment.
Early exception leaves later processes un-waited before finally
The p.wait() loop checks processes in order. If process at index k exits with a non-zero code, the exception is raised before p.wait() is called for processes k+1, k+2, .... The finally block correctly terminates those remaining processes, but there is a window where a process that finishes with a non-zero exit code between the exception raise and the finally block's p.poll() check could be missed by p.terminate() (because p.poll() returns its exit code, not None). Those processes are effectively orphaned without p.wait() being called on them, leaving zombie processes until the Python interpreter exits.
Consider waiting on all started processes in the finally block regardless of whether they needed termination:
finally:
for p in procs:
if p.poll() is None:
p.terminate()
p.wait() # reap all children to avoid zombies
if os.path.exists(temp_base):
shutil.rmtree(temp_base)There was a problem hiding this comment.
Fixed: p.wait() is now called unconditionally for all processes in the finally block, not just terminated ones.
Avoid silent data overwrites when generating update data with dsdgen by using isolated per-child temp directories instead of relying on dsdgen's -force flag. Delete tables are deduplicated during merge (only the first child's copy is kept), consistent with the existing HDFS approach. A pre-flight check rejects runs against directories that already contain update data unless --overwrite_output is set. Signed-off-by: Allen Xu <allxu@nvidia.com> Made-with: Cursor Signed-off-by: Allen Xu <allxu@nvidia.com> Made-with: Cursor Signed-off-by: Allen Xu <allxu@nvidia.com> Made-with: Cursor Signed-off-by: Allen Xu <allxu@nvidia.com> Made-with: Cursor
37275f3 to
c0155c7
Compare
| _merge_update_data_local(temp_base, data_dir, range_start, range_end, | ||
| args.parallel, args.update, args.overwrite_output) |
There was a problem hiding this comment.
Partial merge leaves data_dir in inconsistent state on failure
_merge_update_data_local is called inside the try block, and the finally block cleans up temp_base. However, if the merge itself partially succeeds (some shutil.move calls complete) and then throws (e.g., disk-full, permission error), the already-moved files remain in data_dir while temp_base is deleted — leaving data_dir in a partially-merged, unrecoverable state without re-running with --overwrite_output.
Since there is no rollback for the partial merge, consider adding a warning comment before the merge call to make this behaviour explicit, or move the merge outside the try block and handle its failures separately:
_merge_update_data_local(temp_base, data_dir, range_start, range_end,
args.parallel, args.update, args.overwrite_output)
# Note: if merge partially succeeded before raising, data_dir may be incomplete;
# re-run with --overwrite_output to recover.
finally:There was a problem hiding this comment.
Fixed: added a comment before the merge call noting that partial failure leaves data_dir incomplete and requires --overwrite_output to recover.
| if args.update: | ||
| _generate_update_data_local(args, data_dir, range_start, range_end, tool_path) | ||
| return |
There was a problem hiding this comment.
Update path skips overwrite_output check for pre-existing base data
When args.update is truthy the function immediately delegates to _generate_update_data_local and returns, bypassing the existing get_dir_size guard that protects base data. This means that if data_dir is already populated with base (source) data, a user accidentally running --update against it will silently proceed and start writing maintenance table directories alongside the base data without any warning.
_check_existing_update_data only looks at maintenance_table_names, so it provides no protection here. Consider adding a check that warns (or fails) when source table directories already exist in data_dir before generating update data:
if args.update:
if not args.overwrite_output and get_dir_size(data_dir) > 0:
# Only warn; don't block — source and update data can coexist,
# but the user should know they're writing into an existing directory.
print("Warning: data_dir '{}' is non-empty; update data will be written alongside "
"existing content.".format(data_dir))
_generate_update_data_local(args, data_dir, range_start, range_end, tool_path)
returnThere was a problem hiding this comment.
Fixed: a warning is now printed when --update targets a non-empty data_dir.
- Warn when --update targets a non-empty data_dir so users know update data will be written alongside existing content. - Add comment noting that a partial merge failure leaves data_dir incomplete and requires --overwrite_output to recover. Signed-off-by: Allen Xu <allxu@nvidia.com> Made-with: Cursor
nds/nds_gen_data.py
Outdated
| p.wait() | ||
| if os.path.exists(temp_base): | ||
| shutil.rmtree(temp_base) | ||
| subprocess.run(['du', '-h', '-d1', data_dir]) |
There was a problem hiding this comment.
du silently swallowed on failure
subprocess.run is called without check=True, so a failure of the du command (e.g. permission error, missing binary, or path disappearing immediately after cleanup) is silently ignored. While this line is purely informational, the current pattern is inconsistent with most other subprocess.run calls in the file that do use check=True. At minimum, the return code should be checked, or a note added that failure here is intentional:
| subprocess.run(['du', '-h', '-d1', data_dir]) | |
| subprocess.run(['du', '-h', '-d1', data_dir], check=False) # best-effort display |
There was a problem hiding this comment.
Fixed: changed to check=False to make the best-effort intent explicit.
| print("Warning: expected source not found, skipping: {}".format(src)) | ||
|
|
||
|
|
||
| def _generate_update_data_local(args, data_dir, range_start, range_end, tool_path): |
There was a problem hiding this comment.
Unit tests not committed to the repository
The PR description lists 14 unit tests covering _check_existing_update_data, _merge_update_data_local, and _generate_update_data_local, but no test file appears in the diff or anywhere in the repository (a search for test*gen_data* returns no results). Without the test file being part of the repository, CI cannot automatically run or enforce them, and they are not available to future contributors.
Consider committing the test module (e.g. nds/test_nds_gen_data.py) as part of this PR to ensure regression coverage is preserved.
There was a problem hiding this comment.
Acknowledged. The tests were used for local validation with a mock dsdgen. This project doesn't have an existing test framework or CI for Python unit tests yet, so committing them would add files without integration. Happy to add if maintainers want them included.
| if table in delete_tables: | ||
| target_file = os.path.join(target_dir, f'{table}_{update}.dat') | ||
| if os.path.exists(target_file) and not overwrite_output: | ||
| print("Skipping {} (already present from a previous run)".format(target_file)) | ||
| continue | ||
| src = os.path.join(temp_base, f'child_{range_start}', | ||
| f'{table}_{update}.dat') | ||
| if os.path.exists(src): | ||
| shutil.move(src, target_file) |
There was a problem hiding this comment.
Delete-table overwrite skips even when destination exists with wrong update number
The guard if os.path.exists(target_file) and not overwrite_output: continue uses target_file = f'{table}_{update}.dat', which encodes the update number. This means if a user previously generated delete_1.dat and is now running --update 2, target_file is delete_2.dat — it doesn't exist yet — so no skip occurs, and the merge proceeds correctly.
However the logic becomes subtly wrong if overwrite_output=True is passed after a previous identical update run: the condition not overwrite_output is False, so the skip is bypassed and shutil.move replaces the file with identical content. This is harmless functionally, but the PR description states "delete/delete_1.dat kept from first range, not overwritten" for Test 4, which contradicts the actual code path when --overwrite_output is set. Consider adding a comment clarifying the intended semantics:
if table in delete_tables:
target_file = os.path.join(target_dir, f'{table}_{update}.dat')
# All children produce identical delete content for a given update number.
# Preserve any existing copy unless the caller explicitly requested overwrite.
if os.path.exists(target_file) and not overwrite_output:There was a problem hiding this comment.
Fixed: added a comment clarifying the intended semantics — preserve existing copy unless caller explicitly requested overwrite.
- Add comment clarifying delete table dedup preserves existing copy unless --overwrite_output is explicitly set. - Use check=False on du summary call to make best-effort intent explicit. Signed-off-by: Allen Xu <allxu@nvidia.com> Made-with: Cursor
14 tests covering _check_existing_update_data, _merge_update_data_local, and _generate_update_data_local using a mock dsdgen script. Signed-off-by: Allen Xu <allxu@nvidia.com> Made-with: Cursor
Signed-off-by: Allen Xu <allxu@nvidia.com> Made-with: Cursor
Summary
Fixes #221 —
dsdgenwith--updaterequires-forceto avoid file collisions (especially for delete tables), but blindly enabling-forcecan silently overwrite existing data if the user accidentally passes the wrong output path.This PR redesigns local update data generation to use per-child temp directories, consistent with the existing HDFS approach in
generate_data_hdfs():-force--overwrite_outputis explicitly settry/finally)Changes
nds/nds_gen_data.py: +103 / -8 lines_check_existing_update_data(): scans for existing maintenance table data_merge_update_data_local(): merges per-child temp dirs with delete table dedup_generate_update_data_local(): orchestrates temp-dir generation + merge + cleanupgenerate_data_local(): delegates to new function when--updateis setTest plan
Unit tests (14 tests)
Covers
_check_existing_update_data,_merge_update_data_local, and_generate_update_data_localwith mock dsdgen:--overwrite_output, no-forceflag passed to dsdgen, incremental range preserves previous dataCLI end-to-end tests (with mock dsdgen)
Test 1 — Generate update data to fresh directory:
Result: 4 files per regular maintenance table, 1 file per delete table,
_temp_updatedir cleaned up. ✅Test 2 — Re-run same path without
--overwrite_output(safety check):Result: Fails with
"Update data already exists in directory /tmp/nds_test for tables: [...]". ✅Test 3 — Re-run with
--overwrite_output:Result: Succeeds, data regenerated. ✅
Test 4 — Incremental
--range(two invocations):Result:
s_catalog_order/contains files from both ranges (_1_4,_2_4,_3_4,_4_4).delete/delete_1.datkept from first range, not overwritten. ✅Test 5 — Base data generation (no
--update):Result: Original behavior unchanged, all 25 source tables generated correctly. ✅
Made with Cursor