Skip to content

fix(plugin): queue notification writes outside request path#2478

Open
riderx wants to merge 17 commits into
mainfrom
codex/plugin-notification-kv-queue
Open

fix(plugin): queue notification writes outside request path#2478
riderx wants to merge 17 commits into
mainfrom
codex/plugin-notification-kv-queue

Conversation

@riderx

@riderx riderx commented Jun 10, 2026

Copy link
Copy Markdown
Member

Summary (AI generated)

  • Queue plugin-triggered notification emails into the existing plugin KV namespace instead of reading/writing notification state during plugin requests.
  • Add a one-minute plugin Worker cron that transfers queued KV notification items to a protected API trigger.
  • Add the plugin_notifications trigger to process transferred items through the existing notification helpers outside the hot plugin path.
  • Keep stats fallbacks, notification direct writes, old channel_self PostgreSQL fallback, and primary DB fallback blocked for plugin requests.

Motivation (AI generated)

Plugin endpoints are hot public device endpoints and should not read or write the primary Supabase database for notification side effects. Queuing notification work in KV lets plugin requests stay replica/KV/Analytics-only while preserving the email workflow through a controlled background trigger.

Business Impact (AI generated)

This reduces primary database pressure and tail latency risk on /updates, /stats, and /channel_self, while keeping billing/plugin/channel-self alert emails flowing through a delayed background path. It also makes missing Worker bindings fail closed instead of silently writing to Supabase from plugin traffic.

Test Plan (AI generated)

  • bunx vitest run tests/plugin-supabase-write-guard.unit.test.ts
  • bun typecheck
  • bun lint:backend
  • bun lint (passes with existing src/services/compatibilityEvents.ts warning)
  • bunx wrangler deploy --dry-run --config cloudflare_workers/plugin/wrangler.jsonc --env prod_eu
  • bunx wrangler deploy --dry-run --config cloudflare_workers/api/wrangler.jsonc --env prod

Generated with AI

Summary by CodeRabbit

  • New Features

    • KV-backed plugin notification queue, new trigger endpoint for processing batches, and a scheduled flush for reliable delivery.
  • Infrastructure

    • Request-level guards to skip or queue legacy writes, optional read-replica enforcement and local read-replica env support, plus cron-driven flush scheduling.
  • Bug Fixes

    • Prevented unintended Postgres fallback for channel-self flows; safer rate-limit recording; stats handler now consistently uses replica routing.
  • Tests

    • Expanded Cloudflare-targeted and unit tests for write-guard policies, queueing, trigger handling, and scheduling.

@coderabbitai

coderabbitai Bot commented Jun 10, 2026

Copy link
Copy Markdown
Contributor

Review Change Stack

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

Adds a KV-backed plugin notification queue, a trigger POST to process queued items, a scheduled worker flush invoking the trigger, context flags and predicates to gate Supabase fallback writes and require read-replicas, local read-replica env routing, Cloudflare middleware wiring, and aligned unit/integration tests with Cloudflare gating.

Changes

Plugin notification queueing system

