Tiny, stdlib-only DAG runner for Python.
pip install slimflowfrom slimflow import Task, Workflow, Runner
extract = Task(name="extract", func=fetch_data)
transform = Task(name="transform", func=clean, depends_on=[extract])
load = Task(name="load", func=write_to_db, depends_on=[transform], max_retries=3)
workflow = Workflow("my-pipeline", [extract, transform, load])
results = Runner(threads=4).run(workflow)Pass artifacts_dir= to Runner to persist run_results.json (one per run) at the end of each workflow. Failed or skipped tasks also get a sidecar log file. Below is the schema.
{
"schema_version": 1,
"run_id": "8c7a...",
"workflow_name": "my-pipeline",
"started_at": "2026-05-23T17:04:11.000000Z",
"ended_at": "2026-05-23T17:09:32.000000Z",
"duration_s": 321.4,
"python_version": "3.12.4",
"slimflow_version": "0.1.0",
"caller_path": "main.py",
"tasks": [
{
"name": "extract",
"status": "success",
"attempts": 1,
"started_at": "2026-05-23T17:04:11.000000Z",
"ended_at": "2026-05-23T17:07:55.000000Z",
"duration_s": 224.0,
"thread_name": "ThreadPoolExecutor-0_2",
"depends_on": [],
"func_module": "my_pipeline.tasks",
"func_qualname": "fetch_data",
"func_source_path": "my_pipeline/tasks.py",
"func_source_line": 12,
"error": null,
"traceback": null,
"log_path": null
}
]
}func_module / func_qualname / func_source_path are best-effort. Plain functions, functools.partial, and @functools.wraps-decorated functions all resolve cleanly. Lambdas show as "<lambda>". Callable objects fall back to their class name. None of this affects execution — only the BI-grouping tags.