Skip to content

Draft: ASR Open Source Datasets Processing Pipeline#2067

Open
sushmitha-deva-09 wants to merge 10 commits into
NVIDIA-NeMo:mainfrom
sushmitha-deva-09:asr_dp
Open

Draft: ASR Open Source Datasets Processing Pipeline#2067
sushmitha-deva-09 wants to merge 10 commits into
NVIDIA-NeMo:mainfrom
sushmitha-deva-09:asr_dp

Conversation

@sushmitha-deva-09

Copy link
Copy Markdown
Contributor

Description

Usage

# Add snippet demonstrating usage

Checklist

  • I am familiar with the Contributing Guide.
  • New or Existing tests cover these changes.
  • The documentation is up to date with these changes.

Signed-off-by: Sushmitha Deva <sdeva@nvidia.com>
Signed-off-by: Sushmitha Deva <sdeva@nvidia.com>
Signed-off-by: Sushmitha Deva <sdeva@nvidia.com>
Signed-off-by: Sushmitha Deva <sdeva@nvidia.com>
Signed-off-by: Sushmitha Deva <sdeva@nvidia.com>
Signed-off-by: Sushmitha Deva <sdeva@nvidia.com>
Signed-off-by: Sushmitha Deva <sdeva@nvidia.com>
Signed-off-by: Sushmitha Deva <sdeva@nvidia.com>
Signed-off-by: Sushmitha Deva <sdeva@nvidia.com>
@sushmitha-deva-09 sushmitha-deva-09 requested a review from a team as a code owner June 11, 2026 11:17
@sushmitha-deva-09 sushmitha-deva-09 requested review from meatybobby and removed request for a team June 11, 2026 11:17
@copy-pr-bot

copy-pr-bot Bot commented Jun 11, 2026

Copy link
Copy Markdown

This pull request requires additional validation before any workflows can run on NVIDIA's runners.

Pull request vetters can view their responsibilities here.

Contributors can view more details about this message here.

@greptile-apps

greptile-apps Bot commented Jun 11, 2026

Copy link
Copy Markdown
Contributor

Greptile Summary

This PR introduces a new ASR open-source dataset processing pipeline under nemo_curator/stages/audio/asr/, covering HuggingFace Arrow dataset extraction, transcript normalization with language-specific resources (10+ Indic languages), streaming transcript quality statistics, and tarred dataset writing. A tutorial for the IndicVoices dataset and a modest fix to ManifestReader.decompose() to handle OmegaConf sequences are also included.

  • HuggingFaceASRDatasetHandler loads Arrow datasets, decodes audio via datasets.Audio, coerces to mono, resamples, and emits AudioTask objects; joblib threading is used for per-split parallel extraction.
  • TranscriptNormalizationStage / TranscriptStatsStage apply language resource files (alphabet, pretok rules, pnc chars) to normalize and quality-gate transcripts, writing a rolling JSON summary per utterance when output_summary_path is set.
  • SplitAwareManifestWriter / TarredAudioDatasetWriterStage route tasks to per-language/per-split JSONL manifests and wrap NeMo's tarred dataset converter for final packaging.

Confidence Score: 4/5

The pipeline logic is sound for the primary HuggingFace handler path, but stats.py has open issues around per-utterance disk reads that could cause serious slowdowns on large multi-language datasets.

The rolling alphabet-file reads inside TranscriptStatsStage.process() (one summary() call per utterance, each triggering disk reads for every language/source bucket) remain unresolved and will significantly degrade throughput at scale. The new finding here is confined to the convert_audio contract gap, which is a lower-risk maintenance concern.

nemo_curator/stages/audio/asr/normalization/stats.py deserves attention before this PR is merged out of draft status.

Important Files Changed

Filename Overview
nemo_curator/stages/audio/asr/normalization/stats.py New streaming transcript stats stage; _metrics_snapshot calls summary()_bucket_summary() → disk reads on every utterance (flagged previously), and _write_summary() has a re-open file handle leak (flagged previously).
nemo_curator/stages/audio/asr/datasets/base.py New base class for ASR dataset handlers; convert_audio does not enforce mono despite target_channels=1, creating a contract gap for subclasses.
nemo_curator/stages/audio/asr/datasets/huggingface.py New HuggingFace dataset handler; stats dict keyed from skip_reason strings is fragile but all current skip reasons are covered.
nemo_curator/stages/audio/asr/io/split_manifest_writer.py New split-aware manifest writer; teardown() clears _handles but not _counts (flagged previously), which would skew logged entry counts on reuse.
nemo_curator/stages/audio/asr/normalization/transcript.py New transcript normalization stage; normalizers are lazily cached per-language and resource files are loaded once per language. Logic is clean.
nemo_curator/stages/audio/asr/io/tarred_dataset_writer.py New stage wrapping NeMo's tarred dataset converter; straightforward delegation with proper validation of manifest/target_dir length parity.
nemo_curator/stages/audio/asr/metadata.py New typed ASRMetadata dataclass with clean to_dict/from_dict round-trip; extra fields are spread at serialization with core fields taking precedence.
nemo_curator/stages/audio/common.py Minor change: adds _coerce_manifest_path() to convert OmegaConf sequences to plain Python lists before passing to FilePartitioningStage.

