Skip to content
Open
2 changes: 1 addition & 1 deletion internal/asyncapi/command_export.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ func (c *command) getMessageExamples(consumer *ckgo.Consumer, topicName, content
return nil, err
}

err = deserializationProvider.InitDeserializer(srEndpoint, srClusterId, "value", serdes.SchemaRegistryAuth{Token: token}, nil)
err = deserializationProvider.InitDeserializer(srEndpoint, srClusterId, "", "value", serdes.SchemaRegistryAuth{Token: token}, nil)
if err != nil {
return nil, err
}
Expand Down
3 changes: 3 additions & 0 deletions internal/kafka/command_topic_consume.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,12 +286,15 @@ func (c *command) consumeCloud(cmd *cobra.Command, args []string) error {
subject = schemaRegistryContext
}

log.CliLogger.Tracef("consumeCloud: kafkaClusterId=%q topic=%q", cluster.ID, topic)

groupHandler := &GroupHandler{
SrClient: srClient,
SrApiKey: srApiKey,
SrApiSecret: srApiSecret,
SrClusterId: srClusterId,
SrClusterEndpoint: srEndpoint,
KafkaClusterId: cluster.ID,
Token: token,
KeyFormat: keyFormat,
ValueFormat: valueFormat,
Expand Down
97 changes: 50 additions & 47 deletions internal/kafka/command_topic_produce.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@
cmd.Flags().String("delimiter", ":", "The delimiter separating each key and value.")
cmd.Flags().StringSlice("config", nil, `A comma-separated list of configuration overrides ("key=value") for the producer client. For a full list, see https://docs.confluent.io/platform/current/clients/librdkafka/html/md_CONFIGURATION.html`)
pcmd.AddProducerConfigFileFlag(cmd)
cmd.Flags().String("schema-registry-endpoint", "", "Endpoint for Schema Registry cluster.")

Check failure on line 65 in internal/kafka/command_topic_produce.go

View check run for this annotation

SonarQube-Confluent / SonarQube Code Analysis

Define a constant instead of duplicating this literal "schema-registry-endpoint" 3 times.

[S1192] String literals should not be duplicated See more on https://sonarqube.confluent.io/project/issues?id=cli&pullRequest=3373&issues=707eb69f-ab81-471c-b540-728c05d4249b&open=707eb69f-ab81-471c-b540-728c05d4249b
cmd.Flags().StringSlice("headers", nil, `A comma-separated list of headers formatted as "key:value".`)
cmd.Flags().Bool("schema-id-header", false, "Serialize schema ID in the header instead of the message prefix.")

Expand Down Expand Up @@ -147,12 +147,14 @@
return err
}

keySerializer, keyMetaInfo, err := c.initSchemaAndGetInfo(cmd, topic, "key")
log.CliLogger.Tracef("produceCloud: kafkaClusterId=%q topic=%q", cluster.ID, topic)

keySerializer, keyMetaInfo, err := c.initSchemaAndGetInfo(cmd, topic, "key", cluster.ID)
if err != nil {
return err
}

valueSerializer, valueMetaInfo, err := c.initSchemaAndGetInfo(cmd, topic, "value")
valueSerializer, valueMetaInfo, err := c.initSchemaAndGetInfo(cmd, topic, "value", cluster.ID)
if err != nil {
return err
}
Expand Down Expand Up @@ -207,12 +209,12 @@
func (c *command) produceOnPrem(cmd *cobra.Command, args []string) error {
topic := args[0]

keySerializer, keyMetaInfo, err := c.initSchemaAndGetInfoOnPrem(cmd, topic, "key")
keySerializer, keyMetaInfo, err := c.initSchemaAndGetInfoOnPrem(cmd, topic, "key", "")
if err != nil {
return err
}

valueSerializer, valueMetaInfo, err := c.initSchemaAndGetInfoOnPrem(cmd, topic, "value")
valueSerializer, valueMetaInfo, err := c.initSchemaAndGetInfoOnPrem(cmd, topic, "value", "")
if err != nil {
return err
}
Expand Down Expand Up @@ -461,14 +463,12 @@
return "", "", errors.New(missingOrMalformedKeyErrorMsg)
}

func (c *command) initSchemaAndGetInfo(cmd *cobra.Command, topic, mode string) (serdes.SerializationProvider, []byte, error) {
func (c *command) initSchemaAndGetInfo(cmd *cobra.Command, topic, mode, kafkaClusterId string) (serdes.SerializationProvider, []byte, error) {

Check failure on line 466 in internal/kafka/command_topic_produce.go

View check run for this annotation

SonarQube-Confluent / SonarQube Code Analysis

Refactor this method to reduce its Cognitive Complexity from 49 to the 15 allowed.

[S3776] Cognitive Complexity of functions should not be too high See more on https://sonarqube.confluent.io/project/issues?id=cli&pullRequest=3373&issues=2a958b39-d356-4a33-b263-4dce01d429c8&open=2a958b39-d356-4a33-b263-4dce01d429c8
schemaDir, err := createTempDir()
if err != nil {
return nil, nil, err
}

subject := topicNameStrategy(topic, mode)

// Deprecated
var schemaId optional.Int32
if mode == "value" && cmd.Flags().Changed("schema-id") {
Expand All @@ -491,6 +491,46 @@
schemaId = optional.NewInt32(int32(id))
}

srEndpoint, err := cmd.Flags().GetString("schema-registry-endpoint")
if err != nil {
return nil, nil, err
}
srApiKey, err := cmd.Flags().GetString("schema-registry-api-key")
if err != nil {
return nil, nil, err
}
srApiSecret, err := cmd.Flags().GetString("schema-registry-api-secret")
if err != nil {
return nil, nil, err
}
var token string
if c.Config.IsCloudLogin() { // Do not get token if users are consuming from Cloud while logged out
token, err = auth.GetDataplaneToken(c.Context)
if err != nil {
return nil, nil, err
}
}
var srClusterId string
if (schemaId.IsSet() || schema != "") && srEndpoint == "" {
srClusterId, srEndpoint, err = c.GetCurrentSchemaRegistryClusterIdAndEndpoint(cmd)
if err != nil {
return nil, nil, err
}
}
srAuth := serdes.SchemaRegistryAuth{
ApiKey: srApiKey,
ApiSecret: srApiSecret,
Token: token,
}

// Resolve subject via SR associations, fall back to TopicNameStrategy on miss.
subject := topicNameStrategy(topic, mode)
if kafkaClusterId != "" && srEndpoint != "" {
if client, err := newSchemaRegistryClient(srEndpoint, srClusterId, srAuth); err == nil {
subject = resolveSubject(client, kafkaClusterId, topic, mode)
}
Comment on lines +526 to +531
Comment on lines +526 to +531
}

var format string
referencePathMap := map[string]string{}
metaInfo := []byte{}
Expand Down Expand Up @@ -559,49 +599,12 @@
}
}

// Fetch the SR client endpoint during schema registration
srEndpoint, err := cmd.Flags().GetString("schema-registry-endpoint")
if err != nil {
return nil, nil, err
}

var srClusterId string
if (schemaId.IsSet() || schema != "") && srEndpoint == "" {
srClusterId, srEndpoint, err = c.GetCurrentSchemaRegistryClusterIdAndEndpoint(cmd)
if err != nil {
return nil, nil, err
}
}

// Initialize the serializer with the same SR endpoint during registration
// The associated schema ID is also required to initialize the serializer
srApiKey, err := cmd.Flags().GetString("schema-registry-api-key")
if err != nil {
return nil, nil, err
}
srApiSecret, err := cmd.Flags().GetString("schema-registry-api-secret")
if err != nil {
return nil, nil, err
}
var parsedSchemaId = -1
if len(metaInfo) >= 5 {
parsedSchemaId = int(binary.BigEndian.Uint32(metaInfo[1:5]))
}

var token string
if c.Config.IsCloudLogin() { // Do not get token if users are consuming from Cloud while logged out
token, err = auth.GetDataplaneToken(c.Context)
if err != nil {
return nil, nil, err
}
}
srAuth := serdes.SchemaRegistryAuth{
ApiKey: srApiKey,
ApiSecret: srApiSecret,
Token: token,
}
err = serializationProvider.InitSerializer(srEndpoint, srClusterId, mode, parsedSchemaId, srAuth)
if err != nil {
if err := serializationProvider.InitSerializer(srEndpoint, srClusterId, kafkaClusterId, mode, parsedSchemaId, srAuth); err != nil {
return nil, nil, err
}

Expand All @@ -613,7 +616,7 @@
return serializationProvider, metaInfo, nil
}

func (c *command) initSchemaAndGetInfoOnPrem(cmd *cobra.Command, topic, mode string) (serdes.SerializationProvider, []byte, error) {
func (c *command) initSchemaAndGetInfoOnPrem(cmd *cobra.Command, topic, mode, kafkaClusterId string) (serdes.SerializationProvider, []byte, error) {

Check failure on line 619 in internal/kafka/command_topic_produce.go

View check run for this annotation

SonarQube-Confluent / SonarQube Code Analysis

Refactor this method to reduce its Cognitive Complexity from 34 to the 15 allowed.

[S3776] Cognitive Complexity of functions should not be too high See more on https://sonarqube.confluent.io/project/issues?id=cli&pullRequest=3373&issues=a75582d2-10d3-4758-a259-ffb3d1a83c6d&open=a75582d2-10d3-4758-a259-ffb3d1a83c6d
schemaDir, err := createTempDir()
if err != nil {
return nil, nil, err
Expand Down Expand Up @@ -732,7 +735,7 @@
ClientKeyPath: clientKeyPath,
Token: token,
}
err = serializationProvider.InitSerializer(srEndpoint, "", mode, parsedSchemaId, srAuth)
err = serializationProvider.InitSerializer(srEndpoint, "", kafkaClusterId, mode, parsedSchemaId, srAuth)
if err != nil {
return nil, nil, err
}
Expand Down
5 changes: 3 additions & 2 deletions internal/kafka/confluent_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ type GroupHandler struct {
SrApiSecret string
SrClusterId string
SrClusterEndpoint string
KafkaClusterId string
Token string
CertificateAuthorityPath string
ClientCertPath string
Expand Down Expand Up @@ -241,7 +242,7 @@ func ConsumeMessage(message *ckgo.Message, h *GroupHandler) error {
return err
}

err = keyDeserializer.InitDeserializer(h.SrClusterEndpoint, h.SrClusterId, "key", srAuth, nil)
err = keyDeserializer.InitDeserializer(h.SrClusterEndpoint, h.SrClusterId, h.KafkaClusterId, "key", srAuth, nil)
if err != nil {
return err
}
Expand All @@ -268,7 +269,7 @@ func ConsumeMessage(message *ckgo.Message, h *GroupHandler) error {
return err
}

err = valueDeserializer.InitDeserializer(h.SrClusterEndpoint, h.SrClusterId, "value", srAuth, nil)
err = valueDeserializer.InitDeserializer(h.SrClusterEndpoint, h.SrClusterId, h.KafkaClusterId, "value", srAuth, nil)
if err != nil {
return err
}
Expand Down
40 changes: 40 additions & 0 deletions internal/kafka/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,15 @@ import (

cmkv2 "github.com/confluentinc/ccloud-sdk-go-v2/cmk/v2"
cckafkarestv3 "github.com/confluentinc/ccloud-sdk-go-v2/kafkarest/v3"
"github.com/confluentinc/confluent-kafka-go/v2/schemaregistry"
cpkafkarestv3 "github.com/confluentinc/kafka-rest-sdk-go/kafkarestv3"

"github.com/confluentinc/cli/v4/pkg/ccloudv2"
"github.com/confluentinc/cli/v4/pkg/ccstructs"
"github.com/confluentinc/cli/v4/pkg/kafkarest"
"github.com/confluentinc/cli/v4/pkg/kafkausagelimits"
"github.com/confluentinc/cli/v4/pkg/log"
"github.com/confluentinc/cli/v4/pkg/serdes"
)

func toCreateTopicConfigs(topicConfigsMap map[string]string) []cckafkarestv3.ConfigData {
Expand Down Expand Up @@ -210,6 +213,43 @@ func topicNameStrategy(topic, mode string) string {
return fmt.Sprintf("%s-%s", topic, mode)
}

func newSchemaRegistryClient(srClientUrl, srClusterId string, srAuth serdes.SchemaRegistryAuth) (schemaregistry.Client, error) {
var cfg *schemaregistry.Config
switch {
case srAuth.ApiKey != "" && srAuth.ApiSecret != "":
cfg = schemaregistry.NewConfigWithBasicAuthentication(srClientUrl, srAuth.ApiKey, srAuth.ApiSecret)
case srAuth.Token != "":
cfg = schemaregistry.NewConfigWithBearerAuthentication(srClientUrl, srAuth.Token, srClusterId, "")
default:
cfg = schemaregistry.NewConfig(srClientUrl)
log.CliLogger.Info("initializing schema registry client with no authentication")
}
cfg.SslCaLocation = srAuth.CertificateAuthorityPath
cfg.SslCertificateLocation = srAuth.ClientCertPath
cfg.SslKeyLocation = srAuth.ClientKeyPath
return schemaregistry.NewClient(cfg)
}

// returns the SR subject for (topic, mode) by querying the associations API with the Kafka cluster id
// as resource namespace. Falls backt o default TopicNameStrategy (<topic>-<mode>) if unmatched.
func resolveSubject(client schemaregistry.Client, kafkaClusterId, topic, mode string) string {
fallback := topic + "-" + mode
if kafkaClusterId == "" || client == nil {
return fallback
}
associations, err := client.GetAssociationsByResourceName(topic, kafkaClusterId, "topic", []string{mode}, "", 0, -1)
if err != nil {
log.CliLogger.Tracef("subject resolution: associations lookup failed (topic=%q mode=%q clusterId=%q): %v; using %q", topic, mode, kafkaClusterId, err, fallback)
return fallback
}
if len(associations) == 0 {
log.CliLogger.Tracef("subject resolution: no association for topic=%q mode=%q clusterId=%q; using %q", topic, mode, kafkaClusterId, fallback)
return fallback
}
log.CliLogger.Tracef("subject resolution: resolved associated subject %q (topic=%q mode=%q clusterId=%q)", associations[0].Subject, topic, mode, kafkaClusterId)
return associations[0].Subject
}

func getLimitsForSku(cluster *cmkv2.CmkV2Cluster, usageLimits *kafkausagelimits.UsageLimits) *kafkausagelimits.Limits {
if isDedicated(cluster) {
return usageLimits.GetCkuLimit(cluster.Status.GetCku())
Expand Down
74 changes: 74 additions & 0 deletions internal/kafka/utils_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package kafka

import (
"testing"

"github.com/stretchr/testify/require"

"github.com/confluentinc/confluent-kafka-go/v2/schemaregistry"
)

const mockSRUrl = "mock://"

func newMockSRClient(t *testing.T) schemaregistry.Client {
t.Helper()
client, err := schemaregistry.NewClient(schemaregistry.NewConfig(mockSRUrl))
require.NoError(t, err)
return client
}

func seedAssociation(t *testing.T, client schemaregistry.Client, topic, kafkaClusterId, mode, subject string) {
t.Helper()
// The mock requires the subject to have a registered schema before it
// will accept an association referencing it.
_, err := client.Register(subject, schemaregistry.SchemaInfo{
Schema: `{"type":"record","name":"R","fields":[{"name":"f","type":"int"}]}`,
SchemaType: "AVRO",
}, false)
require.NoError(t, err)
_, err = client.CreateOrUpdateAssociation(schemaregistry.AssociationCreateOrUpdateRequest{
ResourceName: topic,
ResourceNamespace: kafkaClusterId,
ResourceID: topic + ":" + kafkaClusterId,
ResourceType: "topic",
Associations: []schemaregistry.AssociationCreateOrUpdateInfo{{
Subject: subject,
AssociationType: mode,
}},
})
require.NoError(t, err)
}

func TestResolveSubject(t *testing.T) {
t.Run("nil client falls back to TopicNameStrategy", func(t *testing.T) {
require.Equal(t, "topic1-value", resolveSubject(nil, "lkc-123", "topic1", "value"))
})

t.Run("empty kafkaClusterId falls back to TopicNameStrategy", func(t *testing.T) {
client := newMockSRClient(t)
require.Equal(t, "topic1-value", resolveSubject(client, "", "topic1", "value"))
})

t.Run("no association falls back to TopicNameStrategy", func(t *testing.T) {
client := newMockSRClient(t)
require.Equal(t, "topic1-value", resolveSubject(client, "lkc-123", "topic1", "value"))
})

t.Run("matching association returns its subject", func(t *testing.T) {
client := newMockSRClient(t)
seedAssociation(t, client, "topic1", "lkc-123", "value", "custom-value-subject")
require.Equal(t, "custom-value-subject", resolveSubject(client, "lkc-123", "topic1", "value"))
})

t.Run("association for other mode falls back", func(t *testing.T) {
client := newMockSRClient(t)
seedAssociation(t, client, "topic1", "lkc-123", "key", "custom-key-subject")
require.Equal(t, "topic1-value", resolveSubject(client, "lkc-123", "topic1", "value"))
})

t.Run("association under different cluster id falls back", func(t *testing.T) {
client := newMockSRClient(t)
seedAssociation(t, client, "topic1", "lkc-other", "value", "should-not-be-used")
require.Equal(t, "topic1-value", resolveSubject(client, "lkc-123", "topic1", "value"))
})
}
4 changes: 3 additions & 1 deletion pkg/serdes/avro_deserialization_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type AvroDeserializationProvider struct {
deser *avrov3.Deserializer
}

func (a *AvroDeserializationProvider) InitDeserializer(srClientUrl, srClusterId, mode string, srAuth SchemaRegistryAuth, existingClient schemaregistry.Client) error {
func (a *AvroDeserializationProvider) InitDeserializer(srClientUrl, srClusterId, kafkaClusterId, mode string, srAuth SchemaRegistryAuth, existingClient schemaregistry.Client) error {
// Note: Now Serializer/Deserializer are tightly coupled with Schema Registry
// If existingClient is not nil, we should share this client between ser and deser.
// As the shared client is referred as mock client to store the same set of schemas in cache
Expand All @@ -34,6 +34,8 @@ func (a *AvroDeserializationProvider) InitDeserializer(srClientUrl, srClusterId,
}
}

serdeConfig.SubjectNameStrategyType, serdeConfig.SubjectNameStrategyConfig = subjectStrategy(kafkaClusterId)

var serdeType serde.Type
switch mode {
case "key":
Expand Down
Loading