Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions migrations/1781568000000_webhook_refresh_log.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/* eslint-disable @typescript-eslint/no-explicit-any */
import { sql } from "kysely";
import type { Kysely } from "kysely";

export async function up(db: Kysely<any>): Promise<void> {
await db.schema
.createTable("webhookRefreshLog")
.addColumn("id", "uuid", (col) =>
col.primaryKey().defaultTo(sql`gen_random_uuid()`),
)
.addColumn("runId", "uuid", (col) => col.notNull())
.addColumn("dataSourceId", "uuid", (col) =>
col.notNull().references("dataSource.id").onDelete("cascade"),
)
.addColumn("dataSourceType", "text", (col) => col.notNull())
.addColumn("enabled", "boolean", (col) => col.notNull())
.addColumn("success", "boolean", (col) => col.notNull())
.addColumn("action", "text", (col) => col.notNull())
.addColumn("oldWebhookIds", "jsonb", (col) =>
col.notNull().defaultTo(sql`'[]'::jsonb`),
)
.addColumn("newWebhookIds", "jsonb", (col) =>
col.notNull().defaultTo(sql`'[]'::jsonb`),
)
.addColumn("details", "jsonb")
.addColumn("error", "text")
.addColumn("createdAt", "text", (col) =>
col.defaultTo(sql`CURRENT_TIMESTAMP`).notNull(),
)
.execute();

await db.schema
.createIndex("webhookRefreshLogDataSourceIdCreatedAtIndex")
.on("webhookRefreshLog")
.columns(["dataSourceId", "createdAt"])
.execute();
}

export async function down(db: Kysely<any>): Promise<void> {
await db.schema.dropTable("webhookRefreshLog").execute();
}
28 changes: 28 additions & 0 deletions src/models/WebhookRefreshLog.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import { z } from "zod";

export const webhookRefreshActionSchema = z.enum([
"created",
"recreated",
"kept",
"removed",
"noop",
"failed",
]);

export const webhookRefreshLogSchema = z.object({
id: z.string(),
runId: z.string(),
dataSourceId: z.string(),
dataSourceType: z.string(),
enabled: z.boolean(),
success: z.boolean(),
action: webhookRefreshActionSchema,
oldWebhookIds: z.array(z.string()),
newWebhookIds: z.array(z.string()),
details: z.record(z.string(), z.unknown()).nullish(),
error: z.string().nullish(),
createdAt: z.string(),
});

export type WebhookRefreshAction = z.infer<typeof webhookRefreshActionSchema>;
export type WebhookRefreshLog = z.infer<typeof webhookRefreshLogSchema>;
9 changes: 8 additions & 1 deletion src/server/adaptors/abstract.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
import type { EnrichedRecord } from "@/models/DataRecord";
import type { ExternalRecord, TaggedRecord } from "@/types";

export interface WebhookToggleResult {
action: "created" | "recreated" | "kept" | "removed" | "noop";
oldWebhookIds: string[];
newWebhookIds: string[];
details?: Record<string, unknown>;
}

export interface DataSourceAdaptor {
extractExternalRecordIdsFromWebhookBody(
body: unknown,
Expand All @@ -10,7 +17,7 @@ export interface DataSourceAdaptor {
fetchFirst(): Promise<ExternalRecord | null>;
fetchByExternalId(externalIds: string[]): Promise<ExternalRecord[]>;
removeDevWebhooks(): Promise<void>;
toggleWebhook(enable: boolean): Promise<void>;
toggleWebhook(enable: boolean): Promise<WebhookToggleResult>;
updateRecords(enrichedRecords: EnrichedRecord[]): Promise<void>;
tagRecords(records: TaggedRecord[]): Promise<void>;
deleteColumn(columnName: string): Promise<void>;
Expand Down
5 changes: 3 additions & 2 deletions src/server/adaptors/actionnetwork.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import {
ENRICHMENT_COLUMN_PREFIX,
} from "@/constants";
import logger from "@/server/services/logger";
import type { DataSourceAdaptor } from "./abstract";
import type { DataSourceAdaptor, WebhookToggleResult } from "./abstract";
import type { EnrichedRecord } from "@/models/DataRecord";
import type { ExternalRecord, TaggedRecord } from "@/types";

Expand Down Expand Up @@ -277,8 +277,9 @@ export class ActionNetworkAdaptor implements DataSourceAdaptor {
}

// eslint-disable-next-line @typescript-eslint/no-unused-vars
async toggleWebhook(enable: boolean): Promise<void> {
async toggleWebhook(enable: boolean): Promise<WebhookToggleResult> {
logger.debug("Cannot toggle webhooks for Action Network data source");
return { action: "noop", oldWebhookIds: [], newWebhookIds: [] };
}

async updateRecords(enrichedRecords: EnrichedRecord[]): Promise<void> {
Expand Down
30 changes: 26 additions & 4 deletions src/server/adaptors/airtable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import {
import logger from "@/server/services/logger";
import { getPublicUrl } from "@/server/services/urls";
import { batch } from "@/server/utils";
import type { DataSourceAdaptor } from "./abstract";
import type { DataSourceAdaptor, WebhookToggleResult } from "./abstract";
import type { EnrichedRecord } from "@/models/DataRecord";
import type { ExternalRecord, TaggedRecord } from "@/types";

Expand Down Expand Up @@ -411,17 +411,22 @@ export class AirtableAdaptor implements DataSourceAdaptor {
await this.removeWebhooks(webhooks);
}

async toggleWebhook(enable: boolean): Promise<void> {
async toggleWebhook(enable: boolean): Promise<WebhookToggleResult> {
const publicUrl = await getPublicUrl();
const webhooks = await this.listWebhooks(publicUrl);
const oldWebhookIds = webhooks.map((wh) => wh.id);

// Remove webhooks on user request
if (!enable) {
logger.info(
`Removing Airtable webhooks for data source ${this.dataSourceId}`,
);
await this.removeWebhooks(webhooks);
return;
return {
action: oldWebhookIds.length ? "removed" : "noop",
oldWebhookIds,
newWebhookIds: [],
};
}
Comment on lines 420 to 430

// Skip recreating webhook that has at least 2 days of validity
Expand All @@ -435,7 +440,12 @@ export class AirtableAdaptor implements DataSourceAdaptor {
logger.info(
`Airtable webhook exists for data source ${this.dataSourceId}`,
);
return;
return {
action: "kept",
oldWebhookIds: [],
newWebhookIds: [webhooks[0].id],
details: { expirationTime: webhooks[0].expirationTime.toISOString() },
};
}

// Cleanup expired webhooks
Expand Down Expand Up @@ -478,6 +488,18 @@ export class AirtableAdaptor implements DataSourceAdaptor {
}
throw Error(`Bad webhooks response: ${response.status}, ${responseText}`);
}

const created = (await response.json()) as {
id: string;
expirationTime?: string;
};

return {
action: oldWebhookIds.length ? "recreated" : "created",
oldWebhookIds,
newWebhookIds: created.id ? [created.id] : [],
details: { expirationTime: created.expirationTime },
};
}

async removeWebhooks(webhooks: Webhook[]): Promise<void> {
Expand Down
4 changes: 2 additions & 2 deletions src/server/adaptors/csv.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { ENRICHMENT_COLUMN_PREFIX } from "@/constants";
import logger from "@/server/services/logger";
import { getAbsoluteUrl } from "@/utils/appUrl";
import { getBaseDir } from "../utils";
import type { DataSourceAdaptor } from "./abstract";
import type { DataSourceAdaptor, WebhookToggleResult } from "./abstract";
import type { ExternalRecord } from "@/types";

export function decodeBuffer(buffer: Buffer): Buffer {
Expand Down Expand Up @@ -152,7 +152,7 @@ export class CSVAdaptor implements DataSourceAdaptor {
async toggleWebhook(
// eslint-disable-next-line @typescript-eslint/no-unused-vars
enable: boolean,
): Promise<void> {
): Promise<WebhookToggleResult> {
throw new Error("Unimplemented");
}

Expand Down
30 changes: 27 additions & 3 deletions src/server/adaptors/googlesheets.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import logger from "@/server/services/logger";
import { getPublicUrl } from "@/server/services/urls";
import { batch } from "@/server/utils";
import { enqueue } from "../services/queue";
import type { DataSourceAdaptor } from "./abstract";
import type { DataSourceAdaptor, WebhookToggleResult } from "./abstract";
import type { EnrichedRecord } from "@/models/DataRecord";
import type { googleOAuthCredentialsSchema } from "@/models/DataSource";
import type { ExternalRecord, TaggedRecord } from "@/types";
Expand Down Expand Up @@ -431,7 +431,7 @@ export class GoogleSheetsAdaptor implements DataSourceAdaptor {
}
}

async toggleWebhook(enable: boolean): Promise<void> {
async toggleWebhook(enable: boolean): Promise<WebhookToggleResult> {
try {
const notificationUrl = await getPublicUrl(
`/api/data-sources/${this.dataSourceId}/webhook`,
Expand All @@ -441,13 +441,37 @@ export class GoogleSheetsAdaptor implements DataSourceAdaptor {
const notificationDomain = new URL(notificationUrl).hostname;
const webhookSheetName = `Mapped Webhook: ${notificationDomain}/${this.dataSourceId}`;

const existingSheets = await this.listSheets();
const oldSheet = existingSheets.find(
(sheet) => sheet.properties.title === webhookSheetName,
);
const oldWebhookIds = oldSheet ? [oldSheet.properties.sheetId] : [];

if (!enable) {
await this.deleteSheet(webhookSheetName);
return;
return {
action: oldSheet ? "removed" : "noop",
oldWebhookIds,
newWebhookIds: [],
details: { sheetName: webhookSheetName },
};
}

await this.ensureSheet(webhookSheetName);
await this.prepareWebhookSheet(webhookSheetName, notificationUrl);

const refreshedSheets = await this.listSheets();
const newSheet = refreshedSheets.find(
(sheet) => sheet.properties.title === webhookSheetName,
);
const newSheetId = newSheet?.properties.sheetId;

return {
action: oldSheet ? "kept" : "created",
oldWebhookIds,
newWebhookIds: newSheetId ? [newSheetId] : [],
details: { sheetName: webhookSheetName, sheetId: newSheetId },
};
} catch (e) {
if (e instanceof Error && e.message.includes("PERMISSION_DENIED")) {
throw new Error(
Expand Down
23 changes: 19 additions & 4 deletions src/server/adaptors/mailchimp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import {
import logger from "@/server/services/logger";
import { getPublicUrl } from "@/server/services/urls";
import { batch } from "@/server/utils";
import type { DataSourceAdaptor } from "./abstract";
import type { DataSourceAdaptor, WebhookToggleResult } from "./abstract";
import type { EnrichedRecord } from "@/models/DataRecord";
import type { ExternalRecord, TaggedRecord } from "@/types";

Expand Down Expand Up @@ -415,25 +415,34 @@ export class MailchimpAdaptor implements DataSourceAdaptor {
await this.removeWebhooks(webhooks);
}

async toggleWebhook(enable: boolean): Promise<void> {
async toggleWebhook(enable: boolean): Promise<WebhookToggleResult> {
const publicUrl = await getPublicUrl();
const webhooks = await this.listWebhooks(publicUrl);
const oldWebhookIds = webhooks.map((wh) => wh.id);

// Remove webhooks on user request
if (!enable) {
logger.info(
`Removing Mailchimp webhooks for data source ${this.dataSourceId}`,
);
await this.removeWebhooks(webhooks);
return;
return {
action: oldWebhookIds.length ? "removed" : "noop",
oldWebhookIds,
newWebhookIds: [],
};
}
Comment on lines 423 to 434

// If we already have a webhook, don't create another
if (webhooks.length > 0) {
logger.info(
`Mailchimp webhook already exists for data source ${this.dataSourceId}`,
);
return;
return {
action: "kept",
oldWebhookIds: [],
newWebhookIds: oldWebhookIds,
};
}

const url = `${this.getListUrl()}/webhooks`;
Expand Down Expand Up @@ -477,6 +486,12 @@ export class MailchimpAdaptor implements DataSourceAdaptor {
logger.info(
`Created Mailchimp webhook for data source ${this.dataSourceId}: ${webhook.id}`,
);

return {
action: "created",
oldWebhookIds,
newWebhookIds: webhook.id ? [webhook.id] : [],
};
}

async removeWebhooks(webhooks: Webhook[]): Promise<void> {
Expand Down
5 changes: 3 additions & 2 deletions src/server/adaptors/zetkin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { DataSourceType } from "@/models/DataSource";
import { updateDataSource } from "@/server/repositories/DataSource";
import logger from "@/server/services/logger";
import { slugify } from "@/utils/text";
import type { DataSourceAdaptor } from "./abstract";
import type { DataSourceAdaptor, WebhookToggleResult } from "./abstract";
import type { EnrichedRecord } from "@/models/DataRecord";
import type { ZetkinOAuthCredentials } from "@/models/DataSource";
import type { ExternalRecord, TaggedRecord } from "@/types";
Expand Down Expand Up @@ -235,8 +235,9 @@ export class ZetkinAdaptor implements DataSourceAdaptor {
// Zetkin does not support webhooks
}

async toggleWebhook(): Promise<void> {
async toggleWebhook(): Promise<WebhookToggleResult> {
logger.debug("Cannot toggle webhooks for Zetkin data source");
return { action: "noop", oldWebhookIds: [], newWebhookIds: [] };
}
Comment thread
joaquimds marked this conversation as resolved.

async updateRecords(enrichedRecords: EnrichedRecord[]): Promise<void> {
Expand Down
Loading
Loading