diff --git a/pkg/sql/compile/remoterunServer.go b/pkg/sql/compile/remoterunServer.go index e078047c09bbf..8be4fd49cc6a3 100644 --- a/pkg/sql/compile/remoterunServer.go +++ b/pkg/sql/compile/remoterunServer.go @@ -118,7 +118,13 @@ func CnServerMessageHandler( // to prevent some strange handle order between 'stop sending message' and others. // todo: it is tcp connection now. should be very careful, we should listen to stream context next day. if err == nil { - <-receiver.connectionCtx.Done() + // Also observe messageCtx cancellation so a killed query + // doesn't leave this handler blocked forever waiting for + // the TCP connection to close (issue #25025). + select { + case <-receiver.connectionCtx.Done(): + case <-receiver.messageCtx.Done(): + } } colexec.Get().RemoveRelatedPipeline(receiver.clientSession, receiver.messageId) } diff --git a/pkg/sql/compile/scope.go b/pkg/sql/compile/scope.go index 7c6b3f15172f5..f98b9783996f3 100644 --- a/pkg/sql/compile/scope.go +++ b/pkg/sql/compile/scope.go @@ -322,7 +322,21 @@ func (s *Scope) MergeRun(c *Compile) error { // step 3. defer func() { // should wait all the notify-message-routine and preScopes done. - wg.Wait() + // Use a goroutine + select to make wg.Wait() cancelable. + // sync.WaitGroup.Wait() does not observe context cancellation, + // so if a sub-routine fails to call wg.Done(), MergeRun blocks + // forever and the query becomes unkillable (issue #25025). + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + select { + case <-done: + case <-s.Proc.Ctx.Done(): + // Context cancelled. Sub-routines that haven't completed + // will be cleaned up by the upper-layer MarkQueryDone. + } // not necessary, but we still clean the preScope error channel here. for len(preScopeResultReceiveChan) > 0 { @@ -811,14 +825,17 @@ func (r *notifyMessageResult) clean(proc *process.Process) { func (s *Scope) sendNotifyMessage(wg *sync.WaitGroup, resultChan chan notifyMessageResult) { // if context has done, it means the user or other part of the pipeline stops this query. closeWithError := func(err error, reg *process.WaitRegister, sender *messageSenderOnClient) { - reg.Ch2 <- process.NewPipelineSignalToDirectly(nil, err, s.Proc.Mp()) - + // Use select to avoid blocking forever on Ch2 when the pipeline + // consumer has already stopped (e.g. due to context cancellation). + // Without this, closeWithError blocks -> resultChan never receives + // -> wg.Done() never called -> MergeRun wg.Wait() blocks forever, + // making the query unkillable (issue #25025). select { + case reg.Ch2 <- process.NewPipelineSignalToDirectly(nil, err, s.Proc.Mp()): case <-s.Proc.Ctx.Done(): - resultChan <- notifyMessageResult{err: nil, sender: sender} - default: - resultChan <- notifyMessageResult{err: err, sender: sender} } + resultChan <- notifyMessageResult{err: err, sender: sender} + wg.Done() } diff --git a/pkg/sql/compile/scope_test.go b/pkg/sql/compile/scope_test.go index ed8e5077b97cb..72530240c5571 100644 --- a/pkg/sql/compile/scope_test.go +++ b/pkg/sql/compile/scope_test.go @@ -18,6 +18,8 @@ import ( "context" "fmt" "path" + "sync" + "testing" "time" @@ -1079,3 +1081,126 @@ func TestBuildScanParallelRunSetsOrderByOnParallelReaders(t *testing.T) { require.Equal(t, uint64(16), reader.blockLimit) } } + +// TestCloseWithErrorContextCancel verifies that closeWithError does not block +// forever on sending to Ch2 when the pipeline consumer has stopped. +// Before the fix for issue #25025, closeWithError used a plain channel send +// that could block forever, preventing wg.Done() from being called and +// causing MergeRun's wg.Wait() to hang. +func TestCloseWithErrorContextCancel(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + + // Create an unbuffered Ch2 — simulating a pipeline consumer + // that has already stopped and is not reading. + ch2 := make(chan process.PipelineSignal) // unbuffered! + + // Cancel context first — the select should detect this and + // skip the blocking channel send. + cancel() + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + // This is the exact select pattern added to closeWithError. + // When ctx is cancelled, it should NOT block on ch2 send. + select { + case ch2 <- process.NewPipelineSignalToDirectly(nil, nil, nil): + t.Log("ch2 send succeeded") + case <-ctx.Done(): + t.Log("ctx cancelled, skipping ch2 send") + } + }() + + // Wait for the goroutine to finish (or timeout). + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + + select { + case <-done: + // wg.Wait() returned — the select didn't block on Ch2. + case <-time.After(3 * time.Second): + t.Fatal("goroutine did not finish within 3s — select is still blocked on Ch2 send") + } +} + + +func TestCnConnectionWaitContextCancel(t *testing.T) { + // Simulate the two contexts: connectionCtx stays alive (TCP connection still open), + // messageCtx gets cancelled (query killed). + connectionCtx, connCancel := context.WithCancel(context.Background()) + defer connCancel() + messageCtx, messageCancel := context.WithCancel(context.Background()) + + // Cancel the message context (simulating KILL). + messageCancel() + + // The select should return because messageCtx.Done() fires, + // even though connectionCtx is still alive. + select { + case <-connectionCtx.Done(): + t.Fatal("connectionCtx should not be done yet") + case <-messageCtx.Done(): + // Success — the select correctly observes messageCtx cancellation. + } +} + +// TestMergeRunWgWaitCancelable verifies that MergeRun's wg.Wait() does not +// block forever when context is cancelled. This is a regression test for +// issue #25025 — before the fix, wg.Wait() was uncancelable. +func TestMergeRunWgWaitCancelable(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + + proc := testutil.NewProcess(t) + proc.Ctx = ctx + + c := &Compile{ + addr: "127.0.0.1:0", + proc: proc, + } + c.hasMergeOp = true + + // Root scope that runs MergeRun with a PreScope that blocks. + root := newScope(Merge) + root.Proc = proc + root.RootOp = connector.NewArgument() + root.RootOp.(*connector.Connector).Reg = &process.WaitRegister{ + Ch2: make(chan process.PipelineSignal, 1), + } + root.NodeInfo = engine.Node{Addr: "127.0.0.1:0", Mcpu: 1} + + // PreScope that blocks until context is cancelled. + preProc := testutil.NewProcess(t) + preProc.Ctx = ctx + preScope := newScope(Normal) + preScope.Proc = preProc + preScope.RootOp = connector.NewArgument() + preScope.RootOp.(*connector.Connector).Reg = &process.WaitRegister{ + Ch2: make(chan process.PipelineSignal, 1), + } + preScope.NodeInfo = engine.Node{Addr: "127.0.0.1:0", Mcpu: 1} + preScope.Magic = Normal + root.PreScopes = []*Scope{preScope} + + // Run MergeRun in a goroutine; cancel context; verify it returns. + errCh := make(chan error, 1) + go func() { + errCh <- root.MergeRun(c) + }() + + // Give it a moment to start execution. + time.Sleep(100 * time.Millisecond) + cancel() + + select { + case err := <-errCh: + // MergeRun returned — success. The important thing is that + // it didn't hang forever on wg.Wait(). + _ = err + case <-time.After(10 * time.Second): + t.Fatal("MergeRun did not return within 10s after context cancellation — wg.Wait() still blocks") + } +}