Skip to content

Deduplicate custom background tasks#806

Open
2chanhaeng wants to merge 9 commits into
fedify-dev:feat/custom-workerfrom
2chanhaeng:issue/798
Open

Deduplicate custom background tasks#806
2chanhaeng wants to merge 9 commits into
fedify-dev:feat/custom-workerfrom
2chanhaeng:issue/798

Conversation

@2chanhaeng

Copy link
Copy Markdown
Member

Resolves #798, the second sub-issue of #206.

Important

This branch is stacked on #803 (#797, the first sub-issue), which is not yet merged. Until #803 lands, this PR's range includes #803's commits too, so the diff shown here is the core defineTask / enqueueTask API plus this sub-issue's deduplication work. Please merge #803 first; once it lands on feat/custom-worker, rebasing this branch drops the overlap and leaves this sub-issue's deduplication changes alone.

Summary

Background tasks frequently need at-most-once-per-key enqueue semantics: a digest mailer must not send twice when a request is retried, and a cleanup job should coalesce duplicate triggers. This sub-issue adds an opt-in deduplicationKey to the task enqueue path, mirroring the native-capability flag pattern that nativeRetrial (#250) established—a backend that deduplicates natively owns the check; otherwise Fedify provides a best-effort key–value fallback with the race-condition tradeoff documented explicitly.

It is kept separate from the core API (#797 / #803) so the first PR stays small and the deduplication semantics—including the documented best-effort limitation—get their own reviewable boundary.

Public API

MQ-layer primitives

These are message-queue-layer primitives, not task-layer concepts, so they survive a future Worker extraction unchanged and are reusable by any enqueue path:

interface MessageQueueEnqueueOptions {
  readonly delay?: Temporal.Duration;     // existing
  readonly orderingKey?: string;          // existing
  readonly deduplicationKey?: string;     // new — at-most-once enqueue for messages sharing it
}

interface MessageQueue {
  readonly nativeRetrial?: boolean;       // existing, #250
  readonly nativeDeduplication?: boolean; // new — backend dedups same deduplicationKey natively
  // …
}

InProcessMessageQueue declares nativeDeduplication = false; ParallelMessageQueue inherits the wrapped queue's flag and rejects a batch carrying a deduplicationKey when the wrapped queue has no enqueueMany (it would otherwise fan out to single enqueue() calls that cannot share one key atomically).

Task-API surface

interface TaskEnqueueOptions {
  readonly delay?: Temporal.DurationLike;
  readonly orderingKey?: string;
  readonly deduplicationKey?: string;     // new (optional → non-breaking on top of #803)
}

interface FederationOptions<TContextData> {
  // …
  taskDeduplicationTtl?: Temporal.DurationLike;     // new, default { hours: 1 }
  taskDeduplicationFallback?: "open" | "closed";    // new, default "open"
}

A new taskDeduplication key–value prefix (default ["_fedify", "taskDeduplication"]) holds fallback markers, kept separate from activityIdempotence.

Resolution path

The enqueue pipeline was extracted out of ContextImpl into a dedicated tasks/enqueue.ts module (handle validation, deduplication planning, payload encoding, and queue dispatch live in one place). Deduplication is decided once, before any payload is encoded:

  1. Native backend — when the task's queue declares nativeDeduplication: true, Fedify forwards deduplicationKey in MessageQueueEnqueueOptions and the backend owns the check. The key–value store is never touched.
  2. Key–value fallback — otherwise, when the configured KvStore exposes the optional compare-and-swap (KvStore.cas) primitive, Fedify claims the key under the taskDeduplication prefix with taskDeduplicationTtl. A present marker skips the enqueue; a won claim proceeds. If the subsequent dispatch fails, the marker is rolled back (via a conditional cas clear) so a transient failure does not suppress the retry.
  3. No conditional write — when neither applies (no nativeDeduplication, and a KvStore without cas), behavior follows taskDeduplicationFallback: "open" (default) proceeds without deduplication after a debug-level log; "closed" throws a TypeError before enqueuing.

Among the first-party adapters, the in-memory, Deno KV, SQLite, and MySQL key–value stores implement cas; PostgreSQL and Redis do not yet, so those deployments take the fallback branch until per-adapter follow-ups add it.

Batch semantics

For enqueueTaskMany, a single deduplicationKey applies to the whole batch—it enqueues as a unit or is skipped as a unit, never partially. Per-item deduplication means calling enqueueTask in a loop, each with its own key. When deduplication is actually applied (a native queue, or a KvStore with cas), a multi-item batch with a deduplicationKey on a queue without enqueueMany is rejected rather than risking duplicates, since fanning the key across separate enqueue() calls cannot enqueue the batch as one unit. Under the "open" fallback no marker is taken, so the batch simply fans out.

Documented limitation

The key–value fallback is best-effort, not transactional: the marker write and the enqueue are separate operations. Fedify rolls the marker back on enqueue failure, but a crash before that rollback, the "open" fallback under concurrency, a non-atomic third-party cas, or key reuse within the TTL window can still admit a duplicate or suppress a task. Cleanup is by TTL expiry, not active deletion on handler success (active cleanup introduces a success→crash-before-delete window; deferred). This is stated in the public JSDoc for deduplicationKey and in a warning callout in docs/manual/tasks.md. Deployments needing strict guarantees use a queue with nativeDeduplication: true.

Out of scope

  • Active key–value cleanup on handler success (TTL-only for now).
  • Adding nativeDeduplication: true / cas to the remaining first-party adapters (PostgreSQL, Redis) — tracked as per-adapter follow-ups; this sub-issue ships the core flag plus the key–value fallback.

Acceptance criteria

  • deduplicationKey on a nativeDeduplication: true queue is forwarded; Fedify does not write to the key–value store.
  • deduplicationKey on a default queue: a second enqueue inside the TTL is skipped; re-enqueue after TTL expiry succeeds.
  • taskDeduplicationFallback: "closed" throws synchronously when no conditional write is available; "open" proceeds with a debug log.
  • taskDeduplication key–value prefix does not collide with activityIdempotence.
  • enqueueTaskMany applies one batch-level deduplicationKey.
  • Best-effort race limitation documented in JSDoc and docs/manual/tasks.md; CHANGES.md updated.

Tests

  • tasks/enqueue.test.ts covers the three resolution paths, TTL skip/expiry, the "open"/"closed" fallback, the prefix isolation, batch-level dedup, the enqueueMany-required rejection, and marker rollback on dispatch failure.
  • mq.test.ts covers the new nativeDeduplication flag and the ParallelMessageQueue batch rejection.
  • The testing mock (@fedify/testing) honors deduplicationKey so applications can assert dedup behavior in their own tests.
  • Tests use @fedify/fixture test() and pass under Deno, Node.js, and Bun.

Notes for reviewers

AI disclosure

Assisted-by: Claude Code:claude-opus-4-8
Assisted-by: Codex:gpt-5.5

Codex was used only in review.

@coderabbitai

coderabbitai Bot commented Jun 16, 2026

Copy link
Copy Markdown

Review Change Stack

Important

Review skipped

Auto reviews are disabled on base/target branches other than the default branch.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

⚙️ Run configuration

Configuration used: Repository UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: fb0fb9bb-c396-4eb3-8e59-10071d687b0e

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

Adds a complete custom background task API to Fedify: defineTask() on TaskRegistry/Federatable, Context.enqueueTask()/enqueueTaskMany(), TaskMessage wire format, devalue-based TaskCodec with vocab object support, an enqueue pipeline with KV CAS deduplication, worker dispatch with retry policy, dedicated per-task queue routing, and matching testing utilities and documentation.

Changes

Custom Background Task API

Layer / File(s) Summary
Task type contracts and MessageQueue deduplication primitives
packages/fedify/src/federation/tasks/task.ts, packages/fedify/src/federation/queue.ts, packages/fedify/src/federation/mq.ts, packages/fedify/src/federation/tasks/mod.ts, packages/fedify/deno.json, packages/fedify/package.json, packages/testing/package.json
Defines TaskHandler, TaskDefinitionOptions, TaskDefinition, TaskRegistry, TaskEnqueueOptions, and TaskDefinitionInternal. Adds TaskMessage to the Message discriminated union. Extends MessageQueueEnqueueOptions with deduplicationKey and MessageQueue with nativeDeduplication, implemented on InProcessMessageQueue and ParallelMessageQueue. Adds devalue and @standard-schema/spec dependencies.
TaskCodec: devalue serialization with vocab support
packages/fedify/src/federation/tasks/codec.ts, packages/fedify/src/federation/tasks/codec.test.ts
Implements TaskCodec with serialize/deserialize (devalue with custom APObject/Link replacer/reviver), encode/decode with StandardSchemaV1 validation, and static validate. Defines internal VocabKind/VocabWire/VocabHolder wire model and classReviver factory. Tests cover round-trip, circular references, vocab objects, deep nesting, state isolation, and idempotency requirements.
Federation public surface: Federatable, FederationOptions, Context, Builder
packages/fedify/src/federation/federation.ts, packages/fedify/src/federation/context.ts, packages/fedify/src/federation/builder.ts, packages/fedify/src/federation/mod.ts
Extends Federatable to extend TaskRegistry. Adds "task" to FederationStartQueueOptions.queue. Adds taskRetryPolicy, taskQueueResolution, taskDeduplicationTtl, taskDeduplicationFallback to FederationOptions. Adds enqueueTask/enqueueTaskMany to Context. Wires defineTask and taskDefinitions into FederationBuilderImpl. Re-exports task types from federation module.
Enqueue pipeline: dedup planning, CAS claiming, dispatch
packages/fedify/src/federation/tasks/enqueue.ts, packages/fedify/src/federation/tasks/enqueue.test.ts
Implements enqueueTasks: validates task handle, resolves queue, plans dedup (none/native/cas/open), encodes payloads to TaskMessage envelopes with OTel trace context, claims KV CAS with rollback, auto-starts queue, and dispatches via enqueueMany or parallel single enqueues. Tests cover envelope stamping, all dedup strategies, concurrency, rollback safety, ParallelMessageQueue wrapping, and error precedence.
FederationImpl and ContextImpl task wiring
packages/fedify/src/federation/middleware.ts, packages/fedify/src/federation/middleware.test.ts
Adds FederationQueueOptions.task, FederationKvPrefixes.taskDeduplication, task state and config fields on FederationImpl, resolveTaskQueue(), extended _startQueueInternal for task and per-task queue workers, #listenTaskMessage with decode/retry logic, and ContextImpl codec getter and public enqueue methods. Tests cover enqueueMany fast path, concurrent fallback dispatch, and batch validation abort.
MessageQueue deduplication tests
packages/fedify/src/federation/mq.test.ts
Tests nativeDeduplication on InProcessMessageQueue, ParallelMessageQueue inheritance, deduplicationKey forwarding for single and batch enqueues, rejection of deduplicated batches when wrapped queue lacks enqueueMany, and fan-out of non-deduplicated batches.
Federation task integration tests
packages/fedify/src/federation/tasks/tasks.test.ts
End-to-end tests for defineTask uniqueness and clone semantics, type-level guards, enqueueTask round-trip through handler, queue routing (per-task/outbox/strict), startQueue selection for task workers and per-task queues, and processQueuedTask dispatch paths including retry, policy precedence, and nativeRetrial rethrow.
Testing utilities and mock updates
packages/fedify/src/testing/tasks.ts, packages/fedify/src/testing/context.ts, packages/fedify/src/testing/mod.ts, packages/testing/src/context.ts, packages/testing/src/mock.ts, packages/testing/src/mock.test.ts
Adds MockQueue, makeSchema/stringSchema/numberSchema/envelopeSchema, baseOptions. Updates createContext in both packages to wire enqueueTask/enqueueTaskMany defaults. Extends MockFederation with taskDefinitions/defineTask/task-queue activation and MockContext with schema validation and handler dispatch. Tests verify validation, schema transformation, batch abort, and cross-federation rejection.
Documentation, changelog, and tooling
docs/manual/tasks.md, docs/.vitepress/config.mts, CHANGES.md, AGENTS.md, mise.toml, scripts/check_fixture_usage.ts
Adds docs/manual/tasks.md covering all task API concepts; adds sidebar entry; updates CHANGES.md with feature description; updates AGENTS.md with mise workflow; splits mise.toml install into install:deno/install:pnpm subtasks; updates fixture allowlist to glob-based exemptions.

Sequence Diagram(s)

sequenceDiagram
  participant App
  participant Federatable
  participant Context
  participant enqueueTasks
  participant KvStore
  participant MessageQueue
  participant Worker as Task Worker (`#listenTaskMessage`)
  participant TaskHandler

  App->>Federatable: defineTask("sendDigest", { schema, handler })
  Federatable-->>App: TaskDefinition handle

  App->>Context: enqueueTask(handle, payload, { deduplicationKey: "k" })
  Context->>enqueueTasks: task, [payload], options
  enqueueTasks->>enqueueTasks: planDeduplication → cas plan
  enqueueTasks->>enqueueTasks: encodeTaskMessage (codec.encode + envelope)
  enqueueTasks->>KvStore: cas(taskDeduplication/k, token, ttl)
  KvStore-->>enqueueTasks: claimed (proceed=true)
  enqueueTasks->>MessageQueue: enqueue(TaskMessage)
  MessageQueue-->>enqueueTasks: ok

  MessageQueue-->>Worker: deliver TaskMessage
  Worker->>Worker: codec.decode(schema, message.data)
  Worker->>TaskHandler: handler(context, validatedPayload)
  TaskHandler-->>Worker: ok

  Note over App,KvStore: Duplicate enqueue within TTL
  App->>Context: enqueueTask(handle, payload2, { deduplicationKey: "k" })
  Context->>enqueueTasks: task, [payload2], options
  enqueueTasks->>KvStore: cas(taskDeduplication/k, token2, ttl)
  KvStore-->>enqueueTasks: lost (proceed=false)
  enqueueTasks-->>Context: (no enqueue)
Loading

Estimated code review effort

🎯 5 (Critical) | ⏱️ ~120 minutes

Possibly related issues

  • #798 (Custom background tasks: deduplication): This PR directly implements the full specification from #798, including deduplicationKey on TaskEnqueueOptions, nativeDeduplication on MessageQueue, KV CAS fallback with TTL, taskDeduplicationFallback "open"/"closed" modes, taskDeduplicationTtl, the separate taskDeduplication KV prefix, and the documented best-effort concurrency limitation.
  • #797: The PR implements the core custom background tasks API described in this issue, including defineTask(), TaskRegistry/Federatable integration, Context.enqueueTask()/enqueueTaskMany(), TaskMessage wire format, Standard Schema validation, devalue serialization, queue routing, retry behavior, and worker dispatch.

Suggested labels

component/federation, component/mq

Suggested reviewers

  • sij411
🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 33.33% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title 'Deduplicate custom background tasks' accurately summarizes the primary change—adding deduplication support to the task enqueue path. It is concise and specific.
Description check ✅ Passed The description thoroughly explains the deduplication feature, design rationale, public API additions, resolution paths, batch semantics, and documented limitations. It is directly related to the changeset.
Linked Issues check ✅ Passed The PR implements all core objectives from #798: deduplicationKey support, native backend forwarding, KV fallback with cas, fallback behavior modes, batch semantics, and documented best-effort limitations. All acceptance criteria are met by the code changes.
Out of Scope Changes check ✅ Passed All changes directly support the deduplication feature and its integration. Documentation updates (AGENTS.md, CHANGES.md, tasks.md), build system changes (mise.toml), and test infrastructure (testing utilities) are all necessary for the feature implementation.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@2chanhaeng

Copy link
Copy Markdown
Member Author

@codex review
@CodeRabbit review
@gemini-code-assist review

@coderabbitai

coderabbitai Bot commented Jun 16, 2026

Copy link
Copy Markdown

@2chanhaeng Sure, I'll review the PR!

✅ Action performed

Review finished.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

@chatgpt-codex-connector

Copy link
Copy Markdown

To use Codex here, create an environment for this repo.

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a custom background task API for Fedify 2.x.x, allowing applications to define, enqueue, and process arbitrary background jobs. It adds defineTask to Federation and FederationBuilder, and enqueueTask/enqueueTaskMany to Context. Payloads are validated using Standard Schema and serialized with devalue to support complex types and Activity Vocabulary objects. The API supports customizable retry policies, queue routing, and deduplication (either native or via a key-value fallback). Additionally, the PR updates testing utilities, documentation, and mise tasks. Feedback on the changes suggests implementing a depth limit in the recursive revival traversal of TaskCodec to mitigate potential denial-of-service (DoS) attacks or stack overflows from deeply nested payloads.

Important

The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.

Comment thread packages/fedify/src/federation/tasks/codec.ts
@codecov

codecov Bot commented Jun 16, 2026

Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 97.45763% with 6 lines in your changes missing coverage. Please review.
✅ All tests successful. No failed tests found.

Files with missing lines Patch % Lines
packages/fedify/src/federation/tasks/enqueue.ts 97.05% 4 Missing and 1 partial ⚠️
packages/fedify/src/federation/middleware.ts 96.66% 0 Missing and 1 partial ⚠️
Files with missing lines Coverage Δ
packages/fedify/src/federation/mq.ts 86.77% <100.00%> (+0.87%) ⬆️
packages/fedify/src/federation/tasks/codec.ts 99.09% <ø> (ø)
packages/fedify/src/federation/tasks/mod.ts 100.00% <100.00%> (ø)
packages/fedify/src/testing/mod.ts 100.00% <100.00%> (ø)
packages/fedify/src/testing/tasks.ts 100.00% <100.00%> (ø)
packages/fedify/src/federation/middleware.ts 90.69% <96.66%> (-0.24%) ⬇️
packages/fedify/src/federation/tasks/enqueue.ts 97.05% <97.05%> (ø)

... and 2 files with indirect coverage changes

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a custom background task API for Fedify, enabling the definition, enqueuing, and background processing of arbitrary application-defined jobs with robust serialization, retry policies, queue routing, and deduplication. The code review feedback highlights several critical compatibility, security, and performance improvements: replacing the ES2024 Array.fromAsync calls with standard loops to maintain compatibility with Node.js 18 and 20, implementing a depth limit in the recursive deserialization function to prevent potential Denial of Service (DoS) attacks, preserving null prototypes during object revival to avoid prototype pollution, and optimizing the enqueue pipeline by validating payloads before claiming deduplication keys.

Important

The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.

Comment thread packages/fedify/src/federation/tasks/codec.ts
Comment thread packages/fedify/src/federation/tasks/codec.ts
Comment thread packages/fedify/src/federation/tasks/codec.ts
Comment thread packages/fedify/src/federation/tasks/codec.ts
Comment thread packages/fedify/src/federation/tasks/enqueue.ts

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 9

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
packages/testing/src/mock.ts (1)

515-527: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

queueStarted should not flip to true for non-outbox queue startup.

Starting only "task" currently sets queueStarted = true, so later sendActivity() is misclassified as queued outbox delivery.

Suggested fix
   async startQueue(
     contextData: TContextData,
     options?: FederationStartQueueOptions,
   ): Promise<void> {
     this.contextData = contextData;
-    this.queueStarted = true;

     // If a specific queue is specified, only activate that one
     if (options?.queue) {
       this.activeQueues.add(options.queue);
     } else {
       // If no specific queue, activate all four
       this.activeQueues.add("inbox");
       this.activeQueues.add("outbox");
       this.activeQueues.add("fanout");
       this.activeQueues.add("task");
     }
+    this.queueStarted = this.activeQueues.has("outbox");
   }
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@packages/testing/src/mock.ts` around lines 515 - 527, The queueStarted flag
is being unconditionally set to true when any queue starts, but it should only
be true when the "outbox" queue is activated. This causes sendActivity() to
incorrectly classify non-outbox queue activity as queued outbox delivery. Move
the assignment of this.queueStarted = true to only execute when either no
specific queue is specified (meaning all queues including outbox are activated)
or when the specific queue being activated is "outbox". Update the conditional
logic to check if options?.queue is either undefined or explicitly equals
"outbox" before setting queueStarted to true.
packages/fedify/src/federation/middleware.ts (1)

902-906: 🧹 Nitpick | 🔵 Trivial | ⚡ Quick win

Wire task queues into the existing queue observability path.

Task queues are omitted from #queueDepthGaugeEntries, and message.type === "task" bypasses the span/started/outcome/in-flight wrapper used by inbox/outbox/fanout workers. That makes task backlog and handler failures invisible to the existing queue metrics/traces even though tasks are first-class queued work.

Also applies to: 1239-1240

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@packages/fedify/src/federation/middleware.ts` around lines 902 - 906, Task
queues are not included in the `#queueDepthGaugeEntries` array and the task
message type handler bypasses the observability wrapper
(span/started/outcome/in-flight) used by inbox/outbox/fanout workers, making
task metrics and traces invisible. Add the task queue entry to
`#queueDepthGaugeEntries` following the same pattern as the inbox, outbox, and
fanout entries with role "task" and the corresponding task queue. Additionally,
update the message handler to apply the same observability wrapper and metrics
collection to task queue messages that are currently applied to other queue
types, ensuring all queued work is tracked consistently.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@AGENTS.md`:
- Around line 137-139: Replace the placeholder syntax `mise run
test:<runtime:deno,node,bun>` with concrete, runnable task names. List the
actual commands explicitly as `mise run test:deno`, `mise run test:node`, and
`mise run test:bun` so users can copy and paste them directly without having to
interpret pseudo-syntax. This makes the documentation immediately actionable and
prevents confusion.

In `@packages/fedify/src/federation/context.ts`:
- Around line 460-473: Update the JSDoc comment for the `enqueueTaskMany` method
to accurately document its behavior with deduplicated batches. First, modify the
statement that currently describes falling back to parallel single enqueues to
make it conditional, clarifying that this fallback only occurs when the queue's
bulk enqueue operation is available or in certain situations. Second, add an
additional `@throws {TypeError}` entry documenting that a TypeError is thrown
when atomic `enqueueMany` is unavailable for deduplicated multi-item batches,
ensuring the documentation fully captures all error conditions the method can
encounter.

In `@packages/fedify/src/federation/middleware.test.ts`:
- Around line 10568-10573: The module-scoped taskFederationOptions object reuses
a single MemoryKvStore instance across all tests, causing potential state
pollution and test interdependencies. Move the taskFederationOptions definition
from module scope into each test function or a beforeEach hook, ensuring a fresh
MemoryKvStore instance is created for each test. Then update all test functions
that reference the module-scoped taskFederationOptions to use the local version
instead.

In `@packages/fedify/src/federation/middleware.ts`:
- Around line 944-949: The resolveTaskQueue method correctly identifies which
queue to use for task messages, but the _startQueueInternal method does not
properly handle the "task" selector when that queue is shared with standard
queues or routed as a fallback. Fix _startQueueInternal to map the "task"
selector to its resolved task queue instance by calling
resolveTaskQueue("task"), then ensure that resolved queue is started/listened to
exactly once by tracking started queues appropriately. This ensures that when
queue.task is configured as the same instance as inbox/outbox/fanout, or when
task messages are fallback-routed to outboxQueue, the actual queue receiving
those messages has an active listener.

In `@packages/fedify/src/federation/mq.ts`:
- Around line 454-462: The error check in the enqueueMany method around the
deduplicationKey validation is too strict and prevents single-item batches from
using the deduplicated enqueue fallback. Modify the condition to only throw the
TypeError when both deduplicationKey is present AND the batch contains multiple
items. For single-item batches, allow the code to proceed with the fallback to
the wrapped queue's enqueue() method, since atomicity is maintained for
individual items. Check the length of the messages array being enqueued (this
will be available through the method parameters) and only enforce the
restriction when multiple items are being batched.

In `@packages/fedify/src/federation/tasks/codec.ts`:
- Around line 7-15: The TaskCodec class and its public methods (serialize,
deserialize, and validate) lack JSDoc documentation. Add comprehensive JSDoc
comments for the TaskCodec class constructor describing its purpose and the
options parameter, the serialize method explaining that it converts data to a
JSON string and documenting any potential errors, the deserialize method
explaining that it parses a JSON string back into data and documenting any
parsing errors, and the validate method documenting its validation behavior and
error conditions. Each JSDoc should follow the established pattern in the
codebase by including descriptions of parameters, return types, and any errors
that might be thrown.

In `@packages/fedify/src/federation/tasks/tasks.test.ts`:
- Around line 302-323: The startQueue() task worker test currently only covers
the scenario with a distinct task queue, but lacks coverage for when the
effective task queue falls back to shared instances. Add additional test steps
within the same test function to cover the cases where queue.task is undefined
(falling back to the outbox queue) and where queue.task shares an instance with
inbox, outbox, or fanout. For each fallback scenario, verify that only the
appropriate fallback queue (like outbox when task is undefined) has its
listenCount incremented to 1 while the other queues remain at 0, using the same
assertion pattern as the existing test step but with the appropriate queue
configuration.

In `@packages/fedify/src/testing/tasks.ts`:
- Around line 127-134: The MockQueue.listen method does not handle pre-aborted
signals. If the signal passed via options is already aborted before the event
listener is registered, the abort event will never fire and the Promise will
never resolve, causing tests to hang. Check if options?.signal?.aborted is
already true and resolve the Promise immediately in that case. Only register the
abort event listener when the signal is not already aborted.

In `@packages/testing/src/mock.test.ts`:
- Around line 1728-1830: The tests in mock.test.ts are using the old Deno-style
test harness (`@std/assert`) instead of the required Node.js test harness for the
packages/testing package. Replace the import statements to use `import { test }
from "node:test"` and `import assert from "node:assert/strict"`, then update all
assertion calls throughout the tests: replace each `assertEquals()` call with
`assert.equal()` or `assert.strictEqual()`, replace each `assertRejects()` call
with `assert.rejects()`, and ensure the test function signatures and assertion
call syntax conform to the Node.js assert/strict API.

---

Outside diff comments:
In `@packages/fedify/src/federation/middleware.ts`:
- Around line 902-906: Task queues are not included in the
`#queueDepthGaugeEntries` array and the task message type handler bypasses the
observability wrapper (span/started/outcome/in-flight) used by
inbox/outbox/fanout workers, making task metrics and traces invisible. Add the
task queue entry to `#queueDepthGaugeEntries` following the same pattern as the
inbox, outbox, and fanout entries with role "task" and the corresponding task
queue. Additionally, update the message handler to apply the same observability
wrapper and metrics collection to task queue messages that are currently applied
to other queue types, ensuring all queued work is tracked consistently.

In `@packages/testing/src/mock.ts`:
- Around line 515-527: The queueStarted flag is being unconditionally set to
true when any queue starts, but it should only be true when the "outbox" queue
is activated. This causes sendActivity() to incorrectly classify non-outbox
queue activity as queued outbox delivery. Move the assignment of
this.queueStarted = true to only execute when either no specific queue is
specified (meaning all queues including outbox are activated) or when the
specific queue being activated is "outbox". Update the conditional logic to
check if options?.queue is either undefined or explicitly equals "outbox" before
setting queueStarted to true.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: 37da6ef5-700b-44ba-83f7-0f5f4faec66b

📥 Commits

Reviewing files that changed from the base of the PR and between 7bca39c and 7e11d06.

⛔ Files ignored due to path filters (2)
  • deno.lock is excluded by !**/*.lock
  • pnpm-lock.yaml is excluded by !**/pnpm-lock.yaml
📒 Files selected for processing (31)
  • AGENTS.md
  • CHANGES.md
  • docs/.vitepress/config.mts
  • docs/manual/tasks.md
  • mise.toml
  • packages/fedify/deno.json
  • packages/fedify/package.json
  • packages/fedify/src/federation/builder.ts
  • packages/fedify/src/federation/context.ts
  • packages/fedify/src/federation/federation.ts
  • packages/fedify/src/federation/middleware.test.ts
  • packages/fedify/src/federation/middleware.ts
  • packages/fedify/src/federation/mod.ts
  • packages/fedify/src/federation/mq.test.ts
  • packages/fedify/src/federation/mq.ts
  • packages/fedify/src/federation/queue.ts
  • packages/fedify/src/federation/tasks/codec.test.ts
  • packages/fedify/src/federation/tasks/codec.ts
  • packages/fedify/src/federation/tasks/enqueue.test.ts
  • packages/fedify/src/federation/tasks/enqueue.ts
  • packages/fedify/src/federation/tasks/mod.ts
  • packages/fedify/src/federation/tasks/task.ts
  • packages/fedify/src/federation/tasks/tasks.test.ts
  • packages/fedify/src/testing/context.ts
  • packages/fedify/src/testing/mod.ts
  • packages/fedify/src/testing/tasks.ts
  • packages/testing/package.json
  • packages/testing/src/context.ts
  • packages/testing/src/mock.test.ts
  • packages/testing/src/mock.ts
  • scripts/check_fixture_usage.ts

Comment thread AGENTS.md
Comment thread packages/fedify/src/federation/context.ts
Comment thread packages/fedify/src/federation/middleware.test.ts Outdated
Comment thread packages/fedify/src/federation/middleware.ts
Comment thread packages/fedify/src/federation/mq.ts
Comment thread packages/fedify/src/federation/tasks/codec.ts
Comment thread packages/fedify/src/federation/tasks/tasks.test.ts
Comment thread packages/fedify/src/testing/tasks.ts
Comment thread packages/testing/src/mock.test.ts
Context.enqueueTask() and enqueueTaskMany() now accept a
deduplicationKey requesting at-most-once enqueue for tasks that share
it (new TaskEnqueueOptions.deduplicationKey).

Resolution follows the queue and key-value store capabilities:

 -  A queue declaring the new MessageQueue.nativeDeduplication owns the
    check; the key is forwarded through the new
    MessageQueueEnqueueOptions.deduplicationKey.
 -  Otherwise Fedify applies a best-effort guard through the optional
    KvStore.cas primitive under a new taskDeduplication key prefix,
    tunable with the new FederationOptions.taskDeduplicationTtl and
    taskDeduplicationFallback options.

For enqueueTaskMany(), a single key governs the whole batch.  A native
queue that does not implement enqueueMany() cannot express batch-level
at-most-once with a per-message key, so such a multi-item enqueue is
rejected with a TypeError instead of silently leaking duplicates.

Configuration errors that are decidable without a payload (a native
queue lacking enqueueMany, or a closed fallback without cas) are
checked before payloads are validated and encoded, so they reject
before any user schema runs or any key is reserved.

fedify-dev#798

Assisted-by: Claude Code:claude-opus-4-8
The #enqueueTasks and #encodeTaskMessage methods made ContextImpl
oversized, so move the handle validation, deduplication planning,
payload encoding, and queue dispatch into a new tasks/enqueue.ts
module.  ContextImpl now delegates to enqueueTasks(), passing only
the small slice of itself (federation, codec, origin, data) the
pipeline needs.

Pull the shared task-test helpers (the schema factory, stock schemas,
base federation options, and the recording MockQueue) into a new
testing/mq-tasks.ts module, and split the enqueue-specific cases out
of tasks.test.ts into enqueue.test.ts.

Teach the fixture-usage check to expand glob patterns in its
allowlist so the whole testing/ directory is covered by a single
entry instead of one path per file.

Assisted-by: Claude Code:claude-opus-4-8
Two branches both touched the task testing utilities and diverged:
one split MockQueue and the shared schemas/options out into
mq-tasks.ts, while the other kept evolving them in tasks.ts.  After
rebasing the common edits, consolidate everything back into a single
tasks.ts and drop the now-redundant mq-tasks.ts.

Assisted-by: Claude Code:claude-opus-4-8
The key-value deduplication path reserved a marker before dispatching
to the queue but never undid it when the dispatch failed.  A transient
backend failure therefore left the marker behind, so the retry was
silently deduplicated against a task that had never reached the queue.

The cas claim now stores a unique token instead of a bare `true`, and a
failed dispatch conditionally clears it (cas succeeds only while the
stored value is still our token).  The conditional clear keeps a stale
rollback from deleting a marker that another concurrent enqueue has
already re-claimed.  A rollback that itself fails is logged and
swallowed so the original enqueue error still reaches the caller.

The enqueueMany requirement for deduplicated multi-item batches now
keys on whether deduplication is actually applied—a native queue or the
cas fallback—rather than on nativeDeduplication alone.  Under the
"open" fallback (no native dedup, no cas) no marker is taken, so the
batch fans out without deduplication instead of throwing.
ParallelMessageQueue likewise rejects a deduplicated batch when the
wrapped queue lacks enqueueMany, since fanning out cannot carry one key
atomically.

fedify-dev#798

Assisted-by: Claude Code:claude-opus-4-8
startQueue({ queue: "task" }) started no worker when the effective
task queue was the outbox fallback (no dedicated task queue) or an
instance shared with inbox/outbox/fanout: the standard-queue branches
were gated on their own selector, and the task branch skipped a queue
that equalled a standard one.  Enqueued tasks then had no listener.

Rather than patch the branch conditions, _startQueueInternal now lists
every (role, queue) target—including the outbox fallback and each
per-task queue—and starts each instance at most once through a single
identity Set, which also subsumes the four per-role started flags and
the dedicated-task-queue Set into one field.

fedify-dev#806 (comment)
fedify-dev#806 (comment)

Assisted-by: Claude Code:claude-opus-4-8
MockQueue.listen() only resolved from an "abort" event listener, so a
signal already aborted before the call left the promise pending
forever and could hang queue-listener teardown in tests.  Return a
resolved promise when the signal is already aborted, and register the
listener with { once: true }.

fedify-dev#806 (comment)

Assisted-by: Claude Code:claude-opus-4-8
ParallelMessageQueue.enqueueMany threw whenever the wrapped queue
lacked enqueueMany and a deduplicationKey was set, regardless of how
many messages the batch held.  The atomicity limitation it guards
against only applies to multi-item fan-out: a single-item batch can
carry its deduplicationKey straight to the wrapped queue's enqueue()
without splitting it across calls.

Narrow the guard to multi-item batches and fall back to enqueue() for
a single item, so direct ParallelMessageQueue callers are no longer
rejected for a case that is perfectly safe.  The task enqueue path
already routes single items through enqueue() before reaching this
branch, so its behavior is unchanged.

fedify-dev#806 (comment)

Assisted-by: Claude Code:claude-opus-4-8
The task suites in middleware.test.ts spread a module-scope
taskFederationOptions object that held a single shared MemoryKvStore,
so every createFederation() call reused the same key-value store.  As
more task cases are added, a deduplication marker written by one test
could leak into another, making the suite order-dependent and flaky.

Replace the shared constant with a mockOptions() factory that returns
fresh options—including a fresh MemoryKvStore—on every call, and spread
mockOptions() at each use site.

fedify-dev#806 (comment)

Assisted-by: Claude Code:claude-opus-4-8
Address review feedback on the custom background task API:

 -  The enqueueTaskMany JSDoc makes its fallback wording conditional
    and now documents the TypeError thrown when a deduplicated
    multi-item batch cannot be enqueued atomically because the queue
    does not implement bulk enqueue.

 -  TaskCodec is marked @internal, and serialize(), deserialize(), and
    the static validate() helper each gain a one-line JSDoc.

 -  The deduplication section of the manual lists Cloudflare Workers KV
    among the key-value stores that do not yet implement cas.

fedify-dev#806

Assisted-by: Claude Code:claude-opus-4-8
@2chanhaeng 2chanhaeng marked this pull request as ready for review June 22, 2026 14:27
@2chanhaeng 2chanhaeng requested a review from dahlia June 22, 2026 14:27
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant