Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions internal/signaling/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,7 @@ func (p *Peer) HandleJoinPacket(ctx context.Context, packet JoinPacket) error {
return fmt.Errorf("lobby code too long")
}

err := p.store.JoinLobby(ctx, p.Game, packet.Lobby, p.ID, packet.Password)
existingPeers, err := p.store.JoinLobby(ctx, p.Game, packet.Lobby, p.ID, packet.Password)
if err != nil {
switch err {
case stores.ErrNotFound:
Expand Down Expand Up @@ -509,7 +509,7 @@ func (p *Peer) HandleJoinPacket(ctx context.Context, packet JoinPacket) error {
return err
}

for _, otherID := range lobby.Peers {
for _, otherID := range existingPeers {
if otherID == p.ID {
continue
}
Expand Down
22 changes: 11 additions & 11 deletions internal/signaling/stores/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,18 +212,18 @@ func (s *PostgresStore) CreateLobby(ctx context.Context, game, lobbyCode, peerID
return nil
}

func (s *PostgresStore) JoinLobby(ctx context.Context, game, lobbyCode, peerID, password string) error {
func (s *PostgresStore) JoinLobby(ctx context.Context, game, lobbyCode, peerID, password string) ([]string, error) {
if len(peerID) > 20 {
logger := logging.GetLogger(ctx)
logger.Warn("peer id too long", zap.String("peerID", peerID))
return ErrInvalidPeerID
return nil, ErrInvalidPeerID
}

now := util.NowUTC(ctx)

tx, err := s.DB.Begin(ctx)
if err != nil {
return err
return nil, err
}
defer tx.Rollback(context.Background()) //nolint:errcheck

Expand All @@ -239,21 +239,21 @@ func (s *PostgresStore) JoinLobby(ctx context.Context, game, lobbyCode, peerID,
`, lobbyCode, game).Scan(&peerlist, &lobbyPassword, &maxPlayers)
if err != nil {
if errors.Is(err, pgx.ErrNoRows) {
return ErrNotFound
return nil, ErrNotFound
}
return err
return nil, err
}

if lobbyPassword != nil && bcrypt.CompareHashAndPassword(lobbyPassword, []byte(password)) != nil {
return ErrInvalidPassword
return nil, ErrInvalidPassword
}

if maxPlayers > 0 && len(peerlist) >= maxPlayers {
return ErrLobbyIsFull
return nil, ErrLobbyIsFull
}

if slices.Contains(peerlist, peerID) {
return ErrAlreadyInLobby
return nil, ErrAlreadyInLobby
}

_, err = tx.Exec(ctx, `
Expand All @@ -265,15 +265,15 @@ func (s *PostgresStore) JoinLobby(ctx context.Context, game, lobbyCode, peerID,
AND game = $4
`, peerID, now, lobbyCode, game)
if err != nil {
return err
return nil, err
}

err = tx.Commit(ctx)
if err != nil {
return err
return nil, err
}

return nil
return peerlist, nil
}

func (s *PostgresStore) LeaveLobby(ctx context.Context, game, lobbyCode, peerID string) error {
Expand Down
2 changes: 1 addition & 1 deletion internal/signaling/stores/shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type LobbyOptions struct {

type Store interface {
CreateLobby(ctx context.Context, Game, LobbyCode, PeerID string, options LobbyOptions) error
JoinLobby(ctx context.Context, game, lobby, id, password string) error
JoinLobby(ctx context.Context, game, lobby, id, password string) ([]string, error)
LeaveLobby(ctx context.Context, game, lobby, id string) error
GetLobby(ctx context.Context, game, lobby string) (Lobby, error)
ListLobbies(ctx context.Context, game string, country, region string, filter, sort string, limit int) ([]Lobby, error)
Expand Down
8 changes: 8 additions & 0 deletions lib/network.ts
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,16 @@ export default class Network extends EventEmitter<NetworkListeners> {
* @internal
*/
async _addPeer (id: string, polite: boolean): Promise<void> {
if (this.peers.has(id)) {
return
}

const config = await this.credentials.fillCredentials(this.peerConfig)

if (this.peers.has(id)) {
return
}

config.iceServers = config.iceServers?.filter(server => !(server.urls.includes('turn:') && server.username === undefined))

const peer = new Peer(this, this.signaling, id, config, polite)
Expand Down
38 changes: 32 additions & 6 deletions lib/peer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ export default class Peer {
private makingOffer: boolean = false
private ignoreOffer: boolean = false
private isSettingRemoteAnswerPending: boolean = false
private readonly pendingRemoteCandidates: RTCIceCandidate[] = []

// Connection state:
private opened: boolean = false
Expand Down Expand Up @@ -231,20 +232,37 @@ export default class Peer {
void this.signaling.event('rtc', 'error', { target: this.id, error: JSON.stringify(e) })
}

private async addIceCandidate (candidate: RTCIceCandidate): Promise<void> {
try {
await this.conn.addIceCandidate(candidate)
} catch (e) {
if (!this.ignoreOffer) {
throw e
}
}
}

private async drainPendingCandidates (): Promise<void> {
const candidates = this.pendingRemoteCandidates.splice(0)
for (const candidate of candidates) {
await this.addIceCandidate(candidate)
}
}

/**
* @internal
*/
async _onSignalingMessage (packet: SignalingPacketTypes): Promise<void> {
switch (packet.type) {
case 'candidate':
if (packet.candidate != null) {
try {
await this.conn.addIceCandidate(packet.candidate)
} catch (e) {
if (this.conn.remoteDescription == null) {
if (!this.ignoreOffer) {
throw e
this.pendingRemoteCandidates.push(packet.candidate)
}
return
}
await this.addIceCandidate(packet.candidate)
}
break

Expand All @@ -260,9 +278,17 @@ export default class Peer {
if (this.ignoreOffer) {
return
}
if (description.type === 'answer' && this.conn.signalingState !== 'have-local-offer') {
this.network.log('ignoring stale remote answer from', this.id, 'in state', this.conn.signalingState)
return
}
this.isSettingRemoteAnswerPending = description.type === 'answer'
await this.conn.setRemoteDescription(description)
this.isSettingRemoteAnswerPending = false
try {
await this.conn.setRemoteDescription(description)
} finally {
this.isSettingRemoteAnswerPending = false
}
await this.drainPendingCandidates()
if (description.type === 'offer') {
if (process.env.NODE_ENV === 'test') {
await this.conn.setLocalDescription(await this.conn.createAnswer())
Expand Down
61 changes: 51 additions & 10 deletions lib/signaling.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ export default class Signaling extends EventEmitter<SignalingListeners> {
private readonly requests: Map<string, RequestHandler> = new Map()

private pingInterval?: ReturnType<typeof setInterval>
private messageQueue: Promise<void> = Promise.resolve()

constructor (private readonly network: Network, peers: Map<string, Peer>, url: string) {
super()
Expand Down Expand Up @@ -75,9 +76,7 @@ export default class Signaling extends EventEmitter<SignalingListeners> {
}, 100)
}
}
const onMessage = (ev: MessageEvent): void => {
this.handleSignalingMessage(ev.data).catch(_ => {})
}
const onMessage = (ev: MessageEvent): void => this.enqueueSignalingMessage(ev.data)
const onClose = (): void => {
if (!this.network.closing) {
const error = new SignalingError('socket-error', 'signaling socket closed')
Expand Down Expand Up @@ -143,8 +142,8 @@ export default class Signaling extends EventEmitter<SignalingListeners> {
packet.rid = rid
this.network.log('requesting signaling packet:', packet.type)
const data = JSON.stringify(packet)
this.requests.set(rid, { resolve, reject, type: packet.type })
this.ws.send(data)
this.requests.set(rid, { resolve, reject })
})
}

Expand All @@ -169,19 +168,60 @@ export default class Signaling extends EventEmitter<SignalingListeners> {
}
}

private async handleSignalingMessage (data: string): Promise<void> {
private enqueueSignalingMessage (data: string): void {
const packet = this.parseSignalingPacket(data)
if (packet == null) {
return
}

this.resolveImmediateRequest(packet)
const handle = this.handleSignalingMessage.bind(this, packet)
this.messageQueue = this.messageQueue.then(handle, handle)
void this.messageQueue.catch(_ => {})
}

private parseSignalingPacket (data: string): SignalingPacketTypes | undefined {
try {
const packet = JSON.parse(data) as SignalingPacketTypes
this.network.log('signaling packet received:', packet.type)
return packet
} catch (e) {
const error = new SignalingError('unknown-error', e as string)
this.network._onSignalingError(error)
}
}

private resolveImmediateRequest (packet: SignalingPacketTypes): void {
if (packet.rid === undefined) {
return
}

const request = this.requests.get(packet.rid)
// Only credential responses can resolve immediately: _addPeer waits for them
// while handling a queued connect packet, so queueing them would deadlock.
if (request?.type !== 'credentials') {
return
}

this.requests.delete(packet.rid)
this.resolveRequest(packet, request)
}

private resolveRequest (packet: SignalingPacketTypes, request: RequestHandler): void {
if (packet.type === 'error') {
request.reject(new SignalingError('server-error', packet.message, undefined, packet.code))
} else {
request.resolve(packet)
}
}

private async handleSignalingMessage (packet: SignalingPacketTypes): Promise<void> {
try {
if (packet.rid !== undefined) {
const request = this.requests.get(packet.rid)
if (request != null) {
this.requests.delete(packet.rid)
if (packet.type === 'error') {
request.reject(new SignalingError('server-error', packet.message, undefined, packet.code))
} else {
request.resolve(packet)
}
this.resolveRequest(packet, request)
}
}
switch (packet.type) {
Expand Down Expand Up @@ -319,6 +359,7 @@ export default class Signaling extends EventEmitter<SignalingListeners> {
interface RequestHandler {
resolve: (data: SignalingPacketTypes) => void
reject: (reason?: any) => void
type: string // Original request packet type; used to allow safe queue bypasses.
}

export class SignalingError {
Expand Down
Loading