Add v2 Temporal durable workflows#208
Conversation
|
✅ Staging Deployment Report
🟢 Staging is live and healthy! Test your changes at the staging URL above. Ready to ship? Comment |
🔍 API Schema Diff---REPORT--- 🔄 Modified
Auto-generated by API Schema Diff workflow |
There was a problem hiding this comment.
Code Review
This pull request introduces Version 2 of the API routes, integrating them with Temporal workflows for durable execution. It adds a local Temporal service setup, registers new v2 routers for memory, jobs, and scanner operations, and implements the corresponding Temporal workflows, activities, and worker. The review feedback highlights several critical robustness and performance issues, including a timezone mismatch TypeError and a potential None float conversion in scanner.py, a performance bottleneck from connecting to the Temporal client on every request, missing error handling for branch tip retrieval, unhandled RPCErrors during workflow cancellation, and thread-safety concerns when running Playwright's sync API via asyncio.to_thread.
| async def get_temporal_client(): | ||
| try: | ||
| from temporalio.client import Client | ||
| except Exception as exc: # pragma: no cover - depends on optional SDK import | ||
| raise TemporalUnavailable( | ||
| "temporalio is not installed. Install project dependencies first." | ||
| ) from exc | ||
|
|
||
| return await Client.connect( | ||
| settings.temporal_address, | ||
| namespace=settings.temporal_namespace, | ||
| ) |
There was a problem hiding this comment.
Performance Bottleneck: Creating a new Temporal Client connection via Client.connect on every single API request (e.g., starting or cancelling a workflow) is highly inefficient. It introduces significant network latency, overhead, and risks port exhaustion under load.
The Temporal Client is thread-safe and designed to be shared as a singleton across the entire application. You should cache and reuse the client instance.
_temporal_client = None
async def get_temporal_client():
global _temporal_client
try:
from temporalio.client import Client
except Exception as exc: # pragma: no cover - depends on optional SDK import
raise TemporalUnavailable(
"temporalio is not installed. Install project dependencies first."
) from exc
if _temporal_client is None:
_temporal_client = await Client.connect(
settings.temporal_address,
namespace=settings.temporal_namespace,
)
return _temporal_client
|
| Filename | Overview |
|---|---|
| src/api/routes/v2/temporal_client.py | Adds singleton Temporal client with double-checked locking; cancel uses fragile string matching on exception messages instead of Temporal exception types. |
| src/api/routes/v2/workflows.py | Five workflow classes (memory ingest, batch ingest, scrape, scanner scan, scanner phase2); domain activities in MemoryIngestWorkflow run sequentially rather than in parallel. |
| src/api/routes/v2/scanner.py | start_scan_v2 fully wraps start_job_workflow in try/except and resets the scanner code-store on failure; scan_status_v2 uses org_id query param while start_scan_v2 exposes the field as org in responses. |
| src/jobs/durable.py | Adds CANCELLED to TERMINAL_STATUSES, $nin guards on all terminal-state writes, two-step mark_running using attempt_count/retry_count split, reserve_workflow_start, and reset_for_retry with clear_workflow flag. |
| src/api/routes/v2/secrets.py | Fernet encryption for GitHub PATs; _fernet() raises RuntimeError instead of falling back to JWT secret, removing the key-coupling concern from the previous review. |
| src/api/routes/v2/jobs.py | cancel_job now has status guard (queued/running only), try/except around cancel_job_workflow, and calls _mark_scanner_job_cancelled; retry_job wraps start_job_workflow in try/except with mark_failed on failure. |
| src/api/routes/v2/activities.py | All Temporal activities defined; scanner_scan_activity now calls _mark_scanner_scan_failed on PAT resolution failure before re-raising, fixing the stuck-running scanner record issue. |
| src/api/routes/v2/memory.py | _enqueue_and_start helper uses reserve_workflow_start to prevent duplicate workflow starts; WorkflowStartFailed returns the durable job handle in the 503 response so the caller can poll status. |
| tests/test_v2_review_fixes.py | New test file covering PAT round-trip, cancel state transitions, phase status derivation, and transient cancel error handling. |
Reviews (16): Last reviewed commit: "Return job handles on workflow start fai..." | Re-trigger Greptile
✅ Staging Deployment Report
🟢 Staging is live and healthy! Test your changes at the staging URL above. Ready to ship? Comment |
🔍 API Schema Diff---REPORT--- 🔄 Modified
Auto-generated by API Schema Diff workflow |
✅ Staging Deployment Report
🟢 Staging is live and healthy! Test your changes at the staging URL above. Ready to ship? Comment |
🔍 API Schema Diff---REPORT--- 🔄 Modified
Auto-generated by API Schema Diff workflow |
✅ Staging Deployment Report
🟢 Staging is live and healthy! Test your changes at the staging URL above. Ready to ship? Comment |
🔍 API Schema Diff---REPORT--- 🔄 Modified
Auto-generated by API Schema Diff workflow |
✅ Staging Deployment Report
🟢 Staging is live and healthy! Test your changes at the staging URL above. Ready to ship? Comment |
🔍 API Schema Diff---REPORT--- 🔄 Modified
Auto-generated by API Schema Diff workflow |
✅ Staging Deployment Report
🟢 Staging is live and healthy! Test your changes at the staging URL above. Ready to ship? Comment |
🔍 API Schema Diff---REPORT--- 🔄 Modified
Auto-generated by API Schema Diff workflow |
✅ Staging Deployment Report
🟢 Staging is live and healthy! Test your changes at the staging URL above. Ready to ship? Comment |
🔍 API Schema Diff---REPORT--- 🔄 Modified
Auto-generated by API Schema Diff workflow |
✅ Staging Deployment Report
🟢 Staging is live and healthy! Test your changes at the staging URL above. Ready to ship? Comment |
🔍 API Schema Diff---REPORT--- 🔄 Modified
Auto-generated by API Schema Diff workflow |
Summary
src/api/routes/v2/while leaving existing v1 memory/scanner route contracts intact.Tests
python -B -c "import ast, pathlib; paths=[...]; [ast.parse(pathlib.Path(p).read_text(encoding='utf-8'), filename=p) for p in paths]; print('ast ok')"ENVIRONMENT=test .venv\\Scripts\\python.exe -m pytest tests\\api\\test_memory_versioning.py tests\\test_durable_jobs.py-> 10 passedENVIRONMENT=test .venv\\Scripts\\python.exe -c "from src.api.app import create_app; app=create_app(); ..."confirmed/v2/memory,/v2/scanner, and/v2/jobsroutes registerNotes
temporalio>=1.10.0is declared, but the local venv does not currently have the SDK installed, so full worker smoke testing still needs a dependency sync plus Temporal server.