From 44eda1afbca4cb936399a58f11863686c16d7183 Mon Sep 17 00:00:00 2001 From: jiangxinmeng Date: Tue, 16 Jun 2026 14:28:19 +0800 Subject: [PATCH 1/3] update --- pkg/cdc/util.go | 84 +++++++++++++++++++++++++++++--------------- pkg/cdc/util_test.go | 50 +++++++++++++++++++++++--- 2 files changed, 101 insertions(+), 33 deletions(-) diff --git a/pkg/cdc/util.go b/pkg/cdc/util.go index 6ee7f0779b661..245720fecc9cf 100644 --- a/pkg/cdc/util.go +++ b/pkg/cdc/util.go @@ -24,11 +24,14 @@ import ( "fmt" "math" "math/rand" + "net" + "net/url" "slices" "strconv" "strings" "time" + "github.com/go-sql-driver/mysql" "go.uber.org/zap" "github.com/matrixorigin/matrixone/pkg/catalog" @@ -557,10 +560,12 @@ func floatArrayToString[T float32 | float64](arr []T) string { var OpenDbConn = func(user, password string, ip string, port int, timeout string) (db *sql.DB, err error) { logutil.Info("cdc.util.open_db_conn", zap.String("timeout", timeout)) - dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/?readTimeout=%s&timeout=%s&writeTimeout=%s&multiStatements=true", - user, password, ip, port, timeout, timeout, timeout) + cfg, err := makeMysqlConfig(user, password, ip, port, timeout) + if err != nil { + return nil, err + } for i := 0; i < 3; i++ { - if db, err = tryConn(dsn); err == nil { + if db, err = tryConn(cfg); err == nil { // TODO check table existence return } @@ -571,23 +576,40 @@ var OpenDbConn = func(user, password string, ip string, port int, timeout string return } -var openDb = sql.Open +func makeMysqlConfig(user, password string, ip string, port int, timeout string) (*mysql.Config, error) { + timeoutDuration, err := time.ParseDuration(timeout) + if err != nil { + return nil, err + } + cfg := mysql.NewConfig() + cfg.User = user + cfg.Passwd = password + cfg.Net = "tcp" + cfg.Addr = net.JoinHostPort(ip, strconv.Itoa(port)) + cfg.Timeout = timeoutDuration + cfg.ReadTimeout = timeoutDuration + cfg.WriteTimeout = timeoutDuration + cfg.MultiStatements = true + return cfg, nil +} -var tryConn = func(dsn string) (*sql.DB, error) { - db, err := openDb("mysql-mo", dsn) +var openDbWithConnector = sql.OpenDB + +var tryConn = func(cfg *mysql.Config) (*sql.DB, error) { + connector, err := mysql.NewConnector(cfg) + if err != nil { + return nil, err + } + db := openDbWithConnector(connector) + db.SetConnMaxLifetime(time.Minute * 3) + db.SetMaxOpenConns(1) + db.SetMaxIdleConns(1) + time.Sleep(time.Millisecond * 100) + + //ping opens the connection + err = db.Ping() if err != nil { return nil, err - } else { - db.SetConnMaxLifetime(time.Minute * 3) - db.SetMaxOpenConns(1) - db.SetMaxIdleConns(1) - time.Sleep(time.Millisecond * 100) - - //ping opens the connection - err = db.Ping() - if err != nil { - return nil, err - } } return db, err } @@ -816,31 +838,35 @@ func ExtractUriInfo( // compositedUriInfo uri according to the format: mysql://root:111@127.0.0.1:6001 // if valid, return true and extracted info -// !!!NOTE!!! -// user and password does not have the special character ( ':' '@' ) func compositedUriInfo(uri string, uriPrefix string) (bool, UriInfo) { if !uriHasPrefix(uri, uriPrefix) { return false, UriInfo{} } //locate user password rest := uri[len(uriPrefix):] - seps := strings.Split(rest, "@") - if len(seps) != 2 || len(seps[0]) == 0 || len(seps[1]) == 0 { + atIdx := strings.LastIndex(rest, "@") + if atIdx <= 0 || atIdx == len(rest)-1 { return false, UriInfo{} } - seps2 := strings.Split(seps[0], ":") - if len(seps2) < 2 { + userInfo := rest[:atIdx] + hostInfo := rest[atIdx+1:] + + colonIdx := strings.LastIndex(userInfo, ":") + if colonIdx <= 0 || colonIdx == len(userInfo)-1 { + return false, UriInfo{} + } + userName, err := url.PathUnescape(userInfo[:colonIdx]) + if err != nil || userName == "" { return false, UriInfo{} } - userName := strings.Join(seps2[0:len(seps2)-1], ":") - password := seps2[len(seps2)-1] - passwordStart := len(uriPrefix) + len(userName) + 1 - passwordEnd := passwordStart + len(password) - if passwordEnd > len(uri) || password != uri[passwordStart:passwordEnd] { + passwordStart := len(uriPrefix) + colonIdx + 1 + passwordEnd := len(uriPrefix) + atIdx + password, err := url.PathUnescape(uri[passwordStart:passwordEnd]) + if err != nil { return false, UriInfo{} } - sep3 := strings.Split(seps[1], ":") + sep3 := strings.Split(hostInfo, ":") if len(sep3) != 2 || len(sep3[0]) == 0 || len(sep3[1]) == 0 { return false, UriInfo{} } diff --git a/pkg/cdc/util_test.go b/pkg/cdc/util_test.go index 5fc4eb78276da..2d4df01990269 100644 --- a/pkg/cdc/util_test.go +++ b/pkg/cdc/util_test.go @@ -17,12 +17,14 @@ package cdc import ( "context" "database/sql" + "database/sql/driver" "fmt" "math" "testing" "time" "github.com/DATA-DOG/go-sqlmock" + "github.com/go-sql-driver/mysql" "github.com/prashantv/gostub" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -795,7 +797,7 @@ func Test_floatArrayToString(t *testing.T) { } func Test_openDbConn(t *testing.T) { - stub := gostub.Stub(&tryConn, func(_ string) (*sql.DB, error) { + stub := gostub.Stub(&tryConn, func(_ *mysql.Config) (*sql.DB, error) { return nil, nil }) defer stub.Reset() @@ -806,7 +808,7 @@ func Test_openDbConn(t *testing.T) { } func Test_openDbConnFailed(t *testing.T) { - stub := gostub.Stub(&tryConn, func(_ string) (*sql.DB, error) { + stub := gostub.Stub(&tryConn, func(_ *mysql.Config) (*sql.DB, error) { return nil, moerr.NewInternalErrorNoCtx("") }) defer stub.Reset() @@ -816,16 +818,33 @@ func Test_openDbConnFailed(t *testing.T) { assert.Nil(t, conn) } +func Test_makeMysqlConfig(t *testing.T) { + cfg, err := makeMysqlConfig("account:user:role", "p@ss", "127.0.0.1", 6001, CDCDefaultSendSqlTimeout) + require.NoError(t, err) + assert.Equal(t, "account:user:role", cfg.User) + assert.Equal(t, "p@ss", cfg.Passwd) + assert.Equal(t, "tcp", cfg.Net) + assert.Equal(t, "127.0.0.1:6001", cfg.Addr) + assert.Equal(t, 10*time.Minute, cfg.Timeout) + assert.Equal(t, 10*time.Minute, cfg.ReadTimeout) + assert.Equal(t, 10*time.Minute, cfg.WriteTimeout) + assert.True(t, cfg.MultiStatements) + assert.True(t, cfg.AllowNativePasswords) +} + func Test_tryConn(t *testing.T) { db, mock, err := sqlmock.New() assert.NoError(t, err) mock.ExpectPing() - stub := gostub.Stub(&openDb, func(_, _ string) (*sql.DB, error) { + stub := gostub.Stub(&openDbWithConnector, func(_ driver.Connector) *sql.DB { return db, nil }) defer stub.Reset() - got, err := tryConn("dsn") + + cfg, err := makeMysqlConfig("user", "password", "host", 1234, CDCDefaultSendSqlTimeout) + require.NoError(t, err) + got, err := tryConn(cfg) assert.NoError(t, err) assert.Equal(t, db, got) } @@ -966,6 +985,29 @@ func Test_compUriInfo(t *testing.T) { ret, _ = compositedUriInfo("prefixroot:111@3:4", "prefix") assert.True(t, ret) + + ret, info := compositedUriInfo("prefixaccount:user:role:p@ss@127.0.0.1:6001", "prefix") + assert.True(t, ret) + assert.Equal(t, "account:user:role", info.User) + assert.Equal(t, "p@ss", info.Password) + assert.Equal(t, "127.0.0.1", info.Ip) + assert.Equal(t, 6001, info.Port) + + ret, info = compositedUriInfo("prefixaccount%3Auser%3Arole:p%40ss@127.0.0.1:6001", "prefix") + assert.True(t, ret) + assert.Equal(t, "account:user:role", info.User) + assert.Equal(t, "p@ss", info.Password) + + ret, info = compositedUriInfo("prefixaccount%3Auser%3Arole:p%2540ss@127.0.0.1:6001", "prefix") + assert.True(t, ret) + assert.Equal(t, "account:user:role", info.User) + assert.Equal(t, "p%40ss", info.Password) + + ret, _ = compositedUriInfo("prefixaccount%zzuser:111@127.0.0.1:6001", "prefix") + assert.False(t, ret) + + ret, _ = compositedUriInfo("prefixaccount:user:role:@127.0.0.1:6001", "prefix") + assert.False(t, ret) } func Test_uriHasPrefix(t *testing.T) { From 76e88dc2a6df4616475f9a3634988f637c386ccd Mon Sep 17 00:00:00 2001 From: jiangxinmeng Date: Tue, 16 Jun 2026 14:46:03 +0800 Subject: [PATCH 2/3] fix cdc connector test stub --- pkg/cdc/util_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/cdc/util_test.go b/pkg/cdc/util_test.go index 2d4df01990269..76b34a7a02d7e 100644 --- a/pkg/cdc/util_test.go +++ b/pkg/cdc/util_test.go @@ -838,7 +838,7 @@ func Test_tryConn(t *testing.T) { mock.ExpectPing() stub := gostub.Stub(&openDbWithConnector, func(_ driver.Connector) *sql.DB { - return db, nil + return db }) defer stub.Reset() From 4d2983057faae5c6ae77fa1d51f769af91f989e7 Mon Sep 17 00:00:00 2001 From: jiangxinmeng Date: Tue, 23 Jun 2026 11:54:28 +0800 Subject: [PATCH 3/3] fix cdc mysql driver connector --- pkg/cdc/sinker_v2_executor_test.go | 205 +++++++++++++++++++++++++++++ pkg/cdc/util.go | 2 +- pkg/cdc/util_test.go | 2 +- 3 files changed, 207 insertions(+), 2 deletions(-) diff --git a/pkg/cdc/sinker_v2_executor_test.go b/pkg/cdc/sinker_v2_executor_test.go index 36bb43db18e15..911ae473237f0 100644 --- a/pkg/cdc/sinker_v2_executor_test.go +++ b/pkg/cdc/sinker_v2_executor_test.go @@ -18,6 +18,12 @@ import ( "context" "database/sql" "database/sql/driver" + "encoding/binary" + "errors" + "io" + "net" + "strconv" + "sync" "testing" "time" @@ -29,6 +35,175 @@ import ( "github.com/stretchr/testify/require" ) +type fakeMySQLServer struct { + listener net.Listener + queries chan string + errs chan error + wg sync.WaitGroup +} + +func startFakeMySQLServer(t *testing.T) *fakeMySQLServer { + t.Helper() + + listener, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + + server := &fakeMySQLServer{ + listener: listener, + queries: make(chan string, 4), + errs: make(chan error, 1), + } + server.wg.Add(1) + go server.serve() + + t.Cleanup(func() { + _ = listener.Close() + server.wg.Wait() + }) + + return server +} + +func (s *fakeMySQLServer) addr(t *testing.T) (string, int) { + t.Helper() + + host, portStr, err := net.SplitHostPort(s.listener.Addr().String()) + require.NoError(t, err) + port, err := strconv.Atoi(portStr) + require.NoError(t, err) + return host, port +} + +func (s *fakeMySQLServer) serve() { + defer s.wg.Done() + + conn, err := s.listener.Accept() + if err != nil { + if !errorsIsNetClosed(err) { + s.reportErr(err) + } + return + } + defer conn.Close() + + if err := writeMySQLPacket(conn, 0, mysqlHandshakePayload()); err != nil { + s.reportErr(err) + return + } + if _, _, err := readMySQLPacket(conn); err != nil { + s.reportErr(err) + return + } + if err := writeMySQLOK(conn, 2); err != nil { + s.reportErr(err) + return + } + + for { + _, payload, err := readMySQLPacket(conn) + if err != nil { + if err != io.EOF && !errorsIsNetClosed(err) { + s.reportErr(err) + } + return + } + if len(payload) == 0 { + continue + } + + switch payload[0] { + case 0x01: // COM_QUIT + return + case 0x03: // COM_QUERY + s.queries <- string(payload[1:]) + if err := writeMySQLOK(conn, 1); err != nil { + s.reportErr(err) + return + } + case 0x0e: // COM_PING + if err := writeMySQLOK(conn, 1); err != nil { + s.reportErr(err) + return + } + default: + s.reportErr(io.ErrUnexpectedEOF) + return + } + } +} + +func (s *fakeMySQLServer) reportErr(err error) { + select { + case s.errs <- err: + default: + } +} + +func errorsIsNetClosed(err error) bool { + return errors.Is(err, net.ErrClosed) || err.Error() == "use of closed network connection" +} + +func mysqlHandshakePayload() []byte { + const ( + clientLongPassword uint32 = 1 << 0 + clientLongFlag uint32 = 1 << 2 + clientProtocol41 uint32 = 1 << 9 + clientTransactions uint32 = 1 << 13 + clientSecureConn uint32 = 1 << 15 + clientMultiStatements uint32 = 1 << 16 + clientPluginAuth uint32 = 1 << 19 + ) + + caps := clientLongPassword | clientLongFlag | clientProtocol41 | + clientTransactions | clientSecureConn | clientMultiStatements | clientPluginAuth + authData := []byte("12345678abcdefghijklmnop") + + payload := []byte{0x0a} + payload = append(payload, []byte("5.7.0-cdc-test")...) + payload = append(payload, 0x00) + payload = binary.LittleEndian.AppendUint32(payload, 1) + payload = append(payload, authData[:8]...) + payload = append(payload, 0x00) + payload = binary.LittleEndian.AppendUint16(payload, uint16(caps)) + payload = append(payload, 0x21) + payload = binary.LittleEndian.AppendUint16(payload, 0x0002) + payload = binary.LittleEndian.AppendUint16(payload, uint16(caps>>16)) + payload = append(payload, 21) + payload = append(payload, make([]byte, 10)...) + payload = append(payload, authData[8:21]...) + payload = append(payload, 0x00) + payload = append(payload, []byte("mysql_native_password")...) + payload = append(payload, 0x00) + return payload +} + +func readMySQLPacket(conn net.Conn) (byte, []byte, error) { + header := make([]byte, 4) + if _, err := io.ReadFull(conn, header); err != nil { + return 0, nil, err + } + + length := int(header[0]) | int(header[1])<<8 | int(header[2])<<16 + payload := make([]byte, length) + if _, err := io.ReadFull(conn, payload); err != nil { + return 0, nil, err + } + return header[3], payload, nil +} + +func writeMySQLPacket(conn net.Conn, sequence byte, payload []byte) error { + header := []byte{byte(len(payload)), byte(len(payload) >> 8), byte(len(payload) >> 16), sequence} + if _, err := conn.Write(header); err != nil { + return err + } + _, err := conn.Write(payload) + return err +} + +func writeMySQLOK(conn net.Conn, sequence byte) error { + return writeMySQLPacket(conn, sequence, []byte{0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00}) +} + func TestExecutor_BeginTx(t *testing.T) { t.Run("SuccessfulBegin", func(t *testing.T) { db, mock, err := sqlmock.New() @@ -461,6 +636,36 @@ func TestExecutor_ExecSQL(t *testing.T) { }) } +func TestExecutor_ExecSQLAfterTryConnUsesReuseQueryBuf(t *testing.T) { + server := startFakeMySQLServer(t) + host, port := server.addr(t) + + cfg, err := makeMysqlConfig("user", "password", host, port, "5s") + require.NoError(t, err) + cfg.MaxAllowedPacket = 64 << 20 + + db, err := tryConn(cfg) + require.NoError(t, err) + + executor := &Executor{conn: db} + defer func() { + require.NoError(t, executor.Close()) + }() + + sqlBuf := append(make([]byte, v2SQLBufReserved), []byte("CREATE DATABASE cdc_regression")...) + err = executor.ExecSQL(context.Background(), nil, sqlBuf, false) + require.NoError(t, err) + + select { + case query := <-server.queries: + require.Equal(t, "CREATE DATABASE cdc_regression", query) + case err := <-server.errs: + require.NoError(t, err) + case <-time.After(time.Second): + t.Fatal("timed out waiting for COM_QUERY") + } +} + func TestExecutor_Close(t *testing.T) { t.Run("CloseWithActiveTransaction", func(t *testing.T) { db, mock, err := sqlmock.New() diff --git a/pkg/cdc/util.go b/pkg/cdc/util.go index 5c0a92511a091..b2ac52282e96a 100644 --- a/pkg/cdc/util.go +++ b/pkg/cdc/util.go @@ -31,7 +31,6 @@ import ( "strings" "time" - "github.com/go-sql-driver/mysql" "go.uber.org/zap" "github.com/matrixorigin/matrixone/pkg/catalog" @@ -47,6 +46,7 @@ import ( "github.com/matrixorigin/matrixone/pkg/txn/client" v2 "github.com/matrixorigin/matrixone/pkg/util/metric/v2" "github.com/matrixorigin/matrixone/pkg/vm/engine" + "github.com/matrixorigin/mysql" ) // escapeSQLString escapes special characters in SQL string literals to prevent SQL injection. diff --git a/pkg/cdc/util_test.go b/pkg/cdc/util_test.go index 76b34a7a02d7e..4bd8930452fb5 100644 --- a/pkg/cdc/util_test.go +++ b/pkg/cdc/util_test.go @@ -24,7 +24,6 @@ import ( "time" "github.com/DATA-DOG/go-sqlmock" - "github.com/go-sql-driver/mysql" "github.com/prashantv/gostub" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -40,6 +39,7 @@ import ( "github.com/matrixorigin/matrixone/pkg/testutil" "github.com/matrixorigin/matrixone/pkg/txn/client" "github.com/matrixorigin/matrixone/pkg/vm/engine" + "github.com/matrixorigin/mysql" ) func Test_aes(t *testing.T) {