feat: serialize tenant-owned writes against tenant data purges#87
Conversation
Tenant data purges now take an exclusive transaction-scoped advisory lock per tenant while every tenant-owned mutation (feedback records, webhooks, embeddings, taxonomy) holds the same key in shared try-lock mode inside a single repository-level helper. Writes for a tenant under purge fail fast with a retryable 409 (code tenant_write_conflict); the purge waits up to TENANT_PURGE_LOCK_TIMEOUT_SECONDS (default 5s) for in-flight writes to drain and then returns the same retryable conflict. Other tenants are unaffected. ID-based mutations now resolve the row's tenant inside the transaction and mutate with tenant-scoped WHERE clauses; the GDPR delete-by-user locks every spanned tenant; tenant-changing webhook updates lock both tenants. Workers skip or retry cleanly on conflicts. Closes ENG-1013 Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
✱ Stainless preview buildsThis PR will update the ✅ hub-typescript studio · code
This comment is auto-generated by GitHub Actions and is automatically kept up to date as you push. |
WalkthroughThis PR introduces tenant-level write serialization using Postgres transaction-scoped advisory locks to prevent race conditions between tenant data purges and concurrent mutations. A new 🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 6
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
internal/workers/webhook_dispatch.go (1)
165-201:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winOnly emit disable-success signals after the disable write succeeds.
RecordWebhookDisabledand the"webhook disabled after max delivery attempts"log still run on the new tenant-conflict path, even though Lines 183-188 explicitly skip the update during a purge. That inflates disable metrics and produces a false success log for a webhook that was never disabled.♻️ Suggested change
if isLastAttempt { if w.metrics != nil { - w.metrics.RecordWebhookDisabled(ctx, "max_attempts") w.metrics.RecordDelivery(ctx, args.EventType, "failed_final") w.metrics.RecordWebhookDeliveryDuration(ctx, time.Since(start), args.EventType, "failed_final") } @@ switch { case updateErr == nil: + if w.metrics != nil { + w.metrics.RecordWebhookDisabled(ctx, "max_attempts") + } + + slog.Error("webhook disabled after max delivery attempts", + "webhook_id", webhook.ID, + "event_id", args.EventID, + "error", err, + ) case errors.Is(updateErr, huberrors.ErrTenantWriteConflict): // A tenant data purge is deleting this webhook; disabling it is moot. slog.Warn("webhook dispatch: disable skipped, tenant purge in progress", "webhook_id", webhook.ID, @@ - slog.Error("webhook disabled after max delivery attempts", - "webhook_id", webhook.ID, - "event_id", args.EventID, - "error", err, - ) - return fmt.Errorf("webhook send (final attempt): %w", err) }Based on PR objectives, webhook dispatch should swallow disable-conflicts during purges while still reporting delivery failures.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@internal/workers/webhook_dispatch.go` around lines 165 - 201, The code currently records a disable metric and emits the "webhook disabled after max delivery attempts" log even when repo.Update(webhook.ID, ...) was skipped due to a tenant purge conflict; move the calls to w.metrics.RecordWebhookDisabled(...) and the final slog.Error("webhook disabled after max delivery attempts", ...) into the successful-update branch (i.e., only when updateErr == nil). Keep recording delivery failure metrics (w.metrics.RecordDelivery / RecordWebhookDeliveryDuration) and emitting the failure log/error when updateErr is a tenant write conflict or other error, but do not signal "disabled" nor call RecordWebhookDisabled unless the Update call actually succeeded.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@internal/repository/feedback_records_repository.go`:
- Around line 455-486: DeleteByUser currently snapshots tenantIDs via
listUserFeedbackTenants before acquiring locks, allowing new tenant writes to
appear after that snapshot and escape deletion; modify DeleteByUser (involving
withTenantWriteTx, listUserFeedbackTenants, tryLockTenantsShared and the
tenantIDs variable) to detect tenant-set drift and retry or fail: after
acquiring locks (or using a user-scoped lock), re-query the tenant set and
compare to the original tenantIDs—if they differ, either expand the deletion to
include the new tenants or return a retryable error so the caller can retry;
alternatively acquire a global/user-scoped lock before listing tenants so the
tenant set cannot change between listUserFeedbackTenants and the DELETE. Ensure
the chosen approach returns a retryable conflict when drift is detected and that
returned DeletedFeedbackRecordsByTenant accurately reflects what was deleted.
In `@internal/repository/taxonomy_repository.go`:
- Around line 985-1002: The function getNodeForUpdate currently accepts a
generic queryer which allows non-transactional calls; change its third parameter
type from queryer to the tx-only interface (e.g., tenantWriteTx) so the compiler
forces callers to pass an explicit tenant-scoped transaction; update the
function signature and any callers (including RenameNode and RemoveNode) to
open/forward the tenantWriteTx created by the tenant write lock helper, and
adjust imports/types as needed so the row lock acquired by the SELECT ... FOR
UPDATE is held for the life of the transaction.
In `@internal/repository/tenant_data_repository.go`:
- Around line 76-81: The deferred rollback handling must ignore spurious
connection-closed errors when acquireTenantPurgeLock cancels the request ctx:
update the cleanup path that calls dbTx.Rollback(ctx) (the rollback invoked
after calling acquireTenantPurgeLock and anywhere dbTx is rolled back on error)
to either call Rollback with a detached context (e.g., context.Background()) or
to treat pgconn.ErrConnClosed (and pgx.ErrTxClosed) as non-fatal and swallow
them instead of logging them as failures; reference acquireTenantPurgeLock,
dbTx.Rollback(ctx), purgeLockTimeout and tenantID to locate the affected
transaction cleanup code and adjust the error handling to ignore these specific
connection-closed/tx-closed errors.
In `@internal/repository/tenant_write_lock.go`:
- Around line 156-160: The acquireTenantPurgeLock helper must validate the
timeout before calling set_config: reject or normalize non-positive durations
(timeout <= 0) and return an error immediately instead of passing
timeout.Milliseconds() to Postgres; update acquireTenantPurgeLock to check the
timeout parameter and return a clear error for zero/negative values prior to
calling exec.Exec(`SELECT set_config('lock_timeout', $1, true)`, ...), and add
regression tests covering timeout == 0 and negative durations to assert the
function fails fast rather than disabling or passing an invalid lock_timeout.
In `@openapi.yaml`:
- Around line 1068-1076: Split the ambiguous 409 response into two distinct 409
cases (distinguished by the problem detail "code" field) so each has the correct
remediation: one for the purge conflict (code tenant_purge_in_progress) whose
description instructs the client to retry after the purge completes and mentions
a Retry-After header or waiting for completion, and a second for concurrent
modification (code tenant_write_conflict) whose description instructs the client
to re-fetch the resource and retry the operation or surface the conflict to the
user (do not tell clients to wait for purge completion); update both response
entries that reference ErrorModel (the 409 block shown and the similar block at
the second location) so the problem detail "code" values and guidance are
explicit and accurate.
In `@tests/tenant_write_lock_test.go`:
- Around line 365-374: The pg_locks check in the require.Eventually block is too
broad and can match any ungranted advisory lock; narrow it to only observe the
purge for tenantA by filtering the query with
repository.TenantWriteLockKey(tenantA) (or by selecting the purge session PID)
so the count only considers the advisory lock key for
TenantWriteLockKey(tenantA). Update the SQL inside the eventually closure (the
QueryRow call) to include the appropriate condition using
TenantWriteLockKey(tenantA) and adjust the Scan/params accordingly so the
waiting > 0 assertion only reflects tenantA's purge lock.
---
Outside diff comments:
In `@internal/workers/webhook_dispatch.go`:
- Around line 165-201: The code currently records a disable metric and emits the
"webhook disabled after max delivery attempts" log even when
repo.Update(webhook.ID, ...) was skipped due to a tenant purge conflict; move
the calls to w.metrics.RecordWebhookDisabled(...) and the final
slog.Error("webhook disabled after max delivery attempts", ...) into the
successful-update branch (i.e., only when updateErr == nil). Keep recording
delivery failure metrics (w.metrics.RecordDelivery /
RecordWebhookDeliveryDuration) and emitting the failure log/error when updateErr
is a tenant write conflict or other error, but do not signal "disabled" nor call
RecordWebhookDisabled unless the Update call actually succeeded.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: f32ab145-0c96-4e94-8fe3-14d791ba970e
📒 Files selected for processing (24)
.env.exampleAGENTS.mdcmd/api/app.gointernal/api/response/errors.gointernal/api/response/problem.gointernal/api/response/response_test.gointernal/config/config.gointernal/config/config_test.gointernal/huberrors/errors.gointernal/repository/embeddings_repository.gointernal/repository/feedback_records_repository.gointernal/repository/taxonomy_repository.gointernal/repository/tenant_data_repository.gointernal/repository/tenant_data_repository_test.gointernal/repository/tenant_write_lock.gointernal/repository/tenant_write_lock_test.gointernal/repository/webhooks_repository.gointernal/workers/feedback_embedding.gointernal/workers/feedback_embedding_test.gointernal/workers/webhook_dispatch.gointernal/workers/webhook_dispatch_test.goopenapi.yamltests/integration_test.gotests/tenant_write_lock_test.go
What does this PR do?
Adds a deliberate tenant-write serialization layer so the tenant data purge
(
DELETE /v1/tenants/{tenant_id}/data) can no longer race concurrenttenant-owned writes for the same tenant. Previously the purge deleted in one
transaction but did not serialize against in-flight writes, so a concurrent
create/update could commit mid-purge and survive it.
Closes ENG-1013
Mechanism — Postgres transaction-scoped advisory locks, single source of
truth in
internal/repository/tenant_write_lock.go:takes a shared lock on
hashtextextended('tenant_write|<len>:<tenant_id>', 0).Shared mode keeps same-tenant writers fully concurrent.
TENANT_PURGE_LOCK_TIMEOUT_SECONDS(default 5) for in-flight writers to drain.fail fast; other tenants are unaffected. Both directions return a retryable
409 tenant_write_conflict(new RFC 9457 problem code, distinct from theterminal duplicate-record
conflict).with a tenant-scoped
WHERE; tenant-changing webhook updates lock both old andnew tenant; the GDPR cross-tenant delete locks every spanned tenant.
statement (
INSERT ... SELECT ... WHERE pg_try_advisory_xact_lock_shared(...)),so they serialize in one round trip with the same isolation.
No schema migration (advisory locks are not schema). Since the issue was filed,
taxonomy added six tenant-owned tables; those write paths are covered too.
Scope grew to cover review feedback: deduped the resolve+lock sequence and the
tx/interface helpers, and floored the purge
lock_timeoutto ≥1ms so it cannever disable the timeout.
API behavior change — affected endpoints now document a retryable 409. Example body:
{ "type": "https://hub.formbricks.com/problems/tenant-write-conflict", "title": "Conflict", "status": 409, "detail": "tenant data purge in progress for this tenant; retry later", "instance": "/v1/feedback-records", "code": "tenant_write_conflict", "request_id": "019ebcb4-c817-7582-8017-cbc52258e368" }Accepted, documented tails: webhook deliveries already in flight at purge commit
complete; already-enqueued dispatch jobs no-op post-purge but their payloads
persist until River retention pruning.
How should this be tested?
Config: integration tests need a Postgres+pgvector
test_db(the API serves onPORT, tests useDATABASE_URL,API_KEY).ENG-1013 integration coverage in
tests/tenant_write_lock_test.go— each holdsthe advisory lock directly on a raw connection to make the race deterministic:
TestTenantPurgeWaitsForInFlightWritesThenConflicts— purge waits then 409s; succeeds after releaseTestTenantWritesRejectedDuringPurge— every same-tenant write → 409 (incl. gated creates); other tenants → 201; no events published for rejected writesTestTenantWritesFailFastWhilePurgeQueued—pg_locks-verified: new write rejected while purge is queued; queued purge completes after writers drainTestGDPRDeleteByUserDuringPurge— all-or-nothing 409 across tenants; tenant-scoped delete proceedsTestWebhookTenantMoveLocksTargetTenant,TestRepositoryWritesConflictDuringPurgeAPI contract: build, run, and
make schemathesis(or the API-contract workflow)—
status_code_conformance/response_schema_conformancepass with the new 409sand
tenant_write_conflictenum member.Checklist
Required
make buildmake tests(integration tests intests/)make fmtandmake lint; no new warningsgit pull origin mainAppreciated
docs/if changes were necessary — N/A (endpoint docs are inopenapi.yaml)make tests-coveragefor meaningful logic changes