Skip to content

Add support for Slurm arrays#2059

Open
sarahyurick wants to merge 18 commits into
NVIDIA-NeMo:mainfrom
sarahyurick:slurm_array
Open

Add support for Slurm arrays#2059
sarahyurick wants to merge 18 commits into
NVIDIA-NeMo:mainfrom
sarahyurick:slurm_array

Conversation

@sarahyurick

@sarahyurick sarahyurick commented Jun 9, 2026

Copy link
Copy Markdown
Contributor

TODO:

  • Add Slurm array parameters to FilePartitioningStage
  • Propagate Slurm array parameters through JsonlReader, ParquetReader, etc.
  • Add retry support
  • Add FailedTask support
  • Add a tutorial
  • Add nemo-curator-slurm-cli (not planned for this PR)
  • Address case when SLURM_ARRAY_TASK_COUNT > cluster limit
  • Add tests

Signed-off-by: Sarah Yurick <sarahyurick@gmail.com>
@copy-pr-bot

copy-pr-bot Bot commented Jun 9, 2026

Copy link
Copy Markdown

Auto-sync is disabled for draft pull requests in this repository. Workflows must be run manually.

Contributors can view more details about this message here.

sarahyurick and others added 4 commits June 9, 2026 14:54
Signed-off-by: Sarah Yurick <sarahyurick@gmail.com>
Signed-off-by: Sarah Yurick <sarahyurick@gmail.com>
Signed-off-by: Sarah Yurick <53962159+sarahyurick@users.noreply.github.com>
Signed-off-by: Sarah Yurick <sarahyurick@gmail.com>
Comment thread nemo_curator/stages/text/deduplication/semantic.py Outdated
@sarahyurick sarahyurick marked this pull request as ready for review June 11, 2026 17:31
@sarahyurick sarahyurick requested review from a team, abhinavg4 and suiyoubi as code owners June 11, 2026 17:31
sarahyurick and others added 6 commits June 11, 2026 12:53
Signed-off-by: Sarah Yurick <sarahyurick@gmail.com>
Signed-off-by: Sarah Yurick <sarahyurick@gmail.com>
Signed-off-by: Sarah Yurick <sarahyurick@gmail.com>
Signed-off-by: Sarah Yurick <sarahyurick@gmail.com>
Signed-off-by: Sarah Yurick <sarahyurick@gmail.com>
# Guarantee every emitted task has a task_id (derived id, or uuid fallback).
results = self._post_process_task_ids(tasks, results)

self._record_failed_tasks([r for r in results if isinstance(r, FailedTask)])

@sarahyurick sarahyurick Jun 12, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed with @abhinavg4 . For now the PR keeps track of FailedTask instances by looking for a user-set FAILED_TASKS_DIR_ENV_VAR = "NEMO_CURATOR_FAILED_TASKS_DIR" and writing a JSON file per failed task in the specified directory.

I did the environment variable and write approach because it seems more reliable than trying to handle a global Python variable, etc. And the reason it is an environment variable is so that BaseStageAdapter does not have to propagate an additional parameter for every single stage (which I think would involve having to update the executors as well?). Open to other suggestions.

@praateekmahajan praateekmahajan left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Took a super quick look, here are some general thoughts

  1. Instead of adding the same 3/4 fields to every "source" stage, can we have a base class and inherit that?
  2. Alternatively (or maybe in addition), pipeline.build iirc now dynamically sets the first stage as is_source_stage=True, so can we just rely on those? If we do then inside backends/base.py we can say "if this is a source stage AND slurm is enabled then just use task_id as my key and decide which shard it belongs to"... this is something @abhinavg4 and I had discussed, this reduces the number of changes needed across curator code base, and also generalizes, since source_stage have task_id which is likely assigned using get_determenistic_task_id which is a hash(metadat['source_files'])

@sarahyurick

Copy link
Copy Markdown
Contributor Author

Took a super quick look, here are some general thoughts

  1. Instead of adding the same 3/4 fields to every "source" stage, can we have a base class and inherit that?
  2. Alternatively (or maybe in addition), pipeline.build iirc now dynamically sets the first stage as is_source_stage=True, so can we just rely on those? If we do then inside backends/base.py we can say "if this is a source stage AND slurm is enabled then just use task_id as my key and decide which shard it belongs to"... this is something @abhinavg4 and I had discussed, this reduces the number of changes needed across curator code base, and also generalizes, since source_stage have task_id which is likely assigned using get_determenistic_task_id which is a hash(metadat['source_files'])

For 1, sure.

For 2, we could but it makes this PR dependent on the resumability PR, which is what we were trying to avoid I thought... also, I guess it is not immediately obvious to me how it can work for source stages that are not a FilePartitioningStage. I get the general idea I guess but I am not convinced that it could always work, it sounds to me like how it would probably have to work is convert all unselected tasks to NoneTask maybe?

sarahyurick and others added 4 commits June 16, 2026 13:33
Signed-off-by: Sarah Yurick <sarahyurick@gmail.com>
Signed-off-by: Sarah Yurick <sarahyurick@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants