diff --git a/migrations/1781568000000_webhook_refresh_log.ts b/migrations/1781568000000_webhook_refresh_log.ts new file mode 100644 index 000000000..eedf624d2 --- /dev/null +++ b/migrations/1781568000000_webhook_refresh_log.ts @@ -0,0 +1,41 @@ +/* eslint-disable @typescript-eslint/no-explicit-any */ +import { sql } from "kysely"; +import type { Kysely } from "kysely"; + +export async function up(db: Kysely): Promise { + await db.schema + .createTable("webhookRefreshLog") + .addColumn("id", "uuid", (col) => + col.primaryKey().defaultTo(sql`gen_random_uuid()`), + ) + .addColumn("runId", "uuid", (col) => col.notNull()) + .addColumn("dataSourceId", "uuid", (col) => + col.notNull().references("dataSource.id").onDelete("cascade"), + ) + .addColumn("dataSourceType", "text", (col) => col.notNull()) + .addColumn("enabled", "boolean", (col) => col.notNull()) + .addColumn("success", "boolean", (col) => col.notNull()) + .addColumn("action", "text", (col) => col.notNull()) + .addColumn("oldWebhookIds", "jsonb", (col) => + col.notNull().defaultTo(sql`'[]'::jsonb`), + ) + .addColumn("newWebhookIds", "jsonb", (col) => + col.notNull().defaultTo(sql`'[]'::jsonb`), + ) + .addColumn("details", "jsonb") + .addColumn("error", "text") + .addColumn("createdAt", "text", (col) => + col.defaultTo(sql`CURRENT_TIMESTAMP`).notNull(), + ) + .execute(); + + await db.schema + .createIndex("webhookRefreshLogDataSourceIdCreatedAtIndex") + .on("webhookRefreshLog") + .columns(["dataSourceId", "createdAt"]) + .execute(); +} + +export async function down(db: Kysely): Promise { + await db.schema.dropTable("webhookRefreshLog").execute(); +} diff --git a/src/models/WebhookRefreshLog.ts b/src/models/WebhookRefreshLog.ts new file mode 100644 index 000000000..9a7bb4c0a --- /dev/null +++ b/src/models/WebhookRefreshLog.ts @@ -0,0 +1,28 @@ +import { z } from "zod"; + +export const webhookRefreshActionSchema = z.enum([ + "created", + "recreated", + "kept", + "removed", + "noop", + "failed", +]); + +export const webhookRefreshLogSchema = z.object({ + id: z.string(), + runId: z.string(), + dataSourceId: z.string(), + dataSourceType: z.string(), + enabled: z.boolean(), + success: z.boolean(), + action: webhookRefreshActionSchema, + oldWebhookIds: z.array(z.string()), + newWebhookIds: z.array(z.string()), + details: z.record(z.string(), z.unknown()).nullish(), + error: z.string().nullish(), + createdAt: z.string(), +}); + +export type WebhookRefreshAction = z.infer; +export type WebhookRefreshLog = z.infer; diff --git a/src/server/adaptors/abstract.ts b/src/server/adaptors/abstract.ts index 4ac486c10..74603b329 100644 --- a/src/server/adaptors/abstract.ts +++ b/src/server/adaptors/abstract.ts @@ -1,6 +1,13 @@ import type { EnrichedRecord } from "@/models/DataRecord"; import type { ExternalRecord, TaggedRecord } from "@/types"; +export interface WebhookToggleResult { + action: "created" | "recreated" | "kept" | "removed" | "noop"; + oldWebhookIds: string[]; + newWebhookIds: string[]; + details?: Record; +} + export interface DataSourceAdaptor { extractExternalRecordIdsFromWebhookBody( body: unknown, @@ -10,7 +17,7 @@ export interface DataSourceAdaptor { fetchFirst(): Promise; fetchByExternalId(externalIds: string[]): Promise; removeDevWebhooks(): Promise; - toggleWebhook(enable: boolean): Promise; + toggleWebhook(enable: boolean): Promise; updateRecords(enrichedRecords: EnrichedRecord[]): Promise; tagRecords(records: TaggedRecord[]): Promise; deleteColumn(columnName: string): Promise; diff --git a/src/server/adaptors/actionnetwork.ts b/src/server/adaptors/actionnetwork.ts index d0c16a5cf..135232c61 100644 --- a/src/server/adaptors/actionnetwork.ts +++ b/src/server/adaptors/actionnetwork.ts @@ -4,7 +4,7 @@ import { ENRICHMENT_COLUMN_PREFIX, } from "@/constants"; import logger from "@/server/services/logger"; -import type { DataSourceAdaptor } from "./abstract"; +import type { DataSourceAdaptor, WebhookToggleResult } from "./abstract"; import type { EnrichedRecord } from "@/models/DataRecord"; import type { ExternalRecord, TaggedRecord } from "@/types"; @@ -277,8 +277,9 @@ export class ActionNetworkAdaptor implements DataSourceAdaptor { } // eslint-disable-next-line @typescript-eslint/no-unused-vars - async toggleWebhook(enable: boolean): Promise { + async toggleWebhook(enable: boolean): Promise { logger.debug("Cannot toggle webhooks for Action Network data source"); + return { action: "noop", oldWebhookIds: [], newWebhookIds: [] }; } async updateRecords(enrichedRecords: EnrichedRecord[]): Promise { diff --git a/src/server/adaptors/airtable.ts b/src/server/adaptors/airtable.ts index 1f130e14c..f7eb76784 100644 --- a/src/server/adaptors/airtable.ts +++ b/src/server/adaptors/airtable.ts @@ -11,7 +11,7 @@ import { import logger from "@/server/services/logger"; import { getPublicUrl } from "@/server/services/urls"; import { batch } from "@/server/utils"; -import type { DataSourceAdaptor } from "./abstract"; +import type { DataSourceAdaptor, WebhookToggleResult } from "./abstract"; import type { EnrichedRecord } from "@/models/DataRecord"; import type { ExternalRecord, TaggedRecord } from "@/types"; @@ -411,9 +411,10 @@ export class AirtableAdaptor implements DataSourceAdaptor { await this.removeWebhooks(webhooks); } - async toggleWebhook(enable: boolean): Promise { + async toggleWebhook(enable: boolean): Promise { const publicUrl = await getPublicUrl(); const webhooks = await this.listWebhooks(publicUrl); + const oldWebhookIds = webhooks.map((wh) => wh.id); // Remove webhooks on user request if (!enable) { @@ -421,7 +422,11 @@ export class AirtableAdaptor implements DataSourceAdaptor { `Removing Airtable webhooks for data source ${this.dataSourceId}`, ); await this.removeWebhooks(webhooks); - return; + return { + action: oldWebhookIds.length ? "removed" : "noop", + oldWebhookIds, + newWebhookIds: [], + }; } // Skip recreating webhook that has at least 2 days of validity @@ -435,7 +440,12 @@ export class AirtableAdaptor implements DataSourceAdaptor { logger.info( `Airtable webhook exists for data source ${this.dataSourceId}`, ); - return; + return { + action: "kept", + oldWebhookIds: [], + newWebhookIds: [webhooks[0].id], + details: { expirationTime: webhooks[0].expirationTime.toISOString() }, + }; } // Cleanup expired webhooks @@ -478,6 +488,18 @@ export class AirtableAdaptor implements DataSourceAdaptor { } throw Error(`Bad webhooks response: ${response.status}, ${responseText}`); } + + const created = (await response.json()) as { + id: string; + expirationTime?: string; + }; + + return { + action: oldWebhookIds.length ? "recreated" : "created", + oldWebhookIds, + newWebhookIds: created.id ? [created.id] : [], + details: { expirationTime: created.expirationTime }, + }; } async removeWebhooks(webhooks: Webhook[]): Promise { diff --git a/src/server/adaptors/csv.ts b/src/server/adaptors/csv.ts index 50e3e2c8f..99db5890c 100644 --- a/src/server/adaptors/csv.ts +++ b/src/server/adaptors/csv.ts @@ -6,7 +6,7 @@ import { ENRICHMENT_COLUMN_PREFIX } from "@/constants"; import logger from "@/server/services/logger"; import { getAbsoluteUrl } from "@/utils/appUrl"; import { getBaseDir } from "../utils"; -import type { DataSourceAdaptor } from "./abstract"; +import type { DataSourceAdaptor, WebhookToggleResult } from "./abstract"; import type { ExternalRecord } from "@/types"; export function decodeBuffer(buffer: Buffer): Buffer { @@ -152,7 +152,7 @@ export class CSVAdaptor implements DataSourceAdaptor { async toggleWebhook( // eslint-disable-next-line @typescript-eslint/no-unused-vars enable: boolean, - ): Promise { + ): Promise { throw new Error("Unimplemented"); } diff --git a/src/server/adaptors/googlesheets.ts b/src/server/adaptors/googlesheets.ts index ee4c4de30..9946f0679 100644 --- a/src/server/adaptors/googlesheets.ts +++ b/src/server/adaptors/googlesheets.ts @@ -8,7 +8,7 @@ import logger from "@/server/services/logger"; import { getPublicUrl } from "@/server/services/urls"; import { batch } from "@/server/utils"; import { enqueue } from "../services/queue"; -import type { DataSourceAdaptor } from "./abstract"; +import type { DataSourceAdaptor, WebhookToggleResult } from "./abstract"; import type { EnrichedRecord } from "@/models/DataRecord"; import type { googleOAuthCredentialsSchema } from "@/models/DataSource"; import type { ExternalRecord, TaggedRecord } from "@/types"; @@ -431,7 +431,7 @@ export class GoogleSheetsAdaptor implements DataSourceAdaptor { } } - async toggleWebhook(enable: boolean): Promise { + async toggleWebhook(enable: boolean): Promise { try { const notificationUrl = await getPublicUrl( `/api/data-sources/${this.dataSourceId}/webhook`, @@ -441,13 +441,37 @@ export class GoogleSheetsAdaptor implements DataSourceAdaptor { const notificationDomain = new URL(notificationUrl).hostname; const webhookSheetName = `Mapped Webhook: ${notificationDomain}/${this.dataSourceId}`; + const existingSheets = await this.listSheets(); + const oldSheet = existingSheets.find( + (sheet) => sheet.properties.title === webhookSheetName, + ); + const oldWebhookIds = oldSheet ? [oldSheet.properties.sheetId] : []; + if (!enable) { await this.deleteSheet(webhookSheetName); - return; + return { + action: oldSheet ? "removed" : "noop", + oldWebhookIds, + newWebhookIds: [], + details: { sheetName: webhookSheetName }, + }; } await this.ensureSheet(webhookSheetName); await this.prepareWebhookSheet(webhookSheetName, notificationUrl); + + const refreshedSheets = await this.listSheets(); + const newSheet = refreshedSheets.find( + (sheet) => sheet.properties.title === webhookSheetName, + ); + const newSheetId = newSheet?.properties.sheetId; + + return { + action: oldSheet ? "kept" : "created", + oldWebhookIds, + newWebhookIds: newSheetId ? [newSheetId] : [], + details: { sheetName: webhookSheetName, sheetId: newSheetId }, + }; } catch (e) { if (e instanceof Error && e.message.includes("PERMISSION_DENIED")) { throw new Error( diff --git a/src/server/adaptors/mailchimp.ts b/src/server/adaptors/mailchimp.ts index 6266a9469..34efee290 100644 --- a/src/server/adaptors/mailchimp.ts +++ b/src/server/adaptors/mailchimp.ts @@ -7,7 +7,7 @@ import { import logger from "@/server/services/logger"; import { getPublicUrl } from "@/server/services/urls"; import { batch } from "@/server/utils"; -import type { DataSourceAdaptor } from "./abstract"; +import type { DataSourceAdaptor, WebhookToggleResult } from "./abstract"; import type { EnrichedRecord } from "@/models/DataRecord"; import type { ExternalRecord, TaggedRecord } from "@/types"; @@ -415,9 +415,10 @@ export class MailchimpAdaptor implements DataSourceAdaptor { await this.removeWebhooks(webhooks); } - async toggleWebhook(enable: boolean): Promise { + async toggleWebhook(enable: boolean): Promise { const publicUrl = await getPublicUrl(); const webhooks = await this.listWebhooks(publicUrl); + const oldWebhookIds = webhooks.map((wh) => wh.id); // Remove webhooks on user request if (!enable) { @@ -425,7 +426,11 @@ export class MailchimpAdaptor implements DataSourceAdaptor { `Removing Mailchimp webhooks for data source ${this.dataSourceId}`, ); await this.removeWebhooks(webhooks); - return; + return { + action: oldWebhookIds.length ? "removed" : "noop", + oldWebhookIds, + newWebhookIds: [], + }; } // If we already have a webhook, don't create another @@ -433,7 +438,11 @@ export class MailchimpAdaptor implements DataSourceAdaptor { logger.info( `Mailchimp webhook already exists for data source ${this.dataSourceId}`, ); - return; + return { + action: "kept", + oldWebhookIds: [], + newWebhookIds: oldWebhookIds, + }; } const url = `${this.getListUrl()}/webhooks`; @@ -477,6 +486,12 @@ export class MailchimpAdaptor implements DataSourceAdaptor { logger.info( `Created Mailchimp webhook for data source ${this.dataSourceId}: ${webhook.id}`, ); + + return { + action: "created", + oldWebhookIds, + newWebhookIds: webhook.id ? [webhook.id] : [], + }; } async removeWebhooks(webhooks: Webhook[]): Promise { diff --git a/src/server/adaptors/zetkin.ts b/src/server/adaptors/zetkin.ts index 36fe841e7..8c77ad5ca 100644 --- a/src/server/adaptors/zetkin.ts +++ b/src/server/adaptors/zetkin.ts @@ -7,7 +7,7 @@ import { DataSourceType } from "@/models/DataSource"; import { updateDataSource } from "@/server/repositories/DataSource"; import logger from "@/server/services/logger"; import { slugify } from "@/utils/text"; -import type { DataSourceAdaptor } from "./abstract"; +import type { DataSourceAdaptor, WebhookToggleResult } from "./abstract"; import type { EnrichedRecord } from "@/models/DataRecord"; import type { ZetkinOAuthCredentials } from "@/models/DataSource"; import type { ExternalRecord, TaggedRecord } from "@/types"; @@ -235,8 +235,9 @@ export class ZetkinAdaptor implements DataSourceAdaptor { // Zetkin does not support webhooks } - async toggleWebhook(): Promise { + async toggleWebhook(): Promise { logger.debug("Cannot toggle webhooks for Zetkin data source"); + return { action: "noop", oldWebhookIds: [], newWebhookIds: [] }; } async updateRecords(enrichedRecords: EnrichedRecord[]): Promise { diff --git a/src/server/jobs/refreshWebhooks.ts b/src/server/jobs/refreshWebhooks.ts index fbc39af27..91cc5302f 100644 --- a/src/server/jobs/refreshWebhooks.ts +++ b/src/server/jobs/refreshWebhooks.ts @@ -1,3 +1,4 @@ +import { v4 as uuidv4 } from "uuid"; import { DataSourceType, airtableConfigSchema, @@ -9,14 +10,54 @@ import { findDataSourceById, findDataSourcesByType, } from "@/server/repositories/DataSource"; +import { createWebhookRefreshLog } from "@/server/repositories/WebhookRefreshLog"; import { getDataSourceAdaptor } from "../adaptors"; import logger from "../services/logger"; +import type { WebhookToggleResult } from "@/server/adaptors/abstract"; + +const logRefresh = async ({ + runId, + dataSourceId, + dataSourceType, + enabled, + result, + error, +}: { + runId: string; + dataSourceId: string; + dataSourceType: DataSourceType; + enabled: boolean; + result?: WebhookToggleResult; + error?: unknown; +}): Promise => { + try { + await createWebhookRefreshLog({ + runId, + dataSourceId, + dataSourceType, + enabled, + success: !error, + action: error ? "failed" : result?.action || "noop", + oldWebhookIds: result?.oldWebhookIds || [], + newWebhookIds: result?.newWebhookIds || [], + details: result?.details, + error: error ? String(error) : undefined, + }); + } catch (logError) { + logger.warn( + `Failed to write webhook refresh log for data source ${dataSourceId}`, + { error: logError }, + ); + } +}; const refreshWebhooks = async (args: object | null): Promise => { if (args && "dataSourceId" in args && typeof args.dataSourceId === "string") { return refreshWebhook(args.dataSourceId); } + const runId = uuidv4(); + const airtableDataSources = await findDataSourcesByType( DataSourceType.Airtable, ); @@ -38,12 +79,26 @@ const refreshWebhooks = async (args: object | null): Promise => { ); const enable = source.autoEnrich || source.autoImport; try { - await adaptor.toggleWebhook(enable); + const toggleResult = await adaptor.toggleWebhook(enable); + await logRefresh({ + runId, + dataSourceId: source.id, + dataSourceType: DataSourceType.Airtable, + enabled: enable, + result: toggleResult, + }); } catch (error) { logger.warn( `Failed to refresh airtable webhook for data source ${source.id}`, { error }, ); + await logRefresh({ + runId, + dataSourceId: source.id, + dataSourceType: DataSourceType.Airtable, + enabled: enable, + error, + }); continue; } } @@ -69,18 +124,32 @@ const refreshWebhooks = async (args: object | null): Promise => { ); const enable = source.autoEnrich || source.autoImport; try { - await adaptor.toggleWebhook(enable); + const toggleResult = await adaptor.toggleWebhook(enable); if (enable) { const hasErrors = await adaptor.hasWebhookErrors(); if (hasErrors) { await adaptor.repairWebhook(); } } + await logRefresh({ + runId, + dataSourceId: source.id, + dataSourceType: DataSourceType.GoogleSheets, + enabled: enable, + result: toggleResult, + }); } catch (error) { logger.warn( `Failed to refresh Google Sheets webhook for data source ${source.id}`, { error }, ); + await logRefresh({ + runId, + dataSourceId: source.id, + dataSourceType: DataSourceType.GoogleSheets, + enabled: enable, + error, + }); continue; } } @@ -104,7 +173,28 @@ const refreshWebhook = async (dataSourceId: string): Promise => { ); return false; } - await adaptor.toggleWebhook(enable); + + const runId = uuidv4(); + const dataSourceType = dataSource.config.type; + try { + const result = await adaptor.toggleWebhook(enable); + await logRefresh({ + runId, + dataSourceId, + dataSourceType, + enabled: enable, + result, + }); + } catch (error) { + await logRefresh({ + runId, + dataSourceId, + dataSourceType, + enabled: enable, + error, + }); + throw error; + } return true; }; diff --git a/src/server/models/WebhookRefreshLog.ts b/src/server/models/WebhookRefreshLog.ts new file mode 100644 index 000000000..4dd6a18b8 --- /dev/null +++ b/src/server/models/WebhookRefreshLog.ts @@ -0,0 +1,12 @@ +import type { WebhookRefreshLog } from "@/models/WebhookRefreshLog"; +import type { ColumnType, Generated, Insertable, Updateable } from "kysely"; + +export type WebhookRefreshLogTable = Omit< + WebhookRefreshLog, + "id" | "createdAt" +> & { + id: Generated; + createdAt: ColumnType; +}; +export type NewWebhookRefreshLog = Insertable; +export type WebhookRefreshLogUpdate = Updateable; diff --git a/src/server/repositories/WebhookRefreshLog.ts b/src/server/repositories/WebhookRefreshLog.ts new file mode 100644 index 000000000..22fce9340 --- /dev/null +++ b/src/server/repositories/WebhookRefreshLog.ts @@ -0,0 +1,19 @@ +import { db } from "@/server/services/database"; +import type { NewWebhookRefreshLog } from "@/server/models/WebhookRefreshLog"; + +export function createWebhookRefreshLog(log: NewWebhookRefreshLog) { + return db + .insertInto("webhookRefreshLog") + .values(log) + .returningAll() + .executeTakeFirstOrThrow(); +} + +export function findWebhookRefreshLogsByDataSourceId(dataSourceId: string) { + return db + .selectFrom("webhookRefreshLog") + .where("dataSourceId", "=", dataSourceId) + .selectAll() + .orderBy("createdAt", "desc") + .execute(); +} diff --git a/src/server/services/database/index.ts b/src/server/services/database/index.ts index 0f2270924..a0f7fd57a 100644 --- a/src/server/services/database/index.ts +++ b/src/server/services/database/index.ts @@ -23,6 +23,7 @@ import type { PlacedMarkerTable } from "@/server/models/PlacedMarker"; import type { PublicMapTable } from "@/server/models/PublicMap"; import type { TurfTable } from "@/server/models/Turf"; import type { UserTable } from "@/server/models/User"; +import type { WebhookRefreshLogTable } from "@/server/models/WebhookRefreshLog"; export const pool = new Pool({ connectionString: process.env.DATABASE_URL, @@ -74,6 +75,7 @@ export interface Database { publicMap: PublicMapTable; turf: TurfTable; user: UserTable; + webhookRefreshLog: WebhookRefreshLogTable; "pgboss.job": JobTable; } diff --git a/src/server/services/database/schema.ts b/src/server/services/database/schema.ts index 5ea67f09b..3101eb112 100644 --- a/src/server/services/database/schema.ts +++ b/src/server/services/database/schema.ts @@ -195,6 +195,25 @@ export interface AirtableWebhook { createdAt: string; // text, DEFAULT CURRENT_TIMESTAMP, NOT NULL } +/** + * webhookRefreshLog Table + * Audit log of refreshWebhooks cron runs, one row per (run, data source) + */ +export interface WebhookRefreshLog { + id: string; // uuid, PRIMARY KEY, DEFAULT gen_random_uuid() + runId: string; // uuid, NOT NULL + dataSourceId: string; // uuid, NOT NULL, FK -> dataSource(id) ON DELETE CASCADE + dataSourceType: string; // text, NOT NULL + enabled: boolean; // boolean, NOT NULL + success: boolean; // boolean, NOT NULL + action: string; // text, NOT NULL (created | recreated | kept | removed | noop | failed) + oldWebhookIds: string[]; // jsonb, NOT NULL, DEFAULT '[]' + newWebhookIds: string[]; // jsonb, NOT NULL, DEFAULT '[]' + details: Record | null; // jsonb, nullable + error: string | null; // text, nullable + createdAt: string; // text, DEFAULT CURRENT_TIMESTAMP, NOT NULL +} + // ============================================================================ // MAPS & VIEWS // ============================================================================ @@ -409,6 +428,7 @@ export interface Database { // Webhooks & Integrations airtableWebhook: AirtableWebhook; + webhookRefreshLog: WebhookRefreshLog; // Caches geocodeCache: GeocodeCache; diff --git a/tests/feature/webhookRefreshLog.test.ts b/tests/feature/webhookRefreshLog.test.ts new file mode 100644 index 000000000..74b75a29a --- /dev/null +++ b/tests/feature/webhookRefreshLog.test.ts @@ -0,0 +1,172 @@ +import { v4 as uuidv4 } from "uuid"; +import { afterAll, afterEach, describe, expect, inject, test } from "vitest"; +import { + DataSourceRecordType, + DataSourceType, + GeocodingType, +} from "@/models/DataSource"; +import { AirtableAdaptor } from "@/server/adaptors/airtable"; +import refreshWebhooks from "@/server/jobs/refreshWebhooks"; +import { + createDataSource, + deleteDataSource, +} from "@/server/repositories/DataSource"; +import { upsertOrganisation } from "@/server/repositories/Organisation"; +import { + createWebhookRefreshLog, + findWebhookRefreshLogsByDataSourceId, +} from "@/server/repositories/WebhookRefreshLog"; + +const credentials = inject("credentials"); + +describe("webhookRefreshLog tests", () => { + const toRemove: string[] = []; + + afterEach(async () => { + // Webhooks are filtered by public URL (not data source id), so a webhook + // left over from one test would leak into the next on the same base. + // Clear them after each test so every test starts from a clean base. + try { + await new AirtableAdaptor( + "webhook-log-cleanup", + credentials.airtable.apiKey, + credentials.airtable.baseId, + credentials.airtable.tableId, + ).toggleWebhook(false); + } catch { + // best-effort cleanup + } + }); + + afterAll(async () => { + // Deleting the data sources cascades to their webhookRefreshLog rows + for (const id of toRemove) { + await deleteDataSource(id); + } + }); + + test("createWebhookRefreshLog round-trips arrays and details through JSONB", async () => { + const org = await upsertOrganisation({ name: "Webhook Log Test Org" }); + const dataSource = await createDataSource({ + name: "Webhook Log CSV Source", + autoEnrich: false, + autoImport: false, + recordType: DataSourceRecordType.Members, + config: { + type: DataSourceType.CSV, + url: "file://tests/resources/members.csv", + }, + columnDefs: [], + columnMetadata: [], + columnRoles: { nameColumns: [] }, + enrichments: [], + geocodingConfig: { type: GeocodingType.None }, + organisationId: org.id, + public: false, + }); + toRemove.push(dataSource.id); + + const runId = uuidv4(); + const inserted = await createWebhookRefreshLog({ + runId, + dataSourceId: dataSource.id, + dataSourceType: DataSourceType.CSV, + enabled: true, + success: true, + action: "recreated", + oldWebhookIds: ["oldHook1", "oldHook2"], + newWebhookIds: ["newHook1"], + details: { expirationTime: "2026-07-01T00:00:00.000Z" }, + }); + + // Defaults are populated by the database + expect(inserted.id).toBeTruthy(); + expect(inserted.createdAt).toBeTruthy(); + + const logs = await findWebhookRefreshLogsByDataSourceId(dataSource.id); + expect(logs).toHaveLength(1); + const log = logs[0]; + expect(log.runId).toBe(runId); + expect(log.action).toBe("recreated"); + expect(log.enabled).toBe(true); + expect(log.success).toBe(true); + // JSONB arrays and objects come back as real JS values, not strings + expect(log.oldWebhookIds).toEqual(["oldHook1", "oldHook2"]); + expect(log.newWebhookIds).toEqual(["newHook1"]); + expect(log.details).toEqual({ + expirationTime: "2026-07-01T00:00:00.000Z", + }); + }); + + test("refreshWebhooks logs a row for the processed Airtable data source", async () => { + const org = await upsertOrganisation({ name: "Webhook Refresh Test Org" }); + const dataSource = await createDataSource({ + name: "Webhook Refresh Airtable Source", + autoEnrich: false, + autoImport: true, + recordType: DataSourceRecordType.Members, + config: { + type: DataSourceType.Airtable, + apiKey: credentials.airtable.apiKey, + baseId: credentials.airtable.baseId, + tableId: credentials.airtable.tableId, + }, + columnDefs: [], + columnMetadata: [], + columnRoles: { nameColumns: [] }, + enrichments: [], + geocodingConfig: { type: GeocodingType.None }, + organisationId: org.id, + public: false, + }); + toRemove.push(dataSource.id); + + await refreshWebhooks(null); + + const logs = await findWebhookRefreshLogsByDataSourceId(dataSource.id); + expect(logs.length).toBeGreaterThanOrEqual(1); + const log = logs[0]; + expect(log.dataSourceType).toBe(DataSourceType.Airtable); + expect(log.enabled).toBe(true); + expect(log.success).toBe(true); + // autoImport is on with no valid webhook yet, so it is created (or + // recreated/kept if one already existed for this base + public URL) + expect(["created", "recreated", "kept"]).toContain(log.action); + expect(log.newWebhookIds.length).toBeGreaterThanOrEqual(1); + }, 30000); + + test("refreshWebhooks logs a row when invoked for a single data source", async () => { + const org = await upsertOrganisation({ name: "Webhook Single Test Org" }); + const dataSource = await createDataSource({ + name: "Webhook Single Airtable Source", + autoEnrich: false, + autoImport: true, + recordType: DataSourceRecordType.Members, + config: { + type: DataSourceType.Airtable, + apiKey: credentials.airtable.apiKey, + baseId: credentials.airtable.baseId, + tableId: credentials.airtable.tableId, + }, + columnDefs: [], + columnMetadata: [], + columnRoles: { nameColumns: [] }, + enrichments: [], + geocodingConfig: { type: GeocodingType.None }, + organisationId: org.id, + public: false, + }); + toRemove.push(dataSource.id); + + // This is the path the Google Sheets adaptor enqueues on row-count changes + await refreshWebhooks({ dataSourceId: dataSource.id }); + + const logs = await findWebhookRefreshLogsByDataSourceId(dataSource.id); + expect(logs.length).toBeGreaterThanOrEqual(1); + const log = logs[0]; + expect(log.dataSourceType).toBe(DataSourceType.Airtable); + expect(log.enabled).toBe(true); + expect(log.success).toBe(true); + expect(["created", "recreated", "kept"]).toContain(log.action); + }, 30000); +}); diff --git a/tests/setup.ts b/tests/setup.ts index 466eaf02b..2c26ced3e 100644 --- a/tests/setup.ts +++ b/tests/setup.ts @@ -27,16 +27,25 @@ export async function setup() { // Start a server to handle webhooks (does nothing and returns OK) // Required because some CRMs do not allow the webhook to be created // if it does not return an OK response, which causes some tests to fail. - let server = null; + let server: http.Server | null = null; try { await startPublicTunnel("http"); server = http.createServer((req, res) => { res.writeHead(200, { "Content-Type": "text/plain" }); res.end("OK"); }); - server.listen(3000); + // listen() reports failures (e.g. EADDRINUSE when a dev server is already + // on port 3000) via an async "error" event, which a plain try/catch cannot + // catch — left unhandled it crashes the whole test run. Await it explicitly + // so a busy port degrades gracefully instead of aborting collection. + const webhookServer = server; + await new Promise((resolve, reject) => { + webhookServer.once("error", reject); + webhookServer.listen(3000, resolve); + }); } catch (error) { logger.warn("Could not start public tunnel", { error }); + server = null; } return async () => { diff --git a/tests/unit/server/adaptors/airtable.test.ts b/tests/unit/server/adaptors/airtable.test.ts index 568a17e8b..fe48ad9da 100644 --- a/tests/unit/server/adaptors/airtable.test.ts +++ b/tests/unit/server/adaptors/airtable.test.ts @@ -184,18 +184,41 @@ describe("Airtable adaptor tests", () => { credentials.airtable.baseId, credentials.airtable.tableId, ); - await adaptor.toggleWebhook(true); + + // Start from a clean slate so the reported lifecycle is deterministic + await adaptor.toggleWebhook(false); + + // Enabling with no existing webhook creates one and reports the new id + const created = await adaptor.toggleWebhook(true); + expect(created.action).toBe("created"); + expect(created.oldWebhookIds).toEqual([]); + expect(created.newWebhookIds).toHaveLength(1); let result = await adaptor.listWebhooks(await getPublicUrl()); expect(Array.isArray(result)).toBe(true); expect(result.length).toBeGreaterThan(0); - await adaptor.toggleWebhook(false); + // Enabling again keeps the freshly-created webhook (well over 2 days valid) + const kept = await adaptor.toggleWebhook(true); + expect(kept.action).toBe("kept"); + expect(kept.newWebhookIds).toEqual(created.newWebhookIds); + + // Disabling removes it and reports the removed id as an old id + const removed = await adaptor.toggleWebhook(false); + expect(removed.action).toBe("removed"); + expect(removed.oldWebhookIds).toEqual(created.newWebhookIds); + expect(removed.newWebhookIds).toEqual([]); result = await adaptor.listWebhooks(await getPublicUrl()); expect(Array.isArray(result)).toBe(true); expect(result.length).toBe(0); - }); + + // Disabling again when nothing is left is a no-op, not a removal + const noop = await adaptor.toggleWebhook(false); + expect(noop.action).toBe("noop"); + expect(noop.oldWebhookIds).toEqual([]); + expect(noop.newWebhookIds).toEqual([]); + }, 30000); test("updateRecords updates a record", async () => { const adaptor = new AirtableAdaptor(