[APIE-1087] AssociatedNameStrategy support for the Odyssey initiative - CLI#3373
[APIE-1087] AssociatedNameStrategy support for the Odyssey initiative - CLI#3373Cynthia Qin (cqin-confluent) wants to merge 11 commits into
Conversation
|
🎉 All Contributor License Agreements have been signed. Ready to merge. |
There was a problem hiding this comment.
Pull request overview
This PR aims to add AssociatedNameStrategy support to the CLI’s Schema Registry serde flows by propagating a Kafka cluster ID into serializer/deserializer initialization and (for produce) resolving the SR subject via the associations API.
Changes:
- Extend serde provider interfaces to accept
kafkaClusterIdand thread it through produce/consume paths. - Configure JSON/Protobuf serializers & deserializers to use
serde.AssociatedNameStrategyTypewhen a Kafka cluster ID is available. - Add helpers to create an SR client and resolve a subject via SR associations (with fallback).
Reviewed changes
Copilot reviewed 13 out of 13 changed files in this pull request and generated 11 comments.
Show a summary per file
| File | Description |
|---|---|
| pkg/serdes/serdes.go | Updates provider interfaces; adds SR client helper and subject resolution via associations. |
| pkg/serdes/protobuf_serialization_provider.go | Enables AssociatedNameStrategy for Protobuf serialization when Kafka cluster ID is provided. |
| pkg/serdes/protobuf_deserialization_provider.go | Enables AssociatedNameStrategy for Protobuf deserialization when Kafka cluster ID is provided. |
| pkg/serdes/json_serialization_provider.go | Enables AssociatedNameStrategy for JSON Schema serialization when Kafka cluster ID is provided. |
| pkg/serdes/json_deserialization_provider.go | Enables AssociatedNameStrategy for JSON Schema deserialization when Kafka cluster ID is provided. |
| pkg/serdes/integer_serialization_provider.go | Adjusts InitSerializer signature to match updated interface. |
| pkg/serdes/integer_deserialization_provider.go | Adjusts InitDeserializer signature to match updated interface. |
| pkg/serdes/double_serialization_provider.go | Adjusts InitSerializer signature to match updated interface. |
| pkg/serdes/double_deserialization_provider.go | Adjusts InitDeserializer signature to match updated interface. |
| internal/kafka/confluent_kafka.go | Threads Kafka cluster ID into deserializer initialization during consume. |
| internal/kafka/command_topic_produce.go | Passes Kafka cluster ID to serializers and resolves subject via associations for registration/lookup. |
| internal/kafka/command_topic_consume.go | Captures Kafka cluster ID in the consume handler for downstream deserializer initialization. |
| internal/asyncapi/command_export.go | Updates deserializer init callsite for the new signature (passes empty Kafka cluster ID). |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| type SerializationProvider interface { | ||
| InitSerializer(srClientUrl, srClusterId, mode string, schemaId int, srAuth SchemaRegistryAuth) error | ||
| InitSerializer(srClientUrl, srClusterId, kafkaClusterId, mode string, schemaId int, srAuth SchemaRegistryAuth) error | ||
| LoadSchema(string, map[string]string) error |
| type DeserializationProvider interface { | ||
| InitDeserializer(srClientUrl, srClusterId, mode string, srAuth SchemaRegistryAuth, existingClient schemaregistry.Client) error | ||
| InitDeserializer(srClientUrl, srClusterId, kafkaClusterId, mode string, srAuth SchemaRegistryAuth, existingClient schemaregistry.Client) error | ||
| LoadSchema(string, string, serde.Type, *kafka.Message) error |
| // returns the SR subject for (topic, mode) by querying the associations API with the Kafka cluster id | ||
| // as resource namespace. Falls backt o default TopicNameStrategy (<topic>-<mode>) if unmatched. | ||
| func ResolveSubject(client schemaregistry.Client, kafkaClusterId, topic, mode string) string { |
| associations, err := client.GetAssociationsByResourceName(topic, kafkaClusterId, "topic", []string{mode}, "", 0, -1) | ||
| if err != nil || len(associations) == 0 { | ||
| return fallback |
| // Resolve subject via SR associations, fall back to TopicNameStrategy on miss. | ||
| subject := topicNameStrategy(topic, mode) | ||
| if kafkaClusterId != "" && srEndpoint != "" { | ||
| if client, err := serdes.NewSchemaRegistryClient(srEndpoint, srClusterId, srAuth); err == nil { | ||
| subject = serdes.ResolveSubject(client, kafkaClusterId, topic, mode) | ||
| } |
| type DeserializationProvider interface { | ||
| InitDeserializer(srClientUrl, srClusterId, mode string, srAuth SchemaRegistryAuth, existingClient schemaregistry.Client) error | ||
| InitDeserializer(srClientUrl, srClusterId, kafkaClusterId, mode string, srAuth SchemaRegistryAuth, existingClient schemaregistry.Client) error | ||
| LoadSchema(string, string, serde.Type, *kafka.Message) error |
| // returns the SR subject for (topic, mode) by querying the associations API with the Kafka cluster id | ||
| // as resource namespace. Falls backt o default TopicNameStrategy (<topic>-<mode>) if unmatched. | ||
| func ResolveSubject(client schemaregistry.Client, kafkaClusterId, topic, mode string) string { |
| associations, err := client.GetAssociationsByResourceName(topic, kafkaClusterId, "topic", []string{mode}, "", 0, -1) | ||
| if err != nil || len(associations) == 0 { | ||
| return fallback |
| // Resolve subject via SR associations, fall back to TopicNameStrategy on miss. | ||
| subject := topicNameStrategy(topic, mode) | ||
| if kafkaClusterId != "" && srEndpoint != "" { | ||
| if client, err := serdes.NewSchemaRegistryClient(srEndpoint, srClusterId, srAuth); err == nil { | ||
| subject = serdes.ResolveSubject(client, kafkaClusterId, topic, mode) | ||
| } |
| func ResolveSubject(client schemaregistry.Client, kafkaClusterId, topic, mode string) string { | ||
| fallback := topic + "-" + mode | ||
| if kafkaClusterId == "" || client == nil { | ||
| return fallback | ||
| } | ||
| associations, err := client.GetAssociationsByResourceName(topic, kafkaClusterId, "topic", []string{mode}, "", 0, -1) | ||
| if err != nil || len(associations) == 0 { |
|
|


Release Notes
Breaking Changes
New Features
Bug Fixes
Checklist
Whatsection below whether this PR applies to Confluent Cloud, Confluent Platform, or both.Test & Reviewsection below.Blast Radiussection below.What
Blast Radius
References
Test & Review