Add concurrent payload visitor#2140
Conversation
| - name: Check generated payload visitor is up to date | ||
| run: | | ||
| pnpm run gen:payload-visitor | ||
| if ! git diff --exit-code -- packages/proto/src/payload-visitor.generated.ts; then | ||
| echo "::error::payload-visitor.generated.ts is out of date with the protos. Run 'pnpm run gen:payload-visitor' and commit the result." | ||
| exit 1 | ||
| fi |
There was a problem hiding this comment.
safeguard to make sure generated protos stay up to data
| "rebuild": "pnpm run clean && pnpm run build", | ||
| "build": "pnpm --recursive --stream run build", | ||
| "build:watch": "pnpm run build:protos && tsc --build --watch packages/*/tsconfig.json", | ||
| "build:watch": "pnpm run build:protos && pnpm run gen:payload-visitor && tsc --build --watch packages/*/tsconfig.json", |
There was a problem hiding this comment.
Building the protos package now builds the payload visitor as well. I can't think of a reason why you wouldn't want to keep them in sync but you can still run build:protos to just build the protos.
| * A counting semaphore. `acquire` resolves once a permit is available; `release` returns one, | ||
| * handing it directly to the longest-waiting acquirer if any. | ||
| */ | ||
| class Semaphore { |
There was a problem hiding this comment.
Hand rolling concurrency primitives feels bad but I didn't see any reusable alternatives. I could move this to a different place if folks have opinions.
jmaeagle99
left a comment
There was a problem hiding this comment.
Haven't read too closely, but I think a few things need to be changed:
- The visitor shouldn't enforce cardinality. If a callback wants to mutate the arity of an array, it should be able to e.g. codecs.
- There needs to be some kind of context that is passed through while walking and a separate visitor that allows updating/replacing the context on a per-message basis. This would allow things like SerializationContext or StorageDriverStoreContext to change as we iterate commands in a WFT completion.
- There should be settings for skipping search attributes and headers.
| skipHeaders?: boolean; | ||
| skipSearchAttributes?: boolean; |
| export type PayloadTransform<Ctx> = ( | ||
| payloads: Payload[], | ||
| context: Ctx, | ||
| abortSignal?: AbortSignal | ||
| ) => Promise<Payload[]>; |
There was a problem hiding this comment.
This was the most simple implementation: a PayloadTransform gets Payload[] in and give Payload[] back out. However, there is a hidden contract here: if a non-repeatable field represents a payload is being transformed here, we must return exactly one payload in the transform. Practically this means that if payloads.length === 1 then the return must also be of length 1. As @jmaeagle99 pointed out, we can't just not return a payload when the proto message needs one.
Few options:
- Leave it as it is and document it in the comments.
- Add separate "transform" mechanism that enforce this rule statically
export interface PayloadVisitor<Ctx> {
transformSingle(payload: Payload, context: Ctx): Promise<Payload>;
transformRepeated(payloads: Payload[], context: Ctx): Promise<Payload[]>;
}- Keep one
PayloadTransformbut have the type of the inputpayloadsbe generic and encode the output type information. I think this is elegant and I might reach for this option in other languages but I think it would be a little unwieldy in TS.
There was a problem hiding this comment.
Additionally, is returning a different number of payloads from transforming a repeated field something we want to allow for or encourage? Does it break any assumptions that eventual users of this payload visitor might reasonably have?
For example, if a codec using this payload visitor returned a different number of payloads and we didn't enforce any rules this could happen:
let input = ...
let output = decode(encode(input))
input.length === output.length // could be false!Whether that is "our problem" or not is debatable but we can at least prevent that behavior if we wish to.
There was a problem hiding this comment.
Lastly, I want to point out that for map fields containing payloads they are visited per entry. The visitor receives each map value as a single-payload rather than receiving the whole map batch. This means map keys and cardinality are preserved (since we enforce that one payload in means one payload out). Technically, its OK to "remove" an item from a map but I'm not sure that is something we want to allow in this visitor (maps are represented as repeated fields on the wire).
Just wanted to bring this up in case anyone can think of a reason to allow for changing the structure of a map with the payload visitor.
There was a problem hiding this comment.
Technically, its OK to "remove" an item from a map but I'm not sure that is something we want to allow in this visitor (maps are represented as repeated fields on the wire).
We don't have any need to do this as part of the payload visitor.
| export function boundPayloadTransform<Ctx>( | ||
| transform: PayloadTransform<Ctx>, | ||
| concurrency: number, | ||
| failure: AbortController | ||
| ): (payloads: Payload[], context: Ctx) => Promise<Payload[]> { |
There was a problem hiding this comment.
I chose to wrap the user supplied "transform" in the concurrency limiting logic to keep the actual visiting/walking logic synchronous and agnostic to whatever constraints we want to place on concurrent visiting.
5ce0cb3 to
1978dce
Compare
…'t enforce strict cardinality checks on payload transforms.
1978dce to
7cf4f4d
Compare
What was changed
WorkflowActivation/WorkflowActivationCompletionproto trees.proto/scripts/gen-payload-visitor.tsscript that generates the visitors atproto/src/payload-visitor.generated.ts. Runs automatically when new protos are generated. About ~1k lines of checked in code.Why?
Prerequisite for External Storage store/retrieve (and reusable by codecs / payload validation): we need to find and transform every payload in an arbitrary proto message with bounded concurrency.