Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
95 commits
Select commit Hold shift + click to select a range
98d2f16
feat: including prefetch_file flag
ankitmaster08 Oct 7, 2025
88ef2a0
updated msc version
ankitmaster08 Oct 7, 2025
d58b26e
Update msc to allow huggingface
voegtlel Nov 3, 2025
267147e
Fix epath test for latest msc
voegtlel Nov 3, 2025
3c3bc65
Merge remote-tracking branch 'origin/develop' into feature/update_dep…
voegtlel Nov 3, 2025
35b6238
Merge branch 'feature/update_dependencies' into prefetch-file
voegtlel Nov 3, 2025
644db4f
Merge pull request #171 from ankitmaster08/prefetch-file
voegtlel Nov 3, 2025
04fdcb9
Remove unused dependency
voegtlel Nov 3, 2025
830bef3
Merge pull request #180 from NVIDIA/feature/update_dependencies
philipp-fischer Nov 3, 2025
185694d
Format
voegtlel Nov 3, 2025
0881163
Implement proper error handler to task encoder
voegtlel Nov 3, 2025
bc3386e
Next suggestion
voegtlel Nov 3, 2025
43cabd5
Fix test
voegtlel Nov 3, 2025
dffdb1a
Move exception handler to worker config. Implement better error loggi…
voegtlel Nov 4, 2025
89a4905
Update comments
voegtlel Nov 5, 2025
3fe67f5
Fix test
voegtlel Nov 5, 2025
274f51b
Add docs for error handling
voegtlel Nov 6, 2025
88f3fd4
Update docs from comments
voegtlel Nov 7, 2025
cbe8ae3
Merge pull request #181 from NVIDIA/feature/error_handling
philipp-fischer Nov 7, 2025
ccd9955
Re-export exception handlers
voegtlel Nov 18, 2025
877c31e
Fix legacy error handler
voegtlel Nov 18, 2025
c35d6e7
First draft for media metadata
philipp-fischer Nov 18, 2025
ded4fec
Fix test
philipp-fischer Nov 18, 2025
4e28ba6
Address review comments
philipp-fischer Nov 18, 2025
45e7d31
New prepare-media command. Introduce parallelism for filesystem prepare.
philipp-fischer Nov 18, 2025
25106f9
Ruff
philipp-fischer Nov 18, 2025
043227a
Fix bug with sqlite index
philipp-fischer Nov 18, 2025
5275e44
Fix progress
philipp-fischer Nov 18, 2025
2a65194
Update existing WDS with prepare-media command
philipp-fischer Nov 18, 2025
1fc5c7f
Typing fixes, and fix source for file store
voegtlel Nov 18, 2025
b220f9c
Small fix an col rename
philipp-fischer Nov 18, 2025
f3c52d5
Disallow duplicate sample keys from now on. Add helper to path tar fi…
philipp-fischer Nov 18, 2025
ca00133
Address review comments. Parallelized TarPatcher
philipp-fischer Nov 18, 2025
04f6bb1
Ruff
philipp-fischer Nov 18, 2025
0ae1310
Small typing and parallism updates
voegtlel Nov 18, 2025
938bfe1
Better pillow-adjusted image extensions
philipp-fischer Nov 18, 2025
e3b1fc4
TarPatcher num_workers and better CLI flags
philipp-fischer Nov 18, 2025
5b93f29
Super optimized tar patcher using numba
voegtlel Nov 18, 2025
08caa1f
Deps. Formatting. Bring num_workers back. Fix bug in `split_ustar_path`
philipp-fischer Nov 18, 2025
1d2077a
Finish numba implementation of tar_patcher
voegtlel Nov 18, 2025
ac9d775
Improve lustre speed of tar patcher
voegtlel Nov 18, 2025
4e444ec
Fix sample key only once for a sample, not per part
voegtlel Nov 18, 2025
c1aa086
Add constants
philipp-fischer Nov 18, 2025
8eef6a2
Clean output
voegtlel Nov 18, 2025
a546851
Make tar patcher optional
voegtlel Nov 18, 2025
6cb96ff
Update the sample key to reflect the actual key within the shard (i.e…
voegtlel Nov 18, 2025
9a47fc6
Fix imports of config constants
philipp-fischer Nov 18, 2025
2bffa2b
Add FileStore wrapper class. Fix `aux is None` case.
philipp-fischer Nov 18, 2025
c69b2b0
Infer statelessness from inner cooking method
philipp-fischer Nov 18, 2025
c9cc494
Ruff
philipp-fischer Nov 18, 2025
c6fe8fc
Fix keys in test
philipp-fischer Nov 18, 2025
a51184e
Fix tests
voegtlel Nov 18, 2025
57f15db
Raise if key not found for get_media_metadata
voegtlel Nov 18, 2025
8c7621e
Started with docs
philipp-fischer Nov 18, 2025
88ff5bd
Typo
philipp-fischer Nov 18, 2025
cfdb420
Merge pull request #182 from NVIDIA/feature/metadata_in_sqlite
voegtlel Nov 19, 2025
6c6e964
Add more docs for media metadata
philipp-fischer Nov 20, 2025
de741bc
Fix test import
philipp-fischer Nov 20, 2025
8e9fa47
Add prog data prep docs
philipp-fischer Nov 20, 2025
c8229c9
Ruff
philipp-fischer Nov 20, 2025
0f2521c
Address review comments
philipp-fischer Nov 20, 2025
7034c1f
Simplify CLI and remove `--media-metadata`
philipp-fischer Nov 20, 2025
d97b66b
Merge pull request #187 from NVIDIA/feature/metadata_in_sqlite_docs
philipp-fischer Nov 20, 2025
5344fd8
Improve fastseek speed and fix time handling. Add fastseek detail tes…
voegtlel Dec 8, 2025
3de346a
Fix boundary conditions
voegtlel Dec 8, 2025
aa3dc38
Remove dbg comments
voegtlel Dec 9, 2025
1f7fa34
Improve test speed
voegtlel Dec 9, 2025
fea5742
Optimize test
voegtlel Dec 9, 2025
cde61d3
Fix a rare deadlock, that happens in __del__ of a cache lazy, when it…
voegtlel Dec 11, 2025
8b77616
Test fix
voegtlel Dec 12, 2025
96fc50c
Merge pull request #190 from NVIDIA/fix/cache_del_deadlock
voegtlel Dec 12, 2025
bfff755
Cleanup code for easier review
voegtlel Dec 12, 2025
4eebc7b
Improve fastseek implementation. Implement comments
voegtlel Dec 12, 2025
6b6f9a2
Fix end-of-video handling
voegtlel Dec 15, 2025
17e3aa0
Remove mpeg custom frame seeking
voegtlel Dec 15, 2025
2f00a49
Fix aggregator pool on mac not using fork.
voegtlel Dec 16, 2025
51bdc85
Implement prepare non-interactive
voegtlel Dec 16, 2025
8a0b723
Ensure we use forking for dataloader (even on mac)
voegtlel Dec 16, 2025
ba6a019
Upgrade GitHub Actions for Node 24 compatibility
salmanmkc Dec 16, 2025
af4c1a9
Upgrade GitHub Actions to latest versions
salmanmkc Dec 16, 2025
3f397e0
Small changes / fixes
voegtlel Dec 16, 2025
e0390ea
Small rename
voegtlel Dec 16, 2025
2331a1c
Fix pypa/gh-action-pypi-publish to use SHA pinning
salmanmkc Dec 17, 2025
aa681fd
Merge pull request #189 from NVIDIA/feature/improve_fastseek_speed
voegtlel Dec 18, 2025
cd690a8
Add audio_num_samples (it's slow, but needed)
voegtlel Dec 12, 2025
812d7b1
use version tag
salmanmkc Dec 18, 2025
048de7f
Merge branch 'develop' into upgrade-github-actions-node24-general
salmanmkc Dec 18, 2025
2036ebd
Merge pull request #196 from salmanmkc/upgrade-github-actions-node24-…
philipp-fischer Jan 2, 2026
f56ac13
Merge pull request #195 from salmanmkc/upgrade-github-actions-node24
philipp-fischer Jan 2, 2026
add40d1
Remove the context delay
voegtlel Jan 2, 2026
351477d
Improve test for crude preparation, add dataset yaml name
voegtlel Jan 2, 2026
d14eab0
Merge pull request #191 from NVIDIA/feature/improve_metadata
philipp-fischer Jan 2, 2026
0dff391
Merge pull request #192 from NVIDIA/fix/aggregator_pool_mac
philipp-fischer Jan 2, 2026
06d99a0
Removing shard prints, fixing gc (encountered bug with django, fixing…
voegtlel Jan 16, 2026
d5ddf11
Merge pull request #198 from NVIDIA/feature/remove_prints_fix_gc
voegtlel Jan 16, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/documentation.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4
uses: actions/checkout@v6

- name: Install uv
uses: astral-sh/setup-uv@v5
Expand All @@ -38,7 +38,7 @@ jobs:
just docs

- name: Upload artifact
uses: actions/upload-pages-artifact@v3
uses: actions/upload-pages-artifact@v4
with:
path: docs/build

Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/license_headers.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ jobs:

steps:
- name: Checkout Repository
uses: actions/checkout@v3
uses: actions/checkout@v6
- name: Set up Python
uses: actions/setup-python@v5
uses: actions/setup-python@v6
with:
python-version: 3.9
- name: Install dependencies
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jobs:
id-token: write # This permission is mandatory for trusted publishing
steps:
- name: Checkout code
uses: actions/checkout@v4
uses: actions/checkout@v6

- name: Install uv
uses: astral-sh/setup-uv@v5
Expand All @@ -32,4 +32,4 @@ jobs:
just build

- name: Publish package
uses: pypa/gh-action-pypi-publish@release/v1
uses: pypa/gh-action-pypi-publish@release/v1.13 # release/v1.13
2 changes: 1 addition & 1 deletion .github/workflows/ruff.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jobs:

steps:
- name: Checkout code
uses: actions/checkout@v4
uses: actions/checkout@v6

- name: Install uv
uses: astral-sh/setup-uv@v5
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jobs:

steps:
- name: Checkout code
uses: actions/checkout@v4
uses: actions/checkout@v6

- name: Install uv
uses: astral-sh/setup-uv@v5
Expand Down
57 changes: 57 additions & 0 deletions docs/source/advanced/crude_datasets.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,34 @@ All it does, is to forward the key, restore key and flavors from the dict to the

In a real use-case you will want to do a lot more here and we recommend keeping the cook methods in separate files and importing them where you define your TaskEncoder.

### Using Media Metadata in Monolithic Datasets
If you prepared your dataset with media metadata, you can access it in the cooker using the {py:meth}`get_media_metadata <megatron.energon.FileStore.get_media_metadata>` method of the {py:class}`FileStore <megatron.energon.FileStore>`. This will be faster than decoding the media files to infer the media metadata like video duration or image size.
Here we are looking at the use case for primary datasets, but for [auxiliary datasets](aux-data), this is even more beneficial.

#### Metadata from the Primary Dataset

```python
from megatron.energon import FileStore

# ...

@stateless
@cooker(need_primary=True)
def cook_media_metadata(sample: dict, primary: FileStore) -> TextSample:
"""This cooker loads the media metadata for the current sample
and returns a text sample with the metadata description."""

# The `.` prefix is used to access the media metadata of the primary dataset,
# referring to a part of the current sample.
primary_media_metadata = primary.get_media_metadata(".wav")

return TextSample(
**basic_sample_keys(sample),
text=f"This audio has duration {primary_media_metadata.audio_duration} seconds",
)
```


(aux-data)=
## Auxiliary Data for Polylithic Datasets

Expand Down Expand Up @@ -165,6 +193,35 @@ def cook_text(sample: dict, primary: FileStore, foo_bar_source: FileStore) -> Te

You can then retrieve files by their names from the primary dataset.


### Using Media Metadata in Polylithic Datasets
If you prepared your auxiliary dataset with media metadata, you can access it in the cooker using the {py:meth}`get_media_metadata <megatron.energon.FileStore.get_media_metadata>` method of the {py:class}`FileStore <megatron.energon.FileStore>`.
This is much faster than reading the media files themselves to infer the media metadata like video duration or image size.
Especially, if you are working with Lazy objects, you can defer loading the media files entirely until you actually need them.
For example in {py:meth}`postencode_sample(self, sample: T_sample) -> T_encoded_sample <megatron.energon.TaskEncoder.postencode_sample>`, when using packing.

```python
from megatron.energon import FileStore

# ...

def cook_media_metadata(sample: dict, foo_bar_source: FileStore) -> TextSample:
# Use the image filename from the primary sample to get the media metadata from the auxiliary dataset
media_metadata = foo_bar_source.get_media_metadata(sample['image'])

return TextSample(
**basic_sample_keys(sample),
text=f"This image has size {media_metadata.width}x{media_metadata.height} and format {media_metadata.format}",
)

```

The path of the media file within the auxiliary dataset, is what needs to be passed to the `get_media_metadata` method, for example `0001.jpg` or `images/0001.jpg` if within a subfoler inside the WebDataset or filesystem dataset.

The return value will be either {py:class}`AVMetadata <megatron.energon.media.AVMetadata>` or {py:class}`ImageMetadata <megatron.energon.media.ImageMetadata>`.
Click on them to see the fields and their types.


(cache-pools)=
## Cache Pools

Expand Down
70 changes: 70 additions & 0 deletions docs/source/advanced/data_prep_api.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
<!--- Copyright (c) 2025, NVIDIA CORPORATION.
SPDX-License-Identifier: BSD-3-Clause -->

# Programmatic Data Preparation

Below, we provide an example function that shows how to prepare a dataset programmatically.

```python
from megatron.energon.flavors import BaseWebdatasetFactory

def prepare_one_dataset(path: Path, num_workers: int, template_dir: Path):

if (path / ".nv-meta" / "dataset.yaml").exists():
print(f"Dataset {path} already prepared. Skipping.")
return

# Fixed settings
tar_index_only = False
split_parts_ratio = [("train", 1), ("val", 0), ("test", 0)]
split_parts_patterns = None

# Get all tar files
all_tars = list(path.glob("**/*.tar")) + list(path.glob("**/*.tgz"))
all_tars = [str(p.relative_to(path)) for p in sorted(all_tars)]

if len(all_tars) == 0:
print("Did not find any tar files. Exiting.")
return

print(f"Found {len(all_tars)} tar files in total. The first and last ones are:")
print(f"- {all_tars[0]}")
print(f"- {all_tars[-1]}")

def progress_fn(els, length=None):
with click.progressbar(
els,
label="Indexing shards",
show_pos=True,
length=length,
) as bar:
for el in bar:
yield el

found_types, duplicates = BaseWebdatasetFactory.prepare_dataset(
path,
all_tars,
split_parts_ratio=split_parts_ratio,
split_parts_patterns=split_parts_patterns,
progress_fn=progress_fn,
tar_index_only=tar_index_only,
shuffle_seed=None,
workers=num_workers,
)

# Copy sample loader and dataset.yaml templates
for file in template_dir.glob("*"):
shutil.copy(file, path / ".nv-meta" / file.name)
```

Example usage:

First, create a template directory with the `dataset.yaml` file, and optionally the `sample_loader.py` file.
Let's call it `template_dir`.

Then, run the script:

```python
if __name__ == "__main__":
prepare_one_dataset(Path("/path/to/dataset"), 16, Path("/path/to/template_dir"))
```
159 changes: 159 additions & 0 deletions docs/source/advanced/error_handling.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
<!--- Copyright (c) 2025, NVIDIA CORPORATION.
SPDX-License-Identifier: BSD-3-Clause -->

# Error Handling

Energon provides configurable error handling to gracefully manage exceptions during data loading and processing.
By default, errors in task encoder code are logged and the sample is skipped, allowing training to continue.
Some system exceptions (clearly programming errors) always cause immediate failure.

By default, if more than 100 samples in a row raise an error, processing will also fail. This is configurable for the
{py:class}`~megatron.energon.task_encoder.base.TaskEncoder` user-functions by setting the `@stateless(failure_tolerance=100)` parameter.

## Error Handler Configuration

Error handlers are configured via {py:class}`WorkerConfig <megatron.energon.WorkerConfig>`:

```python
from megatron.energon import WorkerConfig, log_exception, reraise_exception

worker_config = WorkerConfig(
rank=0,
world_size=1,
num_workers=4,
global_error_handler=log_exception, # Default: logs and skips failed samples
restore_error_handler=reraise_exception, # Default: fails fast on restore
)
```

**`global_error_handler`:** Handles exceptions during normal iteration (training/validation).
Default is {py:func}`log_exception <megatron.energon.log_exception>` which logs details and skips the sample.

**`restore_error_handler`:** Handles exceptions during checkpoint restoration.
Default is {py:func}`reraise_exception <megatron.energon.reraise_exception>` which fails fast to ensure checkpoint integrity.

## System Exceptions

Programming errors and critical system issues always cause immediate failure and are never handled by error handlers:
`SystemError`, `SyntaxError`, `ImportError`, `StopIteration`, `StopAsyncIteration`, `MemoryError`, `RecursionError`, `ReferenceError`, `NameError`, `UnboundLocalError`, and {py:exc}`FatalSampleError <megatron.energon.FatalSampleError>`.
{py:exc}`FatalSampleError <megatron.energon.FatalSampleError>` is raised automatically when consecutive failure tolerance is exceeded or when a system exception occurs during sample processing.

## Built-in Error Handlers

### `log_exception`

Logs detailed error information and continues:
- Exception traceback
- Source information (dataset path, shard, index)
- Sample details in readable format

```python
from megatron.energon import log_exception

worker_config = WorkerConfig(
rank=0,
world_size=1,
num_workers=4,
global_error_handler=log_exception,
)
```

### `reraise_exception`

Immediately reraises the exception to halt iteration:

```python
from megatron.energon import reraise_exception

worker_config = WorkerConfig(
rank=0,
world_size=1,
num_workers=4,
global_error_handler=reraise_exception, # Fail on any error
)
```

### Custom Error Handlers

Implement custom error handlers with this signature:

```python
def my_error_handler(
exception: Exception,
sample: Any,
sources: list[SourceInfo] | None
) -> None:
# Log to your monitoring system
log_to_monitoring(exception, sample)

# Optionally reraise for critical errors
if isinstance(exception, CriticalError):
raise exception
```

```python
worker_config = WorkerConfig(
rank=0,
world_size=1,
num_workers=4,
global_error_handler=my_error_handler,
)
```

## Failure Tolerance for Task Encoder Functions

By default, if more than 100 samples in a row raise an error, processing will fail with a {py:exc}`FatalSampleError <megatron.energon.FatalSampleError>`.

For {py:class}`TaskEncoder <megatron.energon.TaskEncoder>` methods, configure this via the `@stateless` decorator:

```python
from megatron.energon import DefaultTaskEncoder, stateless

class MyTaskEncoder(DefaultTaskEncoder):
@stateless(failure_tolerance=50)
def encode_sample(self, sample):
# Process sample - tolerates up to 50 consecutive failures
if sample.is_corrupted():
raise ValueError("Corrupted sample")
return sample

@stateless(restore_seeds=True, failure_tolerance=200)
def pack_selected_samples(self, samples):
# Packing with higher tolerance and deterministic randomness
return pack_samples(samples)
```

Set `failure_tolerance=0` to disable tolerance checking for a specific function.

```{admonition} Note
:class: important
Tolerance limits count *consecutive* failures. A single successful sample resets the counter.
```

## Skip or Fail Explicitly

Raise {py:exc}`SkipSample <megatron.energon.SkipSample>` to explicitly skip a sample without logging an error:

```python
from megatron.energon import SkipSample

def process_sample(sample):
try:
...
except MySpecificError:
raise SkipSample()
return sample
```

Raise {py:exc}`FatalSampleError <megatron.energon.FatalSampleError>` to cause immediate failure, bypassing the error handler:

```python
from megatron.energon import FatalSampleError

def process_sample(sample):
try:
...
except MyFatalError as e:
raise FatalSampleError.from_sample(sample, "Critical corruption detected") from e
return sample
```
10 changes: 10 additions & 0 deletions docs/source/api/modules_data.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,13 @@ SPDX-License-Identifier: BSD-3-Clause -->
:undoc-members:
:show-inheritance:
```


# megatron.energon.media

```{eval-rst}
.. automodule:: megatron.energon.media
:members:
:undoc-members:
:show-inheritance:
```
Loading