From af435df8726671c79355dc58f9b7abae2adc5964 Mon Sep 17 00:00:00 2001 From: Erik Dubbelboer Date: Sat, 6 Jun 2026 11:02:38 +0200 Subject: [PATCH 1/2] Fix signaling races during peer connection setup Serialize incoming signaling messages so WebRTC descriptions and candidates are handled in order. Queue ICE candidates until a remote description is available, ignore stale answers that no longer match the current signaling state, and avoid creating duplicate Peer instances while async TURN credential lookup is in flight. On the server, have JoinLobby return the peers that existed before the join and only request connections to that snapshot. This avoids racing newly joined peers against each other while the lobby membership changes. Credential request responses can still resolve immediately because _addPeer may wait for them while handling a queued connect packet. --- internal/signaling/peer.go | 4 +- internal/signaling/stores/postgres.go | 22 +++++----- internal/signaling/stores/shared.go | 2 +- lib/network.ts | 8 ++++ lib/peer.ts | 38 ++++++++++++++--- lib/signaling.ts | 61 ++++++++++++++++++++++----- 6 files changed, 105 insertions(+), 30 deletions(-) diff --git a/internal/signaling/peer.go b/internal/signaling/peer.go index 156cb88..bf039d5 100644 --- a/internal/signaling/peer.go +++ b/internal/signaling/peer.go @@ -468,7 +468,7 @@ func (p *Peer) HandleJoinPacket(ctx context.Context, packet JoinPacket) error { return fmt.Errorf("lobby code too long") } - err := p.store.JoinLobby(ctx, p.Game, packet.Lobby, p.ID, packet.Password) + existingPeers, err := p.store.JoinLobby(ctx, p.Game, packet.Lobby, p.ID, packet.Password) if err != nil { switch err { case stores.ErrNotFound: @@ -509,7 +509,7 @@ func (p *Peer) HandleJoinPacket(ctx context.Context, packet JoinPacket) error { return err } - for _, otherID := range lobby.Peers { + for _, otherID := range existingPeers { if otherID == p.ID { continue } diff --git a/internal/signaling/stores/postgres.go b/internal/signaling/stores/postgres.go index ce50b24..027ad1f 100644 --- a/internal/signaling/stores/postgres.go +++ b/internal/signaling/stores/postgres.go @@ -212,18 +212,18 @@ func (s *PostgresStore) CreateLobby(ctx context.Context, game, lobbyCode, peerID return nil } -func (s *PostgresStore) JoinLobby(ctx context.Context, game, lobbyCode, peerID, password string) error { +func (s *PostgresStore) JoinLobby(ctx context.Context, game, lobbyCode, peerID, password string) ([]string, error) { if len(peerID) > 20 { logger := logging.GetLogger(ctx) logger.Warn("peer id too long", zap.String("peerID", peerID)) - return ErrInvalidPeerID + return nil, ErrInvalidPeerID } now := util.NowUTC(ctx) tx, err := s.DB.Begin(ctx) if err != nil { - return err + return nil, err } defer tx.Rollback(context.Background()) //nolint:errcheck @@ -239,21 +239,21 @@ func (s *PostgresStore) JoinLobby(ctx context.Context, game, lobbyCode, peerID, `, lobbyCode, game).Scan(&peerlist, &lobbyPassword, &maxPlayers) if err != nil { if errors.Is(err, pgx.ErrNoRows) { - return ErrNotFound + return nil, ErrNotFound } - return err + return nil, err } if lobbyPassword != nil && bcrypt.CompareHashAndPassword(lobbyPassword, []byte(password)) != nil { - return ErrInvalidPassword + return nil, ErrInvalidPassword } if maxPlayers > 0 && len(peerlist) >= maxPlayers { - return ErrLobbyIsFull + return nil, ErrLobbyIsFull } if slices.Contains(peerlist, peerID) { - return ErrAlreadyInLobby + return nil, ErrAlreadyInLobby } _, err = tx.Exec(ctx, ` @@ -265,15 +265,15 @@ func (s *PostgresStore) JoinLobby(ctx context.Context, game, lobbyCode, peerID, AND game = $4 `, peerID, now, lobbyCode, game) if err != nil { - return err + return nil, err } err = tx.Commit(ctx) if err != nil { - return err + return nil, err } - return nil + return peerlist, nil } func (s *PostgresStore) LeaveLobby(ctx context.Context, game, lobbyCode, peerID string) error { diff --git a/internal/signaling/stores/shared.go b/internal/signaling/stores/shared.go index ee2ae6c..ae466e4 100644 --- a/internal/signaling/stores/shared.go +++ b/internal/signaling/stores/shared.go @@ -26,7 +26,7 @@ type LobbyOptions struct { type Store interface { CreateLobby(ctx context.Context, Game, LobbyCode, PeerID string, options LobbyOptions) error - JoinLobby(ctx context.Context, game, lobby, id, password string) error + JoinLobby(ctx context.Context, game, lobby, id, password string) ([]string, error) LeaveLobby(ctx context.Context, game, lobby, id string) error GetLobby(ctx context.Context, game, lobby string) (Lobby, error) ListLobbies(ctx context.Context, game string, country, region string, filter, sort string, limit int) ([]Lobby, error) diff --git a/lib/network.ts b/lib/network.ts index 542571a..25feb08 100644 --- a/lib/network.ts +++ b/lib/network.ts @@ -157,8 +157,16 @@ export default class Network extends EventEmitter { * @internal */ async _addPeer (id: string, polite: boolean): Promise { + if (this.peers.has(id)) { + return + } + const config = await this.credentials.fillCredentials(this.peerConfig) + if (this.peers.has(id)) { + return + } + config.iceServers = config.iceServers?.filter(server => !(server.urls.includes('turn:') && server.username === undefined)) const peer = new Peer(this, this.signaling, id, config, polite) diff --git a/lib/peer.ts b/lib/peer.ts index e5f7590..1fedbd4 100644 --- a/lib/peer.ts +++ b/lib/peer.ts @@ -14,6 +14,7 @@ export default class Peer { private makingOffer: boolean = false private ignoreOffer: boolean = false private isSettingRemoteAnswerPending: boolean = false + private readonly pendingCandidates: RTCIceCandidate[] = [] // Connection state: private opened: boolean = false @@ -231,6 +232,23 @@ export default class Peer { void this.signaling.event('rtc', 'error', { target: this.id, error: JSON.stringify(e) }) } + private async addIceCandidate (candidate: RTCIceCandidate): Promise { + try { + await this.conn.addIceCandidate(candidate) + } catch (e) { + if (!this.ignoreOffer) { + throw e + } + } + } + + private async drainPendingCandidates (): Promise { + const candidates = this.pendingCandidates.splice(0) + for (const candidate of candidates) { + await this.addIceCandidate(candidate) + } + } + /** * @internal */ @@ -238,13 +256,13 @@ export default class Peer { switch (packet.type) { case 'candidate': if (packet.candidate != null) { - try { - await this.conn.addIceCandidate(packet.candidate) - } catch (e) { + if (this.conn.remoteDescription == null) { if (!this.ignoreOffer) { - throw e + this.pendingCandidates.push(packet.candidate) } + return } + await this.addIceCandidate(packet.candidate) } break @@ -260,9 +278,17 @@ export default class Peer { if (this.ignoreOffer) { return } + if (description.type === 'answer' && this.conn.signalingState !== 'have-local-offer') { + this.network.log('ignoring stale remote answer from', this.id, 'in state', this.conn.signalingState) + return + } this.isSettingRemoteAnswerPending = description.type === 'answer' - await this.conn.setRemoteDescription(description) - this.isSettingRemoteAnswerPending = false + try { + await this.conn.setRemoteDescription(description) + } finally { + this.isSettingRemoteAnswerPending = false + } + await this.drainPendingCandidates() if (description.type === 'offer') { if (process.env.NODE_ENV === 'test') { await this.conn.setLocalDescription(await this.conn.createAnswer()) diff --git a/lib/signaling.ts b/lib/signaling.ts index 92a698c..83727f9 100644 --- a/lib/signaling.ts +++ b/lib/signaling.ts @@ -27,6 +27,7 @@ export default class Signaling extends EventEmitter { private readonly requests: Map = new Map() private pingInterval?: ReturnType + private messageQueue: Promise = Promise.resolve() constructor (private readonly network: Network, peers: Map, url: string) { super() @@ -75,9 +76,7 @@ export default class Signaling extends EventEmitter { }, 100) } } - const onMessage = (ev: MessageEvent): void => { - this.handleSignalingMessage(ev.data).catch(_ => {}) - } + const onMessage = (ev: MessageEvent): void => this.enqueueSignalingMessage(ev.data) const onClose = (): void => { if (!this.network.closing) { const error = new SignalingError('socket-error', 'signaling socket closed') @@ -143,8 +142,8 @@ export default class Signaling extends EventEmitter { packet.rid = rid this.network.log('requesting signaling packet:', packet.type) const data = JSON.stringify(packet) + this.requests.set(rid, { resolve, reject, type: packet.type }) this.ws.send(data) - this.requests.set(rid, { resolve, reject }) }) } @@ -169,19 +168,60 @@ export default class Signaling extends EventEmitter { } } - private async handleSignalingMessage (data: string): Promise { + private enqueueSignalingMessage (data: string): void { + const packet = this.parseSignalingPacket(data) + if (packet == null) { + return + } + + this.resolveImmediateRequest(packet) + const handle = this.handleSignalingMessage.bind(this, packet) + this.messageQueue = this.messageQueue.then(handle, handle) + void this.messageQueue.catch(_ => {}) + } + + private parseSignalingPacket (data: string): SignalingPacketTypes | undefined { try { const packet = JSON.parse(data) as SignalingPacketTypes this.network.log('signaling packet received:', packet.type) + return packet + } catch (e) { + const error = new SignalingError('unknown-error', e as string) + this.network._onSignalingError(error) + } + } + + private resolveImmediateRequest (packet: SignalingPacketTypes): void { + if (packet.rid === undefined) { + return + } + + const request = this.requests.get(packet.rid) + // Only credential responses can resolve immediately: _addPeer waits for them + // while handling a queued connect packet, so queueing them would deadlock. + if (request?.type !== 'credentials') { + return + } + + this.requests.delete(packet.rid) + this.resolveRequest(packet, request) + } + + private resolveRequest (packet: SignalingPacketTypes, request: RequestHandler): void { + if (packet.type === 'error') { + request.reject(new SignalingError('server-error', packet.message, undefined, packet.code)) + } else { + request.resolve(packet) + } + } + + private async handleSignalingMessage (packet: SignalingPacketTypes): Promise { + try { if (packet.rid !== undefined) { const request = this.requests.get(packet.rid) if (request != null) { this.requests.delete(packet.rid) - if (packet.type === 'error') { - request.reject(new SignalingError('server-error', packet.message, undefined, packet.code)) - } else { - request.resolve(packet) - } + this.resolveRequest(packet, request) } } switch (packet.type) { @@ -319,6 +359,7 @@ export default class Signaling extends EventEmitter { interface RequestHandler { resolve: (data: SignalingPacketTypes) => void reject: (reason?: any) => void + type: string // Original request packet type; used to allow safe queue bypasses. } export class SignalingError { From bd0c623a917e77494332a660462297ad364ec8ff Mon Sep 17 00:00:00 2001 From: Erik Dubbelboer Date: Mon, 15 Jun 2026 12:58:18 +0200 Subject: [PATCH 2/2] pendingRemoteCandidates --- lib/peer.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/peer.ts b/lib/peer.ts index 1fedbd4..28a9551 100644 --- a/lib/peer.ts +++ b/lib/peer.ts @@ -14,7 +14,7 @@ export default class Peer { private makingOffer: boolean = false private ignoreOffer: boolean = false private isSettingRemoteAnswerPending: boolean = false - private readonly pendingCandidates: RTCIceCandidate[] = [] + private readonly pendingRemoteCandidates: RTCIceCandidate[] = [] // Connection state: private opened: boolean = false @@ -243,7 +243,7 @@ export default class Peer { } private async drainPendingCandidates (): Promise { - const candidates = this.pendingCandidates.splice(0) + const candidates = this.pendingRemoteCandidates.splice(0) for (const candidate of candidates) { await this.addIceCandidate(candidate) } @@ -258,7 +258,7 @@ export default class Peer { if (packet.candidate != null) { if (this.conn.remoteDescription == null) { if (!this.ignoreOffer) { - this.pendingCandidates.push(packet.candidate) + this.pendingRemoteCandidates.push(packet.candidate) } return }