Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
383 changes: 254 additions & 129 deletions apps/ade-cli/src/cli.ts

Large diffs are not rendered by default.

649 changes: 649 additions & 0 deletions apps/ade-cli/src/services/sync/brainProjectActionsSyncHandler.ts

Large diffs are not rendered by default.

25 changes: 23 additions & 2 deletions apps/ade-cli/src/services/sync/sharedSyncListener.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@ export type SharedSyncListener = {
* host service.
*/
setConnectionHandler(handler: SharedSyncListenerConnectionHandler): () => void;
/**
* Machine-wide handler used when no project sync host currently owns new
* sockets. Project hosts still take precedence once they attach.
*/
setFallbackConnectionHandler(handler: SharedSyncListenerConnectionHandler | null): () => void;
/** Park live peer sockets for adoption by the next host service. */
depositPeers(snapshots: SyncPeerHandoffSnapshot[]): void;
/** Claim every parked socket (deposited peers + connections that arrived handler-less). */
Expand Down Expand Up @@ -129,6 +134,8 @@ export function createSharedSyncListener(options: {
let server: WebSocketServer | null = null;
let listeningPromise: Promise<number> | null = null;
let handler: SharedSyncListenerConnectionHandler | null = null;
let fallbackHandler: SharedSyncListenerConnectionHandler | null = null;
let fallbackSuppressedUntilMs = 0;
let closed = false;
const parked = new Map<WebSocket, ParkedEntry>();

Expand Down Expand Up @@ -252,8 +259,10 @@ export function createSharedSyncListener(options: {
remoteAddress: request.socket.remoteAddress ?? null,
remotePort: request.socket.remotePort ?? null,
};
if (handler) {
handler(connection);
const fallbackSuppressed = fallbackSuppressedUntilMs > Date.now();
const activeHandler = handler ?? (fallbackSuppressed ? null : fallbackHandler);
if (activeHandler) {
activeHandler(connection);
return;
}
// No host service owns the listener right now (mid project switch
Expand Down Expand Up @@ -318,9 +327,20 @@ export function createSharedSyncListener(options: {

setConnectionHandler(nextHandler: SharedSyncListenerConnectionHandler): () => void {
handler = nextHandler;
fallbackSuppressedUntilMs = 0;
return () => {
if (handler === nextHandler) {
handler = null;
fallbackSuppressedUntilMs = Date.now() + parkedPeerGraceMs;
}
};
},

setFallbackConnectionHandler(nextHandler: SharedSyncListenerConnectionHandler | null): () => void {
fallbackHandler = nextHandler;
return () => {
if (fallbackHandler === nextHandler) {
fallbackHandler = null;
}
};
},
Expand Down Expand Up @@ -354,6 +374,7 @@ export function createSharedSyncListener(options: {
if (closed) return;
closed = true;
handler = null;
fallbackHandler = null;
if (listeningPromise) {
await listeningPromise.catch(() => {});
}
Expand Down
256 changes: 255 additions & 1 deletion apps/ade-cli/src/services/sync/syncHostService.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import fs from "node:fs";
import os from "node:os";
import path from "node:path";
import { WebSocket } from "ws";
import { WebSocket, WebSocketServer } from "ws";
import { afterAll, afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import type {
AgentChatEventEnvelope,
Expand All @@ -21,9 +21,12 @@ import {
resolveSyncHostInboundProjectScope,
selectChangesetBatchChunk,
} from "./syncHostService";
import { createBrainProjectActionsSyncHandler } from "./brainProjectActionsSyncHandler";
import { buildChangesetBatchPayload } from "./changesetPump";
import { createSharedSyncListener } from "./sharedSyncListener";
import { createSyncPinStore } from "./syncPinStore";
import { encodeSyncEnvelope, parseSyncEnvelope, wsDataToText, type ParsedSyncEnvelope } from "./syncProtocol";
import { EncryptedFileCredentialStore } from "../credentials/credentialStore";

// The sync host now binds to all interfaces (0.0.0.0) by default so phones on
// the LAN can reach it. These tests assert the LOOPBACK-only posture (no LAN
Expand Down Expand Up @@ -181,6 +184,7 @@ describe("buildSyncHostHelloOkPayload", () => {
pollIntervalMs: 400,
projectCatalog: { projects: [project] },
projectCatalogEnabled: true,
projectActionsEnabled: false,
remoteCommandSupportedActions: [remoteCommand.action],
remoteCommandDescriptors: [remoteCommand],
localCommandDescriptors: [localPresenceCommand],
Expand All @@ -192,6 +196,7 @@ describe("buildSyncHostHelloOkPayload", () => {
expect(payload.serverDbVersion).toBe(7);
expect(payload.projects).toEqual([project]);
expect(payload.features.projectCatalog).toEqual({ enabled: true });
expect(payload.features.projectActions).toEqual({ enabled: false });
expect(payload.features.fileAccess).toBe(true);
expect(payload.features.terminalStreaming).toBe(true);
expect(payload.features.chatStreaming).toEqual({ enabled: true });
Expand Down Expand Up @@ -317,6 +322,213 @@ describe("buildChangesetBatchPayload", () => {
});
});

describe("brain project actions fallback handler", () => {
it("does not send a second switch result if post-response project completion fails", async () => {
const { projectRoot, cleanup } = createTempProjectRoot();
const secretsDir = path.join(projectRoot, "secrets");
fs.mkdirSync(secretsDir, { recursive: true });
const credentialStore = new EncryptedFileCredentialStore({
secretsDir,
keyMaterialProvider: () => null,
});
credentialStore.setSync("test.bootstrap", "bootstrap-token");

const project = createDiscoveryProject({
id: "project-1",
rootPath: projectRoot,
isOpen: true,
});
const logger = createDiscoveryLogger();
const handler = createBrainProjectActionsSyncHandler({
logger,
projectCatalogProvider: {
listProjects: vi.fn(async () => ({ projects: [project] })),
prepareProjectConnection: vi.fn(async () => ({
ok: true,
project,
connection: null,
})),
completeProjectConnection: vi.fn(async () => {
throw new Error("activation failed");
}),
},
bootstrapCredentialStore: credentialStore,
bootstrapTokenKey: "test.bootstrap",
pairingSecretsPath: path.join(secretsDir, "sync-paired-devices.json"),
pinPath: path.join(secretsDir, "sync-pin.json"),
localDeviceIdPath: path.join(secretsDir, "sync-device-id"),
localSiteIdPath: path.join(secretsDir, "sync-site-id"),
});
const server = new WebSocketServer({ host: "127.0.0.1", port: 0 });
server.on("connection", (ws, request) => {
handler({
ws,
remoteAddress: request.socket.remoteAddress ?? null,
remotePort: request.socket.remotePort ?? null,
});
});

let client: WebSocket | null = null;
try {
await new Promise<void>((resolve, reject) => {
server.once("listening", () => resolve());
server.once("error", reject);
});
const address = server.address();
expect(typeof address).toBe("object");
const port = typeof address === "object" && address ? address.port : 0;
expect(port).toBeGreaterThan(0);

client = new WebSocket(`ws://127.0.0.1:${port}`);
const { envelopes } = trackClientEnvelopes(client);
await new Promise<void>((resolve, reject) => {
client!.once("open", () => resolve());
client!.once("error", reject);
});
sendHello(client, "bootstrap-token");
await waitForValue(
() => envelopes.find((envelope) => envelope.type === "hello_ok"),
"fallback hello_ok",
);

client.send(encodeSyncEnvelope({
type: "project_switch_request",
requestId: "switch-1",
payload: { projectId: "project-1" },
}));
const result = await waitForEnvelope(envelopes, "project_switch_result", "switch-1");
expect(result.payload).toMatchObject({ ok: true });

await new Promise((resolve) => setTimeout(resolve, 100));
const switchResults = envelopes.filter(
(envelope) => envelope.type === "project_switch_result" && envelope.requestId === "switch-1",
);
expect(switchResults).toHaveLength(1);
expect(logger.warn).toHaveBeenCalledWith(
"sync_brain.project_switch_failed",
expect.objectContaining({ message: "activation failed" }),
);
} finally {
try {
client?.close();
} catch {
// ignore
}
await new Promise<void>((resolve) => server.close(() => resolve()));
cleanup();
}
});

it("rate-limits failed PIN pairing attempts before a project host owns the listener", async () => {
const { projectRoot, cleanup } = createTempProjectRoot();
const secretsDir = path.join(projectRoot, "secrets");
fs.mkdirSync(secretsDir, { recursive: true });
const pinPath = path.join(secretsDir, "sync-pin.json");
createSyncPinStore({ filePath: pinPath }).setPin("428193");

const logger = createDiscoveryLogger();
const bootstrapTokenPath = path.join(secretsDir, "sync-bootstrap-token");
const handler = createBrainProjectActionsSyncHandler({
logger,
projectCatalogProvider: {
listProjects: vi.fn(async () => ({ projects: [] })),
prepareProjectConnection: vi.fn(async () => ({
ok: false,
message: "No hosted project is ready.",
})),
},
bootstrapCredentialStore: new EncryptedFileCredentialStore({
secretsDir,
keyMaterialProvider: () => null,
}),
pairingSecretsPath: path.join(secretsDir, "sync-paired-devices.json"),
pinPath,
localDeviceIdPath: path.join(secretsDir, "sync-device-id"),
localSiteIdPath: path.join(secretsDir, "sync-site-id"),
});
expect(fs.existsSync(bootstrapTokenPath)).toBe(false);
expect(fs.existsSync(path.join(secretsDir, "credentials.json.enc"))).toBe(true);
const server = new WebSocketServer({ host: "127.0.0.1", port: 0 });
server.on("connection", (ws, request) => {
handler({
ws,
remoteAddress: request.socket.remoteAddress ?? null,
remotePort: request.socket.remotePort ?? null,
});
});

try {
await new Promise<void>((resolve, reject) => {
server.once("listening", () => resolve());
server.once("error", reject);
});
const address = server.address();
expect(typeof address).toBe("object");
const port = typeof address === "object" && address ? address.port : 0;
expect(port).toBeGreaterThan(0);

const sendPairingRequest = async (requestId: string, code: string, deviceId: string) => {
const ws = new WebSocket(`ws://127.0.0.1:${port}`);
const envelopes: ParsedSyncEnvelope[] = [];
ws.on("message", (raw) => {
envelopes.push(parseSyncEnvelope(wsDataToText(raw)));
});
await new Promise<void>((resolve, reject) => {
ws.once("open", () => resolve());
ws.once("error", reject);
});
const closed = new Promise<{ code: number; reason: string }>((resolve) => {
ws.once("close", (closeCode, reason) => {
resolve({ code: closeCode, reason: reason.toString("utf8") });
});
});
ws.send(encodeSyncEnvelope({
type: "pairing_request",
requestId,
payload: {
code,
peer: {
deviceId,
deviceName: "Fallback iPhone",
platform: "iOS",
deviceType: "phone",
siteId: `${deviceId}-site`,
dbVersion: 0,
},
},
}));
const response = await waitForValue(
() => envelopes.find((envelope) => envelope.type === "pairing_result"),
`pairing_result ${requestId}`,
);
return {
payload: response.payload as {
ok: boolean;
error?: { code?: string; message?: string };
},
closed: await closed,
};
};

for (let attempt = 0; attempt < 5; attempt += 1) {
const failed = await sendPairingRequest(`bad-pin-${attempt}`, "000000", `ios-bad-${attempt}`);
expect(failed.payload.ok).toBe(false);
expect(failed.payload.error?.code).toBe("invalid_pin");
expect(failed.closed.code).toBe(4003);
}

const limited = await sendPairingRequest("bad-pin-cooldown", "000000", "ios-rate-limited");
expect(limited.payload.ok).toBe(false);
expect(limited.payload.error?.code).toBe("pairing_failed");
expect(limited.payload.error?.message).toMatch(/Too many failed PIN attempts/i);
expect(limited.closed.code).toBe(4004);
} finally {
await new Promise<void>((resolve) => server.close(() => resolve()));
cleanup();
}
});
});

function createDiscoveryLogger() {
return {
debug: vi.fn(),
Expand Down Expand Up @@ -918,6 +1130,48 @@ describe("sync host handoff over a shared listener", () => {
}
});

it("parks new connections during handler handoff instead of dispatching them to the fallback handler", async () => {
const listener = createSharedSyncListener({ bindHost: "127.0.0.1", parkedPeerGraceMs: 500 });
const fallbackHandler = vi.fn((connection: { ws: WebSocket }) => {
connection.ws.close(4010, "Fallback claimed socket");
});
const primaryHandler = vi.fn((connection: { ws: WebSocket }) => {
connection.ws.close(4011, "Primary claimed socket");
});
let client: WebSocket | null = null;

try {
const port = await listener.ensureListening([0]);
const detachPrimary = listener.setConnectionHandler(primaryHandler);
listener.setFallbackConnectionHandler(fallbackHandler);
detachPrimary();

client = new WebSocket(`ws://127.0.0.1:${port}`);
const { closeEvents } = trackClientEnvelopes(client);
await new Promise<void>((resolve, reject) => {
client!.once("open", () => resolve());
client!.once("error", reject);
});
client.send("hello-during-handoff");
await new Promise((resolve) => setTimeout(resolve, 50));

expect(primaryHandler).not.toHaveBeenCalled();
expect(fallbackHandler).not.toHaveBeenCalled();
const parkedPeers = listener.takePeers();
expect(parkedPeers).toHaveLength(1);
expect(parkedPeers[0]?.bufferedMessages).toHaveLength(1);
expect(wsDataToText(parkedPeers[0]!.bufferedMessages![0]!.data)).toBe("hello-during-handoff");
expect(closeEvents).toEqual([]);
} finally {
try {
client?.close();
} catch {
// ignore
}
await listener.close();
}
});

it("closes parked peers with 4002 when no host adopts them in time", async () => {
const listener = createSharedSyncListener({ bindHost: "127.0.0.1", parkedPeerGraceMs: 150 });
let client: WebSocket | null = null;
Expand Down
Loading
Loading