diff --git a/.gitignore b/.gitignore index b0bab0d..a527e34 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ __pycache__/ +.venv/ api_key.txt data/ output/ diff --git a/examples/scripts/run_analyzer_standalone.py b/examples/scripts/run_analyzer_standalone.py new file mode 100644 index 0000000..f8fac08 --- /dev/null +++ b/examples/scripts/run_analyzer_standalone.py @@ -0,0 +1,153 @@ +#!/usr/bin/env python3 +from __future__ import annotations + +import argparse +import json +import os +import sqlite3 +import sys +from typing import Any, Dict + +from langgraph.checkpoint.sqlite import SqliteSaver +from omegaconf import OmegaConf + + +_REPO_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..")) +if _REPO_ROOT not in sys.path: + sys.path.insert(0, _REPO_ROOT) + +from loopai.agents.Analyzer import ( + ANALYZER_NODE_NAMES, + get_analyzer_checkpoint_state, + run_analyzer_standalone, +) + + +_DEFAULT_CHECKPOINT_PATH = "outputs/analyzer_checkpoints.sqlite" + + +def _load_state(config_path: str) -> Dict[str, Any]: + if config_path.endswith(".json"): + with open(config_path, "r", encoding="utf-8") as f: + cfg = json.load(f) + return cfg.get("default_states", cfg) + + cfg = OmegaConf.load(config_path) + state_cfg = cfg.default_states if "default_states" in cfg else cfg + return OmegaConf.to_container(state_cfg, resolve=True) + + +def _redact_key_fields(value: Any) -> Any: + if isinstance(value, dict): + redacted = {} + for key, child in value.items(): + key_name = str(key).lower() + if key_name in {"api_key", "analyze_api_key"} or key_name.endswith("_key"): + redacted[key] = "***REDACTED***" + else: + redacted[key] = _redact_key_fields(child) + return redacted + if isinstance(value, list): + return [_redact_key_fields(item) for item in value] + return value + + +def _build_sqlite_checkpointer(checkpoint_path: str) -> SqliteSaver: + checkpoint_dir = os.path.dirname(checkpoint_path) + if checkpoint_dir: + os.makedirs(checkpoint_dir, exist_ok=True) + conn = sqlite3.connect(checkpoint_path, check_same_thread=False) + checkpointer = SqliteSaver(conn) + setup = getattr(checkpointer, "setup", None) + if setup is not None: + setup() + return checkpointer + + +def _parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser( + description="Run AnalyzerAgent directly with LangGraph checkpoint resume support." + ) + parser.add_argument( + "--config-path", + required=True, + help="Path to a YAML/JSON state config. If it contains default_states, that mapping is used as state.", + ) + parser.add_argument( + "--thread-id", + default="analyzer-default", + help="LangGraph checkpoint thread_id.", + ) + parser.add_argument( + "--checkpoint-path", + default=_DEFAULT_CHECKPOINT_PATH, + help="SQLite checkpoint file path for cross-process resume.", + ) + parser.add_argument( + "--resume", + action="store_true", + help="Resume with the same thread_id from the latest checkpoint.", + ) + parser.add_argument( + "--from-node", + default=None, + help=( + "Resume from a specific Analyzer node using checkpoint history when available. " + f"Available nodes: {', '.join(ANALYZER_NODE_NAMES)}. " + "Legacy names like analyze_result_node are also accepted." + ), + ) + parser.add_argument( + "--print-result", + action="store_true", + help="Print the final state/result as JSON.", + ) + return parser.parse_args() + + +def main() -> None: + args = _parse_args() + checkpointer = _build_sqlite_checkpointer(args.checkpoint_path) + state = None if args.resume else _load_state(args.config_path) + + if args.resume: + try: + checkpoint_state = get_analyzer_checkpoint_state( + args.thread_id, + checkpointer=checkpointer, + checkpoint_path=args.checkpoint_path, + ) + print( + f"[AnalyzerStandalone] resume checkpoint found: " + f"thread_id={args.thread_id}, " + f"current={checkpoint_state.get('current') or checkpoint_state.get('next_to') or 'unknown'}", + flush=True, + ) + except RuntimeError as exc: + print( + f"[AnalyzerStandalone] {exc}. " + f"Please run once without --resume using thread_id={args.thread_id} " + f"and checkpoint_path={args.checkpoint_path} first.", + flush=True, + ) + raise SystemExit(1) + + try: + result = run_analyzer_standalone( + state=state, + thread_id=args.thread_id, + resume=args.resume, + from_node=args.from_node, + checkpointer=checkpointer, + checkpoint_path=args.checkpoint_path, + ) + except RuntimeError as exc: + print(f"[AnalyzerStandalone] {exc}", flush=True) + raise SystemExit(1) + + if args.print_result: + print(json.dumps(_redact_key_fields(result), ensure_ascii=False, indent=2, default=str)) + + +if __name__ == "__main__": + main() diff --git a/loopai/agents/Analyzer/__init__.py b/loopai/agents/Analyzer/__init__.py index 040db30..c6bf3ed 100644 --- a/loopai/agents/Analyzer/__init__.py +++ b/loopai/agents/Analyzer/__init__.py @@ -1 +1,2 @@ -from .analyzer_agent import AnalyzerAgent \ No newline at end of file +from .analyzer_agent import AnalyzerAgent +from .standalone import ANALYZER_NODE_NAMES, get_analyzer_checkpoint_state, run_analyzer_standalone diff --git a/loopai/agents/Analyzer/analyzer_agent.py b/loopai/agents/Analyzer/analyzer_agent.py index 68ad5f4..c47fba3 100644 --- a/loopai/agents/Analyzer/analyzer_agent.py +++ b/loopai/agents/Analyzer/analyzer_agent.py @@ -125,7 +125,7 @@ def finish_node(state: LoopAIState, runtime: Runtime[RuntimeContext]): return state return finish_node - def init_graph(self, **kwargs): + def init_graph(self, entry_point: Optional[str] = None, **kwargs): builder = StateGraph(LoopAIState) builder.add_node("check_required_fields", self.get_check_required_fields_node()) @@ -151,7 +151,7 @@ def init_graph(self, **kwargs): builder.add_edge("metric_score", "analyze_metric_report") builder.add_edge("analyze_metric_report", "finish") - builder.set_entry_point("check_required_fields") + builder.set_entry_point(entry_point or "check_required_fields") builder.set_finish_point("finish") self.graph = builder.compile( @@ -168,4 +168,4 @@ def __call__(self, **kwargs): kwargs: keyword arguments to pass to init_graph """ self.init_graph(**kwargs) - return self.graph \ No newline at end of file + return self.graph diff --git a/loopai/agents/Analyzer/readme.md b/loopai/agents/Analyzer/readme.md index 918e82f..99b6dcc 100644 --- a/loopai/agents/Analyzer/readme.md +++ b/loopai/agents/Analyzer/readme.md @@ -580,3 +580,131 @@ Analyzer 与 Judger 配合后,可以形成完整的: ``` 闭环流程。 + +--- + +## 十、独立运行与断点续跑 + +Analyzer 现在支持不经过 Starter 独立运行。独立入口只封装运行控制,不改变 Analyzer 原有业务节点、graph 拓扑、state 字段结构、输入格式或输出格式。 + +### 1. Python runner + +可以直接调用: + +```python +from loopai.agents import run_analyzer_standalone + +result = run_analyzer_standalone( + state={ + "task_id": "analyzer-demo", + "output_dir": "./outputs", + "eval": {}, + "analyzer": { + "analyze_task_type": "code", + "eval_result_path": "./outputs/result.jsonl", + "analyze_model_path": "gpt-4o-mini", + "analyze_base_url": "http://127.0.0.1:8000/v1", + "analyze_api_key": "EMPTY", + "analyze_temperature": 0.0, + "analyze_top_p": 0.95, + "analyze_batch_size": 20, + "analyze_max_concurrency": 5, + "analyze_chunk_size": 50, + "analyze_sampling_top_k": 5, + "output_brief": True, + "output_suggestion": True, + "quick_brief": True, + "quick_brief_limit": 10 + } + }, + thread_id="analyzer-demo-001", +) +``` + +runner 内部仍然使用: + +```python +AnalyzerAgent(checkpointer=checkpointer, store=store) +graph.invoke(...) +``` + +返回值为 Analyzer graph 的最终 state / result。 + +### 2. CLI 入口 + +新增命令行入口: + +```bash +python examples/scripts/run_analyzer_standalone.py \ + --config-path examples/config/starter.yaml \ + --thread-id analyzer-test-001 +``` + +CLI 支持参数: + +- `--config-path`:YAML / JSON 配置路径。如果配置中包含 `default_states`,则使用 `default_states` 作为输入 state。 +- `--thread-id`:LangGraph checkpoint 使用的 thread id。 +- `--checkpoint-path`:SQLite checkpoint 文件路径,默认 `outputs/analyzer_checkpoints.sqlite`。 +- `--resume`:使用相同 `thread_id` 从 checkpoint 恢复。 +- `--from-node`:从指定 Analyzer 节点继续运行。 +- `--print-result`:将最终 state / result 打印为 JSON。 + +### 3. 断点续跑 + +使用相同 `thread_id` 恢复最近 checkpoint: + +```bash +python examples/scripts/run_analyzer_standalone.py \ + --config-path examples/config/starter.yaml \ + --thread-id analyzer-test-001 \ + --checkpoint-path outputs/analyzer_checkpoints.sqlite \ + --resume +``` + +从指定节点继续运行: + +```bash +python examples/scripts/run_analyzer_standalone.py \ + --config-path examples/config/starter.yaml \ + --thread-id analyzer-test-001 \ + --checkpoint-path outputs/analyzer_checkpoints.sqlite \ + --resume \ + --from-node analyze_result_node +``` + +当前可用 Analyzer 节点名: + +```text +check_required_fields +route_eval +eval_model +metric_recommend +metric_score +analyze_metric_report +analyze_result +draw_conclusion +finish +``` + +`resume` 和 `from_node` 使用同一套恢复入口逻辑:先从 SQLite checkpoint 读取 snapshot,再根据 `snapshot.next`、`state["current"]` 或用户指定的 `--from-node` 决定恢复节点,并临时构建以该节点为 entry point 的 Analyzer graph。这样中断在 `draw_conclusion_node` 时,续跑只会执行 `draw_conclusion_node -> finish_node`,不会重新执行 `eval_model_node` 和 `analyze_result_node`。 + +### 4. state 兼容性 + +独立运行入口保持现有 state 格式不变,例如: + +```python +{ + "task_id": "...", + "output_dir": "...", + "eval": {...}, + "analyzer": {...} +} +``` + +runner 不重命名字段、不删除字段、不改变节点之间传递的 state 结构。 + +### 5. 注意事项 + +- CLI 默认使用 SQLite checkpointer,因此可以跨进程续跑。请保持 `--thread-id` 和 `--checkpoint-path` 一致。 +- `--print-result` 会脱敏 `api_key`、`analyze_api_key` 和所有 `*_key` 字段。 +- 如果 Analyzer 在 standalone 模式下遇到原本要跳回 Starter 父图的配置补全流程,runner 会返回对应的 state update,例如 `exception=ConfigerError`、`next_to=config_node` 和 `configer.configer_error`,便于独立调试。 diff --git a/loopai/agents/Analyzer/standalone.py b/loopai/agents/Analyzer/standalone.py new file mode 100644 index 0000000..d8c32f1 --- /dev/null +++ b/loopai/agents/Analyzer/standalone.py @@ -0,0 +1,164 @@ +from __future__ import annotations + +from typing import Any, Dict, Iterable, Optional + +from langgraph.errors import ParentCommand + +from loopai.memory import checkpointer as default_checkpointer +from loopai.memory import store as default_store +from .analyzer_agent import AnalyzerAgent + + +ANALYZER_NODE_NAMES = ( + "check_required_fields", + "route_eval", + "eval_model", + "metric_recommend", + "metric_score", + "analyze_metric_report", + "analyze_result", + "draw_conclusion", + "finish", +) + + +_NODE_ALIASES = { + "check_required_fields_node": "check_required_fields", + "route_eval_node": "route_eval", + "eval_model_node": "eval_model", + "metric_recommend_node": "metric_recommend", + "metric_score_node": "metric_score", + "analyze_metric_report_node": "analyze_metric_report", + "analyze_result_node": "analyze_result", + "draw_conclusion_node": "draw_conclusion", + "finish_node": "finish", +} + + +def _build_config(thread_id: str) -> Dict[str, Dict[str, str]]: + return {"configurable": {"thread_id": thread_id}} + + +def _state_is_finished(state: Dict[str, Any]) -> bool: + current = _normalize_node_name(str(state.get("current", ""))) + return current == "finish" + + +def _normalize_node_name(node_name: str) -> str: + node_name = str(node_name or "") + if node_name in ANALYZER_NODE_NAMES: + return node_name + if node_name in _NODE_ALIASES: + return _NODE_ALIASES[node_name] + + for alias, graph_node in _NODE_ALIASES.items(): + if alias in node_name: + return graph_node + for graph_node in sorted(ANALYZER_NODE_NAMES, key=len, reverse=True): + if graph_node in node_name: + return graph_node + return node_name + + +def _resume_node_from_state(state: Dict[str, Any]) -> str: + resume_node = _normalize_node_name(state.get("current") or state.get("next_to") or "") + if resume_node not in ANALYZER_NODE_NAMES: + available = ", ".join(ANALYZER_NODE_NAMES) + raise RuntimeError( + f"Cannot resume Analyzer from current={state.get('current')!r}, " + f"next_to={state.get('next_to')!r}. Available nodes: {available}" + ) + return resume_node + + +def _build_graph(checkpointer: Any, store: Any, entry_point: Optional[str] = None) -> Any: + sg = AnalyzerAgent(checkpointer=checkpointer or default_checkpointer, store=store or default_store) + return sg(entry_point=entry_point) + + +def _checkpoint_values( + graph: Any, + config: Dict[str, Any], + thread_id: str, + checkpoint_path: Optional[str] = None, +) -> Dict[str, Any]: + snapshot = graph.get_state(config) + values = getattr(snapshot, "values", None) + if not values: + if checkpoint_path: + raise RuntimeError(f"No checkpoint found for thread_id={thread_id} in checkpoint_path={checkpoint_path}") + raise RuntimeError(f"No checkpoint found for thread_id={thread_id}") + next_nodes = tuple(node for node in _snapshot_next(snapshot) if node in ANALYZER_NODE_NAMES) + if next_nodes: + values = values.copy() + values["current"] = next_nodes[0] + values["next_to"] = next_nodes[0] + return values + + +def _snapshot_next(snapshot: Any) -> Iterable[str]: + next_nodes = getattr(snapshot, "next", None) + return next_nodes or () + + +def get_analyzer_checkpoint_state( + thread_id: str = "analyzer-default", + checkpointer: Any = None, + checkpoint_path: Optional[str] = None, +) -> Dict[str, Any]: + """Return the latest Analyzer checkpoint values for a thread_id.""" + graph = _build_graph(checkpointer, default_store) + config = _build_config(thread_id) + return _checkpoint_values(graph, config, thread_id, checkpoint_path) + + +def _invoke_graph(graph: Any, input_state: Any, config: Dict[str, Any]) -> Dict[str, Any]: + try: + return graph.invoke(input_state, config=config) + except ParentCommand as exc: + command = exc.args[0] if exc.args else None + update = getattr(command, "update", None) + if isinstance(update, dict): + return update + raise + + +def run_analyzer_standalone( + state: Optional[Dict[str, Any]], + thread_id: str = "analyzer-default", + resume: bool = False, + from_node: Optional[str] = None, + checkpointer: Any = None, + store: Any = None, + checkpoint_path: Optional[str] = None, +) -> Dict[str, Any]: + """Run AnalyzerAgent directly with LangGraph checkpoint controls. + + The input state is passed through unchanged. This wrapper only controls + thread_id, checkpoint resume, and optional node-level restart behavior. + """ + + checkpointer = checkpointer or default_checkpointer + store = store or default_store + graph = _build_graph(checkpointer, store) + config = _build_config(thread_id) + + if resume or from_node is not None: + checkpoint_state = _checkpoint_values(graph, config, thread_id, checkpoint_path) + resume_node = _normalize_node_name(from_node) if from_node is not None else _resume_node_from_state(checkpoint_state) + if resume_node not in ANALYZER_NODE_NAMES: + available = ", ".join(ANALYZER_NODE_NAMES) + raise ValueError(f"Unknown Analyzer node: {resume_node}. Available nodes: {available}") + + checkpoint_state = checkpoint_state.copy() + checkpoint_state["current"] = resume_node + checkpoint_state["next_to"] = resume_node + if _state_is_finished(checkpoint_state): + return checkpoint_state + resume_graph = _build_graph(checkpointer, store, entry_point=resume_node) + return _invoke_graph(resume_graph, checkpoint_state, config) + + if state is None: + raise ValueError("state is required when resume is false.") + + return _invoke_graph(graph, state, config) diff --git a/loopai/agents/__init__.py b/loopai/agents/__init__.py index b9e7cdb..f4aed7b 100644 --- a/loopai/agents/__init__.py +++ b/loopai/agents/__init__.py @@ -1,6 +1,34 @@ from .BaseAgent import BaseAgent -from .Starter.starter_agent import StarterAgent -from .Judger.judger_agent import JudgerAgent -from .Analyzer.analyzer_agent import AnalyzerAgent -from .Obtainer.obtainer_agent import ObtainerAgent -from .Trainer.trainer_agent import TrainerAgent + + +__all__ = [ + "BaseAgent", + "StarterAgent", + "JudgerAgent", + "AnalyzerAgent", + "run_analyzer_standalone", + "ObtainerAgent", + "TrainerAgent", +] + + +def __getattr__(name): + if name == "StarterAgent": + from .Starter.starter_agent import StarterAgent + return StarterAgent + if name == "JudgerAgent": + from .Judger.judger_agent import JudgerAgent + return JudgerAgent + if name == "AnalyzerAgent": + from .Analyzer.analyzer_agent import AnalyzerAgent + return AnalyzerAgent + if name == "run_analyzer_standalone": + from .Analyzer.standalone import run_analyzer_standalone + return run_analyzer_standalone + if name == "ObtainerAgent": + from .Obtainer.obtainer_agent import ObtainerAgent + return ObtainerAgent + if name == "TrainerAgent": + from .Trainer.trainer_agent import TrainerAgent + return TrainerAgent + raise AttributeError(f"module {__name__!r} has no attribute {name!r}") diff --git a/setup.py b/setup.py index 23bb18d..a497725 100644 --- a/setup.py +++ b/setup.py @@ -6,6 +6,7 @@ packages=find_packages(), install_requires=[ "langgraph>=0.6.7", + "langgraph-checkpoint-sqlite>=3.0.0", "colorlog>=6.10.0", "rich>=13.0.0", "langchain>=0.3.27",