Sequence Diagram

%%{init: {'theme': 'neutral'}}%%
sequenceDiagram
    participant Driver
    participant HFHandler as HuggingFaceASRDatasetHandler
    participant Joblib as Joblib Threads
    participant NormStage as TranscriptNormalizationStage
    participant StatsStage as TranscriptStatsStage
    participant ManifestWriter as SplitAwareManifestWriter
    participant TarredWriter as TarredAudioDatasetWriterStage

    Driver->>HFHandler: process(_EmptyTask)
    loop For each lang x native_split
        HFHandler->>HFHandler: load_from_disk + cast_column(Audio)
        HFHandler->>Joblib: Parallel(load_and_process(i))
        Joblib->>HFHandler: coerce_audio mono 1D array
        Joblib->>HFHandler: convert_audio WAV/16kHz/PCM16
        Joblib-->>HFHandler: _RowResult(ASRMetadata)
    end
    HFHandler-->>Driver: list[AudioTask]
    Driver->>NormStage: process(AudioTask)
    NormStage->>NormStage: ResourceTranscriptNormalizer.normalize(text)
    NormStage-->>Driver: AudioTask + unknown_chars + transcript_error
    Driver->>StatsStage: process(AudioTask)
    StatsStage->>StatsStage: accumulate buckets (global + per-lang/source)
    StatsStage->>StatsStage: _write_summary() rolling JSON file
    StatsStage-->>Driver: AudioTask (or None if drop_invalid)
    Driver->>ManifestWriter: process(AudioTask)
    ManifestWriter->>ManifestWriter: route to lang/split.jsonl
    ManifestWriter-->>Driver: AudioTask
    Driver->>TarredWriter: process(_EmptyTask)
    TarredWriter->>TarredWriter: create_tar_datasets(manifest, target_dir)
    TarredWriter-->>Driver: []
Loading
%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%%
sequenceDiagram
    participant Driver
    participant HFHandler as HuggingFaceASRDatasetHandler
    participant Joblib as Joblib Threads
    participant NormStage as TranscriptNormalizationStage
    participant StatsStage as TranscriptStatsStage
    participant ManifestWriter as SplitAwareManifestWriter
    participant TarredWriter as TarredAudioDatasetWriterStage

    Driver->>HFHandler: process(_EmptyTask)
    loop For each lang x native_split
        HFHandler->>HFHandler: load_from_disk + cast_column(Audio)
        HFHandler->>Joblib: Parallel(load_and_process(i))
        Joblib->>HFHandler: coerce_audio mono 1D array
        Joblib->>HFHandler: convert_audio WAV/16kHz/PCM16
        Joblib-->>HFHandler: _RowResult(ASRMetadata)
    end
    HFHandler-->>Driver: list[AudioTask]
    Driver->>NormStage: process(AudioTask)
    NormStage->>NormStage: ResourceTranscriptNormalizer.normalize(text)
    NormStage-->>Driver: AudioTask + unknown_chars + transcript_error
    Driver->>StatsStage: process(AudioTask)
    StatsStage->>StatsStage: accumulate buckets (global + per-lang/source)
    StatsStage->>StatsStage: _write_summary() rolling JSON file
    StatsStage-->>Driver: AudioTask (or None if drop_invalid)
    Driver->>ManifestWriter: process(AudioTask)
    ManifestWriter->>ManifestWriter: route to lang/split.jsonl
    ManifestWriter-->>Driver: AudioTask
    Driver->>TarredWriter: process(_EmptyTask)
    TarredWriter->>TarredWriter: create_tar_datasets(manifest, target_dir)
    TarredWriter-->>Driver: []
Loading

Reviews (2): Last reviewed commit: "Use single handler for huggingface type ..." | Re-trigger Greptile

Comment on lines +120 to +125
def teardown(self) -> None:
for (lang, split), handle in self._handles.items():
handle.close()
filename = self.output_filename_pattern.format(lang=lang, split=split, split_type=split)
logger.info(f"[{self.name}] {lang}/{filename}: {self._counts.get((lang, split), 0)} entries")
self._handles = {}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 teardown() clears _handles but leaves _counts populated. If the stage is reused, _counts from the previous run accumulates into the next, making the logged entry counts wrong. Reset both dicts together.

Suggested change
def teardown(self) -> None:
for (lang, split), handle in self._handles.items():
handle.close()
filename = self.output_filename_pattern.format(lang=lang, split=split, split_type=split)
logger.info(f"[{self.name}] {lang}/{filename}: {self._counts.get((lang, split), 0)} entries")
self._handles = {}
def teardown(self) -> None:
for (lang, split), handle in self._handles.items():
handle.close()
filename = self.output_filename_pattern.format(lang=lang, split=split, split_type=split)
logger.info(f"[{self.name}] {lang}/{filename}: {self._counts.get((lang, split), 0)} entries")
self._handles = {}
self._counts = {}

Signed-off-by: Sushmitha Deva <sdeva@nvidia.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant