-
Notifications
You must be signed in to change notification settings - Fork 29
KSQL-14849: add 'confluent ksql cluster update --csu N' command #3368
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
b6e73f5
afda034
ff9181a
88d7aa6
6ae6583
78ed219
aebb8ce
2b01059
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,163 @@ | ||
| package ksql | ||
|
|
||
| import ( | ||
| "fmt" | ||
| "sort" | ||
|
|
||
| "github.com/spf13/cobra" | ||
|
|
||
| pcmd "github.com/confluentinc/cli/v4/pkg/cmd" | ||
| "github.com/confluentinc/cli/v4/pkg/errors" | ||
| "github.com/confluentinc/cli/v4/pkg/examples" | ||
| "github.com/confluentinc/cli/v4/pkg/output" | ||
| ) | ||
|
|
||
| // validCsuSizes mirrors cc-control-plane-ksql's authoritative list. | ||
| // | ||
| //nolint:gochecknoglobals | ||
| var validCsuSizes = []int32{4, 8, 12, 16, 20, 24, 28} | ||
|
|
||
| //nolint:gochecknoglobals | ||
| var maxSelfServeCSU = func() int32 { | ||
| maxCsu := int32(0) | ||
| for _, v := range validCsuSizes { | ||
| if v > maxCsu { | ||
| maxCsu = v | ||
| } | ||
| } | ||
| return maxCsu | ||
| }() | ||
|
|
||
| func csuSupportTicketMessage() string { | ||
| return fmt.Sprintf( | ||
| "CSU values above %d require a support ticket. "+ | ||
| "Please contact Confluent Support to request a larger cluster size.", | ||
| maxSelfServeCSU) | ||
| } | ||
|
|
||
| // buildUpdateExamples returns the customer-facing help examples for the | ||
| // update command — extracted so it's directly unit-testable. | ||
| func buildUpdateExamples() string { | ||
| return examples.BuildExampleString( | ||
| examples.Example{ | ||
| Text: `Expand ksqlDB cluster "lksqlc-12345" to 8 CSUs.`, | ||
| Code: "confluent ksql cluster update lksqlc-12345 --csu 8", | ||
| }, | ||
| examples.Example{ | ||
| Text: `Shrink ksqlDB cluster "lksqlc-12345" to 4 CSUs.`, | ||
| Code: "confluent ksql cluster update lksqlc-12345 --csu 4", | ||
| }, | ||
| ) | ||
| } | ||
|
|
||
| // buildCsuFlagUsage returns the help text for the --csu flag — extracted | ||
| // so it's directly unit-testable. | ||
| func buildCsuFlagUsage() string { | ||
| return fmt.Sprintf("Target number of CSUs for the cluster. Valid values: %s.", | ||
| formatCsuList(validCsuSizes)) | ||
| } | ||
|
|
||
| func (c *ksqlCommand) newUpdateCommand() *cobra.Command { | ||
| cmd := &cobra.Command{ | ||
| Use: "update <id>", | ||
| Short: "Update a ksqlDB cluster.", | ||
| Long: buildUpdateLongDescription(), | ||
| Args: cobra.ExactArgs(1), | ||
| ValidArgsFunction: pcmd.NewValidArgsFunction(c.validArgs), | ||
| RunE: c.update, | ||
| Hidden: true, // until cc-api #2507 merges + public SDK regenerates | ||
| Example: buildUpdateExamples(), | ||
| } | ||
|
|
||
| cmd.Flags().Int32("csu", 0, buildCsuFlagUsage()) | ||
| pcmd.AddContextFlag(cmd, c.CLICommand) | ||
| pcmd.AddEnvironmentFlag(cmd, c.AuthenticatedCLICommand) | ||
| pcmd.AddOutputFlag(cmd) | ||
|
|
||
| cobra.CheckErr(cmd.MarkFlagRequired("csu")) | ||
|
|
||
| return cmd | ||
| } | ||
|
|
||
| func buildUpdateLongDescription() string { | ||
| return fmt.Sprintf( | ||
| "Update an existing ksqlDB cluster. Currently only the CSU count may be modified. "+ | ||
| "Both expansion (increase) and shrink (decrease) are supported.\n\n"+ | ||
| "Valid CSU values are %s. Larger sizes require a support ticket. "+ | ||
| "The cluster will undergo a rolling restart to apply the new size; "+ | ||
| "the command returns once the resize has been accepted by the control plane. "+ | ||
| "Shrink requests are precondition-checked against the cluster's running "+ | ||
| "persistent-query count and refused if the new size cannot host them; "+ | ||
| "drop excess queries with `TERMINATE <query_id>` and retry.", | ||
| formatCsuList(validCsuSizes)) | ||
|
Comment on lines
+82
to
+92
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done in |
||
| } | ||
|
|
||
| func (c *ksqlCommand) update(cmd *cobra.Command, args []string) error { | ||
| csu, err := cmd.Flags().GetInt32("csu") | ||
| if err != nil { | ||
| return err | ||
| } | ||
| if err := validateCsuForUpdate(csu); err != nil { | ||
| return err | ||
| } | ||
|
|
||
| environmentId, err := c.Context.EnvironmentId() | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| clusterId := args[0] | ||
|
|
||
| // Client-side no-op short-circuit; direction is server-arbitrated. | ||
| current, err := c.V2Client.DescribeKsqlCluster(clusterId, environmentId) | ||
| if err != nil { | ||
| return errors.CatchKSQLNotFoundError(err, clusterId) | ||
| } | ||
| currentCsu := current.Spec.GetCsu() | ||
| if currentCsu == csu { | ||
| return fmt.Errorf("ksqlDB cluster %q is already at %d CSUs; no change requested", | ||
| clusterId, csu) | ||
| } | ||
|
|
||
| cluster, err := c.V2Client.UpdateKsqlCluster(clusterId, environmentId, csu) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| // Rolling-restart notice prints only AFTER the PATCH was accepted. | ||
| output.ErrPrintf(c.Config.EnableColor, | ||
| "Resizing ksqlDB cluster %q from %d to %d CSUs. A rolling restart will be "+ | ||
| "performed asynchronously; the cluster will continue serving queries during the resize.\n", | ||
| clusterId, currentCsu, csu) | ||
|
|
||
| table := output.NewTable(cmd) | ||
| table.Add(c.formatClusterForDisplayAndList(&cluster)) | ||
| return table.Print() | ||
| } | ||
|
|
||
| // validateCsuForUpdate fail-fast checks before issuing the API call. | ||
| func validateCsuForUpdate(csu int32) error { | ||
| if csu > maxSelfServeCSU { | ||
| return fmt.Errorf("%d CSUs: %s", csu, csuSupportTicketMessage()) | ||
| } | ||
|
Comment on lines
+139
to
+142
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done in |
||
| for _, valid := range validCsuSizes { | ||
| if csu == valid { | ||
| return nil | ||
| } | ||
| } | ||
| return fmt.Errorf("%d is not a valid CSU size for cluster update. Valid sizes are %s", | ||
| csu, formatCsuList(validCsuSizes)) | ||
| } | ||
|
|
||
| func formatCsuList(sizes []int32) string { | ||
| sorted := append([]int32(nil), sizes...) | ||
| sort.Slice(sorted, func(i, j int) bool { return sorted[i] < sorted[j] }) | ||
| out := "" | ||
| for i, s := range sorted { | ||
| if i > 0 { | ||
| out += ", " | ||
| } | ||
| out += fmt.Sprintf("%d", s) | ||
| } | ||
| return out | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,127 @@ | ||
| package ksql | ||
|
|
||
| import ( | ||
| "testing" | ||
|
|
||
| "github.com/stretchr/testify/require" | ||
| ) | ||
|
|
||
| func TestValidateCsuForUpdate(t *testing.T) { | ||
| tests := []struct { | ||
| name string | ||
| csu int32 | ||
| expectErr bool | ||
| errContains string | ||
| }{ | ||
| {name: "valid 4", csu: 4}, | ||
| {name: "valid 8", csu: 8}, | ||
| {name: "valid 12", csu: 12}, | ||
| {name: "valid 16", csu: 16}, | ||
| {name: "valid 20", csu: 20}, | ||
| {name: "valid 24", csu: 24}, | ||
| {name: "valid 28", csu: 28}, | ||
| { | ||
| name: "legacy size 1 rejected", | ||
| csu: 1, | ||
| expectErr: true, | ||
| errContains: "not a valid CSU size", | ||
| }, | ||
| { | ||
| name: "legacy size 2 rejected", | ||
| csu: 2, | ||
| expectErr: true, | ||
| errContains: "not a valid CSU size", | ||
| }, | ||
| { | ||
| name: "in-range but non-canonical (5) rejected", | ||
| csu: 5, | ||
| expectErr: true, | ||
| errContains: "not a valid CSU size", | ||
| }, | ||
| { | ||
| name: "in-range but non-canonical (10) rejected", | ||
| csu: 10, | ||
| expectErr: true, | ||
| errContains: "not a valid CSU size", | ||
| }, | ||
| { | ||
| name: "above 28 routes to support-ticket message", | ||
| csu: 32, | ||
| expectErr: true, | ||
| errContains: "support ticket", | ||
| }, | ||
| { | ||
| name: "well above ceiling routes to support-ticket message", | ||
| csu: 128, | ||
| expectErr: true, | ||
| errContains: "support ticket", | ||
| }, | ||
| } | ||
|
|
||
| for _, tc := range tests { | ||
| t.Run(tc.name, func(t *testing.T) { | ||
| err := validateCsuForUpdate(tc.csu) | ||
| if tc.expectErr { | ||
| require.Error(t, err) | ||
| require.Contains(t, err.Error(), tc.errContains) | ||
| } else { | ||
| require.NoError(t, err) | ||
| } | ||
| }) | ||
| } | ||
| } | ||
|
|
||
| func TestFormatCsuList(t *testing.T) { | ||
| require.Equal(t, "4, 8, 12, 16, 20, 24, 28", formatCsuList(validCsuSizes)) | ||
| // Input order should not matter; output is sorted ascending. | ||
| require.Equal(t, "4, 8, 16", formatCsuList([]int32{16, 4, 8})) | ||
| } | ||
|
|
||
| // TestBuildUpdateLongDescription pins the customer-facing help text. KSQL-15168 | ||
| // rewrote this to advertise both expansion and shrink (and the TERMINATE | ||
| // remediation when the server refuses a shrink). A future change that | ||
| // reverts to expand-only wording, drops the TERMINATE guidance, or drops | ||
| // the valid CSU listing would break this test. | ||
| func TestBuildUpdateLongDescription(t *testing.T) { | ||
| long := buildUpdateLongDescription() | ||
|
|
||
| require.Contains(t, long, "Both expansion (increase) and shrink (decrease) are supported", | ||
| "long description must advertise both directions") | ||
| require.Contains(t, long, "TERMINATE <query_id>", | ||
| "long description must surface the customer-side remediation for a refused shrink") | ||
| require.Contains(t, long, "4, 8, 12, 16, 20, 24, 28", | ||
| "long description must enumerate valid CSU sizes (kept in sync with validCsuSizes)") | ||
| require.Contains(t, long, "rolling restart", | ||
| "long description must call out the rolling-restart behavior") | ||
| } | ||
|
|
||
| // TestCsuSupportTicketMessage pins the support-ticket fallback message. | ||
| func TestCsuSupportTicketMessage(t *testing.T) { | ||
| msg := csuSupportTicketMessage() | ||
| require.Contains(t, msg, "support ticket") | ||
| require.Contains(t, msg, "28", "message must name the self-serve ceiling") | ||
| } | ||
|
|
||
| // TestBuildUpdateExamples pins the customer-facing help examples — covers | ||
| // both expand and shrink lines. | ||
| func TestBuildUpdateExamples(t *testing.T) { | ||
| out := buildUpdateExamples() | ||
| require.Contains(t, out, "Expand ksqlDB cluster") | ||
| require.Contains(t, out, "Shrink ksqlDB cluster") | ||
| require.Contains(t, out, "--csu 8") | ||
| require.Contains(t, out, "--csu 4") | ||
| require.Contains(t, out, "lksqlc-12345") | ||
| } | ||
|
|
||
| // TestBuildCsuFlagUsage pins the --csu flag's help text. | ||
| func TestBuildCsuFlagUsage(t *testing.T) { | ||
| out := buildCsuFlagUsage() | ||
| require.Contains(t, out, "Target number of CSUs") | ||
| require.Contains(t, out, "4, 8, 12, 16, 20, 24, 28", | ||
| "flag usage must list all valid CSU sizes") | ||
| } | ||
|
|
||
| // TestMaxSelfServeCSU verifies the cap is derived from validCsuSizes. | ||
| func TestMaxSelfServeCSU(t *testing.T) { | ||
| require.Equal(t, int32(28), maxSelfServeCSU) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done in
9b9eee25: addedHidden: trueon the cobra command. The subcommand is still reachable for testing and review, but won't appear inksql cluster --helpor in customer-facing discovery. DropHiddenwhen the SDK is regenerated and the shim is replaced with the real PATCH call.