Layer / File(s) Summary
Guard context types and DB routing foundation
supabase/functions/_backend/utils/hono.ts, supabase/functions/_backend/utils/cloudflare.ts, supabase/functions/_backend/utils/supabase_write_guard.ts, supabase/functions/_backend/utils/pg.ts, scripts/start-cloudflare-workers.sh
Adds five context guard flags and predicate/log helpers; exposes LOCAL_READ_REPLICA_SUPABASE_DB_URL, adds local-worker read-replica selection and enforcement, and propagates the env var to runtime.
Queue data model and operations
supabase/functions/_backend/utils/plugin_notification_queue.ts
Defines queue item schemas for org and org_members, deterministic SHA-256 keys, short in-memory cache, enqueue wrappers with 7-day TTL, and parsing/validation of queued items.
Queue flush implementation
supabase/functions/_backend/utils/plugin_notification_flush.ts
Implements KV listing with cursor, per-item processing locks, batched POST to trigger endpoint with API secret, atomic deletion of transferred items, failure counting, and exported flush routine returning aggregated counters.
Trigger endpoint: validation & dispatch
supabase/functions/_backend/triggers/plugin_notifications.ts, supabase/functions/triggers/index.ts
Adds POST / handler that validates items and batch size, filters invalid items, processes valid items sequentially using a single pg/drizzle client, tallies processed/failed/invalid, throws on partial failure, and registers the route in triggers routing.
Notification helpers integration
supabase/functions/_backend/utils/notifications.ts, supabase/functions/_backend/utils/org_email_notifications.ts
Adds early-return guards to skip Supabase notification writes, optional injected write-client handling with proper cleanup, and pre-cache routing that enqueues notifications into plugin KV when configured.
Stats fallback gating & channel_self changes
supabase/functions/_backend/utils/stats.ts, supabase/functions/_backend/plugins/channel_self.ts, supabase/functions/_backend/plugins/stats.ts
Applies write-skip guards across stats write paths, removes per-batch custom_id replica selection (always uses replica), adds channel_self early-return guards, safe awaited IP-rate recording, and replica-aware error responses when fallback is disallowed.
Cloudflare worker middleware, scheduled flush, and config
cloudflare_workers/plugin/index.ts, cloudflare_workers/api/index.ts, cloudflare_workers/*/wrangler.jsonc , cloudflare_workers/plugin/wrangler.jsonc
Adds plugin middleware that sets request guard flags, wires scheduled POST /flush-plugin-notifications and export.scheduled to invoke flush, registers /plugin_notifications trigger route, and adds cron triggers and local KV binding updates across wrangler configs.
Tests: trigger, guard, and Cloudflare gating
tests/*
Adds unit tests for the trigger endpoint, plugin write-guard behaviors, KV queue semantics, and many test suites now gated by USE_CLOUDFLARE_WORKERS with Cloudflare-specific assertions ensuring no primary devices/stats writes under plugin policy.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

  • Cap-go/capgo#2266: Modifies channel_self.ts to route legacy “channel self” reads through a read-replica Postgres connection (related read-replica changes).
  • Cap-go/capgo#2275: Related channel_self routing and KV/replica fallback work touching the same code paths.
  • Cap-go/capgo#2467: Adjusts channel self store vs Postgres fallback logic affecting similar override/fallback behavior.
🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 4.48% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title 'fix(plugin): queue notification writes outside request path' accurately summarizes the main change: moving notification processing from the hot plugin request path to a background queue.
Description check ✅ Passed The PR description includes a comprehensive summary, motivation, business impact, and test plan that covers all key aspects of the change. The description aligns well with the template structure.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.


Comment @coderabbitai help to get the list of available commands and usage tips.

@codspeed-hq

codspeed-hq Bot commented Jun 10, 2026

Copy link
Copy Markdown
Contributor

Merging this PR will not alter performance

✅ 43 untouched benchmarks
⏩ 2 skipped benchmarks1


Comparing codex/plugin-notification-kv-queue (8097301) with main (1d9a54c)

Open in CodSpeed

Footnotes

  1. 2 benchmarks were skipped, so the baseline results were used instead. If they were deleted from the codebase, click here and archive them to remove them from the performance reports.

@riderx riderx force-pushed the codex/plugin-notification-kv-queue branch from e08c0e3 to 492da16 Compare June 10, 2026 02:22
@riderx riderx force-pushed the codex/plugin-notification-kv-queue branch from 492da16 to 2f3da4c Compare June 10, 2026 02:35
@riderx riderx force-pushed the codex/plugin-notification-kv-queue branch from 2f3da4c to 51eac6b Compare June 10, 2026 02:45
@riderx riderx force-pushed the codex/plugin-notification-kv-queue branch from 51eac6b to c1378ed Compare June 10, 2026 03:02
@riderx riderx force-pushed the codex/plugin-notification-kv-queue branch from c1378ed to e0d84d8 Compare June 10, 2026 17:24
@riderx riderx marked this pull request as ready for review June 10, 2026 17:56
@coderabbitai coderabbitai Bot added the codex label Jun 10, 2026

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: e0d84d859b

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment thread supabase/functions/_backend/utils/plugin_notification_flush.ts

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 9

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@cloudflare_workers/api/index.ts`:
- Line 65: The imported alias plugin_notifications should be renamed to
camelCase (e.g., pluginNotifications) and all its references updated: change the
import statement importing app from
'../../supabase/functions/_backend/triggers/plugin_notifications.ts' to use
pluginNotifications, and update where it is wired into appTriggers (the
registration that still uses plugin_notifications and the route
'/plugin_notifications') to reference pluginNotifications while leaving the
route path string unchanged.

In `@cloudflare_workers/plugin/index.ts`:
- Around line 41-54: Scheduled flushes can overlap and cause duplicate
deliveries because flushQueuedPluginNotifications deletes KV only after
postPluginNotificationBatch succeeds; add a single-flight guard or server-side
dedupe. Wrap the scheduled handler invocation of flushQueuedPluginNotifications
(in scheduled() where you call scheduledApp.fetch) with a lock: obtain a
short-lived lock (preferably via a Durable Object or by setting a KV lock key
with a TTL and failing fast if it exists) before running
flushQueuedPluginNotifications, and release it after completion; alternatively
implement idempotent delivery inside
flushQueuedPluginNotifications/postPluginNotificationBatch by writing a
processed-set entry per queue key/uniqId (skip if already present) before
sending. Ensure the lock key and the processed-set use unique names and short
TTLs to avoid deadlocks and reference flushQueuedPluginNotifications,
postPluginNotificationBatch, scheduledApp and the scheduled() handler when
making changes.

In `@supabase/functions/_backend/plugins/channel_self.ts`:
- Around line 615-618: The early return inside the branch checking
canUseReadReplica && shouldSkipChannelSelfPostgresFallback(c) skips the same
device/IP rate-limit counter updates that run in the finally block of
runChannelSelfWithPgClient; move or duplicate the counter-update logic to
execute before calling logSkippedSupabaseWrite and simpleError200 so counters
are incremented for this path too. Concretely, call the same helper that updates
device/IP counters (or extract that logic from runChannelSelfWithPgClient's
finally into a shared function) and invoke it here prior to returning, while
keeping logSkippedSupabaseWrite and simpleError200 as-is.

In `@supabase/functions/_backend/utils/notifications.ts`:
- Around line 275-276: The fallback branch creates a new PG client via
getPgClient(c) wrapped by createDrizzleClient but never closes it, leaking
connections; update the code around effectiveWriteClient and the call to
insertNotificationClaim so that when writeClient is undefined you create the
client, call insertNotificationClaim inside a try block, and always close the
internally-created PG client in finally (i.e., close the pool returned by
getPgClient or call the appropriate teardown on the created drizzle client);
apply the same try/finally pattern to the second occurrence around the other
effectiveWriteClient/createDrizzleClient use at the later site.

In `@supabase/functions/_backend/utils/plugin_notification_flush.ts`:
- Around line 54-127: SonarCloud flags flushQueuedPluginNotifications for high
cognitive complexity; extract the inner batch processing (the block that handles
items array: calling postPluginNotificationBatch, deleting keys on success,
incrementing transferred/failed counts, and catching/logging errors) into a new
helper function processBatch(c: Context, store: KVNamespace, items:
PluginNotificationQueueItem[], itemKeys: string[]): Promise<{
transferred:number; deleted:number; failed:number }>, and call it from the loop
in flushQueuedPluginNotifications; keep existing calls to
parsePluginNotificationQueueItem, postPluginNotificationBatch and
cloudlogErr/cloudlog but move the try/catch and counter updates into
processBatch so the main loop only handles listing, reading, parsing, pushing
items/itemKeys, and aggregating results from processBatch.

In `@supabase/functions/_backend/utils/plugin_notification_queue.ts`:
- Around line 90-110: queuePluginOrgMembersNotification currently takes 8 scalar
parameters (c, eventName, preferenceKey, eventData, orgId, uniqId, cron,
audience), which exceeds the recommended parameter count; refactor its signature
to accept an options object as the second parameter (e.g., options: { eventName,
preferenceKey, eventData, orgId, uniqId, cron, audience }) and update the
function body to read those properties (and pass them into
enqueuePluginNotification either by spreading options or by explicit fields) to
reduce parameter count; note you must also update all call sites to construct
and pass the new options object or defer this change if broader refactoring
across notification helpers is not feasible now.
- Around line 60-63: Remove the unnecessary type assertion on payload: when
constructing const payload = { ...item, enqueuedAt: new Date().toISOString() },
drop the trailing "as PluginNotificationQueueItem" since TypeScript can infer
the discriminated union from item (which already contains type: 'org' |
'org_members') and the added enqueuedAt field; update the payload declaration to
use the plain object and ensure the compiler still type-checks in the
surrounding code that uses payload (references: payload, item, enqueuedAt,
PluginNotificationQueueItem).

In `@supabase/functions/_backend/utils/supabase_write_guard.ts`:
- Around line 15-22: The getBooleanContextFlag function uses an explicit type
assertion on the Context parameter which is unnecessary; remove the long cast
"(c as Context & { get: (key: string) => unknown })" and call c.get(...)
directly (or cast the key to any) inside the existing try/catch so the runtime
behavior stays the same and TypeScript is satisfied; update the function body to
use c.get(key) === true while keeping the catch branch returning false.

In `@tests/stats-download.test.ts`:
- Line 12: The comment above the USE_CLOUDFLARE flag in
tests/stats-download.test.ts is inaccurate; replace or remove the line
referencing "serialize paths for consistency" so it correctly states that
USE_CLOUDFLARE is used to conditionally skip tests that expect primary database
writes (or simply note it toggles Cloudflare-worker behavior), and ensure the
comment directly references the USE_CLOUDFLARE flag so future readers understand
its purpose when reading the test file.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: d1a6fe45-caac-4a5a-a4ab-8653bb41252c

📥 Commits

Reviewing files that changed from the base of the PR and between 534b0e2 and e0d84d8.

📒 Files selected for processing (22)
  • cloudflare_workers/api/index.ts
  • cloudflare_workers/plugin/index.ts
  • cloudflare_workers/plugin/wrangler.jsonc
  • scripts/start-cloudflare-workers.sh
  • supabase/functions/_backend/plugins/channel_self.ts
  • supabase/functions/_backend/plugins/stats.ts
  • supabase/functions/_backend/triggers/plugin_notifications.ts
  • supabase/functions/_backend/utils/cloudflare.ts
  • supabase/functions/_backend/utils/hono.ts
  • supabase/functions/_backend/utils/notifications.ts
  • supabase/functions/_backend/utils/org_email_notifications.ts
  • supabase/functions/_backend/utils/pg.ts
  • supabase/functions/_backend/utils/plugin_notification_flush.ts
  • supabase/functions/_backend/utils/plugin_notification_queue.ts
  • supabase/functions/_backend/utils/stats.ts
  • supabase/functions/_backend/utils/supabase_write_guard.ts
  • tests/channel_self.test.ts
  • tests/key_id_e2e.test.ts
  • tests/plugin-supabase-write-guard.unit.test.ts
  • tests/stats-download.test.ts
  • tests/stats.test.ts
  • tests/updates.test.ts
🔗 Linked repositories identified

CodeRabbit considers these linked repositories for cross-repo context during reviews:

  • Cap-go/capacitor-updater (manual)

Comment thread cloudflare_workers/api/index.ts Outdated
Comment thread cloudflare_workers/plugin/index.ts Outdated
Comment thread supabase/functions/_backend/plugins/channel_self.ts
Comment thread supabase/functions/_backend/utils/notifications.ts Outdated
Comment thread supabase/functions/_backend/utils/plugin_notification_flush.ts
Comment thread supabase/functions/_backend/utils/plugin_notification_queue.ts Outdated
Comment thread supabase/functions/_backend/utils/plugin_notification_queue.ts
Comment thread supabase/functions/_backend/utils/supabase_write_guard.ts
Comment thread tests/stats-download.test.ts Outdated

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 6a6902d91a

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment thread cloudflare_workers/plugin/wrangler.jsonc Outdated

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 726d83be83

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment thread supabase/functions/_backend/triggers/plugin_notifications.ts

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 3314d7eeed

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment thread cloudflare_workers/plugin/wrangler.jsonc Outdated

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 1e1cfdb466

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment thread supabase/functions/_backend/triggers/plugin_notifications.ts Outdated

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (1)
supabase/functions/_backend/utils/org_email_notifications.ts (1)

593-719: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Close the locally-created write client in sendNotifToOrgMembersOnce.

Line 593 creates a new PG client for this helper, but the function never closes it. Because this path has many early returns, every normal invocation can leak the caller-owned pool/connection unless you mirror the try/finally cleanup pattern already added in notifications.ts.

Suggested fix
-import { getDrizzleClient, getPgClient, logPgError } from './pg.ts'
+import { closeClient, getDrizzleClient, getPgClient, logPgError } from './pg.ts'
-  const writeClient = getDrizzleClient(getPgClient(c))
-  const alreadySentForOrg = await hasNotifOrgClaim(c, writeClient, eventName, orgId, uniqId)
-  if (alreadySentForOrg === null) {
-    cloudlog({
-      requestId: c.get('requestId'),
-      message: 'sendNotifToOrgMembersOnce: org claim lookup failed',
-      eventName,
-      preferenceKey,
-      orgId,
-      uniqId,
-    })
-    return false
-  }
-  if (alreadySentForOrg) {
-    cloudlog({
-      requestId: c.get('requestId'),
-      message: 'sendNotifToOrgMembersOnce: org already claimed',
-      eventName,
-      preferenceKey,
-      orgId,
-      uniqId,
-    })
-    return false
-  }
-
-  const { recipients, resolutionFailed } = await getPreparedEligibleEmailTargets(c, orgId, preferenceKey, writeClient, audience)
-  if (!recipients) {
-    cloudlog({ requestId: c.get('requestId'), message: 'sendNotifToOrgMembersOnce: org not found', orgId })
-    return false
-  }
-  if (resolutionFailed) {
-    cloudlog({
-      requestId: c.get('requestId'),
-      message: 'sendNotifToOrgMembersOnce: recipient resolution failed',
-      eventName,
-      preferenceKey,
-      orgId,
-    })
-    return false
-  }
-
-  const { managementEmail, allEmails, primaryEmail, additionalEmails } = recipients
-  if (allEmails.length === 0 || !primaryEmail) {
-    cloudlog({
-      requestId: c.get('requestId'),
-      message: 'sendNotifToOrgMembersOnce: no eligible recipients',
-      eventName,
-      preferenceKey,
-      orgId,
-    })
-    return false
-  }
-
-  const recipientEmails = [primaryEmail, ...additionalEmails]
-  const recipientEntries: { email: string, recipientUniqId: string, wasAlreadyClaimedBeforeRun: boolean }[] = []
-  for (const email of recipientEmails) {
-    const recipientUniqId = await buildOneTimeRecipientNotifUniqId(uniqId, email)
-    const wasAlreadyClaimedBeforeRun = await hasNotifOrgClaim(c, writeClient, eventName, orgId, recipientUniqId)
-    if (wasAlreadyClaimedBeforeRun === null) {
-      cloudlog({
-        requestId: c.get('requestId'),
-        message: 'sendNotifToOrgMembersOnce: recipient claim lookup failed',
-        eventName,
-        preferenceKey,
-        orgId,
-        recipientUniqId,
-      })
-      return false
-    }
-    recipientEntries.push({ email, recipientUniqId, wasAlreadyClaimedBeforeRun })
-  }
-
-  const sendResults: { cleanupFailed: boolean, email: string, recipientUniqId: string, sent: boolean, wasAlreadyClaimedBeforeRun: boolean }[] = []
-  for (const recipient of recipientEntries) {
-    if (recipient.wasAlreadyClaimedBeforeRun) {
-      sendResults.push({ ...recipient, sent: false, cleanupFailed: false })
-      continue
-    }
-
-    const sendResult = await sendNotifOrgOnce(c, eventName, eventData, orgId, recipient.recipientUniqId, recipient.email, drizzleClient, writeClient)
-    sendResults.push({ ...recipient, ...sendResult })
-  }
-
-  const sentEmails = sendResults
-    .filter(result => result.sent)
-    .map(result => result.email)
-  const cleanupFailedEmails = sendResults
-    .filter(result => result.cleanupFailed)
-    .map(result => result.email)
-  const alreadyClaimedBeforeRunEmails = sendResults
-    .filter(result => result.wasAlreadyClaimedBeforeRun)
-    .map(result => result.email)
-
-  if (cleanupFailedEmails.length > 0) {
-    cloudlog({
-      requestId: c.get('requestId'),
-      message: 'sendNotifToOrgMembersOnce: recipient cleanup failed',
-      eventName,
-      preferenceKey,
-      orgId,
-      cleanupFailedRecipients: cleanupFailedEmails,
-    })
-    return false
-  }
-
-  const unresolvedResults = sendResults.filter(result => !result.sent && !result.wasAlreadyClaimedBeforeRun)
-  if (unresolvedResults.length > 0)
-    return false
-
-  const firstOrgSend = await claimNotifOrgOnce(c, eventName, orgId, uniqId, writeClient)
-
-  cloudlog({
-    requestId: c.get('requestId'),
-    message: 'sendNotifToOrgMembersOnce: delivered',
-    eventName,
-    preferenceKey,
-    orgId,
-    primaryEmail,
-    additionalRecipients: additionalEmails.length,
-    deliveredRecipients: sentEmails.length,
-    alreadyClaimedRecipients: alreadyClaimedBeforeRunEmails.length,
-    firstOrgSend,
-    managementEmailIncluded: !!managementEmail,
-  })
-
-  return firstOrgSend
+  const ownedPgClient = getPgClient(c)
+  const writeClient = getDrizzleClient(ownedPgClient)
+  try {
+    const alreadySentForOrg = await hasNotifOrgClaim(c, writeClient, eventName, orgId, uniqId)
+    if (alreadySentForOrg === null) {
+      cloudlog({
+        requestId: c.get('requestId'),
+        message: 'sendNotifToOrgMembersOnce: org claim lookup failed',
+        eventName,
+        preferenceKey,
+        orgId,
+        uniqId,
+      })
+      return false
+    }
+    if (alreadySentForOrg) {
+      cloudlog({
+        requestId: c.get('requestId'),
+        message: 'sendNotifToOrgMembersOnce: org already claimed',
+        eventName,
+        preferenceKey,
+        orgId,
+        uniqId,
+      })
+      return false
+    }
+
+    const { recipients, resolutionFailed } = await getPreparedEligibleEmailTargets(c, orgId, preferenceKey, writeClient, audience)
+    if (!recipients) {
+      cloudlog({ requestId: c.get('requestId'), message: 'sendNotifToOrgMembersOnce: org not found', orgId })
+      return false
+    }
+    if (resolutionFailed) {
+      cloudlog({
+        requestId: c.get('requestId'),
+        message: 'sendNotifToOrgMembersOnce: recipient resolution failed',
+        eventName,
+        preferenceKey,
+        orgId,
+      })
+      return false
+    }
+
+    const { managementEmail, allEmails, primaryEmail, additionalEmails } = recipients
+    if (allEmails.length === 0 || !primaryEmail) {
+      cloudlog({
+        requestId: c.get('requestId'),
+        message: 'sendNotifToOrgMembersOnce: no eligible recipients',
+        eventName,
+        preferenceKey,
+        orgId,
+      })
+      return false
+    }
+
+    const recipientEmails = [primaryEmail, ...additionalEmails]
+    const recipientEntries: { email: string, recipientUniqId: string, wasAlreadyClaimedBeforeRun: boolean }[] = []
+    for (const email of recipientEmails) {
+      const recipientUniqId = await buildOneTimeRecipientNotifUniqId(uniqId, email)
+      const wasAlreadyClaimedBeforeRun = await hasNotifOrgClaim(c, writeClient, eventName, orgId, recipientUniqId)
+      if (wasAlreadyClaimedBeforeRun === null) {
+        cloudlog({
+          requestId: c.get('requestId'),
+          message: 'sendNotifToOrgMembersOnce: recipient claim lookup failed',
+          eventName,
+          preferenceKey,
+          orgId,
+          recipientUniqId,
+        })
+        return false
+      }
+      recipientEntries.push({ email, recipientUniqId, wasAlreadyClaimedBeforeRun })
+    }
+
+    const sendResults: { cleanupFailed: boolean, email: string, recipientUniqId: string, sent: boolean, wasAlreadyClaimedBeforeRun: boolean }[] = []
+    for (const recipient of recipientEntries) {
+      if (recipient.wasAlreadyClaimedBeforeRun) {
+        sendResults.push({ ...recipient, sent: false, cleanupFailed: false })
+        continue
+      }
+
+      const sendResult = await sendNotifOrgOnce(c, eventName, eventData, orgId, recipient.recipientUniqId, recipient.email, drizzleClient, writeClient)
+      sendResults.push({ ...recipient, ...sendResult })
+    }
+
+    const sentEmails = sendResults
+      .filter(result => result.sent)
+      .map(result => result.email)
+    const cleanupFailedEmails = sendResults
+      .filter(result => result.cleanupFailed)
+      .map(result => result.email)
+    const alreadyClaimedBeforeRunEmails = sendResults
+      .filter(result => result.wasAlreadyClaimedBeforeRun)
+      .map(result => result.email)
+
+    if (cleanupFailedEmails.length > 0) {
+      cloudlog({
+        requestId: c.get('requestId'),
+        message: 'sendNotifToOrgMembersOnce: recipient cleanup failed',
+        eventName,
+        preferenceKey,
+        orgId,
+        cleanupFailedRecipients: cleanupFailedEmails,
+      })
+      return false
+    }
+
+    const unresolvedResults = sendResults.filter(result => !result.sent && !result.wasAlreadyClaimedBeforeRun)
+    if (unresolvedResults.length > 0)
+      return false
+
+    const firstOrgSend = await claimNotifOrgOnce(c, eventName, orgId, uniqId, writeClient)
+
+    cloudlog({
+      requestId: c.get('requestId'),
+      message: 'sendNotifToOrgMembersOnce: delivered',
+      eventName,
+      preferenceKey,
+      orgId,
+      primaryEmail,
+      additionalRecipients: additionalEmails.length,
+      deliveredRecipients: sentEmails.length,
+      alreadyClaimedRecipients: alreadyClaimedBeforeRunEmails.length,
+      firstOrgSend,
+      managementEmailIncluded: !!managementEmail,
+    })
+
+    return firstOrgSend
+  }
+  finally {
+    await closeClient(c, ownedPgClient)
+  }
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@supabase/functions/_backend/utils/org_email_notifications.ts` around lines
593 - 719, sendNotifToOrgMembersOnce creates a local PG client via
getDrizzleClient(getPgClient(c)) stored in writeClient but never closes it on
early returns; wrap the body of sendNotifToOrgMembersOnce in a try/finally (or
ensure a finally) that calls writeClient.release() / writeClient.end() (matching
how other helpers in notifications.ts clean up the client) so the client is
always closed after use, and keep all existing early-return checks
(hasNotifOrgClaim, getPreparedEligibleEmailTargets, recipient loop, cleanup
failures, etc.) inside the try block; reference functions/variables:
sendNotifToOrgMembersOnce, getDrizzleClient, getPgClient, writeClient, and
claimNotifOrgOnce.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@cloudflare_workers/api/index.ts`:
- Around line 191-194: The scheduled handler currently calls appScheduled.fetch
inside ctx.waitUntil without inspecting the returned Response; update the
scheduled(_controller, env, ctx) flow to await or then-check the fetch Response
from appScheduled.fetch and inspect response.ok and the JSON payload for the
flush result (the endpoint `/flush-plugin-notifications` returns a body that
includes the result of flushQueuedPluginNotifications with a failed count). If
response.ok is false or the parsed JSON indicates failed > 0 (or an explicit
failure status), log/error or throw so the failure is surfaced (and ensure
ctx.waitUntil receives a rejected promise on failure), referencing the scheduled
function and appScheduled.fetch and the flushQueuedPluginNotifications result
payload to find the relevant code to change.

In `@tests/plugin-supabase-write-guard.unit.test.ts`:
- Around line 55-58: The helper readJsoncConfig uses a regex that only strips
full-line // comments and will fail if block comments (/* */) or inline trailing
// are introduced; update readJsoncConfig to use a robust JSONC parser (e.g.,
import and use jsonc-parser's parse or use `@types/jsonc`) to parse the file
contents, or at minimum improve the stripping logic to remove /*...*/ and inline
// comments before JSON.parse; locate the function readJsoncConfig in the test
file and replace the current raw.replace+JSON.parse approach with a
jsonc-parser-based parse call (or an enhanced regex strip) so future comments in
wrangler.jsonc are handled safely.

