diff --git a/server/internal/database/replication_slot_advance_from_cts_resource.go b/server/internal/database/replication_slot_advance_from_cts_resource.go index 4b3fb710..48bb324b 100644 --- a/server/internal/database/replication_slot_advance_from_cts_resource.go +++ b/server/internal/database/replication_slot_advance_from_cts_resource.go @@ -133,9 +133,11 @@ func (r *ReplicationSlotAdvanceFromCTSResource) Create(ctx context.Context, rc * return fmt.Errorf("failed to query target replication slot lsn: %w", err) } - if targetLSN <= currentLSN { - // No need to advance if the slot is already ahead of the commit - // timestamp + atOrBefore, err := postgres.LsnAtOrBefore(targetLSN, currentLSN).Scalar(ctx, conn) + if err != nil { + return fmt.Errorf("failed to compare LSNs: %w", err) + } + if atOrBefore { return nil } diff --git a/server/internal/postgres/create_db.go b/server/internal/postgres/create_db.go index cd6e0107..5a52bb2b 100644 --- a/server/internal/postgres/create_db.go +++ b/server/internal/postgres/create_db.go @@ -448,6 +448,16 @@ func SpockProgressReachedLSN(peerNodeName, targetLSN string) Query[bool] { } } +// LsnAtOrBefore reports whether lsn1 <= lsn2 using PostgreSQL's pg_lsn type. +// Use this instead of Go string comparison — LSNs are hex-formatted and string +// ordering produces wrong results across segment boundaries (e.g. "F/..." > "10/..."). +func LsnAtOrBefore(lsn1, lsn2 string) Query[bool] { + return Query[bool]{ + SQL: "SELECT @lsn1::pg_lsn <= @lsn2::pg_lsn", + Args: pgx.NamedArgs{"lsn1": lsn1, "lsn2": lsn2}, + } +} + // GetSubscriptionStatus returns the current status of a specific subscription func GetSubscriptionStatus(providerNode, subscriberNode string) Query[string] { return Query[string]{