feat: resume interrupted dataset generation runs (sync + async engine)#526
feat: resume interrupted dataset generation runs (sync + async engine)#526przemekboruta wants to merge 15 commits intoNVIDIA-NeMo:mainfrom
Conversation
- ArtifactStorage gains a `resume: bool = False` field - resolved_dataset_name skips timestamp logic when resume=True, returning the existing dataset folder name as-is - Raises ArtifactStorageError on resume=True when the target folder is absent or empty (no data to resume from) - New clear_partial_results() removes in-flight partial results left over from an interrupted run Fixes NVIDIA-NeMo#525
DatasetBatchManager.start() now accepts: - start_batch: int = 0 — first batch index to process - initial_actual_num_records: int = 0 — records already on disk Both default to 0 so all existing call sites are unaffected. Fixes NVIDIA-NeMo#525
- build() gains a resume: bool = False parameter - _load_resume_state() reads metadata.json and validates that num_records and buffer_size match the original run - _build_with_resume() skips completed batches, clears in-flight partial results, and continues from the first incomplete batch - Raises DatasetGenerationError with clear messages for: - missing metadata.json (interrupted before first batch completes) - num_records mismatch - buffer_size mismatch - DATA_DESIGNER_ASYNC_ENGINE=1 (not yet supported) - Logs a warning and returns early when dataset is already complete Fixes NVIDIA-NeMo#525
- create() gains resume: bool = False - _create_resource_provider() passes resume to ArtifactStorage - builder.build() receives the resume flag Fixes NVIDIA-NeMo#525
Covers: - ArtifactStorage.resolved_dataset_name with resume=True - ArtifactStorage.clear_partial_results() - DatasetBatchManager.start() with start_batch and initial_actual_num_records - DatasetBuilder.build(resume=True): missing metadata, num_records mismatch, buffer_size mismatch, already-complete detection Fixes NVIDIA-NeMo#525
Greptile SummaryThis PR adds The two concerns flagged in the previous review are addressed: the
|
| Filename | Overview |
|---|---|
| packages/data-designer-engine/src/data_designer/engine/dataset_builders/dataset_builder.py | Core resume logic — adds _load_resume_state, _build_with_resume, _find_completed_row_group_ids, and extends _build_async. Batch-number synchronisation between the loop index and batch_manager._current_batch_number is correct; generated flag properly gates post-generation processors. |
| packages/data-designer-engine/src/data_designer/engine/storage/artifact_storage.py | Adds resume field and clear_partial_results(); resolved_dataset_name correctly short-circuits timestamping on resume and raises when folder is absent. clear_partial_results() is idempotent (guards on exists()). No issues found. |
| packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/dataset_batch_manager.py | start() gains start_batch and initial_actual_num_records; both are applied after reset() so they correctly override its zero-init. The num_records_batch property indexes _num_records_list[_current_batch_number] which is valid for all resume start points. |
| packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/row_group_buffer.py | Adds initial_actual_num_records and initial_total_num_batches to seed counters for async resume. checkpoint_row_group correctly increments both counters only for non-empty row groups, keeping write_metadata consistent. |
| packages/data-designer/src/data_designer/interface/data_designer.py | Threads resume through _create_resource_provider → ArtifactStorage and into builder.build(). Clean pass-through with no logic of its own. |
| packages/data-designer-engine/tests/engine/dataset_builders/test_dataset_builder.py | Adds 18 resume tests covering sync/async paths, crash-window simulation, already-complete detection, and processor bypass. The filesystem-truth test (test_initial_actual_num_records_from_filesystem_in_crash_window) provides good coverage of the stale-metadata scenario. |
| packages/data-designer-engine/tests/engine/storage/test_artifact_storage.py | Covers resolved_dataset_name branching under resume=True/False and both clear_partial_results() cases. Tests are well-isolated. |
Sequence Diagram
sequenceDiagram
participant User
participant DataDesigner
participant DatasetBuilder
participant ArtifactStorage
participant BatchManager
participant RowGroupBuffer
User->>DataDesigner: create(config, num_records, resume=True)
DataDesigner->>ArtifactStorage: new(artifact_path, resume=True)
Note over ArtifactStorage: resolved_dataset_name returns existing folder
DataDesigner->>DatasetBuilder: build(num_records, resume=True)
alt metadata.json missing
DatasetBuilder->>ArtifactStorage: clear_partial_results()
Note over DatasetBuilder: resume = False, full restart
else ASYNC ENGINE
DatasetBuilder->>DatasetBuilder: _load_resume_state() [validate only]
DatasetBuilder->>DatasetBuilder: _find_completed_row_group_ids() [filesystem scan]
DatasetBuilder->>ArtifactStorage: clear_partial_results()
alt all row groups done
DatasetBuilder-->>DataDesigner: generated=False (return early)
else resume remaining
DatasetBuilder->>RowGroupBuffer: new(initial_actual_num_records, initial_total_num_batches)
loop each remaining row group
DatasetBuilder->>RowGroupBuffer: checkpoint_row_group(rg_id)
DatasetBuilder->>RowGroupBuffer: write_metadata() [incremental]
end
DatasetBuilder->>RowGroupBuffer: write_metadata() [final]
end
else SYNC ENGINE with resume
DatasetBuilder->>DatasetBuilder: _load_resume_state() [validate + read metadata]
DatasetBuilder->>BatchManager: start(start_batch=N, initial_actual_num_records=X)
alt all batches done
DatasetBuilder-->>DataDesigner: generated=False (return early)
else resume remaining
DatasetBuilder->>ArtifactStorage: clear_partial_results()
loop batch N to total-1
DatasetBuilder->>BatchManager: _run_batch(current_batch_number=idx)
BatchManager->>ArtifactStorage: write then move_partial_to_final
BatchManager->>ArtifactStorage: write_metadata [per-batch]
end
DatasetBuilder->>BatchManager: finish()
end
end
alt generated=True
DatasetBuilder->>DatasetBuilder: run_after_generation(processors)
end
DataDesigner-->>User: DatasetCreationResults
Reviews (9): Last reviewed commit: "fix(builder): address agent review findi..." | Re-trigger Greptile
…INE=1) - Add _find_completed_row_group_ids() to scan parquet-files/ for already-written row groups by parsing batch_*.parquet filenames - _build_async() now accepts resume=True: loads metadata, finds completed row groups, clears partial results, and logs progress; returns early if all row groups are done - _prepare_async_run() accepts skip_row_groups, initial_actual_num_records, and initial_total_num_batches so the scheduler only processes remaining row groups and RowGroupBufferManager starts from the correct counts - RowGroupBufferManager.__init__ gains initial_actual_num_records and initial_total_num_batches params to seed the counters on resume - finalize_row_group closure now writes incremental metadata after each checkpoint so any run (resume or not) can be resumed if interrupted mid-way - Remove the guard that rejected resume=True with DATA_DESIGNER_ASYNC_ENGINE=1 - Add tests for all new paths
…set already complete _build_with_resume and _build_async now return False when the dataset is already complete (early-return path), True otherwise. build() skips _processor_runner.run_after_generation() on False, preventing processors from calling shutil.rmtree and rewriting an already-finalized dataset. Fixes the issue raised in review: greptile P1 comment on PR NVIDIA-NeMo#526.
…sync resume Metadata can lag by one row group if a crash occurs between move_partial_result_to_final_file_path and write_metadata. Using len(completed_ids) from the filesystem scan instead of state.num_completed_batches ensures the final metadata reflects the actual number of parquet files present, not the potentially stale metadata count.
|
Issue #525 has been triaged. The linked issue check is being re-evaluated. |
…efore first batch) When a run is interrupted before any row group or batch completes, metadata.json is never written. Previously resume=True would raise DatasetGenerationError in this case. Now build() detects the missing file, logs an info message, clears any leftover partial results and falls back to a clean fresh run. This is the common scenario for small datasets (fewer records than buffer_size) where all records fit in a single row group.
…ync resume In the crash window (row group written to disk but write_metadata crashed before updating the file), both initial_total_num_batches and initial_actual_num_records now use the filesystem-discovered completed_ids as source of truth. Previously initial_actual_num_records was read from potentially stale metadata, causing actual_num_records in the final metadata to be undercounted by one row group. Also adds a test covering the partial-resume crash-window scenario.
Code Review: PR #526 — Resume interrupted dataset generation runs (sync + async engine)SummaryThis PR adds a Scope: ~860 additions, ~16 deletions across 10 files (including a plan doc and comprehensive tests). The feature is well-designed: it leverages existing FindingsHigh Severity(H1) Medium Severity(M1) (M2) sum(min(buffer_size, num_records - rg_id * buffer_size) for rg_id in completed_ids)This formula assumes each row group was written with exactly (M3) Low Severity(L1) Plan/implementation divergence: async engine support (L2) (L3) Incremental metadata writes add I/O overhead to async engine (L4) Test file has mid-file imports (L5) No validation of Positive Observations
VerdictApprove with suggestions. The implementation is solid, well-tested, and handles edge cases thoughtfully. The high-severity finding (H1) is a readability/maintainability concern rather than a correctness bug — the discarded return value works because |
H1: Annotate _load_resume_state() call in _build_async with a comment
clarifying that the return value is intentionally discarded — the async
path derives ground-truth state from the filesystem, not from metadata.
M1: Replace fragile split("_", 1)[1] filename parsing in
_find_completed_row_group_ids with re.fullmatch(r"batch_(\d+)", stem),
making it immune to unexpected filename shapes.
L2: Remove unused _ResumeState.buffer_size field — the field was set
but never read; _build_with_resume uses the buffer_size parameter directly.
L4: Move mid-file imports (json, Path, ArtifactStorage) used by resume
tests to the top of test_dataset_builder.py and drop the underscore aliases.
L1: Update plans/525/resume-interrupted-runs.md to reflect that async
engine resume is fully implemented (not deferred) and that missing
metadata triggers a fresh restart instead of raising DatasetGenerationError.
|
Want your agent to iterate on Greptile's feedback? Try greploops. |
Summary
Closes #525
Adds
resume: bool = FalsetoDataDesigner.create()andDatasetBuilder.build(). Whenresume=True, generation picks up from where the interrupted run left off — for both the sync and async engines.Changes
ArtifactStorageresume: bool = Falsefield;resolved_dataset_nameskips timestamp logic on resume; newclear_partial_results()DatasetBatchManager.start()start_batchandinitial_actual_num_recordsparams (default 0, no breakage)DatasetBuilder.build()resumeparam;_load_resume_state()reads and validatesmetadata.json;_build_with_resume()skips completed batches (sync);_build_async()skips completed row groups (async)RowGroupBufferManager.__init__()initial_actual_num_recordsandinitial_total_num_batchesparams to seed counters on resumeDatasetBuilder._find_completed_row_group_ids()parquet-files/forbatch_*.parquetto determine which async row groups are already donefinalize_row_groupclosuremetadata.jsonafter every row-group checkpoint (not just at the end), making all async runs resumable if interruptedDataDesigner.create()resume, passes it through toArtifactStorageandbuilder.build()Validation and error cases
metadata.json→DatasetGenerationError(interrupted before any batch completed)num_recordsmismatch →DatasetGenerationErrorbuffer_sizemismatch →DatasetGenerationErrorTest plan
test_resolved_dataset_name_resume_uses_existing_foldertest_resolved_dataset_name_resume_raises_when_no_existing_foldertest_resolved_dataset_name_resume_raises_when_folder_is_emptytest_clear_partial_results_removes_partial_foldertest_clear_partial_results_is_noop_when_no_partial_foldertest_start_with_start_batchtest_start_with_initial_actual_num_recordstest_start_with_start_batch_and_initial_actual_num_recordstest_start_default_values_unchangedtest_build_resume_raises_without_metadatatest_build_resume_raises_on_num_records_mismatchtest_build_resume_raises_on_buffer_size_mismatchtest_build_resume_logs_warning_when_already_completetest_find_completed_row_group_ids_empty_dirtest_find_completed_row_group_ids_with_filestest_find_completed_row_group_ids_ignores_non_batch_filestest_build_async_resume_logs_warning_when_already_completetest_build_async_resume_raises_without_metadatatest_initial_actual_num_recordstest_initial_total_num_batches_reflected_in_metadata