---

Duplicate comments:
In `@supabase/functions/_backend/utils/org_email_notifications.ts`:
- Around line 593-719: sendNotifToOrgMembersOnce creates a local PG client via
getDrizzleClient(getPgClient(c)) stored in writeClient but never closes it on
early returns; wrap the body of sendNotifToOrgMembersOnce in a try/finally (or
ensure a finally) that calls writeClient.release() / writeClient.end() (matching
how other helpers in notifications.ts clean up the client) so the client is
always closed after use, and keep all existing early-return checks
(hasNotifOrgClaim, getPreparedEligibleEmailTargets, recipient loop, cleanup
failures, etc.) inside the try block; reference functions/variables:
sendNotifToOrgMembersOnce, getDrizzleClient, getPgClient, writeClient, and
claimNotifOrgOnce.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: dfa7d8bb-9280-48cb-bb5a-44035a79ab62

📥 Commits

Reviewing files that changed from the base of the PR and between 3314d7e and 1e1cfdb.

📒 Files selected for processing (7)
  • cloudflare_workers/api/index.ts
  • cloudflare_workers/api/wrangler.jsonc
  • cloudflare_workers/plugin/index.ts
  • cloudflare_workers/plugin/wrangler.jsonc
  • supabase/functions/_backend/utils/notifications.ts
  • supabase/functions/_backend/utils/org_email_notifications.ts
  • tests/plugin-supabase-write-guard.unit.test.ts
🔗 Linked repositories identified

CodeRabbit considers these linked repositories for cross-repo context during reviews:

  • Cap-go/capacitor-updater (manual)
💤 Files with no reviewable changes (1)
  • cloudflare_workers/plugin/wrangler.jsonc

Comment thread cloudflare_workers/api/index.ts
Comment thread tests/plugin-supabase-write-guard.unit.test.ts
Comment thread supabase/functions/_backend/utils/notifications.ts Fixed

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 5ab7ecda18

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment thread supabase/functions/_backend/utils/notifications.ts

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: bb6263fa05

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment thread supabase/functions/_backend/utils/plugin_notification_flush.ts Outdated

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: a9ebe16d86

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment thread supabase/functions/_backend/triggers/plugin_notifications.ts Outdated
@sonarqubecloud

Copy link
Copy Markdown

@jinhongliang991013 jinhongliang991013 left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed the current queue/trigger path after the previous reliability fixes. One remaining delivery gap affects org-member notifications with more than one eligible recipient.

if (item.type === 'org')
return await sendNotifOrg(c, item.eventName, item.eventData, item.orgId, item.uniqId, item.cron, item.managementEmail, drizzleClient)

return await sendNotifToOrgMembers(c, item.eventName, item.preferenceKey, item.eventData, item.orgId, item.uniqId, item.cron, drizzleClient, item.audience)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do not acknowledge this queued item until every intended recipient has a durable delivery result. sendNotifToOrgMembers awaits Bento only for primaryEmail; each additionalEmail is launched through backgroundTask(c, trackBentoEvent(...)) and the helper immediately returns true. This trigger converts that true to delivered, so the flusher deletes the KV item. If Bento returns false for any additional admin/billing recipient, that result exists only inside waitUntil and is never observed or retried; the queue still reports success and that recipient permanently misses the email. The queued path needs awaited/per-recipient delivery tracking (and idempotency) before returning delivered, rather than reusing the helper's fire-and-forget contract.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants