diff --git a/internal/signaling/peer.go b/internal/signaling/peer.go index 156cb88..bf039d5 100644 --- a/internal/signaling/peer.go +++ b/internal/signaling/peer.go @@ -468,7 +468,7 @@ func (p *Peer) HandleJoinPacket(ctx context.Context, packet JoinPacket) error { return fmt.Errorf("lobby code too long") } - err := p.store.JoinLobby(ctx, p.Game, packet.Lobby, p.ID, packet.Password) + existingPeers, err := p.store.JoinLobby(ctx, p.Game, packet.Lobby, p.ID, packet.Password) if err != nil { switch err { case stores.ErrNotFound: @@ -509,7 +509,7 @@ func (p *Peer) HandleJoinPacket(ctx context.Context, packet JoinPacket) error { return err } - for _, otherID := range lobby.Peers { + for _, otherID := range existingPeers { if otherID == p.ID { continue } diff --git a/internal/signaling/stores/postgres.go b/internal/signaling/stores/postgres.go index ce50b24..027ad1f 100644 --- a/internal/signaling/stores/postgres.go +++ b/internal/signaling/stores/postgres.go @@ -212,18 +212,18 @@ func (s *PostgresStore) CreateLobby(ctx context.Context, game, lobbyCode, peerID return nil } -func (s *PostgresStore) JoinLobby(ctx context.Context, game, lobbyCode, peerID, password string) error { +func (s *PostgresStore) JoinLobby(ctx context.Context, game, lobbyCode, peerID, password string) ([]string, error) { if len(peerID) > 20 { logger := logging.GetLogger(ctx) logger.Warn("peer id too long", zap.String("peerID", peerID)) - return ErrInvalidPeerID + return nil, ErrInvalidPeerID } now := util.NowUTC(ctx) tx, err := s.DB.Begin(ctx) if err != nil { - return err + return nil, err } defer tx.Rollback(context.Background()) //nolint:errcheck @@ -239,21 +239,21 @@ func (s *PostgresStore) JoinLobby(ctx context.Context, game, lobbyCode, peerID, `, lobbyCode, game).Scan(&peerlist, &lobbyPassword, &maxPlayers) if err != nil { if errors.Is(err, pgx.ErrNoRows) { - return ErrNotFound + return nil, ErrNotFound } - return err + return nil, err } if lobbyPassword != nil && bcrypt.CompareHashAndPassword(lobbyPassword, []byte(password)) != nil { - return ErrInvalidPassword + return nil, ErrInvalidPassword } if maxPlayers > 0 && len(peerlist) >= maxPlayers { - return ErrLobbyIsFull + return nil, ErrLobbyIsFull } if slices.Contains(peerlist, peerID) { - return ErrAlreadyInLobby + return nil, ErrAlreadyInLobby } _, err = tx.Exec(ctx, ` @@ -265,15 +265,15 @@ func (s *PostgresStore) JoinLobby(ctx context.Context, game, lobbyCode, peerID, AND game = $4 `, peerID, now, lobbyCode, game) if err != nil { - return err + return nil, err } err = tx.Commit(ctx) if err != nil { - return err + return nil, err } - return nil + return peerlist, nil } func (s *PostgresStore) LeaveLobby(ctx context.Context, game, lobbyCode, peerID string) error { diff --git a/internal/signaling/stores/shared.go b/internal/signaling/stores/shared.go index ee2ae6c..ae466e4 100644 --- a/internal/signaling/stores/shared.go +++ b/internal/signaling/stores/shared.go @@ -26,7 +26,7 @@ type LobbyOptions struct { type Store interface { CreateLobby(ctx context.Context, Game, LobbyCode, PeerID string, options LobbyOptions) error - JoinLobby(ctx context.Context, game, lobby, id, password string) error + JoinLobby(ctx context.Context, game, lobby, id, password string) ([]string, error) LeaveLobby(ctx context.Context, game, lobby, id string) error GetLobby(ctx context.Context, game, lobby string) (Lobby, error) ListLobbies(ctx context.Context, game string, country, region string, filter, sort string, limit int) ([]Lobby, error) diff --git a/lib/network.ts b/lib/network.ts index 542571a..25feb08 100644 --- a/lib/network.ts +++ b/lib/network.ts @@ -157,8 +157,16 @@ export default class Network extends EventEmitter { * @internal */ async _addPeer (id: string, polite: boolean): Promise { + if (this.peers.has(id)) { + return + } + const config = await this.credentials.fillCredentials(this.peerConfig) + if (this.peers.has(id)) { + return + } + config.iceServers = config.iceServers?.filter(server => !(server.urls.includes('turn:') && server.username === undefined)) const peer = new Peer(this, this.signaling, id, config, polite) diff --git a/lib/peer.ts b/lib/peer.ts index e5f7590..28a9551 100644 --- a/lib/peer.ts +++ b/lib/peer.ts @@ -14,6 +14,7 @@ export default class Peer { private makingOffer: boolean = false private ignoreOffer: boolean = false private isSettingRemoteAnswerPending: boolean = false + private readonly pendingRemoteCandidates: RTCIceCandidate[] = [] // Connection state: private opened: boolean = false @@ -231,6 +232,23 @@ export default class Peer { void this.signaling.event('rtc', 'error', { target: this.id, error: JSON.stringify(e) }) } + private async addIceCandidate (candidate: RTCIceCandidate): Promise { + try { + await this.conn.addIceCandidate(candidate) + } catch (e) { + if (!this.ignoreOffer) { + throw e + } + } + } + + private async drainPendingCandidates (): Promise { + const candidates = this.pendingRemoteCandidates.splice(0) + for (const candidate of candidates) { + await this.addIceCandidate(candidate) + } + } + /** * @internal */ @@ -238,13 +256,13 @@ export default class Peer { switch (packet.type) { case 'candidate': if (packet.candidate != null) { - try { - await this.conn.addIceCandidate(packet.candidate) - } catch (e) { + if (this.conn.remoteDescription == null) { if (!this.ignoreOffer) { - throw e + this.pendingRemoteCandidates.push(packet.candidate) } + return } + await this.addIceCandidate(packet.candidate) } break @@ -260,9 +278,17 @@ export default class Peer { if (this.ignoreOffer) { return } + if (description.type === 'answer' && this.conn.signalingState !== 'have-local-offer') { + this.network.log('ignoring stale remote answer from', this.id, 'in state', this.conn.signalingState) + return + } this.isSettingRemoteAnswerPending = description.type === 'answer' - await this.conn.setRemoteDescription(description) - this.isSettingRemoteAnswerPending = false + try { + await this.conn.setRemoteDescription(description) + } finally { + this.isSettingRemoteAnswerPending = false + } + await this.drainPendingCandidates() if (description.type === 'offer') { if (process.env.NODE_ENV === 'test') { await this.conn.setLocalDescription(await this.conn.createAnswer()) diff --git a/lib/signaling.ts b/lib/signaling.ts index 92a698c..83727f9 100644 --- a/lib/signaling.ts +++ b/lib/signaling.ts @@ -27,6 +27,7 @@ export default class Signaling extends EventEmitter { private readonly requests: Map = new Map() private pingInterval?: ReturnType + private messageQueue: Promise = Promise.resolve() constructor (private readonly network: Network, peers: Map, url: string) { super() @@ -75,9 +76,7 @@ export default class Signaling extends EventEmitter { }, 100) } } - const onMessage = (ev: MessageEvent): void => { - this.handleSignalingMessage(ev.data).catch(_ => {}) - } + const onMessage = (ev: MessageEvent): void => this.enqueueSignalingMessage(ev.data) const onClose = (): void => { if (!this.network.closing) { const error = new SignalingError('socket-error', 'signaling socket closed') @@ -143,8 +142,8 @@ export default class Signaling extends EventEmitter { packet.rid = rid this.network.log('requesting signaling packet:', packet.type) const data = JSON.stringify(packet) + this.requests.set(rid, { resolve, reject, type: packet.type }) this.ws.send(data) - this.requests.set(rid, { resolve, reject }) }) } @@ -169,19 +168,60 @@ export default class Signaling extends EventEmitter { } } - private async handleSignalingMessage (data: string): Promise { + private enqueueSignalingMessage (data: string): void { + const packet = this.parseSignalingPacket(data) + if (packet == null) { + return + } + + this.resolveImmediateRequest(packet) + const handle = this.handleSignalingMessage.bind(this, packet) + this.messageQueue = this.messageQueue.then(handle, handle) + void this.messageQueue.catch(_ => {}) + } + + private parseSignalingPacket (data: string): SignalingPacketTypes | undefined { try { const packet = JSON.parse(data) as SignalingPacketTypes this.network.log('signaling packet received:', packet.type) + return packet + } catch (e) { + const error = new SignalingError('unknown-error', e as string) + this.network._onSignalingError(error) + } + } + + private resolveImmediateRequest (packet: SignalingPacketTypes): void { + if (packet.rid === undefined) { + return + } + + const request = this.requests.get(packet.rid) + // Only credential responses can resolve immediately: _addPeer waits for them + // while handling a queued connect packet, so queueing them would deadlock. + if (request?.type !== 'credentials') { + return + } + + this.requests.delete(packet.rid) + this.resolveRequest(packet, request) + } + + private resolveRequest (packet: SignalingPacketTypes, request: RequestHandler): void { + if (packet.type === 'error') { + request.reject(new SignalingError('server-error', packet.message, undefined, packet.code)) + } else { + request.resolve(packet) + } + } + + private async handleSignalingMessage (packet: SignalingPacketTypes): Promise { + try { if (packet.rid !== undefined) { const request = this.requests.get(packet.rid) if (request != null) { this.requests.delete(packet.rid) - if (packet.type === 'error') { - request.reject(new SignalingError('server-error', packet.message, undefined, packet.code)) - } else { - request.resolve(packet) - } + this.resolveRequest(packet, request) } } switch (packet.type) { @@ -319,6 +359,7 @@ export default class Signaling extends EventEmitter { interface RequestHandler { resolve: (data: SignalingPacketTypes) => void reject: (reason?: any) => void + type: string // Original request packet type; used to allow safe queue bypasses. } export class SignalingError {