Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
__pycache__/
.venv/
api_key.txt
data/
output/
Expand Down
153 changes: 153 additions & 0 deletions examples/scripts/run_analyzer_standalone.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
#!/usr/bin/env python3
from __future__ import annotations

import argparse
import json
import os
import sqlite3
import sys
from typing import Any, Dict

from langgraph.checkpoint.sqlite import SqliteSaver
from omegaconf import OmegaConf


_REPO_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", ".."))
if _REPO_ROOT not in sys.path:
sys.path.insert(0, _REPO_ROOT)

from loopai.agents.Analyzer import (
ANALYZER_NODE_NAMES,
get_analyzer_checkpoint_state,
run_analyzer_standalone,
)


_DEFAULT_CHECKPOINT_PATH = "outputs/analyzer_checkpoints.sqlite"


def _load_state(config_path: str) -> Dict[str, Any]:
if config_path.endswith(".json"):
with open(config_path, "r", encoding="utf-8") as f:
cfg = json.load(f)
return cfg.get("default_states", cfg)

cfg = OmegaConf.load(config_path)
state_cfg = cfg.default_states if "default_states" in cfg else cfg
return OmegaConf.to_container(state_cfg, resolve=True)


def _redact_key_fields(value: Any) -> Any:
if isinstance(value, dict):
redacted = {}
for key, child in value.items():
key_name = str(key).lower()
if key_name in {"api_key", "analyze_api_key"} or key_name.endswith("_key"):
redacted[key] = "***REDACTED***"
else:
redacted[key] = _redact_key_fields(child)
return redacted
if isinstance(value, list):
return [_redact_key_fields(item) for item in value]
return value


def _build_sqlite_checkpointer(checkpoint_path: str) -> SqliteSaver:
checkpoint_dir = os.path.dirname(checkpoint_path)
if checkpoint_dir:
os.makedirs(checkpoint_dir, exist_ok=True)
conn = sqlite3.connect(checkpoint_path, check_same_thread=False)
checkpointer = SqliteSaver(conn)
setup = getattr(checkpointer, "setup", None)
if setup is not None:
setup()
return checkpointer


def _parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Run AnalyzerAgent directly with LangGraph checkpoint resume support."
)
parser.add_argument(
"--config-path",
required=True,
help="Path to a YAML/JSON state config. If it contains default_states, that mapping is used as state.",
)
parser.add_argument(
"--thread-id",
default="analyzer-default",
help="LangGraph checkpoint thread_id.",
)
parser.add_argument(
"--checkpoint-path",
default=_DEFAULT_CHECKPOINT_PATH,
help="SQLite checkpoint file path for cross-process resume.",
)
parser.add_argument(
"--resume",
action="store_true",
help="Resume with the same thread_id from the latest checkpoint.",
)
parser.add_argument(
"--from-node",
default=None,
help=(
"Resume from a specific Analyzer node using checkpoint history when available. "
f"Available nodes: {', '.join(ANALYZER_NODE_NAMES)}. "
"Legacy names like analyze_result_node are also accepted."
),
)
parser.add_argument(
"--print-result",
action="store_true",
help="Print the final state/result as JSON.",
)
return parser.parse_args()


def main() -> None:
args = _parse_args()
checkpointer = _build_sqlite_checkpointer(args.checkpoint_path)
state = None if args.resume else _load_state(args.config_path)

if args.resume:
try:
checkpoint_state = get_analyzer_checkpoint_state(
args.thread_id,
checkpointer=checkpointer,
checkpoint_path=args.checkpoint_path,
)
print(
f"[AnalyzerStandalone] resume checkpoint found: "
f"thread_id={args.thread_id}, "
f"current={checkpoint_state.get('current') or checkpoint_state.get('next_to') or 'unknown'}",
flush=True,
)
except RuntimeError as exc:
print(
f"[AnalyzerStandalone] {exc}. "
f"Please run once without --resume using thread_id={args.thread_id} "
f"and checkpoint_path={args.checkpoint_path} first.",
flush=True,
)
raise SystemExit(1)

try:
result = run_analyzer_standalone(
state=state,
thread_id=args.thread_id,
resume=args.resume,
from_node=args.from_node,
checkpointer=checkpointer,
checkpoint_path=args.checkpoint_path,
)
except RuntimeError as exc:
print(f"[AnalyzerStandalone] {exc}", flush=True)
raise SystemExit(1)

if args.print_result:
print(json.dumps(_redact_key_fields(result), ensure_ascii=False, indent=2, default=str))


if __name__ == "__main__":
main()
3 changes: 2 additions & 1 deletion loopai/agents/Analyzer/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
from .analyzer_agent import AnalyzerAgent
from .analyzer_agent import AnalyzerAgent
from .standalone import ANALYZER_NODE_NAMES, get_analyzer_checkpoint_state, run_analyzer_standalone
6 changes: 3 additions & 3 deletions loopai/agents/Analyzer/analyzer_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ def finish_node(state: LoopAIState, runtime: Runtime[RuntimeContext]):
return state
return finish_node

def init_graph(self, **kwargs):
def init_graph(self, entry_point: Optional[str] = None, **kwargs):
builder = StateGraph(LoopAIState)

builder.add_node("check_required_fields", self.get_check_required_fields_node())
Expand All @@ -151,7 +151,7 @@ def init_graph(self, **kwargs):
builder.add_edge("metric_score", "analyze_metric_report")
builder.add_edge("analyze_metric_report", "finish")

builder.set_entry_point("check_required_fields")
builder.set_entry_point(entry_point or "check_required_fields")
builder.set_finish_point("finish")

self.graph = builder.compile(
Expand All @@ -168,4 +168,4 @@ def __call__(self, **kwargs):
kwargs: keyword arguments to pass to init_graph
"""
self.init_graph(**kwargs)
return self.graph
return self.graph
128 changes: 128 additions & 0 deletions loopai/agents/Analyzer/readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -580,3 +580,131 @@ Analyzer 与 Judger 配合后,可以形成完整的:
```

闭环流程。

---

## 十、独立运行与断点续跑

Analyzer 现在支持不经过 Starter 独立运行。独立入口只封装运行控制,不改变 Analyzer 原有业务节点、graph 拓扑、state 字段结构、输入格式或输出格式。

### 1. Python runner

可以直接调用:

```python
from loopai.agents import run_analyzer_standalone

result = run_analyzer_standalone(
state={
"task_id": "analyzer-demo",
"output_dir": "./outputs",
"eval": {},
"analyzer": {
"analyze_task_type": "code",
"eval_result_path": "./outputs/result.jsonl",
"analyze_model_path": "gpt-4o-mini",
"analyze_base_url": "http://127.0.0.1:8000/v1",
"analyze_api_key": "EMPTY",
"analyze_temperature": 0.0,
"analyze_top_p": 0.95,
"analyze_batch_size": 20,
"analyze_max_concurrency": 5,
"analyze_chunk_size": 50,
"analyze_sampling_top_k": 5,
"output_brief": True,
"output_suggestion": True,
"quick_brief": True,
"quick_brief_limit": 10
}
},
thread_id="analyzer-demo-001",
)
```

runner 内部仍然使用:

```python
AnalyzerAgent(checkpointer=checkpointer, store=store)
graph.invoke(...)
```

返回值为 Analyzer graph 的最终 state / result。

### 2. CLI 入口

新增命令行入口:

```bash
python examples/scripts/run_analyzer_standalone.py \
--config-path examples/config/starter.yaml \
--thread-id analyzer-test-001
```

CLI 支持参数:

- `--config-path`:YAML / JSON 配置路径。如果配置中包含 `default_states`,则使用 `default_states` 作为输入 state。
- `--thread-id`:LangGraph checkpoint 使用的 thread id。
- `--checkpoint-path`:SQLite checkpoint 文件路径,默认 `outputs/analyzer_checkpoints.sqlite`。
- `--resume`:使用相同 `thread_id` 从 checkpoint 恢复。
- `--from-node`:从指定 Analyzer 节点继续运行。
- `--print-result`:将最终 state / result 打印为 JSON。

### 3. 断点续跑

使用相同 `thread_id` 恢复最近 checkpoint:

```bash
python examples/scripts/run_analyzer_standalone.py \
--config-path examples/config/starter.yaml \
--thread-id analyzer-test-001 \
--checkpoint-path outputs/analyzer_checkpoints.sqlite \
--resume
```

从指定节点继续运行:

```bash
python examples/scripts/run_analyzer_standalone.py \
--config-path examples/config/starter.yaml \
--thread-id analyzer-test-001 \
--checkpoint-path outputs/analyzer_checkpoints.sqlite \
--resume \
--from-node analyze_result_node
```

当前可用 Analyzer 节点名:

```text
check_required_fields
route_eval
eval_model
metric_recommend
metric_score
analyze_metric_report
analyze_result
draw_conclusion
finish
```

`resume` 和 `from_node` 使用同一套恢复入口逻辑:先从 SQLite checkpoint 读取 snapshot,再根据 `snapshot.next`、`state["current"]` 或用户指定的 `--from-node` 决定恢复节点,并临时构建以该节点为 entry point 的 Analyzer graph。这样中断在 `draw_conclusion_node` 时,续跑只会执行 `draw_conclusion_node -> finish_node`,不会重新执行 `eval_model_node` 和 `analyze_result_node`。

### 4. state 兼容性

独立运行入口保持现有 state 格式不变,例如:

```python
{
"task_id": "...",
"output_dir": "...",
"eval": {...},
"analyzer": {...}
}
```

runner 不重命名字段、不删除字段、不改变节点之间传递的 state 结构。

### 5. 注意事项

- CLI 默认使用 SQLite checkpointer,因此可以跨进程续跑。请保持 `--thread-id` 和 `--checkpoint-path` 一致。
- `--print-result` 会脱敏 `api_key`、`analyze_api_key` 和所有 `*_key` 字段。
- 如果 Analyzer 在 standalone 模式下遇到原本要跳回 Starter 父图的配置补全流程,runner 会返回对应的 state update,例如 `exception=ConfigerError`、`next_to=config_node` 和 `configer.configer_error`,便于独立调试。
Loading