Skip to content

Add concurrent payload visitor#2140

Open
cconstable wants to merge 8 commits into
mainfrom
payload-traversal
Open

Add concurrent payload visitor#2140
cconstable wants to merge 8 commits into
mainfrom
payload-traversal

Conversation

@cconstable

@cconstable cconstable commented Jun 23, 2026

Copy link
Copy Markdown
Contributor

What was changed

  • Added a generic payload visitor for the TS SDK that walks over the WorkflowActivation / WorkflowActivationCompletion proto trees.
  • Added a proto/scripts/gen-payload-visitor.ts script that generates the visitors at proto/src/payload-visitor.generated.ts. Runs automatically when new protos are generated. About ~1k lines of checked in code.
await visitWorkflowActivation(activation, (payloads) => retrieve(payloads), { concurrency: 4 });

Why?

Prerequisite for External Storage store/retrieve (and reusable by codecs / payload validation): we need to find and transform every payload in an arbitrary proto message with bounded concurrency.

Comment on lines +54 to +60
- name: Check generated payload visitor is up to date
run: |
pnpm run gen:payload-visitor
if ! git diff --exit-code -- packages/proto/src/payload-visitor.generated.ts; then
echo "::error::payload-visitor.generated.ts is out of date with the protos. Run 'pnpm run gen:payload-visitor' and commit the result."
exit 1
fi

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

safeguard to make sure generated protos stay up to data

@cconstable cconstable marked this pull request as ready for review June 23, 2026 23:54
@cconstable cconstable requested a review from a team as a code owner June 23, 2026 23:54
Comment thread package.json
"rebuild": "pnpm run clean && pnpm run build",
"build": "pnpm --recursive --stream run build",
"build:watch": "pnpm run build:protos && tsc --build --watch packages/*/tsconfig.json",
"build:watch": "pnpm run build:protos && pnpm run gen:payload-visitor && tsc --build --watch packages/*/tsconfig.json",

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Building the protos package now builds the payload visitor as well. I can't think of a reason why you wouldn't want to keep them in sync but you can still run build:protos to just build the protos.

* A counting semaphore. `acquire` resolves once a permit is available; `release` returns one,
* handing it directly to the longest-waiting acquirer if any.
*/
class Semaphore {

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hand rolling concurrency primitives feels bad but I didn't see any reusable alternatives. I could move this to a different place if folks have opinions.

@jmaeagle99 jmaeagle99 left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Haven't read too closely, but I think a few things need to be changed:

  • The visitor shouldn't enforce cardinality. If a callback wants to mutate the arity of an array, it should be able to e.g. codecs.
  • There needs to be some kind of context that is passed through while walking and a separate visitor that allows updating/replacing the context on a per-message basis. This would allow things like SerializationContext or StorageDriverStoreContext to change as we iterate commands in a WFT completion.
  • There should be settings for skipping search attributes and headers.

Comment on lines +40 to +41
skipHeaders?: boolean;
skipSearchAttributes?: boolean;

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jmaeagle99 Added these as requested.

Comment on lines +15 to +19
export type PayloadTransform<Ctx> = (
payloads: Payload[],
context: Ctx,
abortSignal?: AbortSignal
) => Promise<Payload[]>;

@cconstable cconstable Jun 29, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was the most simple implementation: a PayloadTransform gets Payload[] in and give Payload[] back out. However, there is a hidden contract here: if a non-repeatable field represents a payload is being transformed here, we must return exactly one payload in the transform. Practically this means that if payloads.length === 1 then the return must also be of length 1. As @jmaeagle99 pointed out, we can't just not return a payload when the proto message needs one.

Few options:

  1. Leave it as it is and document it in the comments.
  2. Add separate "transform" mechanism that enforce this rule statically
export interface PayloadVisitor<Ctx> {
  transformSingle(payload: Payload, context: Ctx): Promise<Payload>;
  transformRepeated(payloads: Payload[], context: Ctx): Promise<Payload[]>;
}
  1. Keep one PayloadTransform but have the type of the input payloads be generic and encode the output type information. I think this is elegant and I might reach for this option in other languages but I think it would be a little unwieldy in TS.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Additionally, is returning a different number of payloads from transforming a repeated field something we want to allow for or encourage? Does it break any assumptions that eventual users of this payload visitor might reasonably have?

For example, if a codec using this payload visitor returned a different number of payloads and we didn't enforce any rules this could happen:

let input = ...
let output = decode(encode(input))

input.length === output.length // could be false!

Whether that is "our problem" or not is debatable but we can at least prevent that behavior if we wish to.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lastly, I want to point out that for map fields containing payloads they are visited per entry. The visitor receives each map value as a single-payload rather than receiving the whole map batch. This means map keys and cardinality are preserved (since we enforce that one payload in means one payload out). Technically, its OK to "remove" an item from a map but I'm not sure that is something we want to allow in this visitor (maps are represented as repeated fields on the wire).

Just wanted to bring this up in case anyone can think of a reason to allow for changing the structure of a map with the payload visitor.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically, its OK to "remove" an item from a map but I'm not sure that is something we want to allow in this visitor (maps are represented as repeated fields on the wire).

We don't have any need to do this as part of the payload visitor.

Comment on lines +84 to +88
export function boundPayloadTransform<Ctx>(
transform: PayloadTransform<Ctx>,
concurrency: number,
failure: AbortController
): (payloads: Payload[], context: Ctx) => Promise<Payload[]> {

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I chose to wrap the user supplied "transform" in the concurrency limiting logic to keep the actual visiting/walking logic synchronous and agnostic to whatever constraints we want to place on concurrent visiting.

@cconstable cconstable force-pushed the payload-traversal branch from 5ce0cb3 to 1978dce Compare July 1, 2026 17:34
@cconstable cconstable force-pushed the payload-traversal branch from 1978dce to 7cf4f4d Compare July 1, 2026 18:56
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants