diff --git a/compute-feasibility-advisor-proposal.md b/compute-feasibility-advisor-proposal.md new file mode 100644 index 00000000..7ebf70bd --- /dev/null +++ b/compute-feasibility-advisor-proposal.md @@ -0,0 +1,281 @@ +# Compute Feasibility Advisor for AutoIntent + +- **Date:** 2026-05-23 +- **Status:** Proposal (pre-implementation) +- **Audience:** AutoIntent maintainers / contributor picking up the task +- **Scope of this document:** technical specification — *what* the advisor estimates and the formulas it uses. Architectural and system-design choices (where the advisor lives in the codebase, how it integrates with the optimizer, the public API surface, file/module layout) are deliberately left to the implementer. + +## Problem + +AutoIntent's main strength is letting a user kick off a full search-space optimization with one call: + +```python +pipeline = Pipeline.from_preset("transformers-heavy") +pipeline.fit(dataset) +``` + +The cost of that convenience is that users — especially those running on a laptop, a single consumer GPU, or a free cloud instance — cannot tell ahead of time whether their hardware can carry the configuration they have just selected. + +Concrete failure cases we see today: + +- `transformers-heavy` fine-tunes `microsoft/deberta-v3-large` for up to 30 epochs across 40 HPO trials. That needs ~12–18 GB VRAM (full fine-tune, fp32) and many hours of wall time on a single GPU. A user with an 8 GB card finds out by OOM, often several minutes into a run. +- Swapping `intfloat/multilingual-e5-large-instruct` (2 GB) for `sentence-transformers/all-MiniLM-L6-v2` (90 MB) changes the resource bill by an order of magnitude — but nothing surfaces this difference up front. +- Disk is a silent failure mode: a search space referencing several large checkpoints can pull >10 GB into the HF cache before any training starts. + +The target audience for this feature is users with limited resources who pick a preset, hit `fit()`, and want to know within a second whether they should change something. + +## Proposed solution: pre-flight resource advisor + +Add a **pre-flight advisor** that, given a parsed search space and a dataset, estimates worst-case disk, RAM, VRAM, and wall-time requirements from public Hugging Face Hub metadata and a small set of formulas, then prints a clear summary with red/yellow/green warnings. By default it is **report-only and never blocks the run**; an opt-in **reduce-to-fit** mode additionally prunes the search space to fit detected hardware. + +### Scope + +The advisor analyses only the **local, model-bearing** modules whose footprint can be derived from HF Hub metadata. Everything else is either trivial or out of band. + + +| Module category | In scope? | Reason | +| -------------------------------------------------------------------------------- | --------- | -------------------------------------------------- | +| `SentenceTransformerEmbeddingConfig` | yes | local transformer, dominant cost on small machines | +| `VllmEmbeddingConfig` | yes | local transformer with extra engine overhead | +| `HFModelConfig`-based scorers (`bert`, `lora`, `ptuning`, `dnnc`, cross-encoder) | yes | the actual heavyweights | +| GCN scorer when configured with a transformer backbone | yes | inherits the backbone cost | +| `LinearScorer` (sklearn `LogisticRegression` / `LogisticRegressionCV`) | yes | dominant cost on presets with no transformer fine-tune; the CV path multiplies a single fit by ~30 | +| `CatBoostScorer` | yes | dominant cost on presets with no transformer fine-tune; high default `iterations` | +| `OpenaiEmbeddingConfig` | no | no local resources to estimate | +| `HashingVectorizerEmbeddingConfig` | no | trivial cost | +| `knn`, `mlknn`, generic `sklearn` classifiers via `SklearnScorer`, `description` | no | bounded so far below any in-scope module that they cannot plausibly be the bottleneck | +| `decision` and `regex` nodes | no | negligible | + + +Rationale: the user's real risk is whichever module is the actual bottleneck. On heavy presets that is a transformer fine-tune; on light presets it shifts to `linear` (CV-multiplied) or `catboost` (1000 default iterations × dataset shape). Modules left out of scope are ones whose cost is bounded so far below any in-scope module that they cannot plausibly be the reason a run fails. + +### Phases + +The advisor is one entry point, but internally splits work into three phases that share a single `PreflightReport` object. The split is internal organization — all three run at the same hook point (after `validate_modules`, before `_fit(context)`) and the user sees one summary. Separating them keeps each phase's inputs, formulas, and failure modes scoped: + +- **Resource phase.** Disk / RAM / VRAM / wall-time estimates and comparisons against detected hardware. Most of the formulas in this document live here. This is the only phase consumed by the reduce-to-fit pruner. +- **Data quality phase.** Findings derived from the dataset jointly with the active search space — token-length truncation, split readiness (auto-invokes the existing `check_split_readiness` utility rather than re-implementing it), partial intent descriptions paired with the `description` scorer, embedder/scorer dimension consistency. Reports red/yellow lines but never prunes the search space; the user fixes the dataset or the config. +- **Configuration sanity phase.** Joint checks across dataset + search-space + hardware that don't slot cleanly into the other two — e.g., `hpo_config.n_jobs > 1` × per-trial VRAM contention, CatBoost `task_type="GPU"` with no CUDA. Pydantic schema validation already runs upstream on `OptimizationConfig`; this phase only adds checks that need joint inspection. + +The advisor consumes `validate_modules`'s *post-filter* view of `self.nodes` — it does not duplicate that mutating filter. + +### Inputs + +- The parsed `OptimizationConfig` (search space, HPO config, embedder/transformer configs). +- The training `Dataset` (for `dataset_size` and an approximate token-length distribution). +- Detected local hardware: + - Total / available RAM via `psutil`. + - Free disk on the AutoIntent / HF cache directory via `shutil.disk_usage`. + - Accelerator detection, in priority order: + - **CUDA:** per-GPU VRAM and device name via `torch.cuda`. + - **MPS (Apple Silicon):** detected via `torch.backends.mps.is_available()`. Apple chips use unified memory, so there is no separate VRAM pool — the "VRAM budget" is a fraction of total system RAM. Default budget = 70 % of total RAM (matching the macOS `PYTORCH_MPS_HIGH_WATERMARK_RATIO` default) with the remainder reserved for the OS and other apps. The fraction is exposed as a knob. + - **CPU only:** when neither is available. + +### Output + +A structured estimate plus a human-readable summary printed to the logger. Example: + +``` +Compute feasibility check +───────────────────────── +Resource: + Available : 8 GB VRAM (NVIDIA RTX 3060), 32 GB RAM, 120 GB free disk + Disk : 5.2 GB to download, 1.1 GB already cached (3 unique checkpoints) + RAM : ~4 GB + VRAM : ~14 GB × 2 parallel trials (n_jobs=2) ⚠ exceeds available + Time : ~6 h (+~12 min for refit_after) (single-GPU, fp32, rough) + +Data: + Train tokens p95 : 612 (exceeds bert.max_length=512) ⚠ ~7% truncated + Split readiness : 2 classes have <3 samples — LogisticRegressionCV cv=3 will fail ✗ + +Config: + CatBoost task_type=GPU but no CUDA detected — will fall back to CPU ⚠ + +Drivers of cost: + scoring.bert microsoft/deberta-v3-large full fine-tune × 40 trials × 30 epochs → ~14 GB VRAM, ~5 h + embedder intfloat/multilingual-e5-large-instruct → ~2.2 GB VRAM + +Suggestions: + • Enable mixed precision (fp16/bf16) on the bert scorer + • Reduce batch_size from 64 to 16 or 32 + • Set hpo_config.n_jobs=1 — parallel trials are doubling VRAM demand + • Try preset `transformers-light` or `classic-medium` + +These numbers are heuristic upper bounds, not measurements. +``` + +Numbers are reported with honest precision (one significant figure for time, two for memory) and an explicit "estimate, not measurement" disclaimer. + +### Algorithm (proposal, allowed to adjust) + +1. **Collect candidates.** Walk the search space; collect every unique in-scope module. For transformer-bearing modules the identity is `(module_type, model_name, mode)` with `mode ∈ {inference, lora, full-finetune}`. For `linear` and `catboost` the identity is `(module_type, embedder_name, task_kind)` with `task_kind ∈ {multiclass, multilabel}` — the routing through `LogisticRegressionCV` vs `MultiOutputClassifier`, and CatBoost's per-class trees, both depend on it. Also collect the HPO knobs that drive cost: `n_trials` plus per-module knobs — transformer (`epochs`, `batch_size`, `max_length`, `dtype` ∈ {fp16, bf16, fp32}), `linear` (`cv`, `max_iter`), `catboost` (`iterations`, `depth`, `task_type`, `features_type`). +2. **Resolve checkpoints.** For each unique `model_name`, query HF Hub for safetensors metadata to read parameter count and weight dtype. Fall back to file-size aggregation if safetensors metadata is missing. Fall back to a "unknown — heuristic only" tag with low-confidence labelling if HF Hub is offline or the repo is private. `LinearScorer` and `CatBoostScorer` have no checkpoint of their own; they reuse the embedder resolved by this step in their formulas (their cost is parameterised by `embedder_dim`, not parameter count). +3. **Apply formulas.** All values are honest upper bounds; convergence and early stopping often terminate well below them. + - **Disk** = sum over unique downloadable checkpoints of total file size, plus a small fixed overhead per checkpoint for tokenizers and config. `LinearScorer` and `CatBoostScorer` contribute zero (they consume embedder output that is already accounted for upstream). + - **RAM per module:** + - Transformer modules (any mode): `params × dtype_bytes + dataset_tokens × 4 bytes`, treated as a loose upper bound for tokenized buffers. + - `LinearScorer`: `8 × n_samples × embedder_dim` (float64 data matrix — the dominant term) `+ 8 × n_classes × embedder_dim` (coefficients) `+ ~10 × 8 × embedder_dim` (L-BFGS history). + - `CatBoostScorer`: `4 × n_samples × n_features` (data, float32 internally) `+ 4 × n_features × n_bins` (histograms; default `n_bins = 254`) `+ iterations × 2^depth × ~32 bytes` (tree storage). For `features_type ∈ {embedding, both}`, `n_features = embedder_dim`. For `features_type = text`, `n_features` is the BoW vocab discovered at fit; bound with a coarse default (e.g. 50 000) and tag the estimate low-confidence. + - For `linear` and `catboost`, `embedder_dim` is taken from the largest embedder in the same node group — same worst-case stance as the rest of the estimate. + - **VRAM per module:** + - Inference embedder: `params × dtype_bytes × ~1.3` (small constant for activations). + - Full fine-tune (`bert`, GCN backbone, soft-prompt `ptuning`): `params × dtype_bytes × (1 + 1 + 2)` for weights + grads + Adam state, halved when fp16/bf16 mixed precision is configured. + - LoRA: inference VRAM + a small adapter constant. + - Reranker (cross-encoder, `dnnc`): inference VRAM × small factor for the reranking pass. + - `LinearScorer`: N/A (sklearn is CPU-only). + - `CatBoostScorer`: 0 by default; if `task_type="GPU"` is configured, the RAM formula above lives on device instead. + - **Time per module:** + - Transformer modules: `n_trials × epochs × (dataset_size / batch_size) × per_step_seconds(params, max_length, device_class)`, where `per_step_seconds` is a small static lookup keyed on coarse device class (`cpu`, `low-gpu`, `mid-gpu`, `high-gpu`, `apple-silicon`) auto-detected from `torch.cuda.get_device_name` or `platform`/`torch.backends.mps`. + - `LinearScorer`: `n_trials × C_cpu × n_samples × embedder_dim × max_iter × cv_multiplier × class_multiplier`, where: + - `C_cpu ≈ 1e-8 s` per `(sample × feature × iteration)` on a single modern CPU core. + - `cv_multiplier = Cs × cv + 1 ≈ 31` for the multiclass path (`LogisticRegressionCV` with default `Cs = 10`, repo default `cv = 3`, plus one final refit). `cv_multiplier = 1` for the multilabel path (no inner CV). + - `class_multiplier = n_classes` for the multilabel path (`MultiOutputClassifier` fits one binary LogReg per class); `class_multiplier = 1` otherwise. + - `CatBoostScorer`: `n_trials × iterations × C_device × n_samples × n_features × depth × class_multiplier`, where: + - `C_device ≈ 1e-9 s` on CPU, ~5–20× faster on GPU. Resolve `C_device` via the same `device_class` lookup as the transformer time formula. + - `class_multiplier = n_classes` for both the multiclass `MultiClass` loss (per-class trees per iteration) and the multilabel routing (one CatBoost per class). + - Early stopping is not modelled; `iterations` is treated as the upper bound. + - Total time = sum across modules. MPS time numbers are coarser than CUDA's (one tier for now); we accept that. +4. **Compare to detected hardware.** Per-dimension status is green / yellow / red against a configurable headroom (defaults: **red** if estimate > 100 % of available, **yellow** if > 70 %). On MPS, "VRAM" and "RAM" estimates draw from the same physical pool; we compare *the larger of the two* against the unified-memory budget rather than each independently. +5. **Render summary.** Log at INFO. If any dimension is red, emit at WARNING so it shows in non-logging contexts. + +#### Resource-phase refinements + +These adjust the formulas above for situations that look fine in single-trial isolation but blow up in practice: + +- **Cold-vs-warm HF cache (Tier 1).** Before reporting disk, probe each unique `model_name` against the local HF cache via `huggingface_hub.try_to_load_from_cache` / `scan_cache_dir`, keyed off `HF_HOME`. Split the disk line into `to_download` vs `already_cached`. Treat a repo as cached only if the weight shard (`model.safetensors` or equivalent) is present — not just config/tokenizer files. Without this, a repeated run on the same machine alarms the user about gigabytes they already have. +- **Concurrent-trial × per-trial VRAM (Tier 1).** Multiply the per-trial VRAM estimate by `hpo_config.n_jobs` when `n_jobs > 1` and the active accelerator is GPU. Same for the `dump_modules=True` path on disk: each trial writes module weights to the dump dir, so multiply per-module dump-disk by `n_trials`. vLLM is process-isolated and its contention model differs; note this in the disclaimer. +- **`refit_after=True` time delta (Tier 2).** When `Pipeline.fit(refit_after=True)`, add one full-data training pass per node to the time estimate. Small term but easy to forget; users running close to their time budget care about it. +- **HF Hub reachability probe (Tier 2).** One up-front `HfApi().whoami()` (or unauthenticated `HEAD` to `huggingface.co`) at the start of the phase. On failure, consistently downgrade *all* model entries to the "unknown — heuristic only" path instead of timing out per-model 10× on a 10-model search space. +- **CatBoost `task_type="GPU"` sanity (Tier 2).** When CatBoost is in the search space with `task_type="GPU"` but `torch.cuda.is_available()` is false, tag yellow — CatBoost silently falls back to CPU and the user otherwise sees CPU speeds with no warning. + +### Data quality phase + +The resource phase predicts whether the run *fits*. The data quality phase predicts whether the run *produces a meaningful result*. Both are caught at the same hook point because both have the same failure mode from the user's perspective: hours of compute followed by a cryptic error or a silently degraded model. + +- **Token-length truncation (Tier 1).** Sample ~1000 utterances from the train split, tokenize against each unique transformer's tokenizer, compute `p95_tokens` and `% truncated` against the module's `max_length`. Yellow when >1% truncated; red when >10%. Reuse the tokenizer the resource phase already loaded for parameter-count resolution — don't double-fetch. The existing pipeline silently truncates (sentence-transformers and the HF Trainer both default to `truncation=True`); there is no warning anywhere today. +- **Auto-invoke `check_split_readiness` (Tier 1).** Call the existing utility at `context/data_handler/_readiness_util.py:44–109` with the active `data_config` and surface its `SplitReadinessResult` — it already returns `underpopulated_classes`, `ready`, and a `reason` string, but is not called anywhere from `Pipeline.fit()` today. When `LinearScorer` with CV is in the search space and any class has `n < cv`, name the module by name in the red line ("`LogisticRegressionCV` cv=3 will fail: classes [X, Y] have <3 samples") rather than emitting a generic split-readiness message. +- **Partial intent descriptions × `description` scorer (Tier 1).** The dataset constructor already warns once at import when *some* but not all intents have descriptions (`_dataset/_dataset.py:199–207`). The advisor escalates this to red when the `description` scorer is also present in the active search space — otherwise the run will produce NaN embeddings for the missing intents. Action message: "fill in N missing descriptions", not "drop the scorer". +- **Embedder ↔ scorer dimension consistency (Tier 2).** For `LinearScorer` / `CatBoostScorer` with `features_type="both"`, verify the embedder reachable from the same node group exposes a stable, expected dimension. Cross-node walk; surface as yellow when the resolved dimension cannot be confirmed pre-flight. + +### Configuration sanity phase + +Pydantic schema validation on `OptimizationConfig` runs upstream at config-load time; this phase only adds checks that require *joint* inspection of dataset + search-space + hardware. With Tier 1 + Tier 2 in scope today, this phase holds two items: + +- The `n_jobs × VRAM` callout, surfaced jointly with the resource phase (single line in the rendered output). +- The CatBoost `task_type="GPU"` without CUDA check, same. + +Both could live entirely in the resource phase; they get their own phase because future additions — joint scorer↔decision shape checks, OOS-support mismatches detected up front rather than at module instantiation, embedder-dimension mismatches — slot here naturally. Keep the phase scaffold even if it is currently thin. + +### Failure modes + +- **HF Hub offline or private repo:** fall back to "unknown model — name-pattern heuristic only", explicit low-confidence label, never raise. +- **No accelerator (no CUDA and no MPS):** report VRAM as N/A and mark GPU-only modules as "requires GPU" without estimating a (misleading) CPU wall time. +- **MPS configured but a module is incompatible:** vLLM in particular does not run on MPS. Flag the module as "unsupported on MPS" rather than estimating; do not raise. +- **MPS with CPU fallback ops:** some PyTorch ops fall back to CPU on MPS, inflating system-RAM usage and wall time beyond the heuristic. Note this in the disclaimer; we don't try to model it. +- **vLLM configured but not installed:** still estimate (the VRAM accounting is similar), note that the engine itself has additional overhead not captured. +- **Estimate wildly wrong vs. reality:** always-on disclaimer in the printed summary that these are heuristic upper bounds. + +### Reduce-to-fit mode + +The feasibility check has two modes sharing the same estimation pipeline: + +- **Report mode (default).** Print the summary, return the structured estimate, let the run proceed regardless of severity. +- **Reduce-to-fit mode (opt-in).** Additionally prune the search space to fit detected hardware before the run starts. Same estimates, same comparisons — just one extra step that produces a reduced search space. + +Reduce-to-fit consumes only the **resource phase** output. Data-quality and config-sanity findings are reported but never trigger pruning — they require user action (fix the dataset, change a config flag), not search-space narrowing. + +Using the same per-module estimates, the pruner applies three least-destructive steps in order: + +1. **Filter discrete-choice hyperparameters.** For lists of cost-driving values (model name, batch size, training epochs, CatBoost `iterations` / `depth`, sklearn `cv`), keep only entries whose worst-case estimate fits. +2. **Cap continuous ranges.** For `{low, high}` ranges of cost-driving parameters, lower the upper bound to the largest fitting value. Ranges of non-cost parameters (learning rate, decision thresholds) are not touched. +3. **Drop module variants.** If a module entry has any required hyperparameter with no satisfiable value left, drop that module entry from its node's search space. + +Guard rails: + +- If pruning would leave any node's search space empty, the pruner **raises**. We don't silently produce a non-runnable pipeline, and we don't quietly fall back to report-only — failing loudly is the right contract for a mode whose whole purpose is to make the run feasible. The error message points the user toward a lighter preset. +- Time is not used as a filter — only memory and disk are. Time is still reported. +- Headroom thresholds are intentionally generous to avoid over-pruning and are configurable. + +Alongside the standard estimate, the caller receives a structured description of what was filtered, capped, and dropped, plus the resulting search space and its recomputed (now green) estimate. + +**Drawbacks worth surfacing.** + +- **Silent narrowing of intent.** A search space deliberately written to include heavy/light variants for comparison gets halved. The mode is opt-in for this reason. +- **Over-pruning when our formulas overestimate.** A 30 %-high estimate on a borderline configuration throws away a run that would have succeeded. Generous headroom defaults mitigate; the knob is exposed. +- **Hard failure when nothing fits.** Raising is intentional — silent degradation to report-only would defeat the mode's purpose — but it is a sharper edge than report mode has. +- **Pre-trial only.** The rewrite happens before any HPO trial starts. This is fine because the search space is treated as immutable across a study, but worth calling out so nobody tries to make this dynamic later. + +### CLI surface + +The advisor is also exposed as a console script (`autointent-advisor`) so users can answer "what will this cost?" and "what should I run?" without writing Python. Two subcommands: + +- **`autointent-advisor inspect `.** Resolves the preset (or a user-supplied `OptimizationConfig`), detects local hardware, runs the same three-phase advisor that `Pipeline.fit()` runs, and prints the same report. Accepts `--dataset` for a real dataset, or `--n-samples / --n-classes / --avg-tokens` placeholders when the dataset is not yet built — so the script is useful before any training data exists. `--json` emits the structured `PreflightReport` for scripting. +- **`autointent-advisor recommend [--n-samples ... | --dataset ...] [--budget-time 12h] [--budget-vram-gb 8]`.** Detects local hardware (with manual overrides applied), iterates over the bundled presets in `_presets/`, and tags each as `feasible` / `feasible-with-reduce` / `infeasible`. Ranks feasible presets by quality tier (`heavy > medium > light`) then estimated wall-time; picks the top one as the recommendation. For the heaviest infeasible preset, surfaces the single most-impactful knob change that would make it fit (e.g., "`transformers-heavy` would fit if `batch_size` ≤ 16 and `dtype=fp16`"), reusing the reduce-to-fit pruner's per-knob delta info. + +**Constraints (both subcommands).** No model downloads — only HF Hub metadata endpoints (`HfApi().model_info`); never `from_pretrained`. Offline-safe — on Hub unreachability, fall back to the same "heuristic only" path and mark the report low-confidence; do not raise. Hardware-detection failures (broken CUDA install where `torch.cuda.mem_get_info()` raises) fall back to CPU detection and tag the report rather than crashing. + +## Alternatives considered and rejected + +### B. Smoke-test calibration + +Run each unique module for one mini-batch / one step before the real fit, measure peak RAM and VRAM with `psutil`, `tracemalloc`, and `torch.cuda.max_memory_allocated`, time the step, and extrapolate to the full search space. + +Rejected because: + +- It **downloads weights just to estimate** — the disk-headroom check we wanted to provide is defeated by the act of performing it. +- It can **OOM while predicting OOM**, exactly on the constrained hardware that is the target audience. +- It adds **seconds to minutes** of wall time before `fit()` does anything, surprising users. +- It needs per-module "tiny run" hooks; not every scorer has a clean "stop after one step" path. +- For OpenAI- or vLLM-served embedders, a smoke test costs real money or starts the engine. +- Still not accurate due to CUDA and CPU cache, memory heating and so on. + +### C. Curated benchmark table + +Ship a JSON in the package with measured VRAM and per-step time for the bundled-preset checkpoints, broken out by hardware class (cpu / mid-gpu / high-gpu) and mode (inference / lora / full-finetune). Fall back to heuristics for unknown checkpoints. + +Rejected because: + +- **Maintenance burden:** every new model added to a preset would need entries across the hardware × precision × mode matrix. +- Numbers **go stale** when `transformers` updates change defaults (attention impl, dtype, gradient checkpointing). +- It still needs the chosen-solution heuristics as a long-tail fallback — so it adds work on top of Option A without replacing it. +- **Confident-but-wrong is worse than honest-but-fuzzy.** A table that says "4 GB on 4090" when the user OOMs at 4.5 GB damages trust more than a clearly-labelled range would. + +### D. Layered (A by default, opt-in B, embedded table from C, local actuals cache) + +Combine all three: ship A as the fast path, allow `calibrate=True` to trigger B for heavy modules only, embed a small table from C for the bundled-preset checkpoints, and write actuals from every real run to a local cache that feeds back into future estimates. + +Rejected because: + +- **Implementation surface multiplies:** two estimation code paths to keep consistent, a cache schema with versioning and eviction, two failure modes to document. +- **Discoverability:** users may not learn about `calibrate=True` and the realized value compresses back to roughly Option A anyway. +- The team's bandwidth doesn't justify the marginal accuracy gain over A for the target audience. + +## Comparison + + +| Dimension | A (chosen) | B (smoke-test) | C (benchmark table) | D (layered) | +| -------------------------------- | ------------------------------ | ---------------------- | ---------------------------------- | ------------------------------------- | +| Wall time at pre-flight | < 1 s | seconds–minutes | < 1 s | < 1 s default, s–min when calibrating | +| Accuracy on common checkpoints | medium | high | high | high | +| Accuracy on custom checkpoints | medium | high | medium (fallback) | medium–high | +| Time-estimate quality | low–medium | high | high | high | +| Disk pre-download required | no | yes | no | only when calibrating | +| Risk of OOM during the check | none | real | none | only when calibrating | +| Network usage | 1 cached call per unique model | none beyond normal fit | none | combination | +| Implementation effort | small | large | medium + ongoing benchmark refresh | large + cache infra | +| Ongoing maintenance | low (formulas only) | low | high | high | +| Friendly to offline / air-gapped | with fallback | yes | yes | partial | + + +The chosen solution accepts a real accuracy gap on time and a moderate accuracy gap on VRAM in exchange for the only profile that fits the target audience's constraints: zero added wall time, zero added downloads, zero added failure modes, and a small one-time implementation cost. + +## Out of scope (possible follow-ups) + +- Live resource observability during `fit()` (peak RAM / VRAM per trial, abort on overrun). +- A learned calibration cache from real runs to refine estimates over time. +- **Determinism / `cudnn.deterministic` check.** Belongs in seed-setting code (`set_seed` utility, `Pipeline.__init__`), not in a feasibility advisor — reproducibility is not a hardware-budget question. +- **OpenAI / Generator token-cost ($) estimation.** Real value, but pricing tables age badly, the `StructuredOutputCache` hit rate is unknowable upfront, and the API-paying audience overlaps poorly with this advisor's stated audience (resource-constrained local users). Push to a separate `cost_estimator` tool. +- **Predictive CO₂ / emissions.** `_callbacks/emissions_tracker.py` already does this retrospectively, accurately. A predictive version multiplies our (loose) time estimate by a regional kWh/CO₂ factor — two sources of imprecision compounded. The retrospective number is the trustworthy one. +- **vLLM startup compile time.** Minutes of overhead before any work, but vLLM is unsupported on MPS, isn't the dominant cost on CUDA once running, and modelling it needs a startup-time lookup table. Note once in the disclaimer; do not model. + diff --git a/pyproject.toml b/pyproject.toml index 202a8cb3..b47993fa 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -141,6 +141,7 @@ Documentation = "https://deeppavlov.github.io/AutoIntent/" [project.scripts] "basic-aug" = "autointent.generation.utterances.basic.cli:main" "evolution-aug" = "autointent.generation.utterances.evolution.cli:main" +"autointent-advisor" = "autointent._advisor._cli:main" [build-system] requires = ["uv_build>=0.8.7,<0.9.0"] diff --git a/src/autointent/_advisor/__init__.py b/src/autointent/_advisor/__init__.py new file mode 100644 index 00000000..5f29b028 --- /dev/null +++ b/src/autointent/_advisor/__init__.py @@ -0,0 +1,23 @@ +"""Pre-flight compute feasibility advisor. + +Exposes a small surface used by both ``Pipeline.fit()`` (future integration) and +the ``autointent-advisor`` CLI script. See ``compute-feasibility-advisor-proposal.md`` +at the repo root for the design document. +""" + +from __future__ import annotations + +from ._hardware import HardwareProfile, detect_hardware +from ._report import DatasetStats, Finding, PreflightReport, ResourceEstimate, Severity +from ._estimates import run_preflight + +__all__ = [ + "DatasetStats", + "Finding", + "HardwareProfile", + "PreflightReport", + "ResourceEstimate", + "Severity", + "detect_hardware", + "run_preflight", +] diff --git a/src/autointent/_advisor/_cli.py b/src/autointent/_advisor/_cli.py new file mode 100644 index 00000000..4e7eae00 --- /dev/null +++ b/src/autointent/_advisor/_cli.py @@ -0,0 +1,243 @@ +"""Console-script entry point for the pre-flight advisor. + +Two subcommands: + +* ``inspect`` — show what a given preset / config will cost on this machine. +* ``recommend`` — pick the best-fitting bundled preset for this machine. + +Both subcommands accept either a real ``--dataset`` (path to load with +``Dataset.from_*`` constructors) or ``--n-samples / --n-classes / --avg-tokens`` +placeholders so the script is useful before the user has built a dataset. +""" + +from __future__ import annotations + +import argparse +import logging +import sys +from pathlib import Path +from typing import Any + +import yaml + +from ._estimates import run_preflight +from ._hardware import detect_hardware +from ._render import render_json, render_recommendation, render_text +from ._report import DatasetStats, PreflightReport + +logger = logging.getLogger("autointent.advisor") + +BUNDLED_PRESETS = [ + "transformers-heavy", + "transformers-light", + "transformers-no-hpo", + "nn-heavy", + "nn-medium", + "classic-heavy", + "classic-medium", + "classic-light", + "zero-shot-encoders", + "zero-shot-llm", +] + +# rough quality tiering used by `recommend` +_QUALITY_TIER = { + "transformers-heavy": 5, + "nn-heavy": 4, + "transformers-light": 4, + "nn-medium": 3, + "classic-heavy": 3, + "transformers-no-hpo": 3, + "classic-medium": 2, + "classic-light": 1, + "zero-shot-encoders": 2, + "zero-shot-llm": 4, +} + + +def _load_config(target: str) -> tuple[dict[str, Any], str]: + """Return (config_dict, friendly_name) for either a preset or a path.""" + path = Path(target) + if path.is_file(): + with path.open(encoding="utf-8") as f: + return yaml.safe_load(f), path.stem + # treat as a bundled preset name + from autointent.utils import load_preset + + return load_preset(target), target # type: ignore[arg-type] + + +def _stats_from_args(args: argparse.Namespace) -> DatasetStats: + if args.dataset: + return _stats_from_dataset(args.dataset, multilabel=args.task == "multilabel") + return DatasetStats.placeholder( + n_samples=args.n_samples, + n_classes=args.n_classes, + avg_tokens=args.avg_tokens, + multilabel=args.task == "multilabel", + ) + + +def _stats_from_dataset(path: str, *, multilabel: bool) -> DatasetStats: + """Best-effort: load a dataset from disk via the existing Dataset constructor.""" + try: + from autointent import Dataset + except ImportError: + logger.warning("autointent.Dataset unavailable; falling back to placeholders.") + return DatasetStats.placeholder(multilabel=multilabel) + + try: + ds = Dataset.from_json(path) if path.endswith(".json") else Dataset.from_hub(path) + except Exception as e: # noqa: BLE001 + logger.warning("Failed to load dataset %s: %s", path, e) + return DatasetStats.placeholder(multilabel=multilabel) + + train = ds.get("train") or next(iter(ds.values()), None) + if train is None: + return DatasetStats.placeholder(multilabel=multilabel) + + utt_col = getattr(ds, "utterance_feature", "utterance") + sample = train[:1000] if len(train) > 1000 else train[:] + lengths = [len(str(s).split()) for s in sample.get(utt_col, [])] + avg_tokens = int(sum(lengths) / max(1, len(lengths))) if lengths else 32 + p95 = sorted(lengths)[int(len(lengths) * 0.95)] if lengths else avg_tokens * 2 + + return DatasetStats( + n_samples=len(train), + n_classes=getattr(ds, "n_classes", 0) or 0, + avg_tokens=avg_tokens, + p95_tokens=p95, + multilabel=getattr(ds, "multilabel", multilabel), + has_descriptions=getattr(ds, "has_descriptions", None), + source=f"dataset:{path}", + ) + + +def _add_common_dataset_args(p: argparse.ArgumentParser) -> None: + p.add_argument("--dataset", help="Path or hub id of a dataset; overrides placeholders.") + p.add_argument("--n-samples", type=int, default=1_000, help="Placeholder training set size.") + p.add_argument("--n-classes", type=int, default=10, help="Placeholder class count.") + p.add_argument("--avg-tokens", type=int, default=32, help="Placeholder average token length.") + p.add_argument( + "--task", + choices=("multiclass", "multilabel"), + default="multiclass", + help="Placeholder task type when --dataset isn't given.", + ) + + +def cmd_inspect(args: argparse.Namespace) -> int: + config, name = _load_config(args.target) + hardware = detect_hardware( + vram_budget_gb=args.budget_vram_gb, + ) + stats = _stats_from_args(args) + report = run_preflight(config, stats, hardware, preset_name=name) + if args.json: + sys.stdout.write(render_json(report)) + sys.stdout.write("\n") + else: + sys.stdout.write(render_text(report)) + sys.stdout.write("\n") + return 0 if report.is_feasible else 1 + + +def cmd_recommend(args: argparse.Namespace) -> int: + hardware = detect_hardware(vram_budget_gb=args.budget_vram_gb) + stats = _stats_from_args(args) + + results: list[tuple[str, PreflightReport]] = [] + from autointent.utils import load_preset + + for preset in BUNDLED_PRESETS: + try: + cfg = load_preset(preset) # type: ignore[arg-type] + except Exception as e: # noqa: BLE001 + logger.debug("Skipping preset %s: %s", preset, e) + continue + report = run_preflight(cfg, stats, hardware, preset_name=preset) + if args.budget_time_h is not None and report.resource.time_hours > args.budget_time_h: + report.add( + "resource", + report.worst_severity if report.worst_severity.value == "red" else report.worst_severity, # noqa: PLW0125 - explicit + f"Estimated time {report.resource.time_hours:.1f} h exceeds budget {args.budget_time_h} h.", + ) + results.append((preset, report)) + + feasible = [(name, r) for name, r in results if r.is_feasible] + feasible.sort( + key=lambda pair: (-_QUALITY_TIER.get(pair[0], 0), pair[1].resource.time_hours, pair[0]) + ) + chosen = feasible[0][0] if feasible else None + + if args.json: + import json + + out = { + "chosen": chosen, + "results": [ + {"preset": name, "report": r.to_dict()} for name, r in results + ], + } + sys.stdout.write(json.dumps(out, indent=2, default=str)) + sys.stdout.write("\n") + else: + sys.stdout.write(render_recommendation(results, chosen)) + sys.stdout.write("\n") + if chosen: + sys.stdout.write("\n") + sys.stdout.write(render_text(dict(results)[chosen])) + sys.stdout.write("\n") + return 0 if chosen else 1 + + +def build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser( + prog="autointent-advisor", + description="Pre-flight feasibility advisor for AutoIntent search-space optimization.", + ) + parser.add_argument("-v", "--verbose", action="store_true", help="Enable debug logging.") + + sub = parser.add_subparsers(dest="cmd", required=True) + + p_inspect = sub.add_parser( + "inspect", + help="Inspect a preset or OptimizationConfig and print a feasibility report.", + ) + p_inspect.add_argument("target", help="Preset name (e.g. transformers-light) or path to a YAML config.") + p_inspect.add_argument("--json", action="store_true", help="Emit a structured JSON report.") + p_inspect.add_argument( + "--budget-vram-gb", type=float, default=None, help="Override detected VRAM budget." + ) + _add_common_dataset_args(p_inspect) + p_inspect.set_defaults(func=cmd_inspect) + + p_rec = sub.add_parser( + "recommend", + help="Detect hardware and recommend the best-fitting bundled preset.", + ) + p_rec.add_argument("--json", action="store_true", help="Emit a structured JSON report.") + p_rec.add_argument( + "--budget-vram-gb", type=float, default=None, help="Override detected VRAM budget." + ) + p_rec.add_argument( + "--budget-time-h", type=float, default=None, help="Optional wall-time ceiling in hours." + ) + _add_common_dataset_args(p_rec) + p_rec.set_defaults(func=cmd_recommend) + + return parser + + +def main(argv: list[str] | None = None) -> int: + parser = build_parser() + args = parser.parse_args(argv) + logging.basicConfig( + level=logging.DEBUG if args.verbose else logging.WARNING, + format="%(levelname)s %(name)s: %(message)s", + ) + return args.func(args) + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/src/autointent/_advisor/_estimates.py b/src/autointent/_advisor/_estimates.py new file mode 100644 index 00000000..f60f940a --- /dev/null +++ b/src/autointent/_advisor/_estimates.py @@ -0,0 +1,382 @@ +"""Resource-phase estimation: walk the search space and aggregate cost. + +Implements an honest worst-case for the modules the proposal lists as +in-scope. Formulas are intentionally coarse — the advisor's contract is +"heuristic upper bound, not measurement". Time and VRAM are the noisiest; +treat them as ballparks, not budgets. +""" + +from __future__ import annotations + +import logging +from typing import Any, Iterable + +from ._hardware import HardwareProfile +from ._hub import ModelMeta, hub_reachable, resolve_model +from ._report import DatasetStats, PreflightReport, ResourceEstimate, Severity + +logger = logging.getLogger(__name__) + +# yellow / red thresholds as fraction of available budget +_YELLOW = 0.7 +_RED = 1.0 + +# rough per-step seconds, keyed on device class. Scaled by params_millions / 100. +_PER_STEP_BASELINE_S = { + "cpu": 0.5, + "low-gpu": 0.04, + "mid-gpu": 0.02, + "high-gpu": 0.01, + "apple-silicon": 0.08, +} + +TRANSFORMER_SCORER_MODULES = {"bert", "lora", "ptuning", "dnnc"} + + +def _extract_model_names(module_entry: dict[str, Any]) -> list[str]: + """Pull model name(s) from a search-space module entry.""" + candidates: list[str] = [] + cfg = module_entry.get("classification_model_config") + if isinstance(cfg, list): + for c in cfg: + if isinstance(c, dict) and c.get("model_name"): + candidates.append(c["model_name"]) + elif isinstance(cfg, dict) and cfg.get("model_name"): + candidates.append(cfg["model_name"]) + embedder_cfg = module_entry.get("embedder_config") + if isinstance(embedder_cfg, list): + for c in embedder_cfg: + if isinstance(c, dict) and c.get("model_name"): + candidates.append(c["model_name"]) + elif isinstance(embedder_cfg, dict) and embedder_cfg.get("model_name"): + candidates.append(embedder_cfg["model_name"]) + return candidates + + +def _max_int(value: Any, default: int) -> int: + if value is None: + return default + if isinstance(value, list) and value: + return max(int(x) for x in value) + if isinstance(value, dict): + return int(value.get("high", default)) + try: + return int(value) + except (TypeError, ValueError): + return default + + +def _walk_modules(search_space: list[dict[str, Any]]) -> Iterable[tuple[str, dict[str, Any]]]: + """Yield (node_type, module_entry) pairs.""" + for node in search_space or []: + node_type = node.get("node_type", "?") + for entry in node.get("search_space", []) or []: + yield node_type, entry + + +def _vram_for_transformer(meta: ModelMeta, mode: str, mixed_precision: bool) -> float: + """VRAM in GB for one trial of a transformer-based module. + + Conservative AMP accounting (the proposal flags the prior naive halving + as too generous; keep optimizer state at fp32 even in AMP). + """ + weights_gb = meta.weights_gb + if mode == "inference": + return weights_gb * 1.3 + if mode == "lora": + return weights_gb * 1.3 + 0.5 + if mode == "reranker": + return weights_gb * 1.5 + # full fine-tune (bert, ptuning, gcn-with-backbone) + if mixed_precision: + # fp16 weights+grads + fp32 master+adam moments + return (weights_gb * 0.5) * 2 + weights_gb * 1 + weights_gb * 2 + return weights_gb * (1 + 1 + 2) + + +def _ram_for_module(meta: ModelMeta, stats: DatasetStats) -> float: + """RAM in GB. Loose upper bound.""" + return meta.weights_gb + (stats.n_samples * stats.avg_tokens * 4) / (1024**3) + + +def _time_for_transformer( + *, + meta: ModelMeta, + n_trials: int, + epochs: int, + batch_size: int, + n_samples: int, + device_class: str, +) -> float: + per_step = _PER_STEP_BASELINE_S[device_class] * (meta.params_millions / 100.0) + steps = max(1, (n_samples // max(1, batch_size))) * epochs + return (n_trials * steps * per_step) / 3600.0 + + +def _classify_severity(estimate: float, budget: float) -> Severity: + if budget <= 0: + return Severity.YELLOW + ratio = estimate / budget + if ratio >= _RED: + return Severity.RED + if ratio >= _YELLOW: + return Severity.YELLOW + return Severity.GREEN + + +def _resource_phase( # noqa: PLR0912 - kept linear for clarity + config: dict[str, Any], + stats: DatasetStats, + hardware: HardwareProfile, + report: PreflightReport, +) -> None: + hpo = config.get("hpo_config") or {} + n_trials = int(hpo.get("n_trials", 1)) + n_jobs = int(hpo.get("n_jobs", 1)) + refit_after = bool(config.get("refit_after", False)) + dump_modules = bool(config.get("dump_modules", False)) + + if not hub_reachable(): + report.low_confidence = True + report.notes.append("HF Hub unreachable — all model sizes are name-pattern heuristics.") + + seen_models: dict[str, ModelMeta] = {} + estimate = ResourceEstimate(parallel_factor=max(1, n_jobs)) + + embedder_cfg = config.get("embedder_config") or {} + global_embedder = embedder_cfg.get("model_name") if isinstance(embedder_cfg, dict) else None + if global_embedder: + seen_models[global_embedder] = resolve_model(global_embedder) + + for node_type, entry in _walk_modules(config.get("search_space") or []): + module = entry.get("module_name", "?") + model_names = _extract_model_names(entry) + if not model_names and global_embedder and module in {"linear", "catboost", "knn", "mlknn"}: + model_names = [global_embedder] + + for name in model_names: + meta = seen_models.setdefault(name, resolve_model(name)) + + mixed_precision = entry.get("dtype") in {"fp16", "bf16"} + if module == "bert": + mode = "full-finetune" + elif module == "lora": + mode = "lora" + elif module == "dnnc": + mode = "reranker" + elif module == "ptuning": + mode = "full-finetune" + else: + mode = "inference" + + batch_size = _max_int(entry.get("batch_size"), 32) + epochs = _max_int(entry.get("num_train_epochs"), 1 if mode == "inference" else 10) + + vram = _vram_for_transformer(meta, mode, mixed_precision) + ram = _ram_for_module(meta, stats) + + time_h = 0.0 + if mode != "inference": + time_h = _time_for_transformer( + meta=meta, + n_trials=n_trials, + epochs=epochs, + batch_size=batch_size, + n_samples=stats.n_samples, + device_class=hardware.device_class, + ) + if refit_after and mode != "inference": + time_h *= 1 + 1.0 / max(1, n_trials) + + estimate.vram_gb = max(estimate.vram_gb, vram) + estimate.ram_gb = max(estimate.ram_gb, ram) + estimate.time_hours += time_h + estimate.drivers.append( + { + "node_type": node_type, + "module": module, + "model": name, + "mode": mode, + "vram_gb": round(vram, 2), + "ram_gb": round(ram, 2), + "time_hours": round(time_h, 2), + "confidence": meta.confidence, + } + ) + + for meta in seen_models.values(): + if meta.cached_locally: + estimate.disk_cached_gb += meta.disk_gb + else: + estimate.disk_download_gb += meta.disk_gb + + if dump_modules: + weights_total = sum(m.weights_gb for m in seen_models.values()) + estimate.disk_dump_gb = weights_total * n_trials + + if n_jobs > 1 and hardware.accelerator in {"cuda", "mps"}: + effective_vram = estimate.vram_gb * n_jobs + else: + effective_vram = estimate.vram_gb + + report.resource = estimate + + # render findings + vram_sev = _classify_severity(effective_vram, hardware.vram_gb) + if hardware.accelerator == "cpu" and effective_vram > 0: + report.add( + "resource", + Severity.YELLOW, + f"No GPU detected; transformer modules will be very slow (worst case ~{estimate.time_hours:.1f} h).", + metric="vram", + ) + else: + msg = f"VRAM ~{effective_vram:.1f} GB" + if n_jobs > 1: + msg += f" (= per-trial {estimate.vram_gb:.1f} GB × {n_jobs} parallel trials)" + msg += f" vs available {hardware.vram_gb:.1f} GB" + report.add("resource", vram_sev, msg, metric="vram") + + ram_sev = _classify_severity(estimate.ram_gb, hardware.ram_gb) + report.add( + "resource", + ram_sev, + f"RAM ~{estimate.ram_gb:.1f} GB vs available {hardware.ram_gb:.1f} GB", + metric="ram", + ) + + disk_total = estimate.disk_download_gb + estimate.disk_dump_gb + disk_sev = _classify_severity(disk_total, hardware.free_disk_gb) + disk_msg = f"Disk ~{estimate.disk_download_gb:.1f} GB to download" + if estimate.disk_cached_gb > 0: + disk_msg += f", {estimate.disk_cached_gb:.1f} GB already cached" + if estimate.disk_dump_gb > 0: + disk_msg += f", +{estimate.disk_dump_gb:.1f} GB during training (dump_modules=True)" + disk_msg += f" vs {hardware.free_disk_gb:.0f} GB free" + report.add("resource", disk_sev, disk_msg, metric="disk") + + if estimate.time_hours > 0: + time_msg = f"Time ~{estimate.time_hours:.1f} h (worst case, no HPO pruning)" + report.add("resource", Severity.GREEN, time_msg, metric="time") + + +def _config_phase( + config: dict[str, Any], + hardware: HardwareProfile, + report: PreflightReport, +) -> None: + hpo = config.get("hpo_config") or {} + n_jobs = int(hpo.get("n_jobs", 1)) + + if n_jobs > 1 and hardware.accelerator in {"cuda", "mps"}: + report.add( + "config", + Severity.YELLOW, + f"hpo_config.n_jobs={n_jobs} on a single GPU multiplies VRAM demand by {n_jobs}×.", + ) + + uses_catboost_gpu = False + for _, entry in _walk_modules(config.get("search_space") or []): + if entry.get("module_name") == "catboost" and entry.get("task_type") == "GPU": + uses_catboost_gpu = True + break + if uses_catboost_gpu and hardware.accelerator != "cuda": + report.add( + "config", + Severity.YELLOW, + "CatBoost task_type=GPU configured but no CUDA detected — will fall back to CPU.", + ) + + +def _data_phase( + config: dict[str, Any], + stats: DatasetStats, + report: PreflightReport, +) -> None: + # token-length truncation (heuristic — we use stats.p95_tokens vs configured max_length) + p95 = stats.p95_tokens or int(stats.avg_tokens * 2.5) + for _, entry in _walk_modules(config.get("search_space") or []): + max_len_value = entry.get("max_length") + if max_len_value is None: + continue + max_len = _max_int(max_len_value, 512) + if p95 > max_len: + severity = Severity.RED if p95 > max_len * 1.5 else Severity.YELLOW + report.add( + "data", + severity, + f"Train tokens p95~{p95} exceeds {entry.get('module_name', '?')}.max_length={max_len}; expect silent truncation.", + ) + + # rare class × linear-CV + has_linear = any( + e.get("module_name") == "linear" for _, e in _walk_modules(config.get("search_space") or []) + ) + if has_linear and stats.rare_classes: + report.add( + "data", + Severity.RED, + ( + "LogisticRegressionCV (cv=3) will fail: classes " + f"{stats.rare_classes[:5]} have <3 samples." + ), + ) + + # partial descriptions × description scorer + has_description = any( + e.get("module_name") == "description" + for _, e in _walk_modules(config.get("search_space") or []) + ) + if has_description and stats.has_descriptions is False: + report.add( + "data", + Severity.RED, + "description scorer present but intent descriptions are missing — fill them in or drop the scorer.", + ) + + +def run_preflight( + config: dict[str, Any], + stats: DatasetStats, + hardware: HardwareProfile, + *, + preset_name: str | None = None, +) -> PreflightReport: + """Run all three phases and return one report. + + Args: + config: parsed preset / OptimizationConfig dict (top-level keys: + ``search_space``, ``hpo_config``, optional ``embedder_config``). + stats: dataset statistics (real or placeholder). + hardware: detected hardware profile. + preset_name: optional friendly name for the report header. + + Returns: + PreflightReport with findings across resource/data/config phases. + """ + report = PreflightReport( + preset_name=preset_name, + hardware={ + "accelerator": hardware.accelerator, + "device_name": hardware.device_name, + "vram_gb": round(hardware.vram_gb, 2), + "ram_gb": round(hardware.ram_gb, 2), + "free_disk_gb": round(hardware.free_disk_gb, 2), + "device_class": hardware.device_class, + }, + dataset={ + "n_samples": stats.n_samples, + "n_classes": stats.n_classes, + "avg_tokens": stats.avg_tokens, + "p95_tokens": stats.p95_tokens, + "multilabel": stats.multilabel, + "source": stats.source, + }, + ) + report.notes.extend(hardware.notes) + + _resource_phase(config, stats, hardware, report) + _data_phase(config, stats, report) + _config_phase(config, hardware, report) + + return report diff --git a/src/autointent/_advisor/_hardware.py b/src/autointent/_advisor/_hardware.py new file mode 100644 index 00000000..2bda6120 --- /dev/null +++ b/src/autointent/_advisor/_hardware.py @@ -0,0 +1,160 @@ +"""Local hardware detection. + +Probes CPU / RAM / disk and the highest-priority accelerator available +(CUDA → MPS → CPU). All probes are wrapped to fall back safely on a +broken install (e.g. CUDA driver mismatch) rather than crash the advisor. +""" + +from __future__ import annotations + +import logging +import os +import platform +import shutil +from dataclasses import dataclass, field +from typing import Literal + +logger = logging.getLogger(__name__) + +Accelerator = Literal["cuda", "mps", "cpu"] + +# matches macOS PYTORCH_MPS_HIGH_WATERMARK_RATIO default +MPS_DEFAULT_BUDGET_RATIO = 0.7 + + +@dataclass +class HardwareProfile: + accelerator: Accelerator + device_name: str + vram_gb: float + ram_gb: float + free_disk_gb: float + cpu_count: int + notes: list[str] = field(default_factory=list) + + @property + def device_class(self) -> str: + if self.accelerator == "cpu": + return "cpu" + if self.accelerator == "mps": + return "apple-silicon" + if self.vram_gb >= 24: + return "high-gpu" + if self.vram_gb >= 12: + return "mid-gpu" + return "low-gpu" + + +def _detect_ram_gb() -> float: + try: + import psutil + + return psutil.virtual_memory().total / (1024**3) + except ImportError: + logger.debug("psutil unavailable; RAM unknown") + return 0.0 + + +def _detect_free_disk_gb(path: str | None = None) -> float: + cache = path or os.environ.get("HF_HOME") or os.path.expanduser("~/.cache/huggingface") + probe_path = cache if os.path.exists(cache) else os.path.expanduser("~") + try: + usage = shutil.disk_usage(probe_path) + return usage.free / (1024**3) + except OSError as e: + logger.debug("disk usage probe failed at %s: %s", probe_path, e) + return 0.0 + + +def _detect_cuda() -> tuple[float, str] | None: + try: + import torch + + if not torch.cuda.is_available(): + return None + idx = 0 + try: + free, total = torch.cuda.mem_get_info(idx) + vram_gb = total / (1024**3) + except (RuntimeError, AttributeError) as e: + logger.debug("torch.cuda.mem_get_info failed: %s", e) + return None + name = torch.cuda.get_device_name(idx) + return vram_gb, name + except ImportError: + return None + except Exception as e: # noqa: BLE001 - protect the advisor from torch quirks + logger.debug("CUDA detection raised: %s", e) + return None + + +def _detect_mps(ram_gb: float, budget_ratio: float = MPS_DEFAULT_BUDGET_RATIO) -> tuple[float, str] | None: + try: + import torch + + if not (hasattr(torch.backends, "mps") and torch.backends.mps.is_available()): + return None + # apple silicon: unified memory; budget is fraction of total RAM + return ram_gb * budget_ratio, f"Apple Silicon ({platform.machine()})" + except ImportError: + return None + except Exception as e: # noqa: BLE001 + logger.debug("MPS detection raised: %s", e) + return None + + +def detect_hardware( + *, + vram_budget_gb: float | None = None, + mps_budget_ratio: float = MPS_DEFAULT_BUDGET_RATIO, +) -> HardwareProfile: + """Detect the local hardware, with optional manual overrides. + + Args: + vram_budget_gb: when set, overrides the detected VRAM (use for + shared-GPU machines where part of the device is taken). + mps_budget_ratio: fraction of total RAM treated as the MPS + "VRAM" budget on Apple Silicon. + + Returns: + HardwareProfile reflecting current machine state. + """ + notes: list[str] = [] + ram_gb = _detect_ram_gb() + free_disk_gb = _detect_free_disk_gb() + cpu_count = os.cpu_count() or 1 + + cuda = _detect_cuda() + if cuda is not None: + vram_gb, device_name = cuda + accel: Accelerator = "cuda" + else: + mps = _detect_mps(ram_gb, mps_budget_ratio) + if mps is not None: + vram_gb, device_name = mps + accel = "mps" + notes.append( + f"MPS unified memory: VRAM budget = {mps_budget_ratio:.0%} of RAM." + ) + else: + vram_gb = 0.0 + device_name = platform.processor() or "cpu" + accel = "cpu" + + if vram_budget_gb is not None: + if vram_gb and vram_budget_gb > vram_gb: + notes.append( + f"Manual --budget-vram-gb={vram_budget_gb} exceeds detected {vram_gb:.1f} GB; using override." + ) + notes.append(f"Using manual VRAM budget: {vram_budget_gb} GB.") + vram_gb = vram_budget_gb + + return HardwareProfile( + accelerator=accel, + device_name=device_name, + vram_gb=vram_gb, + ram_gb=ram_gb, + free_disk_gb=free_disk_gb, + cpu_count=cpu_count, + notes=notes, + ) diff --git a/src/autointent/_advisor/_hub.py b/src/autointent/_advisor/_hub.py new file mode 100644 index 00000000..80ccb713 --- /dev/null +++ b/src/autointent/_advisor/_hub.py @@ -0,0 +1,183 @@ +"""HF Hub metadata lookups + warm-cache probe. + +Memoized per-process. Offline-safe: every probe falls back to a +heuristic value rather than raising. The advisor flips the report's +``low_confidence`` flag when a fallback is taken. +""" + +from __future__ import annotations + +import logging +import os +import re +from dataclasses import dataclass +from functools import lru_cache +from typing import Any + +logger = logging.getLogger(__name__) + +# Coarse heuristic estimates keyed on name fragments. Used only when HF Hub +# is unreachable and we can't get safetensors metadata. Values in millions. +_NAME_HEURISTICS = [ + (re.compile(r"(?i)(deberta|roberta|bert).*(xxlarge|huge)"), 1_500), + (re.compile(r"(?i)(deberta|roberta|bert).*xlarge"), 750), + (re.compile(r"(?i)(deberta|roberta|bert).*large"), 350), + (re.compile(r"(?i)e5.*large"), 560), + (re.compile(r"(?i)e5.*small"), 33), + (re.compile(r"(?i)mpnet"), 110), + (re.compile(r"(?i)minilm"), 33), + (re.compile(r"(?i)distil"), 66), + (re.compile(r"(?i)small"), 60), + (re.compile(r"(?i)base"), 110), + (re.compile(r"(?i)large"), 350), +] + + +@dataclass +class ModelMeta: + name: str + params_millions: float + weight_bytes_per_param: int + total_file_bytes: int + cached_locally: bool + confidence: str # "hub" | "heuristic" + + @property + def disk_gb(self) -> float: + return self.total_file_bytes / (1024**3) + + @property + def weights_gb(self) -> float: + return (self.params_millions * 1_000_000 * self.weight_bytes_per_param) / (1024**3) + + +@lru_cache(maxsize=1) +def hub_reachable(timeout_s: float = 2.0) -> bool: + """Single up-front probe. Memoized per process.""" + try: + from huggingface_hub import HfApi + + HfApi().list_models(limit=1) + except ImportError: + logger.debug("huggingface_hub not installed; assuming offline") + return False + except Exception as e: # noqa: BLE001 + logger.debug("HF Hub probe failed: %s", e) + return False + else: + return True + + +def _heuristic_params_millions(model_name: str) -> float: + for pattern, m in _NAME_HEURISTICS: + if pattern.search(model_name): + return float(m) + return 110.0 # generic BERT-base default + + +def _is_warm_cached(model_name: str) -> bool: + """True when the weight shard is present in the local HF cache.""" + try: + from huggingface_hub import scan_cache_dir, try_to_load_from_cache + except ImportError: + return False + + weight_files = ["model.safetensors", "pytorch_model.bin", "model.safetensors.index.json"] + for fname in weight_files: + path = try_to_load_from_cache(model_name, fname) + if path is not None and path is not False: + return True + + # sharded models won't match the single-file probe; fall back to a scan + try: + cache = scan_cache_dir() + except Exception as e: # noqa: BLE001 + logger.debug("scan_cache_dir failed: %s", e) + return False + return any(repo.repo_id == model_name for repo in cache.repos) + + +def _hub_metadata(model_name: str) -> ModelMeta | None: + try: + from huggingface_hub import HfApi + except ImportError: + return None + + try: + info = HfApi().model_info(model_name, files_metadata=True) + except Exception as e: # noqa: BLE001 + logger.debug("model_info(%s) failed: %s", model_name, e) + return None + + params_millions = 0.0 + weight_bytes_per_param = 4 + safetensors = getattr(info, "safetensors", None) + if safetensors is not None: + params_total = getattr(safetensors, "total", None) or sum( + getattr(safetensors, "parameters", {}).values() or [0] + ) + if params_total: + params_millions = params_total / 1_000_000 + params_map: dict[str, Any] = getattr(safetensors, "parameters", {}) or {} + if any("F16" in k or "BF16" in k for k in params_map): + weight_bytes_per_param = 2 + + total_file_bytes = 0 + for sibling in getattr(info, "siblings", []) or []: + size = getattr(sibling, "size", None) + if size: + total_file_bytes += int(size) + + if params_millions == 0: + params_millions = _heuristic_params_millions(model_name) + + if total_file_bytes == 0: + total_file_bytes = int(params_millions * 1_000_000 * weight_bytes_per_param) + + return ModelMeta( + name=model_name, + params_millions=params_millions, + weight_bytes_per_param=weight_bytes_per_param, + total_file_bytes=total_file_bytes, + cached_locally=_is_warm_cached(model_name), + confidence="hub", + ) + + +def _heuristic_metadata(model_name: str) -> ModelMeta: + params_millions = _heuristic_params_millions(model_name) + weight_bytes_per_param = 4 + total_file_bytes = int(params_millions * 1_000_000 * weight_bytes_per_param) + return ModelMeta( + name=model_name, + params_millions=params_millions, + weight_bytes_per_param=weight_bytes_per_param, + total_file_bytes=total_file_bytes, + cached_locally=_is_warm_cached(model_name), + confidence="heuristic", + ) + + +@lru_cache(maxsize=64) +def resolve_model(model_name: str) -> ModelMeta: + """Resolve metadata for a single model name. Memoized per process. + + Always returns a value — never raises — so the advisor can keep going + on offline machines or for unknown checkpoints. + """ + if model_name.startswith("local:") or os.path.isabs(model_name): + return ModelMeta( + name=model_name, + params_millions=_heuristic_params_millions(model_name), + weight_bytes_per_param=4, + total_file_bytes=0, + cached_locally=True, + confidence="heuristic", + ) + + if hub_reachable(): + meta = _hub_metadata(model_name) + if meta is not None: + return meta + + return _heuristic_metadata(model_name) diff --git a/src/autointent/_advisor/_render.py b/src/autointent/_advisor/_render.py new file mode 100644 index 00000000..52168aa7 --- /dev/null +++ b/src/autointent/_advisor/_render.py @@ -0,0 +1,104 @@ +"""Rendering for the pre-flight report. + +Text output is grouped by phase (Resource / Data / Config) plus a Drivers +section and the always-on disclaimer. JSON output dumps the structured +report straight through. +""" + +from __future__ import annotations + +import json +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from ._report import PreflightReport + +_SEVERITY_TAG = {"green": "✓", "yellow": "⚠", "red": "✗"} +_PHASE_ORDER = ("resource", "data", "config") +_PHASE_LABEL = {"resource": "Resource", "data": "Data", "config": "Config"} + + +def render_text(report: "PreflightReport") -> str: + lines: list[str] = [] + title = "Compute feasibility check" + if report.preset_name: + title += f" — {report.preset_name}" + lines.append(title) + lines.append("─" * len(title)) + + hw = report.hardware + lines.append( + f"Hardware: {hw.get('accelerator', '?')} ({hw.get('device_name', '?')})," + f" {hw.get('vram_gb', 0):.1f} GB VRAM, {hw.get('ram_gb', 0):.0f} GB RAM," + f" {hw.get('free_disk_gb', 0):.0f} GB free disk" + ) + ds = report.dataset + lines.append( + f"Dataset: n_samples={ds.get('n_samples')}, n_classes={ds.get('n_classes')}," + f" avg_tokens={ds.get('avg_tokens')} ({ds.get('source')})" + ) + lines.append("") + + for phase in _PHASE_ORDER: + bucket = [f for f in report.findings if f.phase == phase] + if not bucket: + continue + lines.append(f"{_PHASE_LABEL[phase]}:") + for f in bucket: + tag = _SEVERITY_TAG.get(f.severity.value, "·") + lines.append(f" {tag} {f.message}") + lines.append("") + + if report.resource.drivers: + lines.append("Drivers of cost:") + for d in report.resource.drivers[:8]: + lines.append( + f" {d['node_type']}.{d['module']:<10} {d['model']:<48}" + f" {d['mode']:<14} VRAM ~{d['vram_gb']} GB, time ~{d['time_hours']} h" + f" [{d['confidence']}]" + ) + if len(report.resource.drivers) > 8: + lines.append(f" … and {len(report.resource.drivers) - 8} more") + lines.append("") + + if report.notes: + lines.append("Notes:") + for note in report.notes: + lines.append(f" • {note}") + lines.append("") + + summary = f"Verdict: {'feasible' if report.is_feasible else 'INFEASIBLE'} " + summary += f"(worst severity: {report.worst_severity.value})" + if report.low_confidence: + summary += " — low-confidence (heuristic fallback in use)" + lines.append(summary) + lines.append("Note: estimates are heuristic upper bounds, not measurements.") + return "\n".join(lines) + + +def render_json(report: "PreflightReport") -> str: + return json.dumps(report.to_dict(), indent=2, default=str) + + +def render_recommendation( + results: list[tuple[str, "PreflightReport"]], + chosen: str | None, +) -> str: + """Compact table for the ``recommend`` subcommand.""" + lines = ["", "Recommendation:"] + if chosen: + lines.append(f" → {chosen}") + else: + lines.append(" → none of the bundled presets fit your hardware as-is.") + lines.append("") + lines.append(f"{'Preset':<24} {'Status':<14} {'VRAM':<10} {'Time':<10} {'Worst':<8}") + lines.append("-" * 68) + for name, report in results: + verdict = "feasible" if report.is_feasible else "infeasible" + lines.append( + f"{name:<24} {verdict:<14} " + f"{report.resource.vram_gb:>4.1f} GB " + f"{report.resource.time_hours:>4.1f} h " + f"{report.worst_severity.value:<8}" + ) + return "\n".join(lines) diff --git a/src/autointent/_advisor/_report.py b/src/autointent/_advisor/_report.py new file mode 100644 index 00000000..0250482a --- /dev/null +++ b/src/autointent/_advisor/_report.py @@ -0,0 +1,113 @@ +"""Dataclasses for the pre-flight advisor's structured report.""" + +from __future__ import annotations + +from dataclasses import asdict, dataclass, field +from enum import Enum +from typing import Any, Literal + + +class Severity(str, Enum): + GREEN = "green" + YELLOW = "yellow" + RED = "red" + + +Phase = Literal["resource", "data", "config"] + + +@dataclass(frozen=True) +class Finding: + """A single advisor finding rendered as one line in the summary.""" + + phase: Phase + severity: Severity + message: str + metric: str | None = None + + +@dataclass +class ResourceEstimate: + """Aggregated resource numbers across the search space.""" + + disk_download_gb: float = 0.0 + disk_cached_gb: float = 0.0 + disk_dump_gb: float = 0.0 + ram_gb: float = 0.0 + vram_gb: float = 0.0 + time_hours: float = 0.0 + parallel_factor: int = 1 + drivers: list[dict[str, Any]] = field(default_factory=list) + + @property + def total_disk_gb(self) -> float: + return self.disk_download_gb + self.disk_dump_gb + + +@dataclass +class DatasetStats: + """Minimal stats the advisor needs about the user's dataset. + + Built either from a real ``Dataset`` or from CLI placeholder flags. + """ + + n_samples: int + n_classes: int + avg_tokens: int + p95_tokens: int | None = None + multilabel: bool = False + has_descriptions: bool | None = None + rare_classes: list[str] = field(default_factory=list) + source: str = "placeholder" + + @classmethod + def placeholder( + cls, + n_samples: int = 1_000, + n_classes: int = 10, + avg_tokens: int = 32, + multilabel: bool = False, + ) -> "DatasetStats": + return cls( + n_samples=n_samples, + n_classes=n_classes, + avg_tokens=avg_tokens, + p95_tokens=int(avg_tokens * 2.5), + multilabel=multilabel, + ) + + +@dataclass +class PreflightReport: + """One report covering all three phases.""" + + findings: list[Finding] = field(default_factory=list) + resource: ResourceEstimate = field(default_factory=ResourceEstimate) + hardware: dict[str, Any] = field(default_factory=dict) + dataset: dict[str, Any] = field(default_factory=dict) + preset_name: str | None = None + low_confidence: bool = False + notes: list[str] = field(default_factory=list) + + def add(self, phase: Phase, severity: Severity, message: str, metric: str | None = None) -> None: + self.findings.append(Finding(phase=phase, severity=severity, message=message, metric=metric)) + + @property + def worst_severity(self) -> Severity: + order = {Severity.GREEN: 0, Severity.YELLOW: 1, Severity.RED: 2} + if not self.findings: + return Severity.GREEN + return max((f.severity for f in self.findings), key=lambda s: order[s]) + + @property + def is_feasible(self) -> bool: + return self.worst_severity != Severity.RED + + def to_dict(self) -> dict[str, Any]: + d = asdict(self) + d["findings"] = [ + {**asdict(f), "severity": f.severity.value} for f in self.findings + ] + d["worst_severity"] = self.worst_severity.value + d["is_feasible"] = self.is_feasible + return d diff --git a/tests/advisor/__init__.py b/tests/advisor/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/advisor/test_estimates_and_cli.py b/tests/advisor/test_estimates_and_cli.py new file mode 100644 index 00000000..18c2615a --- /dev/null +++ b/tests/advisor/test_estimates_and_cli.py @@ -0,0 +1,198 @@ +"""End-to-end smoke tests for the advisor. + +These run offline — HF Hub probes are monkeypatched to fail so the +advisor falls back to its name-pattern heuristics. Verifies that: + +* every bundled preset can be inspected without raising; +* the recommend subcommand picks something on a generous budget and + nothing on a hostile one; +* ``--json`` emits parseable JSON. +""" + +from __future__ import annotations + +import json +import sys + +import pytest + +from autointent._advisor import DatasetStats, HardwareProfile, run_preflight +from autointent._advisor._cli import BUNDLED_PRESETS, main +from autointent.utils import load_preset + + +@pytest.fixture(autouse=True) +def _force_offline(monkeypatch: pytest.MonkeyPatch) -> None: + """Pin the HF Hub probe to "offline" so tests don't hit the network.""" + from autointent._advisor import _estimates, _hub + + _hub.hub_reachable.cache_clear() + _hub.resolve_model.cache_clear() + offline = lambda *_a, **_kw: False # noqa: E731 + monkeypatch.setattr(_hub, "hub_reachable", offline) + monkeypatch.setattr(_estimates, "hub_reachable", offline) + + +def _profile(vram_gb: float = 16.0) -> HardwareProfile: + return HardwareProfile( + accelerator="cuda" if vram_gb > 0 else "cpu", + device_name="test-gpu" if vram_gb > 0 else "test-cpu", + vram_gb=vram_gb, + ram_gb=32.0, + free_disk_gb=200.0, + cpu_count=8, + ) + + +@pytest.mark.parametrize("preset", BUNDLED_PRESETS) +def test_every_preset_inspects_without_raising(preset: str) -> None: + cfg = load_preset(preset) # type: ignore[arg-type] + stats = DatasetStats.placeholder(n_samples=500, n_classes=10, avg_tokens=24) + report = run_preflight(cfg, stats, _profile(vram_gb=16.0), preset_name=preset) + assert report.preset_name == preset + assert report.low_confidence is True # we forced offline + # always at least one resource-phase finding + assert any(f.phase == "resource" for f in report.findings) + + +def test_heavy_preset_is_infeasible_on_2gb_budget() -> None: + cfg = load_preset("transformers-heavy") # type: ignore[arg-type] + stats = DatasetStats.placeholder(n_samples=5000, n_classes=20, avg_tokens=40) + report = run_preflight(cfg, stats, _profile(vram_gb=2.0), preset_name="transformers-heavy") + assert not report.is_feasible, "deberta-v3-large should not fit in 2 GB" + + +def test_light_preset_is_feasible_on_8gb_budget() -> None: + cfg = load_preset("transformers-light") # type: ignore[arg-type] + stats = DatasetStats.placeholder(n_samples=1000, n_classes=10, avg_tokens=24) + report = run_preflight(cfg, stats, _profile(vram_gb=8.0), preset_name="transformers-light") + assert report.is_feasible + + +def test_n_jobs_doubles_vram_findings() -> None: + cfg = load_preset("transformers-light") # type: ignore[arg-type] + cfg = {**cfg, "hpo_config": {**(cfg.get("hpo_config") or {}), "n_jobs": 4}} + stats = DatasetStats.placeholder() + report = run_preflight(cfg, stats, _profile(vram_gb=4.0)) + assert any("parallel trials" in f.message for f in report.findings) + assert any(f.phase == "config" and "n_jobs" in f.message for f in report.findings) + + +def test_cli_inspect_json_is_parseable(capsys: pytest.CaptureFixture[str]) -> None: + rc = main( + [ + "inspect", + "transformers-light", + "--n-samples", + "500", + "--n-classes", + "5", + "--avg-tokens", + "20", + "--json", + "--budget-vram-gb", + "16", + ] + ) + captured = capsys.readouterr() + payload = json.loads(captured.out) + assert payload["preset_name"] == "transformers-light" + assert "findings" in payload + assert payload["worst_severity"] in {"green", "yellow", "red"} + # rc is 0 on feasible, 1 otherwise + assert rc in (0, 1) + + +def test_cli_inspect_text_runs(capsys: pytest.CaptureFixture[str]) -> None: + main( + [ + "inspect", + "transformers-light", + "--n-samples", + "200", + "--n-classes", + "5", + "--avg-tokens", + "15", + "--budget-vram-gb", + "16", + ] + ) + out = capsys.readouterr().out + assert "Compute feasibility check" in out + assert "Verdict:" in out + + +def test_cli_recommend_picks_a_preset_on_generous_hardware( + capsys: pytest.CaptureFixture[str], +) -> None: + rc = main( + [ + "recommend", + "--n-samples", + "1000", + "--n-classes", + "10", + "--avg-tokens", + "20", + "--budget-vram-gb", + "24", + ] + ) + out = capsys.readouterr().out + assert "Recommendation:" in out + assert rc == 0 + + +def test_partial_descriptions_with_description_scorer_flags_red() -> None: + cfg = { + "search_space": [ + { + "node_type": "scoring", + "search_space": [ + {"module_name": "description"}, + ], + } + ], + } + stats = DatasetStats( + n_samples=500, + n_classes=10, + avg_tokens=24, + has_descriptions=False, + ) + report = run_preflight(cfg, stats, _profile(vram_gb=16.0)) + assert any( + f.phase == "data" and "description" in f.message.lower() for f in report.findings + ) + + +def test_long_dataset_triggers_truncation_warning() -> None: + cfg = { + "search_space": [ + { + "node_type": "scoring", + "search_space": [ + { + "module_name": "bert", + "classification_model_config": [ + {"model_name": "microsoft/deberta-v3-small"} + ], + "max_length": [128], + } + ], + } + ], + } + stats = DatasetStats( + n_samples=500, + n_classes=10, + avg_tokens=80, + p95_tokens=512, # well over 128 + ) + report = run_preflight(cfg, stats, _profile(vram_gb=16.0)) + assert any("truncation" in f.message.lower() for f in report.findings) + + +if __name__ == "__main__": + sys.exit(pytest.main([__file__, "-v"])) diff --git a/tests/advisor/test_estimates_internals.py b/tests/advisor/test_estimates_internals.py new file mode 100644 index 00000000..0317ff27 --- /dev/null +++ b/tests/advisor/test_estimates_internals.py @@ -0,0 +1,319 @@ +"""Targeted tests for `_estimates` helpers + edge cases of `run_preflight`.""" + +from __future__ import annotations + +import pytest + +from autointent._advisor import _estimates, _hub +from autointent._advisor._estimates import ( + _classify_severity, + _extract_model_names, + _max_int, + _ram_for_module, + _vram_for_transformer, + run_preflight, +) +from autointent._advisor._hardware import HardwareProfile +from autointent._advisor._hub import ModelMeta +from autointent._advisor._report import DatasetStats, Severity + + +@pytest.fixture(autouse=True) +def _offline(monkeypatch: pytest.MonkeyPatch) -> None: + _hub.hub_reachable.cache_clear() + _hub.resolve_model.cache_clear() + offline = lambda *_a, **_kw: False # noqa: E731 + monkeypatch.setattr(_hub, "hub_reachable", offline) + monkeypatch.setattr(_estimates, "hub_reachable", offline) + monkeypatch.setattr(_hub, "_is_warm_cached", lambda _name: False) + + +def _profile(vram_gb: float = 16.0, accelerator: str = "cuda") -> HardwareProfile: + return HardwareProfile( + accelerator=accelerator, # type: ignore[arg-type] + device_name=f"test-{accelerator}", + vram_gb=vram_gb, + ram_gb=32.0, + free_disk_gb=200.0, + cpu_count=8, + ) + + +class TestMaxInt: + def test_none_returns_default(self) -> None: + assert _max_int(None, 7) == 7 + + def test_list_picks_max(self) -> None: + assert _max_int([1, 5, 3], 0) == 5 + + def test_range_dict_uses_high(self) -> None: + assert _max_int({"low": 1, "high": 9}, 0) == 9 + + def test_scalar_int_passes_through(self) -> None: + assert _max_int(42, 0) == 42 + + def test_garbage_returns_default(self) -> None: + assert _max_int("not-a-number", 11) == 11 + + +class TestExtractModelNames: + def test_classification_model_config_as_list(self) -> None: + entry = {"classification_model_config": [{"model_name": "foo/bar"}]} + assert _extract_model_names(entry) == ["foo/bar"] + + def test_classification_model_config_as_dict(self) -> None: + entry = {"classification_model_config": {"model_name": "foo/bar"}} + assert _extract_model_names(entry) == ["foo/bar"] + + def test_embedder_config_picked_up(self) -> None: + entry = {"embedder_config": [{"model_name": "e/b"}]} + assert _extract_model_names(entry) == ["e/b"] + + def test_multiple_choices_all_returned(self) -> None: + entry = { + "classification_model_config": [ + {"model_name": "a/x"}, + {"model_name": "b/y"}, + ] + } + assert _extract_model_names(entry) == ["a/x", "b/y"] + + def test_empty_entry(self) -> None: + assert _extract_model_names({}) == [] + + +class TestClassifySeverity: + def test_below_yellow_is_green(self) -> None: + assert _classify_severity(estimate=1.0, budget=10.0) == Severity.GREEN + + def test_above_yellow_threshold(self) -> None: + assert _classify_severity(estimate=8.0, budget=10.0) == Severity.YELLOW + + def test_at_or_above_red_threshold(self) -> None: + assert _classify_severity(estimate=10.0, budget=10.0) == Severity.RED + assert _classify_severity(estimate=12.0, budget=10.0) == Severity.RED + + def test_zero_budget_returns_yellow(self) -> None: + assert _classify_severity(estimate=1.0, budget=0.0) == Severity.YELLOW + + +class TestVramForTransformer: + @pytest.fixture + def meta(self) -> ModelMeta: + return ModelMeta( + name="x", + params_millions=100.0, + weight_bytes_per_param=4, + total_file_bytes=0, + cached_locally=False, + confidence="hub", + ) + + def test_full_finetune_is_larger_than_lora_is_larger_than_inference( + self, meta: ModelMeta + ) -> None: + inference = _vram_for_transformer(meta, "inference", mixed_precision=False) + lora = _vram_for_transformer(meta, "lora", mixed_precision=False) + full = _vram_for_transformer(meta, "full-finetune", mixed_precision=False) + assert inference < lora < full + + def test_amp_does_not_naively_halve(self, meta: ModelMeta) -> None: + """The proposal calls out that AMP doesn't halve total VRAM — fp32 master + weights and Adam moments don't shrink. Weight-side accounting comes out + equal to fp32; the only savings (activations) aren't modeled by us.""" + full_fp32 = _vram_for_transformer(meta, "full-finetune", mixed_precision=False) + full_amp = _vram_for_transformer(meta, "full-finetune", mixed_precision=True) + assert full_amp / full_fp32 == pytest.approx(1.0) + assert full_amp / full_fp32 > 0.5 # explicit check vs the naive-halving formula + + def test_reranker_uses_inference_class(self, meta: ModelMeta) -> None: + inference = _vram_for_transformer(meta, "inference", mixed_precision=False) + reranker = _vram_for_transformer(meta, "reranker", mixed_precision=False) + assert reranker > inference + + +def test_ram_scales_with_dataset_size() -> None: + meta = ModelMeta( + name="x", + params_millions=100.0, + weight_bytes_per_param=4, + total_file_bytes=0, + cached_locally=False, + confidence="hub", + ) + small = _ram_for_module(meta, DatasetStats.placeholder(n_samples=100)) + big = _ram_for_module(meta, DatasetStats.placeholder(n_samples=10_000_000, avg_tokens=128)) + assert big > small + + +class TestRunPreflightFeatures: + def test_dump_modules_adds_disk_during_training(self) -> None: + cfg = { + "search_space": [ + { + "node_type": "scoring", + "search_space": [ + { + "module_name": "bert", + "classification_model_config": [ + {"model_name": "microsoft/deberta-v3-small"} + ], + "num_train_epochs": [3], + "batch_size": [16], + } + ], + } + ], + "hpo_config": {"n_trials": 5}, + "dump_modules": True, + } + report = run_preflight(cfg, DatasetStats.placeholder(), _profile()) + assert report.resource.disk_dump_gb > 0 + assert any("during training" in f.message for f in report.findings) + + def test_refit_after_increases_time(self) -> None: + cfg = { + "search_space": [ + { + "node_type": "scoring", + "search_space": [ + { + "module_name": "bert", + "classification_model_config": [ + {"model_name": "microsoft/deberta-v3-small"} + ], + "num_train_epochs": [3], + "batch_size": [16], + } + ], + } + ], + "hpo_config": {"n_trials": 10}, + } + baseline = run_preflight(cfg, DatasetStats.placeholder(), _profile()) + cfg_refit = {**cfg, "refit_after": True} + bumped = run_preflight(cfg_refit, DatasetStats.placeholder(), _profile()) + assert bumped.resource.time_hours > baseline.resource.time_hours + + def test_catboost_gpu_without_cuda_flags_config(self) -> None: + cfg = { + "search_space": [ + { + "node_type": "scoring", + "search_space": [ + {"module_name": "catboost", "task_type": "GPU"}, + ], + } + ], + } + report = run_preflight(cfg, DatasetStats.placeholder(), _profile(accelerator="cpu")) + assert any( + f.phase == "config" and "CatBoost" in f.message for f in report.findings + ) + + def test_catboost_gpu_with_cuda_is_silent(self) -> None: + cfg = { + "search_space": [ + { + "node_type": "scoring", + "search_space": [ + {"module_name": "catboost", "task_type": "GPU"}, + ], + } + ], + } + report = run_preflight(cfg, DatasetStats.placeholder(), _profile(accelerator="cuda")) + assert not any( + f.phase == "config" and "CatBoost" in f.message for f in report.findings + ) + + def test_offline_flips_low_confidence(self) -> None: + cfg = { + "search_space": [ + { + "node_type": "scoring", + "search_space": [ + { + "module_name": "bert", + "classification_model_config": [{"model_name": "any/model"}], + } + ], + } + ] + } + report = run_preflight(cfg, DatasetStats.placeholder(), _profile()) + assert report.low_confidence is True + assert any("HF Hub unreachable" in n for n in report.notes) + + def test_rare_classes_with_linear_scorer_flag_red(self) -> None: + cfg = { + "search_space": [ + { + "node_type": "scoring", + "search_space": [ + {"module_name": "linear"}, + ], + } + ] + } + stats = DatasetStats( + n_samples=20, + n_classes=5, + avg_tokens=10, + rare_classes=["intent_a", "intent_b"], + ) + report = run_preflight(cfg, stats, _profile()) + assert any( + f.phase == "data" and "LogisticRegressionCV" in f.message and f.severity == Severity.RED + for f in report.findings + ) + + def test_truncation_red_when_p95_dominates_max_length(self) -> None: + cfg = { + "search_space": [ + { + "node_type": "scoring", + "search_space": [ + { + "module_name": "bert", + "max_length": [128], + "classification_model_config": [ + {"model_name": "some/model"} + ], + } + ], + } + ] + } + stats = DatasetStats(n_samples=500, n_classes=5, avg_tokens=50, p95_tokens=400) + report = run_preflight(cfg, stats, _profile()) + red = [f for f in report.findings if f.phase == "data" and f.severity == Severity.RED] + assert red, "p95=400 > 1.5 * max_length=128 should be red" + + def test_truncation_yellow_when_p95_only_slightly_exceeds(self) -> None: + cfg = { + "search_space": [ + { + "node_type": "scoring", + "search_space": [ + { + "module_name": "bert", + "max_length": [128], + "classification_model_config": [ + {"model_name": "some/model"} + ], + } + ], + } + ] + } + stats = DatasetStats(n_samples=500, n_classes=5, avg_tokens=50, p95_tokens=140) + report = run_preflight(cfg, stats, _profile()) + yellows = [ + f + for f in report.findings + if f.phase == "data" + and f.severity == Severity.YELLOW + and "truncation" in f.message.lower() + ] + assert yellows diff --git a/tests/advisor/test_hardware_detection.py b/tests/advisor/test_hardware_detection.py new file mode 100644 index 00000000..d8131fb1 --- /dev/null +++ b/tests/advisor/test_hardware_detection.py @@ -0,0 +1,72 @@ +"""Hardware detection has to be safe on every machine — broken CUDA, no GPU, +no psutil. Verify the fallbacks work without raising. +""" + +from __future__ import annotations + +from unittest.mock import patch + +import pytest + +from autointent._advisor._hardware import detect_hardware + + +def test_cpu_fallback_when_no_accelerator() -> None: + with ( + patch("autointent._advisor._hardware._detect_cuda", return_value=None), + patch("autointent._advisor._hardware._detect_mps", return_value=None), + ): + hw = detect_hardware() + assert hw.accelerator == "cpu" + assert hw.vram_gb == 0.0 + assert hw.device_class == "cpu" + + +def test_cuda_branch_classifies_low_gpu() -> None: + with ( + patch( + "autointent._advisor._hardware._detect_cuda", + return_value=(8.0, "NVIDIA RTX 3060"), + ), + ): + hw = detect_hardware() + assert hw.accelerator == "cuda" + assert hw.vram_gb == pytest.approx(8.0) + assert hw.device_class == "low-gpu" + + +def test_mps_budget_uses_ram_fraction() -> None: + with ( + patch("autointent._advisor._hardware._detect_cuda", return_value=None), + patch("autointent._advisor._hardware._detect_ram_gb", return_value=32.0), + patch( + "autointent._advisor._hardware._detect_mps", + side_effect=lambda ram, ratio: (ram * ratio, "Apple Silicon (arm64)"), + ), + ): + hw = detect_hardware() + assert hw.accelerator == "mps" + assert hw.vram_gb == pytest.approx(32.0 * 0.7) + assert any("MPS unified memory" in n for n in hw.notes) + + +def test_vram_budget_override_applies() -> None: + with ( + patch( + "autointent._advisor._hardware._detect_cuda", + return_value=(24.0, "NVIDIA RTX 4090"), + ), + ): + hw = detect_hardware(vram_budget_gb=8.0) + assert hw.vram_gb == pytest.approx(8.0) + assert any("manual VRAM budget" in n for n in hw.notes) + + +def test_broken_cuda_returns_none_does_not_crash() -> None: + # _detect_cuda swallows torch quirks already; verify the wrapper holds. + with ( + patch("autointent._advisor._hardware._detect_cuda", return_value=None), + patch("autointent._advisor._hardware._detect_mps", return_value=None), + ): + hw = detect_hardware() + assert hw.accelerator == "cpu" diff --git a/tests/advisor/test_hub_heuristics.py b/tests/advisor/test_hub_heuristics.py new file mode 100644 index 00000000..54a03431 --- /dev/null +++ b/tests/advisor/test_hub_heuristics.py @@ -0,0 +1,81 @@ +"""Tests for the offline name-pattern heuristics in `_hub`. + +The advisor must produce a sensible estimate even when HF Hub is +unreachable, so these tests pin the public `hub_reachable` to False and +exercise the heuristic path directly. +""" + +from __future__ import annotations + +import pytest + +from autointent._advisor import _hub + + +@pytest.fixture(autouse=True) +def _offline(monkeypatch: pytest.MonkeyPatch) -> None: + _hub.hub_reachable.cache_clear() + _hub.resolve_model.cache_clear() + monkeypatch.setattr(_hub, "hub_reachable", lambda *_a, **_kw: False) + monkeypatch.setattr(_hub, "_is_warm_cached", lambda _name: False) + + +@pytest.mark.parametrize( + ("name", "expected_min_m", "expected_max_m"), + [ + ("microsoft/deberta-v3-large", 200, 500), + ("microsoft/deberta-v3-small", 30, 200), + ("sentence-transformers/all-MiniLM-L6-v2", 20, 80), + ("intfloat/multilingual-e5-large-instruct", 300, 700), + ("intfloat/e5-small", 20, 80), + ("distilbert-base-uncased", 40, 150), + ("bert-base-uncased", 70, 200), + ], +) +def test_name_heuristic_picks_reasonable_bucket( + name: str, expected_min_m: int, expected_max_m: int +) -> None: + meta = _hub.resolve_model(name) + assert meta.confidence == "heuristic" + assert expected_min_m <= meta.params_millions <= expected_max_m, ( + f"{name} got {meta.params_millions}M; expected [{expected_min_m}, {expected_max_m}]" + ) + + +def test_unknown_name_falls_back_to_bert_base() -> None: + meta = _hub.resolve_model("totally-made-up/no-such-model") + assert meta.confidence == "heuristic" + assert meta.params_millions == pytest.approx(110.0) + + +def test_weights_gb_matches_params_times_bytes() -> None: + meta = _hub.resolve_model("microsoft/deberta-v3-large") + expected_gb = meta.params_millions * 1_000_000 * meta.weight_bytes_per_param / (1024**3) + assert meta.weights_gb == pytest.approx(expected_gb) + + +def test_local_path_returns_zero_disk() -> None: + meta = _hub.resolve_model("/tmp/local/path/to/model") + assert meta.total_file_bytes == 0 + assert meta.cached_locally is True + + +def test_disk_gb_falls_back_to_param_size_when_siblings_unknown() -> None: + meta = _hub.resolve_model("intfloat/multilingual-e5-large-instruct") + assert meta.disk_gb > 0 + assert meta.disk_gb == pytest.approx(meta.weights_gb, rel=0.01) + + +def test_resolve_is_memoized() -> None: + a = _hub.resolve_model("microsoft/deberta-v3-large") + b = _hub.resolve_model("microsoft/deberta-v3-large") + assert a is b + + +def test_metadata_fallback_uses_heuristic_when_hub_unreachable() -> None: + """End-to-end: resolve_model must return a usable ModelMeta even when + the live Hub is unreachable (autouse fixture forces offline).""" + meta = _hub.resolve_model("microsoft/deberta-v3-large") + assert meta.confidence == "heuristic" + assert meta.params_millions > 0 + assert meta.disk_gb > 0 diff --git a/tests/advisor/test_render.py b/tests/advisor/test_render.py new file mode 100644 index 00000000..e82d7573 --- /dev/null +++ b/tests/advisor/test_render.py @@ -0,0 +1,151 @@ +"""Output rendering: text formatting and JSON serialization.""" + +from __future__ import annotations + +import json + +import pytest + +from autointent._advisor._render import render_json, render_recommendation, render_text +from autointent._advisor._report import ( + DatasetStats, + PreflightReport, + ResourceEstimate, + Severity, +) + + +def _populated_report() -> PreflightReport: + r = PreflightReport( + preset_name="example", + hardware={ + "accelerator": "cuda", + "device_name": "RTX 3060", + "vram_gb": 8.0, + "ram_gb": 32.0, + "free_disk_gb": 100.0, + "device_class": "low-gpu", + }, + dataset={"n_samples": 500, "n_classes": 10, "avg_tokens": 30, "source": "placeholder"}, + resource=ResourceEstimate( + disk_download_gb=2.5, + disk_cached_gb=0.5, + ram_gb=1.0, + vram_gb=4.0, + time_hours=1.2, + drivers=[ + { + "node_type": "scoring", + "module": "bert", + "model": "x/y", + "mode": "full-finetune", + "vram_gb": 4.0, + "ram_gb": 1.0, + "time_hours": 1.2, + "confidence": "hub", + } + ], + ), + notes=["MPS unified memory note"], + ) + r.add("resource", Severity.YELLOW, "VRAM ~6 GB vs available 8 GB") + r.add("data", Severity.RED, "rare classes blocked") + return r + + +class TestRenderText: + def test_contains_phase_blocks(self) -> None: + out = render_text(_populated_report()) + assert "Resource:" in out + assert "Data:" in out + # Config phase has no findings → block omitted + assert "Config:" not in out + + def test_includes_drivers_block(self) -> None: + out = render_text(_populated_report()) + assert "Drivers of cost:" in out + assert "x/y" in out + + def test_verdict_reflects_worst_severity(self) -> None: + out = render_text(_populated_report()) + assert "Verdict: INFEASIBLE" in out + assert "worst severity: red" in out + + def test_disclaimer_always_present(self) -> None: + out = render_text(_populated_report()) + assert "heuristic upper bounds" in out + + def test_low_confidence_tag_when_offline(self) -> None: + r = _populated_report() + r.low_confidence = True + out = render_text(r) + assert "low-confidence" in out + + def test_preset_name_in_title(self) -> None: + out = render_text(_populated_report()) + assert "Compute feasibility check — example" in out + + def test_empty_report_still_renders(self) -> None: + out = render_text(PreflightReport()) + assert "Compute feasibility check" in out + assert "Verdict: feasible" in out + + +class TestRenderJson: + def test_is_valid_json(self) -> None: + json.loads(render_json(_populated_report())) + + def test_findings_have_string_severity(self) -> None: + d = json.loads(render_json(_populated_report())) + for f in d["findings"]: + assert f["severity"] in {"green", "yellow", "red"} + + def test_worst_severity_and_feasibility_serialized(self) -> None: + d = json.loads(render_json(_populated_report())) + assert d["worst_severity"] == "red" + assert d["is_feasible"] is False + + def test_empty_report_serializes(self) -> None: + d = json.loads(render_json(PreflightReport())) + assert d["worst_severity"] == "green" + assert d["is_feasible"] is True + + +class TestRenderRecommendation: + def _two_reports(self) -> list[tuple[str, PreflightReport]]: + a = PreflightReport(preset_name="a", resource=ResourceEstimate(vram_gb=2.0, time_hours=0.5)) + a.add("resource", Severity.GREEN, "ok") + b = PreflightReport(preset_name="b", resource=ResourceEstimate(vram_gb=8.0, time_hours=4.0)) + b.add("resource", Severity.RED, "too big") + return [("a", a), ("b", b)] + + def test_lists_chosen_preset_when_present(self) -> None: + out = render_recommendation(self._two_reports(), chosen="a") + assert "→ a" in out + + def test_handles_no_chosen(self) -> None: + out = render_recommendation(self._two_reports(), chosen=None) + assert "none of the bundled presets" in out + + def test_includes_all_presets_in_table(self) -> None: + out = render_recommendation(self._two_reports(), chosen="a") + assert "a " in out # preset name + assert "b " in out + + def test_shows_status_per_preset(self) -> None: + out = render_recommendation(self._two_reports(), chosen="a") + assert "feasible" in out + assert "infeasible" in out + + +def test_dataset_stats_in_text_block() -> None: + stats = DatasetStats.placeholder(n_samples=777, n_classes=4) + r = PreflightReport(dataset={ + "n_samples": stats.n_samples, + "n_classes": stats.n_classes, + "avg_tokens": stats.avg_tokens, + "source": stats.source, + }) + out = render_text(r) + assert "777" in out + assert "n_classes=4" in out diff --git a/tests/advisor/test_report.py b/tests/advisor/test_report.py new file mode 100644 index 00000000..52f2e675 --- /dev/null +++ b/tests/advisor/test_report.py @@ -0,0 +1,85 @@ +"""Unit tests for the report dataclasses.""" + +from __future__ import annotations + +import pytest + +from autointent._advisor._report import ( + DatasetStats, + Finding, + PreflightReport, + ResourceEstimate, + Severity, +) + + +class TestSeverityOrdering: + def test_worst_severity_on_empty_report_is_green(self) -> None: + assert PreflightReport().worst_severity == Severity.GREEN + + def test_red_beats_yellow_beats_green(self) -> None: + r = PreflightReport() + r.add("resource", Severity.GREEN, "ok") + r.add("data", Severity.YELLOW, "warn") + assert r.worst_severity == Severity.YELLOW + r.add("config", Severity.RED, "fail") + assert r.worst_severity == Severity.RED + + def test_is_feasible_flips_on_any_red(self) -> None: + r = PreflightReport() + r.add("resource", Severity.YELLOW, "warn") + assert r.is_feasible is True + r.add("data", Severity.RED, "fail") + assert r.is_feasible is False + + +class TestDatasetStatsPlaceholder: + def test_defaults_populate_p95_above_avg(self) -> None: + stats = DatasetStats.placeholder() + assert stats.n_samples == 1_000 + assert stats.p95_tokens is not None + assert stats.p95_tokens > stats.avg_tokens + assert stats.source == "placeholder" + + def test_overrides_propagate(self) -> None: + stats = DatasetStats.placeholder(n_samples=42, n_classes=3, avg_tokens=80, multilabel=True) + assert stats.n_samples == 42 + assert stats.n_classes == 3 + assert stats.avg_tokens == 80 + assert stats.multilabel is True + + +class TestResourceEstimate: + def test_total_disk_sums_download_and_dump(self) -> None: + e = ResourceEstimate(disk_download_gb=2.5, disk_dump_gb=4.0) + assert e.total_disk_gb == pytest.approx(6.5) + + def test_total_disk_ignores_cached(self) -> None: + e = ResourceEstimate(disk_download_gb=1.0, disk_cached_gb=100.0, disk_dump_gb=0.5) + assert e.total_disk_gb == pytest.approx(1.5) + + +class TestToDictSerialization: + def test_findings_round_trip_severity_as_string(self) -> None: + r = PreflightReport() + r.add("resource", Severity.RED, "boom") + d = r.to_dict() + assert d["worst_severity"] == "red" + assert d["is_feasible"] is False + assert d["findings"] == [ + {"phase": "resource", "severity": "red", "message": "boom", "metric": None}, + ] + + def test_hardware_and_dataset_pass_through(self) -> None: + r = PreflightReport( + hardware={"accelerator": "cuda", "vram_gb": 8.0}, + dataset={"n_samples": 100, "n_classes": 5}, + ) + d = r.to_dict() + assert d["hardware"]["accelerator"] == "cuda" + assert d["dataset"]["n_samples"] == 100 + + def test_finding_is_frozen(self) -> None: + f = Finding(phase="resource", severity=Severity.GREEN, message="ok") + with pytest.raises(Exception): # noqa: PT011 - dataclass.FrozenInstanceError varies + f.message = "changed" # type: ignore[misc]