From 186ece6df8d4b7f9763da7adf85e91527927cf5c Mon Sep 17 00:00:00 2001 From: Vincent VIALLET Date: Thu, 29 Jan 2026 11:27:03 +0800 Subject: [PATCH 1/9] Get team_id in containers label for reconciliation / orphan lookup. --- app/workers/tasks/deployment.py | 1 + 1 file changed, 1 insertion(+) diff --git a/app/workers/tasks/deployment.py b/app/workers/tasks/deployment.py index 8c443cbb..31dfd518 100644 --- a/app/workers/tasks/deployment.py +++ b/app/workers/tasks/deployment.py @@ -154,6 +154,7 @@ async def start_deployment(ctx, deployment_id: str): "traefik.docker.network": "devpush_runner", "devpush.deployment_id": deployment.id, "devpush.project_id": deployment.project_id, + "devpush.team_id": deployment.project.team_id, "devpush.environment_id": deployment.environment_id, "devpush.branch": deployment.branch, } From b98dd3f5fb16452ad1864e9a39730f3360bb32e1 Mon Sep 17 00:00:00 2001 From: Vincent VIALLET Date: Thu, 29 Jan 2026 12:00:31 +0800 Subject: [PATCH 2/9] Add restart policy to deployments Rebase: fix formating on README for new ENV VARS --- .env.dev.example | 2 ++ .env.example | 2 ++ README.md | 2 ++ app/config.py | 2 ++ app/workers/tasks/deployment.py | 11 +++++++++++ 5 files changed, 19 insertions(+) diff --git a/.env.dev.example b/.env.dev.example index aabfa9ec..217bf0eb 100644 --- a/.env.dev.example +++ b/.env.dev.example @@ -45,6 +45,8 @@ RESEND_API_KEY= # Optional if SMTP is configured (see below) # JOB_TIMEOUT_SECONDS=320 # JOB_MAX_TRIES=3 # DEPLOYMENT_TIMEOUT_SECONDS=300 +# DEPLOYMENT_RESTART_POLICY=on-failure +# DEPLOYMENT_RESTART_MAX_RETRIES=5 # MAGIC_LINK_TTL_SECONDS=900 # AUTH_TOKEN_TTL_DAYS=30 # AUTH_TOKEN_REFRESH_THRESHOLD_DAYS=1 diff --git a/.env.example b/.env.example index fd0bfb07..30727507 100644 --- a/.env.example +++ b/.env.example @@ -51,6 +51,8 @@ RESEND_API_KEY= # Optional if SMTP is configured (see below) # JOB_TIMEOUT_SECONDS=320 # JOB_MAX_TRIES=3 # DEPLOYMENT_TIMEOUT_SECONDS=300 +# DEPLOYMENT_RESTART_POLICY=on-failure +# DEPLOYMENT_RESTART_MAX_RETRIES=5 # MAGIC_LINK_TTL_SECONDS=900 # AUTH_TOKEN_TTL_DAYS=30 # AUTH_TOKEN_REFRESH_THRESHOLD_DAYS=1 diff --git a/README.md b/README.md index 389a37cf..73cdad0d 100644 --- a/README.md +++ b/README.md @@ -151,6 +151,8 @@ See [ARCHITECTURE.md](ARCHITECTURE.md) for codebase structure. | `JOB_TIMEOUT_SECONDS` | Job timeout (seconds). Default: `320`. | | `JOB_MAX_TRIES` | Max retries per background job. Default: `3`. | | `DEPLOYMENT_TIMEOUT_SECONDS` | Deployment timeout (seconds). Default: `300`. | +| `DEPLOYMENT_RESTART_POLICY` | Docker restart policy for deployment containers. Default: `on-failure`. | +| `DEPLOYMENT_RESTART_MAX_RETRIES` | Max retries when restart policy is `on-failure`. Default: `5`. | | `CONTAINER_DELETE_GRACE_SECONDS` | Wait before deleting containers after stop/failure to let logs ship. Default: `3`. | | `LOG_STREAM_GRACE_SECONDS` | Grace window for deployment log streaming (when to connect/close SSE around terminal states). Default: `5`. | | `LOG_LEVEL` | Logging level. Default: `WARNING`. | diff --git a/app/config.py b/app/config.py index c6d3440a..a8d59b7a 100644 --- a/app/config.py +++ b/app/config.py @@ -57,6 +57,8 @@ class Settings(BaseSettings): job_timeout_seconds: int = 320 job_completion_wait_seconds: int = 300 deployment_timeout_seconds: int = 300 + deployment_restart_policy: str = "on-failure" + deployment_restart_max_retries: int = 5 container_delete_grace_seconds: int = 15 log_stream_grace_seconds: int = 5 service_uid: int = 1000 diff --git a/app/workers/tasks/deployment.py b/app/workers/tasks/deployment.py index 31dfd518..8afac60e 100644 --- a/app/workers/tasks/deployment.py +++ b/app/workers/tasks/deployment.py @@ -292,6 +292,17 @@ async def start_deployment(ctx, deployment_id: str): ), **({"Binds": mounts} if mounts else {}), "SecurityOpt": ["no-new-privileges:true"], + "RestartPolicy": { + "Name": settings.deployment_restart_policy, + **( + { + "MaximumRetryCount": settings.deployment_restart_max_retries, + } + if settings.deployment_restart_policy + == "on-failure" + else {} + ), + }, "LogConfig": { "Type": "json-file", "Config": {"max-size": "10m", "max-file": "5"}, From d1812eee52db7e1a1f6702ddb5599a6ed01d99fd Mon Sep 17 00:00:00 2001 From: Vincent VIALLET Date: Wed, 4 Feb 2026 10:42:04 +0800 Subject: [PATCH 3/9] Add observed state + update model accordingly --- .../6b0c7d2a9e1f_deployment_observed_state.py | 75 +++++++++++++++++++ app/models.py | 19 +++++ 2 files changed, 94 insertions(+) create mode 100644 app/migrations/versions/6b0c7d2a9e1f_deployment_observed_state.py diff --git a/app/migrations/versions/6b0c7d2a9e1f_deployment_observed_state.py b/app/migrations/versions/6b0c7d2a9e1f_deployment_observed_state.py new file mode 100644 index 00000000..efc3af63 --- /dev/null +++ b/app/migrations/versions/6b0c7d2a9e1f_deployment_observed_state.py @@ -0,0 +1,75 @@ +"""Deployment observed state + +Revision ID: 6b0c7d2a9e1f +Revises: f45484bf96b0 +Create Date: 2026-01-29 00:00:00.000000 + +""" +from typing import Sequence, Union + +import sqlalchemy as sa +from alembic import op + + +# revision identifiers, used by Alembic. +revision: str = "6b0c7d2a9e1f" +down_revision: Union[str, Sequence[str], None] = "f45484bf96b0" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +observed_status_enum = sa.Enum( + "running", + "exited", + "dead", + "paused", + "not_found", + name="deployment_observed_status", +) + + +def upgrade() -> None: + """Upgrade schema.""" + observed_status_enum.create(op.get_bind(), checkfirst=True) + + op.add_column( + "deployment", + sa.Column("observed_status", observed_status_enum, nullable=True), + ) + op.add_column( + "deployment", + sa.Column("observed_exit_code", sa.Integer(), nullable=True), + ) + op.add_column( + "deployment", + sa.Column("observed_at", sa.DateTime(), nullable=True), + ) + op.add_column( + "deployment", + sa.Column("observed_reason", sa.Text(), nullable=True), + ) + op.add_column( + "deployment", + sa.Column("observed_last_seen_at", sa.DateTime(), nullable=True), + ) + op.add_column( + "deployment", + sa.Column( + "observed_missing_count", + sa.Integer(), + nullable=False, + server_default="0", + ), + ) + + +def downgrade() -> None: + """Downgrade schema.""" + op.drop_column("deployment", "observed_missing_count") + op.drop_column("deployment", "observed_last_seen_at") + op.drop_column("deployment", "observed_reason") + op.drop_column("deployment", "observed_at") + op.drop_column("deployment", "observed_exit_code") + op.drop_column("deployment", "observed_status") + + observed_status_enum.drop(op.get_bind(), checkfirst=True) diff --git a/app/models.py b/app/models.py index 3c93bbf2..ef8a7878 100644 --- a/app/models.py +++ b/app/models.py @@ -3,6 +3,7 @@ BigInteger, Boolean, Enum as SQLAEnum, + Integer, JSON, String, Text, @@ -756,6 +757,24 @@ class Deployment(Base): SQLAEnum("running", "stopped", "removed", name="deployment_container_status"), nullable=True, ) + observed_status: Mapped[str | None] = mapped_column( + SQLAEnum( + "running", + "exited", + "dead", + "paused", + "not_found", + name="deployment_observed_status", + ), + nullable=True, + ) + observed_exit_code: Mapped[int | None] = mapped_column(Integer, nullable=True) + observed_at: Mapped[datetime | None] = mapped_column(nullable=True) + observed_reason: Mapped[str | None] = mapped_column(Text, nullable=True) + observed_last_seen_at: Mapped[datetime | None] = mapped_column(nullable=True) + observed_missing_count: Mapped[int] = mapped_column( + Integer, nullable=False, default=0 + ) status: Mapped[str] = mapped_column( SQLAEnum( "prepare", From cf65eb5b3c5fd35cb4070e261105b13762ca69d7 Mon Sep 17 00:00:00 2001 From: Vincent VIALLET Date: Wed, 4 Feb 2026 14:54:23 +0800 Subject: [PATCH 4/9] Add reconcile script and wrapper to sync the DB status whenever needed - allowing to track drift --- app/services/reconcile.py | 145 ++++++++++++++++++++++++++++++++++++++ scripts/reconcile.sh | 92 ++++++++++++++++++++++++ 2 files changed, 237 insertions(+) create mode 100644 app/services/reconcile.py create mode 100755 scripts/reconcile.sh diff --git a/app/services/reconcile.py b/app/services/reconcile.py new file mode 100644 index 00000000..4028a705 --- /dev/null +++ b/app/services/reconcile.py @@ -0,0 +1,145 @@ +import logging +from datetime import datetime, timezone + +import aiodocker +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from models import Deployment + +logger = logging.getLogger(__name__) + +# Keep in sync with models.Deployment.observed_status enum values. +OBSERVED_STATUSES = {"running", "exited", "dead", "paused", "not_found"} + + +async def reconcile_deployments( + db: AsyncSession, + docker_client: aiodocker.Docker, + deployment_ids: list[str] | None = None, +) -> dict[str, int]: + now = datetime.now(timezone.utc) + counts = {"processed": 0, "observed": 0, "missing": 0} + + query = select(Deployment) + if deployment_ids: + query = query.where(Deployment.id.in_(deployment_ids)) + else: + query = query.where(Deployment.container_id.isnot(None)) + + result = await db.execute(query) + deployments = result.scalars().all() + if not deployments: + logger.info("Reconcile: no deployments found to process.") + return counts + + try: + containers = await docker_client.containers.list( + all=True, filters={"label": ["devpush.deployment_id"]} + ) + except aiodocker.DockerError as error: + logger.error("Failed to list devpush containers: %s", error) + return counts + + container_by_deployment: dict[str, str] = {} + container_by_id: dict[str, str] = {} + + for info in containers: + container_id = None + labels = {} + if hasattr(info, "show"): + details = await info.show() + container_id = details.get("Id") + labels = ( + details.get("Config", {}).get("Labels") + or details.get("Labels") + or {} + ) + else: + container_id = info.get("Id") + labels = info.get("Labels") or {} + + deployment_id = labels.get("devpush.deployment_id") + if deployment_id and container_id: + container_by_deployment[deployment_id] = container_id + if container_id: + container_by_id[container_id] = container_id + + for deployment in deployments: + counts["processed"] += 1 + container_id = None + if deployment.container_id and deployment.container_id in container_by_id: + container_id = deployment.container_id + else: + container_id = container_by_deployment.get(deployment.id) + + if not container_id: + deployment.observed_status = "not_found" + deployment.observed_at = now.replace(tzinfo=None) + deployment.observed_missing_count = (deployment.observed_missing_count or 0) + 1 + counts["missing"] += 1 + logger.info( + "Reconcile: deployment %s container not found (missing_count=%s).", + deployment.id, + deployment.observed_missing_count, + ) + continue + + try: + container = await docker_client.containers.get(container_id) + details = await container.show() + except aiodocker.DockerError as error: + if getattr(error, "status", None) == 404: + deployment.observed_status = "not_found" + deployment.observed_at = now.replace(tzinfo=None) + deployment.observed_missing_count = ( + deployment.observed_missing_count or 0 + ) + 1 + counts["missing"] += 1 + logger.info( + "Reconcile: deployment %s container 404 (missing_count=%s).", + deployment.id, + deployment.observed_missing_count, + ) + continue + logger.warning("Failed to inspect container %s: %s", container_id, error) + deployment.observed_status = "not_found" + deployment.observed_at = now.replace(tzinfo=None) + deployment.observed_missing_count = (deployment.observed_missing_count or 0) + 1 + counts["missing"] += 1 + logger.info( + "Reconcile: deployment %s container inspect failed (missing_count=%s).", + deployment.id, + deployment.observed_missing_count, + ) + continue + + state = details.get("State", {}) + status = state.get("Status") + if status not in OBSERVED_STATUSES: + logger.warning( + "Unknown container status '%s' for deployment %s", + status, + deployment.id, + ) + status = "not_found" + + deployment.observed_status = status + deployment.observed_exit_code = state.get("ExitCode") + deployment.observed_at = now.replace(tzinfo=None) + deployment.observed_last_seen_at = now.replace(tzinfo=None) + deployment.observed_missing_count = 0 + counts["observed"] += 1 + logger.info( + "Reconcile: deployment %s observed_status=%s exit_code=%s.", + deployment.id, + deployment.observed_status, + deployment.observed_exit_code, + ) + + await db.commit() + logger.info( + "Reconcile: updated observed state for %s deployment(s).", + len(deployments), + ) + return counts diff --git a/scripts/reconcile.sh b/scripts/reconcile.sh new file mode 100755 index 00000000..fc7689f9 --- /dev/null +++ b/scripts/reconcile.sh @@ -0,0 +1,92 @@ +#!/usr/bin/env bash +set -Eeuo pipefail +IFS=$'\n\t' + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +source "$SCRIPT_DIR/lib.sh" + +init_script_logging "reconcile" + +usage(){ + cat <] [-h|--help] + +Run a one-off deployment reconciliation (observed state only). + + --deployment Reconcile a single deployment (optional) + -h, --help Show this help +USG + exit 0 +} + +# Parse CLI flags +deployment_id="" +while [[ $# -gt 0 ]]; do + case "$1" in + --deployment) deployment_id="$2"; shift 2 ;; + -h|--help) usage ;; + *) err "Unknown option: $1"; usage ;; + esac +done + +cd "$APP_DIR" || { err "App dir not found: $APP_DIR"; exit 1; } + +docker info >/dev/null 2>&1 || { err "Docker not accessible. Run with sudo or add your user to the docker group."; exit 1; } + +# Build compose args +set_compose_base + +# Run reconcile +printf '\n' +if [[ -n "$deployment_id" ]]; then + prev_verbose="${VERBOSE:-0}" + VERBOSE=1 run_cmd "Reconciling deployment $deployment_id" "${COMPOSE_BASE[@]}" exec -T app \ + uv run python - < None: + settings = get_settings() + async with AsyncSessionLocal() as db: + async with aiodocker.Docker(url=settings.docker_host) as docker_client: + counts = await reconcile_deployments( + db, + docker_client, + deployment_ids=["$deployment_id"], + ) + print(f"processed={counts['processed']} observed={counts['observed']} missing={counts['missing']}") + + +asyncio.run(main()) +PY + VERBOSE="$prev_verbose" +else + prev_verbose="${VERBOSE:-0}" + VERBOSE=1 run_cmd "Reconciling all deployments" "${COMPOSE_BASE[@]}" exec -T app \ + uv run python - < None: + settings = get_settings() + async with AsyncSessionLocal() as db: + async with aiodocker.Docker(url=settings.docker_host) as docker_client: + counts = await reconcile_deployments(db, docker_client) + print(f"processed={counts['processed']} observed={counts['observed']} missing={counts['missing']}") + + +asyncio.run(main()) +PY + VERBOSE="$prev_verbose" +fi + +printf '\n' +printf "${GRN}Reconcile completed. ✔${NC}\n" From 44eb8e12d172f1f7fa41e2e30885d925e42728eb Mon Sep 17 00:00:00 2001 From: Vincent VIALLET Date: Wed, 4 Feb 2026 16:16:26 +0800 Subject: [PATCH 5/9] Add cron task to worker, allowing to reconciliate efficiently containers and deployments --- .env.dev.example | 1 + .env.example | 1 + app/config.py | 1 + app/services/reconcile.py | 12 ++++++++++-- app/workers/jobs.py | 20 ++++++++++++++++++++ app/workers/tasks/reconcile.py | 22 ++++++++++++++++++++++ scripts/reconcile.sh | 11 +++++++++-- 7 files changed, 64 insertions(+), 4 deletions(-) create mode 100644 app/workers/tasks/reconcile.py diff --git a/.env.dev.example b/.env.dev.example index 217bf0eb..f26c1e8d 100644 --- a/.env.dev.example +++ b/.env.dev.example @@ -45,6 +45,7 @@ RESEND_API_KEY= # Optional if SMTP is configured (see below) # JOB_TIMEOUT_SECONDS=320 # JOB_MAX_TRIES=3 # DEPLOYMENT_TIMEOUT_SECONDS=300 +# RECONCILE_INTERVAL_SECONDS=60 # DEPLOYMENT_RESTART_POLICY=on-failure # DEPLOYMENT_RESTART_MAX_RETRIES=5 # MAGIC_LINK_TTL_SECONDS=900 diff --git a/.env.example b/.env.example index 30727507..42f33c34 100644 --- a/.env.example +++ b/.env.example @@ -51,6 +51,7 @@ RESEND_API_KEY= # Optional if SMTP is configured (see below) # JOB_TIMEOUT_SECONDS=320 # JOB_MAX_TRIES=3 # DEPLOYMENT_TIMEOUT_SECONDS=300 +# RECONCILE_INTERVAL_SECONDS=60 # DEPLOYMENT_RESTART_POLICY=on-failure # DEPLOYMENT_RESTART_MAX_RETRIES=5 # MAGIC_LINK_TTL_SECONDS=900 diff --git a/app/config.py b/app/config.py index a8d59b7a..85820698 100644 --- a/app/config.py +++ b/app/config.py @@ -57,6 +57,7 @@ class Settings(BaseSettings): job_timeout_seconds: int = 320 job_completion_wait_seconds: int = 300 deployment_timeout_seconds: int = 300 + reconcile_interval_seconds: int = 60 deployment_restart_policy: str = "on-failure" deployment_restart_max_retries: int = 5 container_delete_grace_seconds: int = 15 diff --git a/app/services/reconcile.py b/app/services/reconcile.py index 4028a705..ad259650 100644 --- a/app/services/reconcile.py +++ b/app/services/reconcile.py @@ -2,7 +2,7 @@ from datetime import datetime, timezone import aiodocker -from sqlalchemy import select +from sqlalchemy import select, or_ from sqlalchemy.ext.asyncio import AsyncSession from models import Deployment @@ -17,6 +17,7 @@ async def reconcile_deployments( db: AsyncSession, docker_client: aiodocker.Docker, deployment_ids: list[str] | None = None, + full_scan: bool = False, ) -> dict[str, int]: now = datetime.now(timezone.utc) counts = {"processed": 0, "observed": 0, "missing": 0} @@ -24,8 +25,15 @@ async def reconcile_deployments( query = select(Deployment) if deployment_ids: query = query.where(Deployment.id.in_(deployment_ids)) - else: + elif full_scan: query = query.where(Deployment.container_id.isnot(None)) + else: + query = query.where( + or_( + Deployment.container_status.in_(["running", "stopped"]), + Deployment.observed_status == "running", + ) + ) result = await db.execute(query) deployments = result.scalars().all() diff --git a/app/workers/jobs.py b/app/workers/jobs.py index 3c937cf2..5722c725 100644 --- a/app/workers/jobs.py +++ b/app/workers/jobs.py @@ -1,4 +1,5 @@ import logging +from arq import cron from arq.connections import RedisSettings from workers.tasks.deployment import ( start_deployment, @@ -8,6 +9,7 @@ cleanup_inactive_containers, ) from workers.tasks.project import delete_project +from workers.tasks.reconcile import reconcile_deployments_tick from workers.tasks.storage import provision_storage, deprovision_storage, reset_storage from workers.tasks.team import delete_team from workers.tasks.user import delete_user @@ -25,6 +27,22 @@ settings = get_settings() +def _build_reconcile_cron() -> list: + interval = max(1, settings.reconcile_interval_seconds) + if interval < 60 and 60 % interval == 0: + seconds = set(range(0, 60, interval)) + return [cron(reconcile_deployments_tick, second=seconds)] + minutes = max(1, interval // 60) + if interval % 60 != 0: + logger.warning( + "RECONCILE_INTERVAL_SECONDS=%s is not a clean minute; rounding to %s minute(s).", + interval, + minutes, + ) + minute_values = set(range(0, 60, minutes)) + return [cron(reconcile_deployments_tick, minute=minute_values, second=0)] + + class WorkerSettings: functions = [ start_deployment, @@ -42,7 +60,9 @@ class WorkerSettings: pull_all_runner_images, clear_runner_image, clear_all_runner_images, + reconcile_deployments_tick, ] + cron_jobs = _build_reconcile_cron() redis_settings = RedisSettings.from_dsn(settings.redis_url) max_jobs = 8 job_timeout_seconds = settings.job_timeout_seconds diff --git a/app/workers/tasks/reconcile.py b/app/workers/tasks/reconcile.py new file mode 100644 index 00000000..b86ba8f2 --- /dev/null +++ b/app/workers/tasks/reconcile.py @@ -0,0 +1,22 @@ +import logging + +import aiodocker + +from config import get_settings +from db import AsyncSessionLocal +from services.reconcile import reconcile_deployments + +logger = logging.getLogger(__name__) + + +async def reconcile_deployments_tick(ctx) -> None: + settings = get_settings() + async with AsyncSessionLocal() as db: + async with aiodocker.Docker(url=settings.docker_host) as docker_client: + counts = await reconcile_deployments(db, docker_client) + logger.info( + "Reconcile tick completed processed=%s observed=%s missing=%s.", + counts["processed"], + counts["observed"], + counts["missing"], + ) diff --git a/scripts/reconcile.sh b/scripts/reconcile.sh index fc7689f9..957f598f 100755 --- a/scripts/reconcile.sh +++ b/scripts/reconcile.sh @@ -9,11 +9,12 @@ init_script_logging "reconcile" usage(){ cat <] [-h|--help] +Usage: reconcile.sh [--deployment ] [--full] [-h|--help] Run a one-off deployment reconciliation (observed state only). --deployment Reconcile a single deployment (optional) + --full Reconcile all deployments (default: running/stopped) -h, --help Show this help USG exit 0 @@ -21,9 +22,11 @@ USG # Parse CLI flags deployment_id="" +full=0 while [[ $# -gt 0 ]]; do case "$1" in --deployment) deployment_id="$2"; shift 2 ;; + --full) full=1; shift ;; -h|--help) usage ;; *) err "Unknown option: $1"; usage ;; esac @@ -79,7 +82,11 @@ async def main() -> None: settings = get_settings() async with AsyncSessionLocal() as db: async with aiodocker.Docker(url=settings.docker_host) as docker_client: - counts = await reconcile_deployments(db, docker_client) + counts = await reconcile_deployments( + db, + docker_client, + full_scan=bool($full), + ) print(f"processed={counts['processed']} observed={counts['observed']} missing={counts['missing']}") From f3dac27eef77b2235962fc6e68112d6cd5ad491a Mon Sep 17 00:00:00 2001 From: Vincent VIALLET Date: Wed, 4 Feb 2026 18:56:35 +0800 Subject: [PATCH 6/9] Add API end point to trigger reconcile, send event to redis on status change, add computed_status logic --- app/models.py | 25 ++++++++++++++++ app/routers/project.py | 42 +++++++++++++++++++++++++++ app/services/reconcile.py | 53 ++++++++++++++++++++++++++++++++++ app/workers/tasks/reconcile.py | 7 ++++- 4 files changed, 126 insertions(+), 1 deletion(-) diff --git a/app/models.py b/app/models.py index ef8a7878..ab03fe1e 100644 --- a/app/models.py +++ b/app/models.py @@ -824,6 +824,31 @@ def __init__(self, *args, project: "Project", environment_id: str, **kwargs): environment = project.get_environment_by_id(environment_id) self.env_vars = project.get_env_vars(environment["slug"]) if environment else [] + @property + def computed_status(self) -> str: + observed = self.observed_status + expected = self.container_status + if expected in {"stopped", "removed"}: + if observed == "running": + return "orphaned" + return expected + + if expected == "running": + if observed == "not_found": + return "missing" + if observed == "exited": + return "stopped" if self.observed_exit_code == 0 else "crashed" + if observed in {"paused", "dead"}: + return observed + return "running" + + if observed == "exited": + return "stopped" if self.observed_exit_code == 0 else "crashed" + if observed in {"running", "paused", "dead", "not_found"}: + return observed + + return self.status + @property def environment(self) -> dict | None: """Get environment configuration""" diff --git a/app/routers/project.py b/app/routers/project.py index df803b0b..1fcf3bfa 100644 --- a/app/routers/project.py +++ b/app/routers/project.py @@ -1,5 +1,6 @@ from fastapi import APIRouter, Depends, Request, Query, HTTPException import httpx +import aiodocker from fastapi.responses import Response, RedirectResponse from sqlalchemy import select, update from sqlalchemy.ext.asyncio import AsyncSession @@ -66,6 +67,7 @@ from services.github import GitHubService from services.github_installation import GitHubInstallationService from services.deployment import DeploymentService +from services.reconcile import reconcile_deployments from services.domain import DomainService from services.preset_detector import PresetDetector from services.registry import RegistryService @@ -2231,6 +2233,46 @@ async def project_deployment( ) +@router.post( + "/{team_slug}/projects/{project_name}/deployments/{deployment_id}/reconcile", + name="project_deployment_reconcile", +) +async def project_deployment_reconcile( + project: Project = Depends(get_project_by_name), + current_user: User = Depends(get_current_user), + role: str = Depends(get_role), + team_and_membership: tuple[Team, TeamMember] = Depends(get_team_by_slug), + db: AsyncSession = Depends(get_db), + deployment: Deployment = Depends(get_deployment_by_id), + settings: Settings = Depends(get_settings), +): + team, membership = team_and_membership + + if deployment.project_id != project.id: + raise HTTPException(status_code=404, detail="Deployment not found") + + async with aiodocker.Docker(url=settings.docker_host) as docker_client: + await reconcile_deployments(db, docker_client, deployment_ids=[deployment.id]) + await db.refresh(deployment) + + def _dt(value: datetime | None) -> str | None: + if not value: + return None + return value.replace(tzinfo=timezone.utc).isoformat() + + return { + "deployment_id": deployment.id, + "status": deployment.status, + "container_status": deployment.container_status, + "observed_status": deployment.observed_status, + "observed_exit_code": deployment.observed_exit_code, + "observed_at": _dt(deployment.observed_at), + "observed_last_seen_at": _dt(deployment.observed_last_seen_at), + "observed_missing_count": deployment.observed_missing_count, + "computed_status": deployment.computed_status, + } + + @router.get( "/{team_slug}/projects/{project_name}/logs", name="project_logs", diff --git a/app/services/reconcile.py b/app/services/reconcile.py index ad259650..d65d6b83 100644 --- a/app/services/reconcile.py +++ b/app/services/reconcile.py @@ -3,6 +3,7 @@ import aiodocker from sqlalchemy import select, or_ +from redis.asyncio import Redis from sqlalchemy.ext.asyncio import AsyncSession from models import Deployment @@ -13,11 +14,31 @@ OBSERVED_STATUSES = {"running", "exited", "dead", "paused", "not_found"} +async def _emit_observed_update( + redis_client: Redis, deployment: Deployment, now: datetime +) -> None: + fields = { + "event_type": "deployment_observed_update", + "project_id": deployment.project_id, + "deployment_id": deployment.id, + "observed_status": deployment.observed_status, + "observed_exit_code": deployment.observed_exit_code, + "computed_status": deployment.computed_status, + "timestamp": now.isoformat(), + } + await redis_client.xadd( + f"stream:project:{deployment.project_id}:deployment:{deployment.id}:status", + fields, + ) + await redis_client.xadd(f"stream:project:{deployment.project_id}:updates", fields) + + async def reconcile_deployments( db: AsyncSession, docker_client: aiodocker.Docker, deployment_ids: list[str] | None = None, full_scan: bool = False, + redis_client: Redis | None = None, ) -> dict[str, int]: now = datetime.now(timezone.utc) counts = {"processed": 0, "observed": 0, "missing": 0} @@ -51,6 +72,7 @@ async def reconcile_deployments( container_by_deployment: dict[str, str] = {} container_by_id: dict[str, str] = {} + changed_deployments: dict[str, Deployment] = {} for info in containers: container_id = None @@ -81,6 +103,10 @@ async def reconcile_deployments( else: container_id = container_by_deployment.get(deployment.id) + prev_status = deployment.observed_status + prev_exit_code = deployment.observed_exit_code + prev_missing = deployment.observed_missing_count + if not container_id: deployment.observed_status = "not_found" deployment.observed_at = now.replace(tzinfo=None) @@ -91,6 +117,12 @@ async def reconcile_deployments( deployment.id, deployment.observed_missing_count, ) + if ( + prev_status != deployment.observed_status + or prev_exit_code != deployment.observed_exit_code + or prev_missing != deployment.observed_missing_count + ): + changed_deployments[deployment.id] = deployment continue try: @@ -109,6 +141,12 @@ async def reconcile_deployments( deployment.id, deployment.observed_missing_count, ) + if ( + prev_status != deployment.observed_status + or prev_exit_code != deployment.observed_exit_code + or prev_missing != deployment.observed_missing_count + ): + changed_deployments[deployment.id] = deployment continue logger.warning("Failed to inspect container %s: %s", container_id, error) deployment.observed_status = "not_found" @@ -120,6 +158,12 @@ async def reconcile_deployments( deployment.id, deployment.observed_missing_count, ) + if ( + prev_status != deployment.observed_status + or prev_exit_code != deployment.observed_exit_code + or prev_missing != deployment.observed_missing_count + ): + changed_deployments[deployment.id] = deployment continue state = details.get("State", {}) @@ -144,10 +188,19 @@ async def reconcile_deployments( deployment.observed_status, deployment.observed_exit_code, ) + if ( + prev_status != deployment.observed_status + or prev_exit_code != deployment.observed_exit_code + or prev_missing != deployment.observed_missing_count + ): + changed_deployments[deployment.id] = deployment await db.commit() logger.info( "Reconcile: updated observed state for %s deployment(s).", len(deployments), ) + if redis_client and changed_deployments: + for deployment in changed_deployments.values(): + await _emit_observed_update(redis_client, deployment, now) return counts diff --git a/app/workers/tasks/reconcile.py b/app/workers/tasks/reconcile.py index b86ba8f2..15fa089f 100644 --- a/app/workers/tasks/reconcile.py +++ b/app/workers/tasks/reconcile.py @@ -11,9 +11,14 @@ async def reconcile_deployments_tick(ctx) -> None: settings = get_settings() + redis_client = ctx.get("redis") async with AsyncSessionLocal() as db: async with aiodocker.Docker(url=settings.docker_host) as docker_client: - counts = await reconcile_deployments(db, docker_client) + counts = await reconcile_deployments( + db, + docker_client, + redis_client=redis_client, + ) logger.info( "Reconcile tick completed processed=%s observed=%s missing=%s.", counts["processed"], From 4aa4bf2d4b1e421e5ff3f52bd7a51fcb144d91e9 Mon Sep 17 00:00:00 2001 From: Vincent VIALLET Date: Thu, 5 Feb 2026 08:35:07 +0800 Subject: [PATCH 7/9] Update documentation --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index 73cdad0d..668889ef 100644 --- a/README.md +++ b/README.md @@ -150,6 +150,7 @@ See [ARCHITECTURE.md](ARCHITECTURE.md) for codebase structure. | `MAX_MEMORY_MB` | Maximum allowed memory override per project. Used only when `DEFAULT_MEMORY_MB` is set. Required to let user customize memory. | | `JOB_TIMEOUT_SECONDS` | Job timeout (seconds). Default: `320`. | | `JOB_MAX_TRIES` | Max retries per background job. Default: `3`. | +| `RECONCILE_INTERVAL_SECONDS` | Reconcile interval (seconds) for observed deployment state. Default: `60`. | | `DEPLOYMENT_TIMEOUT_SECONDS` | Deployment timeout (seconds). Default: `300`. | | `DEPLOYMENT_RESTART_POLICY` | Docker restart policy for deployment containers. Default: `on-failure`. | | `DEPLOYMENT_RESTART_MAX_RETRIES` | Max retries when restart policy is `on-failure`. Default: `5`. | From 345d6e7d33c12972b8dd204880a93436fc48e580 Mon Sep 17 00:00:00 2001 From: Vincent VIALLET Date: Thu, 5 Feb 2026 08:56:58 +0800 Subject: [PATCH 8/9] Change fallback return status of computed_status --- app/models.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/models.py b/app/models.py index ab03fe1e..82818382 100644 --- a/app/models.py +++ b/app/models.py @@ -847,7 +847,7 @@ def computed_status(self) -> str: if observed in {"running", "paused", "dead", "not_found"}: return observed - return self.status + return observed or expected @property def environment(self) -> dict | None: From e8fe7ed71670c829d75531ed4b29e70126c4e536 Mon Sep 17 00:00:00 2001 From: Vincent VIALLET Date: Thu, 5 Feb 2026 09:48:53 +0800 Subject: [PATCH 9/9] Update migration to apply on the head --- .../versions/6b0c7d2a9e1f_deployment_observed_state.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/app/migrations/versions/6b0c7d2a9e1f_deployment_observed_state.py b/app/migrations/versions/6b0c7d2a9e1f_deployment_observed_state.py index efc3af63..d4051565 100644 --- a/app/migrations/versions/6b0c7d2a9e1f_deployment_observed_state.py +++ b/app/migrations/versions/6b0c7d2a9e1f_deployment_observed_state.py @@ -1,7 +1,7 @@ """Deployment observed state Revision ID: 6b0c7d2a9e1f -Revises: f45484bf96b0 +Revises: 4fe4c96ad3dd Create Date: 2026-01-29 00:00:00.000000 """ @@ -13,7 +13,7 @@ # revision identifiers, used by Alembic. revision: str = "6b0c7d2a9e1f" -down_revision: Union[str, Sequence[str], None] = "f45484bf96b0" +down_revision: Union[str, Sequence[str], None] = "4fe4c96ad3dd" branch_labels: Union[str, Sequence[str], None] = None depends_on: Union[str, Sequence[str], None] = None