Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion pkg/sql/compile/remoterunServer.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,13 @@ func CnServerMessageHandler(
// to prevent some strange handle order between 'stop sending message' and others.
// todo: it is tcp connection now. should be very careful, we should listen to stream context next day.
if err == nil {
<-receiver.connectionCtx.Done()
// Also observe messageCtx cancellation so a killed query
// doesn't leave this handler blocked forever waiting for
// the TCP connection to close (issue #25025).
select {
case <-receiver.connectionCtx.Done():
case <-receiver.messageCtx.Done():
}
}
colexec.Get().RemoveRelatedPipeline(receiver.clientSession, receiver.messageId)
}
Expand Down
29 changes: 23 additions & 6 deletions pkg/sql/compile/scope.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,21 @@ func (s *Scope) MergeRun(c *Compile) error {
// step 3.
defer func() {
// should wait all the notify-message-routine and preScopes done.
wg.Wait()
// Use a goroutine + select to make wg.Wait() cancelable.
// sync.WaitGroup.Wait() does not observe context cancellation,
// so if a sub-routine fails to call wg.Done(), MergeRun blocks
// forever and the query becomes unkillable (issue #25025).
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()
select {
case <-done:
case <-s.Proc.Ctx.Done():
// Context cancelled. Sub-routines that haven't completed
// will be cleaned up by the upper-layer MarkQueryDone.
}

// not necessary, but we still clean the preScope error channel here.
for len(preScopeResultReceiveChan) > 0 {
Expand Down Expand Up @@ -811,14 +825,17 @@ func (r *notifyMessageResult) clean(proc *process.Process) {
func (s *Scope) sendNotifyMessage(wg *sync.WaitGroup, resultChan chan notifyMessageResult) {
// if context has done, it means the user or other part of the pipeline stops this query.
closeWithError := func(err error, reg *process.WaitRegister, sender *messageSenderOnClient) {
reg.Ch2 <- process.NewPipelineSignalToDirectly(nil, err, s.Proc.Mp())

// Use select to avoid blocking forever on Ch2 when the pipeline
// consumer has already stopped (e.g. due to context cancellation).
// Without this, closeWithError blocks -> resultChan never receives
// -> wg.Done() never called -> MergeRun wg.Wait() blocks forever,
// making the query unkillable (issue #25025).
select {
case reg.Ch2 <- process.NewPipelineSignalToDirectly(nil, err, s.Proc.Mp()):
case <-s.Proc.Ctx.Done():
resultChan <- notifyMessageResult{err: nil, sender: sender}
default:
resultChan <- notifyMessageResult{err: err, sender: sender}
}
resultChan <- notifyMessageResult{err: err, sender: sender}

wg.Done()
}

Expand Down
125 changes: 125 additions & 0 deletions pkg/sql/compile/scope_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"context"
"fmt"
"path"
"sync"

"testing"
"time"

Expand Down Expand Up @@ -1079,3 +1081,126 @@ func TestBuildScanParallelRunSetsOrderByOnParallelReaders(t *testing.T) {
require.Equal(t, uint64(16), reader.blockLimit)
}
}

// TestCloseWithErrorContextCancel verifies that closeWithError does not block
// forever on sending to Ch2 when the pipeline consumer has stopped.
// Before the fix for issue #25025, closeWithError used a plain channel send
// that could block forever, preventing wg.Done() from being called and
// causing MergeRun's wg.Wait() to hang.
func TestCloseWithErrorContextCancel(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())

// Create an unbuffered Ch2 — simulating a pipeline consumer
// that has already stopped and is not reading.
ch2 := make(chan process.PipelineSignal) // unbuffered!

// Cancel context first — the select should detect this and
// skip the blocking channel send.
cancel()

var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
// This is the exact select pattern added to closeWithError.
// When ctx is cancelled, it should NOT block on ch2 send.
select {
case ch2 <- process.NewPipelineSignalToDirectly(nil, nil, nil):
t.Log("ch2 send succeeded")
case <-ctx.Done():
t.Log("ctx cancelled, skipping ch2 send")
}
}()

// Wait for the goroutine to finish (or timeout).
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()

select {
case <-done:
// wg.Wait() returned — the select didn't block on Ch2.
case <-time.After(3 * time.Second):
t.Fatal("goroutine did not finish within 3s — select is still blocked on Ch2 send")
}
}


func TestCnConnectionWaitContextCancel(t *testing.T) {
// Simulate the two contexts: connectionCtx stays alive (TCP connection still open),
// messageCtx gets cancelled (query killed).
connectionCtx, connCancel := context.WithCancel(context.Background())
defer connCancel()
messageCtx, messageCancel := context.WithCancel(context.Background())

// Cancel the message context (simulating KILL).
messageCancel()

// The select should return because messageCtx.Done() fires,
// even though connectionCtx is still alive.
select {
case <-connectionCtx.Done():
t.Fatal("connectionCtx should not be done yet")
case <-messageCtx.Done():
// Success — the select correctly observes messageCtx cancellation.
}
}

// TestMergeRunWgWaitCancelable verifies that MergeRun's wg.Wait() does not
// block forever when context is cancelled. This is a regression test for
// issue #25025 — before the fix, wg.Wait() was uncancelable.
func TestMergeRunWgWaitCancelable(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())

proc := testutil.NewProcess(t)
proc.Ctx = ctx

c := &Compile{
addr: "127.0.0.1:0",
proc: proc,
}
c.hasMergeOp = true

// Root scope that runs MergeRun with a PreScope that blocks.
root := newScope(Merge)
root.Proc = proc
root.RootOp = connector.NewArgument()
root.RootOp.(*connector.Connector).Reg = &process.WaitRegister{
Ch2: make(chan process.PipelineSignal, 1),
}
root.NodeInfo = engine.Node{Addr: "127.0.0.1:0", Mcpu: 1}

// PreScope that blocks until context is cancelled.
preProc := testutil.NewProcess(t)
preProc.Ctx = ctx
preScope := newScope(Normal)
preScope.Proc = preProc
preScope.RootOp = connector.NewArgument()
preScope.RootOp.(*connector.Connector).Reg = &process.WaitRegister{
Ch2: make(chan process.PipelineSignal, 1),
}
preScope.NodeInfo = engine.Node{Addr: "127.0.0.1:0", Mcpu: 1}
preScope.Magic = Normal
root.PreScopes = []*Scope{preScope}

// Run MergeRun in a goroutine; cancel context; verify it returns.
errCh := make(chan error, 1)
go func() {
errCh <- root.MergeRun(c)
}()

// Give it a moment to start execution.
time.Sleep(100 * time.Millisecond)
cancel()

select {
case err := <-errCh:
// MergeRun returned — success. The important thing is that
// it didn't hang forever on wg.Wait().
_ = err
case <-time.After(10 * time.Second):
t.Fatal("MergeRun did not return within 10s after context cancellation — wg.Wait() still blocks")
}
}
Loading