Skip to content

feat: resume interrupted dataset generation runs (sync + async engine)#526

Open
przemekboruta wants to merge 15 commits intoNVIDIA-NeMo:mainfrom
przemekboruta:main
Open

feat: resume interrupted dataset generation runs (sync + async engine)#526
przemekboruta wants to merge 15 commits intoNVIDIA-NeMo:mainfrom
przemekboruta:main

Conversation

@przemekboruta
Copy link
Copy Markdown
Contributor

@przemekboruta przemekboruta commented Apr 13, 2026

Summary

Closes #525

Adds resume: bool = False to DataDesigner.create() and DatasetBuilder.build(). When resume=True, generation picks up from where the interrupted run left off — for both the sync and async engines.

dd = DataDesigner(...)
dd.add_column(...)

# First run — interrupted mid-way
results = dd.create(config_builder, num_records=10_000)

# After restart — picks up from the last completed batch/row-group
results = dd.create(config_builder, num_records=10_000, resume=True)

Changes

Layer Change
ArtifactStorage resume: bool = False field; resolved_dataset_name skips timestamp logic on resume; new clear_partial_results()
DatasetBatchManager.start() New start_batch and initial_actual_num_records params (default 0, no breakage)
DatasetBuilder.build() New resume param; _load_resume_state() reads and validates metadata.json; _build_with_resume() skips completed batches (sync); _build_async() skips completed row groups (async)
RowGroupBufferManager.__init__() New initial_actual_num_records and initial_total_num_batches params to seed counters on resume
DatasetBuilder._find_completed_row_group_ids() New helper — scans parquet-files/ for batch_*.parquet to determine which async row groups are already done
finalize_row_group closure Now writes incremental metadata.json after every row-group checkpoint (not just at the end), making all async runs resumable if interrupted
DataDesigner.create() Exposes resume, passes it through to ArtifactStorage and builder.build()

Validation and error cases

  • Missing metadata.jsonDatasetGenerationError (interrupted before any batch completed)
  • num_records mismatch → DatasetGenerationError
  • buffer_size mismatch → DatasetGenerationError
  • Dataset already complete → warning logged, returns existing path (both engines)

Test plan

  • test_resolved_dataset_name_resume_uses_existing_folder
  • test_resolved_dataset_name_resume_raises_when_no_existing_folder
  • test_resolved_dataset_name_resume_raises_when_folder_is_empty
  • test_clear_partial_results_removes_partial_folder
  • test_clear_partial_results_is_noop_when_no_partial_folder
  • test_start_with_start_batch
  • test_start_with_initial_actual_num_records
  • test_start_with_start_batch_and_initial_actual_num_records
  • test_start_default_values_unchanged
  • test_build_resume_raises_without_metadata
  • test_build_resume_raises_on_num_records_mismatch
  • test_build_resume_raises_on_buffer_size_mismatch
  • test_build_resume_logs_warning_when_already_complete
  • test_find_completed_row_group_ids_empty_dir
  • test_find_completed_row_group_ids_with_files
  • test_find_completed_row_group_ids_ignores_non_batch_files
  • test_build_async_resume_logs_warning_when_already_complete
  • test_build_async_resume_raises_without_metadata
  • test_initial_actual_num_records
  • test_initial_total_num_batches_reflected_in_metadata

- ArtifactStorage gains a `resume: bool = False` field
- resolved_dataset_name skips timestamp logic when resume=True,
  returning the existing dataset folder name as-is
- Raises ArtifactStorageError on resume=True when the target folder
  is absent or empty (no data to resume from)
- New clear_partial_results() removes in-flight partial results
  left over from an interrupted run

Fixes NVIDIA-NeMo#525
DatasetBatchManager.start() now accepts:
- start_batch: int = 0  — first batch index to process
- initial_actual_num_records: int = 0  — records already on disk

Both default to 0 so all existing call sites are unaffected.

Fixes NVIDIA-NeMo#525
- build() gains a resume: bool = False parameter
- _load_resume_state() reads metadata.json and validates that
  num_records and buffer_size match the original run
- _build_with_resume() skips completed batches, clears in-flight
  partial results, and continues from the first incomplete batch
- Raises DatasetGenerationError with clear messages for:
  - missing metadata.json (interrupted before first batch completes)
  - num_records mismatch
  - buffer_size mismatch
  - DATA_DESIGNER_ASYNC_ENGINE=1 (not yet supported)
- Logs a warning and returns early when dataset is already complete

Fixes NVIDIA-NeMo#525
- create() gains resume: bool = False
- _create_resource_provider() passes resume to ArtifactStorage
- builder.build() receives the resume flag

Fixes NVIDIA-NeMo#525
Covers:
- ArtifactStorage.resolved_dataset_name with resume=True
- ArtifactStorage.clear_partial_results()
- DatasetBatchManager.start() with start_batch and
  initial_actual_num_records
- DatasetBuilder.build(resume=True): missing metadata, num_records
  mismatch, buffer_size mismatch, already-complete detection

Fixes NVIDIA-NeMo#525
@przemekboruta przemekboruta requested a review from a team as a code owner April 13, 2026 11:15
@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps bot commented Apr 13, 2026

Greptile Summary

This PR adds resume: bool = False to DataDesigner.create() and DatasetBuilder.build(), allowing interrupted generation runs to continue from the last completed batch (sync) or row group (async) rather than regenerating from scratch. Both engines are covered: the sync path uses metadata.json as the source of truth while the async path additionally scans the filesystem to handle the metadata-lag crash window.

The two concerns flagged in the previous review are addressed: the generated flag in build() now correctly gates _processor_runner.run_after_generation(), and the async path derives both initial_actual_num_records and initial_total_num_batches from the filesystem rather than from potentially stale metadata.

Confidence Score: 5/5

Safe to merge — all P0/P1 concerns from the prior review are resolved and no new correctness issues were found.

Both previously flagged issues are fixed: the generated flag correctly prevents processors from running on an already-complete dataset, and the async path derives initial_actual_num_records and initial_total_num_batches from the filesystem rather than stale metadata. Validation, crash-window handling, and test coverage are thorough. All remaining observations are P2 or lower.

No files require special attention.

Important Files Changed

Filename Overview
packages/data-designer-engine/src/data_designer/engine/dataset_builders/dataset_builder.py Core resume logic — adds _load_resume_state, _build_with_resume, _find_completed_row_group_ids, and extends _build_async. Batch-number synchronisation between the loop index and batch_manager._current_batch_number is correct; generated flag properly gates post-generation processors.
packages/data-designer-engine/src/data_designer/engine/storage/artifact_storage.py Adds resume field and clear_partial_results(); resolved_dataset_name correctly short-circuits timestamping on resume and raises when folder is absent. clear_partial_results() is idempotent (guards on exists()). No issues found.
packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/dataset_batch_manager.py start() gains start_batch and initial_actual_num_records; both are applied after reset() so they correctly override its zero-init. The num_records_batch property indexes _num_records_list[_current_batch_number] which is valid for all resume start points.
packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/row_group_buffer.py Adds initial_actual_num_records and initial_total_num_batches to seed counters for async resume. checkpoint_row_group correctly increments both counters only for non-empty row groups, keeping write_metadata consistent.
packages/data-designer/src/data_designer/interface/data_designer.py Threads resume through _create_resource_provider → ArtifactStorage and into builder.build(). Clean pass-through with no logic of its own.
packages/data-designer-engine/tests/engine/dataset_builders/test_dataset_builder.py Adds 18 resume tests covering sync/async paths, crash-window simulation, already-complete detection, and processor bypass. The filesystem-truth test (test_initial_actual_num_records_from_filesystem_in_crash_window) provides good coverage of the stale-metadata scenario.
packages/data-designer-engine/tests/engine/storage/test_artifact_storage.py Covers resolved_dataset_name branching under resume=True/False and both clear_partial_results() cases. Tests are well-isolated.

Sequence Diagram

sequenceDiagram
    participant User
    participant DataDesigner
    participant DatasetBuilder
    participant ArtifactStorage
    participant BatchManager
    participant RowGroupBuffer

    User->>DataDesigner: create(config, num_records, resume=True)
    DataDesigner->>ArtifactStorage: new(artifact_path, resume=True)
    Note over ArtifactStorage: resolved_dataset_name returns existing folder
    DataDesigner->>DatasetBuilder: build(num_records, resume=True)

    alt metadata.json missing
        DatasetBuilder->>ArtifactStorage: clear_partial_results()
        Note over DatasetBuilder: resume = False, full restart
    else ASYNC ENGINE
        DatasetBuilder->>DatasetBuilder: _load_resume_state() [validate only]
        DatasetBuilder->>DatasetBuilder: _find_completed_row_group_ids() [filesystem scan]
        DatasetBuilder->>ArtifactStorage: clear_partial_results()
        alt all row groups done
            DatasetBuilder-->>DataDesigner: generated=False (return early)
        else resume remaining
            DatasetBuilder->>RowGroupBuffer: new(initial_actual_num_records, initial_total_num_batches)
            loop each remaining row group
                DatasetBuilder->>RowGroupBuffer: checkpoint_row_group(rg_id)
                DatasetBuilder->>RowGroupBuffer: write_metadata() [incremental]
            end
            DatasetBuilder->>RowGroupBuffer: write_metadata() [final]
        end
    else SYNC ENGINE with resume
        DatasetBuilder->>DatasetBuilder: _load_resume_state() [validate + read metadata]
        DatasetBuilder->>BatchManager: start(start_batch=N, initial_actual_num_records=X)
        alt all batches done
            DatasetBuilder-->>DataDesigner: generated=False (return early)
        else resume remaining
            DatasetBuilder->>ArtifactStorage: clear_partial_results()
            loop batch N to total-1
                DatasetBuilder->>BatchManager: _run_batch(current_batch_number=idx)
                BatchManager->>ArtifactStorage: write then move_partial_to_final
                BatchManager->>ArtifactStorage: write_metadata [per-batch]
            end
            DatasetBuilder->>BatchManager: finish()
        end
    end

    alt generated=True
        DatasetBuilder->>DatasetBuilder: run_after_generation(processors)
    end
    DataDesigner-->>User: DatasetCreationResults
Loading

Reviews (9): Last reviewed commit: "fix(builder): address agent review findi..." | Re-trigger Greptile

…INE=1)

- Add _find_completed_row_group_ids() to scan parquet-files/ for already-written
  row groups by parsing batch_*.parquet filenames
- _build_async() now accepts resume=True: loads metadata, finds completed row groups,
  clears partial results, and logs progress; returns early if all row groups are done
- _prepare_async_run() accepts skip_row_groups, initial_actual_num_records, and
  initial_total_num_batches so the scheduler only processes remaining row groups
  and RowGroupBufferManager starts from the correct counts
- RowGroupBufferManager.__init__ gains initial_actual_num_records and
  initial_total_num_batches params to seed the counters on resume
- finalize_row_group closure now writes incremental metadata after each checkpoint
  so any run (resume or not) can be resumed if interrupted mid-way
- Remove the guard that rejected resume=True with DATA_DESIGNER_ASYNC_ENGINE=1
- Add tests for all new paths
@przemekboruta przemekboruta changed the title feat: resume interrupted dataset generation runs (sync engine) feat: resume interrupted dataset generation runs (sync + async engine) Apr 13, 2026
…set already complete

_build_with_resume and _build_async now return False when the dataset is already
complete (early-return path), True otherwise. build() skips
_processor_runner.run_after_generation() on False, preventing processors from
calling shutil.rmtree and rewriting an already-finalized dataset.

Fixes the issue raised in review: greptile P1 comment on PR NVIDIA-NeMo#526.
…sync resume

Metadata can lag by one row group if a crash occurs between
move_partial_result_to_final_file_path and write_metadata. Using
len(completed_ids) from the filesystem scan instead of
state.num_completed_batches ensures the final metadata reflects the
actual number of parquet files present, not the potentially stale
metadata count.
@github-actions
Copy link
Copy Markdown
Contributor

Issue #525 has been triaged. The linked issue check is being re-evaluated.

@andreatgretel andreatgretel added the agent-review Trigger agentic CI review label Apr 13, 2026
…efore first batch)

When a run is interrupted before any row group or batch completes, metadata.json
is never written. Previously resume=True would raise DatasetGenerationError in
this case. Now build() detects the missing file, logs an info message, clears
any leftover partial results and falls back to a clean fresh run.

This is the common scenario for small datasets (fewer records than buffer_size)
where all records fit in a single row group.
…ync resume

In the crash window (row group written to disk but write_metadata crashed before
updating the file), both initial_total_num_batches and initial_actual_num_records
now use the filesystem-discovered completed_ids as source of truth.  Previously
initial_actual_num_records was read from potentially stale metadata, causing
actual_num_records in the final metadata to be undercounted by one row group.

Also adds a test covering the partial-resume crash-window scenario.
@andreatgretel andreatgretel added the agent-review Trigger agentic CI review label Apr 16, 2026
@github-actions
Copy link
Copy Markdown
Contributor

Code Review: PR #526 — Resume interrupted dataset generation runs (sync + async engine)

Summary

This PR adds a resume: bool = False parameter to DataDesigner.create() and DatasetBuilder.build(), enabling users to resume interrupted dataset generation from the last completed batch (sync) or row group (async). The implementation touches 5 source files and 4 test files across the data-designer-engine and data-designer packages.

Scope: ~860 additions, ~16 deletions across 10 files (including a plan doc and comprehensive tests).

The feature is well-designed: it leverages existing metadata.json checkpoints, validates run-parameter compatibility, handles edge cases (already-complete, no-metadata, parameter mismatch), and correctly separates the sync and async resume paths. The plan diverged from implementation in a positive way — the async engine now supports resume (the plan initially deferred it).

Findings

High Severity

(H1) _load_resume_state return value discarded in async resume path
dataset_builder.py:411 — In _build_async, when resume=True, the call self._load_resume_state(num_records, buffer_size) is made for validation only — the returned _ResumeState is discarded. This is intentional (the async path derives state from the filesystem instead), but it's confusing. The validation-only intent should be made explicit, e.g. by extracting a _validate_resume_params() method or assigning to _ with a comment. As-is, a future maintainer might remove the "unused" call and break parameter validation for async resume.

Medium Severity

(M1) _find_completed_row_group_ids parses batch filenames with split("_", 1)[1]
dataset_builder.py:381 — The glob pattern is batch_*.parquet and the ID is extracted via p.stem.split("_", 1)[1]. This works for batch_00000"00000"int("00000") = 0. However, if a file like batch_00000_extra.parquet appeared (e.g., from a future format change), split("_", 1)[1] would yield "00000_extra" and int() would raise ValueError, which is caught. This is acceptable but fragile. Consider using a regex r"^batch_(\d+)$" on the stem for robustness.

(M2) initial_actual_num_records calculation assumes uniform batch sizes
dataset_builder.py:418-420 — The async resume path computes initial_actual_num_records as:

sum(min(buffer_size, num_records - rg_id * buffer_size) for rg_id in completed_ids)

This formula assumes each row group was written with exactly min(buffer_size, remaining) rows, ignoring dropped rows. If the original run dropped rows within a row group (e.g., due to generation failures), the actual count would be lower. However, actual_num_records in the sync path also counts written records (not requested), and the metadata from write_metadata stores the true post-drop count. This means the filesystem-derived count may overestimate vs. what was actually written. The comment at line 414 acknowledges metadata may lag, but the formula's assumption about no drops could lead to inflated actual_num_records in the final metadata when some rows were dropped in completed groups.

(M3) batch_manager.start() calls reset() which deletes files on resume path
dataset_batch_manager.py:177start() calls self.reset() which sets _current_batch_number = 0 and _actual_num_records = 0, then immediately overrides them. The reset(delete_files=False) call is harmless here (it doesn't delete files), but it does zero-out internal state that's immediately overwritten. While functionally correct, this coupling is subtle — if reset() ever gains side effects beyond zeroing counters, the resume path would break silently.

Low Severity

(L1) Plan/implementation divergence: async engine support
The plan document (plans/525/resume-interrupted-runs.md) states in the Design Decisions table: "Async engine: Raise DatasetGenerationError if DATA_DESIGNER_ASYNC_ENGINE=1 with resume=True" and in Trade-offs: "Resume support for async engine: deferred to a follow-up." The implementation fully supports async resume. The plan should be updated to reflect the actual implementation.

(L2) _ResumeState.buffer_size field is redundant
dataset_builder.py:91_ResumeState stores buffer_size but it's always set to the same buffer_size parameter that was already validated. The field is never read after construction in _build_with_resume — the method uses the buffer_size parameter directly. The field could be removed to avoid confusion.

(L3) Incremental metadata writes add I/O overhead to async engine
dataset_builder.py:443write_metadata is now called after every row group checkpoint in finalize_row_group. For large datasets with many small row groups, this adds per-row-group disk I/O. The trade-off (resumability vs. performance) is reasonable, but worth noting in documentation or the PR description. The final write_metadata call at line 478 is documented as redundant ("overwrites the last incremental write with identical content") — good.

(L4) Test file has mid-file imports
test_dataset_builder.py:927-429 — The resume test section re-imports json, Path, and ArtifactStorage with underscore-prefixed aliases (_json, _Path, _ArtifactStorage) mid-file. While this works, it's unconventional and potentially confusing. Standard practice is to add imports at the top of the file.

(L5) No validation of start_batch or initial_actual_num_records bounds
dataset_batch_manager.py:165-166 — The new start_batch and initial_actual_num_records parameters have no validation (e.g., start_batch >= 0, start_batch <= num_batches, initial_actual_num_records >= 0). Since these are only called from internal resume code that validates upstream, this is acceptable — but defensive checks would prevent misuse if the method is called from new paths in the future.

Positive Observations

  • Comprehensive test coverage: 20+ new test cases covering validation errors, already-complete detection, async/sync paths, filesystem-vs-metadata crash window scenarios, and processor non-invocation on skip.
  • Clean separation of sync/async resume: The sync path uses _build_with_resume with DatasetBatchManager, while the async path extends _build_async with skip_row_groups and filesystem-based counters. No shared mutable state between the two paths.
  • Filesystem as source of truth for async: The decision to derive initial_actual_num_records from the filesystem rather than potentially-stale metadata (lines 414-420) handles the crash window correctly and is well-documented.
  • Graceful degradation for missing metadata: The build() method at line 188 handles the case where metadata.json is missing (interrupted before any batch completed) by logging and restarting fresh, rather than raising an error. This is a UX improvement over the plan's original "raise error" approach.
  • No breaking changes: All new parameters default to their pre-existing behavior (resume=False, start_batch=0, initial_actual_num_records=0).
  • Incremental metadata writes enable async resumability — a meaningful improvement over the plan's deferred-async-resume decision.

Verdict

Approve with suggestions. The implementation is solid, well-tested, and handles edge cases thoughtfully. The high-severity finding (H1) is a readability/maintainability concern rather than a correctness bug — the discarded return value works because _load_resume_state raises on validation failure. The medium-severity findings (M1-M3) are minor robustness concerns. None of these block merging, but H1 and M2 are worth addressing before or shortly after merge.

@github-actions github-actions bot removed the agent-review Trigger agentic CI review label Apr 16, 2026
H1: Annotate _load_resume_state() call in _build_async with a comment
clarifying that the return value is intentionally discarded — the async
path derives ground-truth state from the filesystem, not from metadata.

M1: Replace fragile split("_", 1)[1] filename parsing in
_find_completed_row_group_ids with re.fullmatch(r"batch_(\d+)", stem),
making it immune to unexpected filename shapes.

L2: Remove unused _ResumeState.buffer_size field — the field was set
but never read; _build_with_resume uses the buffer_size parameter directly.

L4: Move mid-file imports (json, Path, ArtifactStorage) used by resume
tests to the top of test_dataset_builder.py and drop the underscore aliases.

L1: Update plans/525/resume-interrupted-runs.md to reflect that async
engine resume is fully implemented (not deferred) and that missing
metadata triggers a fresh restart instead of raising DatasetGenerationError.
@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps bot commented Apr 17, 2026

Want your agent to iterate on Greptile's feedback? Try greploops.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

feat: resume interrupted dataset generation runs

2 participants