diff --git a/src/index.ts b/src/index.ts index 1ed86882..9ad781d7 100644 --- a/src/index.ts +++ b/src/index.ts @@ -5,6 +5,7 @@ export { FallthroughProvider, } from './providers/FallthroughProvider'; export { jsonRpcProvider, JsonRpcProvider } from './providers/JsonRpcProvider'; +export { WebSocketProvider } from './providers/WebSocketProvider'; // TinyBig and big.js removed in favor of native bigint export { BlockResponse, BlockTag, RPCBlock } from './types/Block.types'; export { diff --git a/src/providers/WebSocketProvider.ts b/src/providers/WebSocketProvider.ts new file mode 100644 index 00000000..2e151d33 --- /dev/null +++ b/src/providers/WebSocketProvider.ts @@ -0,0 +1,364 @@ +import { BaseProvider } from './BaseProvider'; + +type SubscriptionType = 'newHeads' | 'logs' | 'newPendingTransactions'; + +type EventCallback = (...args: any[]) => void; + +interface PendingRequest { + resolve: (value: any) => void; + reject: (reason: any) => void; +} + +/** + * Resolves a WebSocket constructor. Uses the global `WebSocket` (browsers, Node 21+), + * falls back to the `ws` npm package for older Node versions. + */ +function getWebSocketConstructor(): typeof WebSocket { + if (typeof WebSocket !== 'undefined') { + return WebSocket; + } + try { + // eslint-disable-next-line @typescript-eslint/no-var-requires + return require('ws') as typeof WebSocket; + } catch { + throw new Error( + 'WebSocket is not available. Install the "ws" package for Node < 21.', + ); + } +} + +/** + * A JSON-RPC provider that communicates over WebSocket instead of HTTP. + * + * Supports all the same methods as {@link JsonRpcProvider} (getBalance, getBlock, etc.) + * plus real-time subscriptions via `eth_subscribe` / `eth_unsubscribe`. + * @example + * ```javascript + * const provider = new WebSocketProvider('wss://eth-mainnet.g.alchemy.com/v2/YOUR_KEY'); + * + * // Standard JSON-RPC methods work the same as JsonRpcProvider + * const block = await provider.getBlock('latest'); + * + * // Real-time subscriptions + * const subId = await provider.subscribe('newHeads'); + * provider.on('block', (blockHeader) => { + * console.log('New block:', blockHeader.number); + * }); + * + * // Clean up + * await provider.unsubscribe(subId); + * provider.destroy(); + * ``` + */ +export class WebSocketProvider extends BaseProvider { + private _ws: WebSocket | null = null; + private _wsUrl: string; + private _requestId = 0; + private _pendingRequests: Map = new Map(); + private _subscriptions: Map> = new Map(); + private _eventListeners: Map> = new Map(); + private _subscriptionIdToEvent: Map = new Map(); + private _destroyed = false; + private _reconnectAttempts = 0; + private _maxReconnectAttempts = 3; + private _ready: Promise; + private _readyResolve!: () => void; + private _readyReject!: (err: any) => void; + + /** + * @param wsUrl WebSocket URL to connect to (e.g. `wss://eth-mainnet.g.alchemy.com/v2/...`) + */ + constructor(wsUrl: string) { + super([wsUrl]); + this._wsUrl = wsUrl; + this._ready = this._createReadyPromise(); + this._connect(); + } + + private _createReadyPromise(): Promise { + return new Promise((resolve, reject) => { + this._readyResolve = resolve; + this._readyReject = reject; + }); + } + + private _connect(): void { + const WS = getWebSocketConstructor(); + this._ws = new WS(this._wsUrl); + + this._ws.onopen = () => { + this._reconnectAttempts = 0; + this._readyResolve(); + }; + + this._ws.onmessage = (event: MessageEvent) => { + let data: any; + try { + data = JSON.parse( + typeof event.data === 'string' ? event.data : String(event.data), + ); + } catch { + return; // ignore unparseable messages + } + + // Handle subscription notifications + if (data.method === 'eth_subscription' && data.params) { + const subId: string = data.params.subscription; + const result = data.params.result; + + // Direct subscription callbacks + const callbacks = this._subscriptions.get(subId); + if (callbacks) { + callbacks.forEach((cb) => { + try { + cb(result); + } catch { + // don't let user callbacks break the provider + } + }); + } + + // Event-based listeners + const eventName = this._subscriptionIdToEvent.get(subId); + if (eventName) { + const listeners = this._eventListeners.get(eventName); + if (listeners) { + listeners.forEach((cb) => { + try { + cb(result); + } catch { + // don't let user callbacks break the provider + } + }); + } + } + return; + } + + // Handle JSON-RPC response + if (data.id != null) { + const pending = this._pendingRequests.get(data.id); + if (pending) { + this._pendingRequests.delete(data.id); + if (data.error) { + pending.reject( + new Error( + data.error.message || + `JSON-RPC error: ${JSON.stringify(data.error)}`, + ), + ); + } else { + pending.resolve(data.result); + } + } + } + }; + + this._ws.onerror = () => { + // onerror is always followed by onclose; reconnection is handled there + }; + + this._ws.onclose = () => { + // Reject all pending requests + for (const [id, pending] of this._pendingRequests) { + pending.reject(new Error('WebSocket connection closed')); + this._pendingRequests.delete(id); + } + + if (!this._destroyed) { + this._attemptReconnect(); + } + }; + } + + private _attemptReconnect(): void { + if (this._reconnectAttempts >= this._maxReconnectAttempts) { + this._readyReject( + new Error( + `WebSocket reconnection failed after ${this._maxReconnectAttempts} attempts`, + ), + ); + return; + } + + const delay = Math.pow(2, this._reconnectAttempts) * 1000; // 1s, 2s, 4s + this._reconnectAttempts++; + + this._ready = this._createReadyPromise(); + + setTimeout(() => { + if (!this._destroyed) { + this._connect(); + } + }, delay); + } + + /** + * Send a raw JSON-RPC request over the WebSocket and wait for the response. + * @param body JSON-RPC request body. The `id` field is overwritten with an auto-incrementing counter. + * @returns the `result` field from the JSON-RPC response + */ + private async _sendRequest(body: Record): Promise { + await this._ready; + + const id = ++this._requestId; + const payload = { ...body, id }; + + return new Promise((resolve, reject) => { + this._pendingRequests.set(id, { resolve, reject }); + + if (!this._ws || this._ws.readyState !== WebSocket.OPEN) { + this._pendingRequests.delete(id); + reject(new Error('WebSocket is not open')); + return; + } + + this._ws.send(JSON.stringify(payload)); + }); + } + + // ---- BaseProvider abstract implementations ---- + + /** @ignore */ + selectRpcUrl(): string { + return this._wsUrl; + } + + /** @ignore */ + post(body: Record): Promise { + return this._sendRequest(body); + } + + // ---- Subscription API ---- + + /** + * Subscribe to real-time Ethereum events via `eth_subscribe`. + * @param type The subscription type: `'newHeads'`, `'logs'`, or `'newPendingTransactions'` + * @param params Optional parameters (e.g. filter object for `'logs'`) + * @returns The subscription ID returned by the node + * @example + * ```javascript + * const subId = await provider.subscribe('newHeads'); + * ``` + * @example + * ```javascript + * const subId = await provider.subscribe('logs', { + * address: '0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2', + * topics: ['0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef'], + * }); + * ``` + */ + async subscribe(type: SubscriptionType, params?: object): Promise { + const rpcParams: unknown[] = [type]; + if (params) { + rpcParams.push(params); + } + + const subscriptionId = (await this._sendRequest({ + jsonrpc: '2.0', + method: 'eth_subscribe', + params: rpcParams, + })) as string; + + // Initialize callback set for this subscription + this._subscriptions.set(subscriptionId, new Set()); + + // Map well-known types to event names + const eventMap: Record = { + newHeads: 'block', + newPendingTransactions: 'pending', + logs: 'logs', + }; + const eventName = eventMap[type] || type; + this._subscriptionIdToEvent.set(subscriptionId, eventName); + + return subscriptionId; + } + + /** + * Unsubscribe from a previously created subscription. + * @param subscriptionId The subscription ID to cancel (returned by {@link subscribe}) + * @returns `true` if successfully unsubscribed, `false` otherwise + */ + async unsubscribe(subscriptionId: string): Promise { + const result = (await this._sendRequest({ + jsonrpc: '2.0', + method: 'eth_unsubscribe', + params: [subscriptionId], + })) as boolean; + + // Clean up maps + this._subscriptions.delete(subscriptionId); + this._subscriptionIdToEvent.delete(subscriptionId); + + return result; + } + + /** + * Register an event listener for subscription events. + * + * Event names correspond to subscription types: + * - `'block'` → `newHeads` subscription notifications + * - `'pending'` → `newPendingTransactions` subscription notifications + * - `'logs'` → `logs` subscription notifications + * - Any subscription ID → direct subscription callbacks + * @param event The event name or subscription ID + * @param callback The function to call when the event fires + */ + on(event: string, callback: EventCallback): this { + if (!this._eventListeners.has(event)) { + this._eventListeners.set(event, new Set()); + } + this._eventListeners.get(event)!.add(callback); + return this; + } + + /** + * Remove an event listener. If no callback is provided, removes all listeners for that event. + * @param event The event name or subscription ID + * @param callback The specific callback to remove, or omit to remove all + */ + off(event: string, callback?: EventCallback): this { + if (!callback) { + this._eventListeners.delete(event); + } else { + const listeners = this._eventListeners.get(event); + if (listeners) { + listeners.delete(callback); + if (listeners.size === 0) { + this._eventListeners.delete(event); + } + } + } + return this; + } + + /** + * Close the WebSocket connection cleanly and prevent reconnection. + * After calling `destroy()`, this provider instance cannot be reused. + */ + destroy(): void { + this._destroyed = true; + if (this._ws) { + this._ws.close(); + this._ws = null; + } + + // Reject any remaining pending requests + for (const [id, pending] of this._pendingRequests) { + pending.reject(new Error('Provider destroyed')); + this._pendingRequests.delete(id); + } + + this._subscriptions.clear(); + this._eventListeners.clear(); + this._subscriptionIdToEvent.clear(); + } + + /** + * Returns a promise that resolves when the WebSocket connection is open and ready. + */ + get ready(): Promise { + return this._ready; + } +} diff --git a/src/providers/test/WebSocketProvider.test.ts b/src/providers/test/WebSocketProvider.test.ts new file mode 100644 index 00000000..680df26d --- /dev/null +++ b/src/providers/test/WebSocketProvider.test.ts @@ -0,0 +1,335 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; + +// ---- WebSocket mock ---- + +type WSHandler = (...args: any[]) => void; + +class MockWebSocket { + static OPEN = 1; + static CLOSED = 3; + static instances: MockWebSocket[] = []; + + url: string; + readyState: number = MockWebSocket.OPEN; + onopen: WSHandler | null = null; + onmessage: WSHandler | null = null; + onerror: WSHandler | null = null; + onclose: WSHandler | null = null; + sent: string[] = []; + + constructor(url: string) { + this.url = url; + MockWebSocket.instances.push(this); + // Simulate async open + setTimeout(() => { + if (this.onopen) this.onopen({} as Event); + }, 0); + } + + send(data: string) { + this.sent.push(data); + } + + close() { + this.readyState = MockWebSocket.CLOSED; + if (this.onclose) this.onclose({} as CloseEvent); + } + + // Test helpers + simulateMessage(data: any) { + if (this.onmessage) { + this.onmessage({ data: JSON.stringify(data) } as MessageEvent); + } + } + + simulateClose() { + this.readyState = MockWebSocket.CLOSED; + if (this.onclose) this.onclose({} as CloseEvent); + } +} + +// Install mock before module is imported +(globalThis as any).WebSocket = MockWebSocket; + +// Now import the provider (it will see our global WebSocket) +import { WebSocketProvider } from '../WebSocketProvider'; + +describe('WebSocketProvider', () => { + let provider: WebSocketProvider; + + beforeEach(async () => { + MockWebSocket.instances = []; + vi.useFakeTimers({ shouldAdvanceTime: true }); + provider = new WebSocketProvider('wss://example.com/ws'); + // Wait for the connection to be "opened" + await vi.advanceTimersByTimeAsync(1); + await provider.ready; + }); + + afterEach(() => { + provider.destroy(); + vi.useRealTimers(); + }); + + function latestWs(): MockWebSocket { + return MockWebSocket.instances[MockWebSocket.instances.length - 1]; + } + + it('connects to the correct URL', () => { + expect(latestWs().url).toBe('wss://example.com/ws'); + }); + + it('sends JSON-RPC requests with incrementing IDs', async () => { + const ws = latestWs(); + + // Fire off two requests concurrently + const p1 = provider.getBlockNumber(); + const p2 = provider.getGasPrice(); + + // Give microtasks time to flush + await vi.advanceTimersByTimeAsync(1); + + expect(ws.sent.length).toBe(2); + + const req1 = JSON.parse(ws.sent[0]); + const req2 = JSON.parse(ws.sent[1]); + + expect(req1.id).toBe(1); + expect(req2.id).toBe(2); + expect(req1.method).toBe('eth_blockNumber'); + expect(req2.method).toBe('eth_gasPrice'); + + // Respond to both + ws.simulateMessage({ jsonrpc: '2.0', id: 1, result: '0xa' }); // 10 + ws.simulateMessage({ jsonrpc: '2.0', id: 2, result: '0x3b9aca00' }); // 1 gwei + + const blockNumber = await p1; + const gasPrice = await p2; + + expect(blockNumber).toBe(10); + expect(gasPrice).toBe(1000000000n); + }); + + it('routes subscription notifications to correct callbacks', async () => { + const ws = latestWs(); + + const subscribePromise = provider.subscribe('newHeads'); + await vi.advanceTimersByTimeAsync(1); + + // Respond with subscription ID + const subReq = JSON.parse(ws.sent[ws.sent.length - 1]); + ws.simulateMessage({ + jsonrpc: '2.0', + id: subReq.id, + result: '0xabc123', + }); + + const subId = await subscribePromise; + expect(subId).toBe('0xabc123'); + + // Register event listener + const blockCallback = vi.fn(); + provider.on('block', blockCallback); + + // Simulate a subscription notification + ws.simulateMessage({ + jsonrpc: '2.0', + method: 'eth_subscription', + params: { + subscription: '0xabc123', + result: { number: '0xf', hash: '0xdeadbeef' }, + }, + }); + + expect(blockCallback).toHaveBeenCalledTimes(1); + expect(blockCallback).toHaveBeenCalledWith({ + number: '0xf', + hash: '0xdeadbeef', + }); + }); + + it('unsubscribe cleans up and returns result', async () => { + const ws = latestWs(); + + // Subscribe first + const subPromise = provider.subscribe('newPendingTransactions'); + await vi.advanceTimersByTimeAsync(1); + const subReq = JSON.parse(ws.sent[ws.sent.length - 1]); + ws.simulateMessage({ + jsonrpc: '2.0', + id: subReq.id, + result: '0xsub1', + }); + const subId = await subPromise; + + // Now unsubscribe + const unsubPromise = provider.unsubscribe(subId); + await vi.advanceTimersByTimeAsync(1); + const unsubReq = JSON.parse(ws.sent[ws.sent.length - 1]); + expect(unsubReq.method).toBe('eth_unsubscribe'); + expect(unsubReq.params).toEqual(['0xsub1']); + + ws.simulateMessage({ + jsonrpc: '2.0', + id: unsubReq.id, + result: true, + }); + + const result = await unsubPromise; + expect(result).toBe(true); + }); + + it('off() removes specific callback', async () => { + const cb1 = vi.fn(); + const cb2 = vi.fn(); + provider.on('block', cb1); + provider.on('block', cb2); + + provider.off('block', cb1); + + const ws = latestWs(); + + // Set up a subscription mapped to 'block' + const subPromise = provider.subscribe('newHeads'); + await vi.advanceTimersByTimeAsync(1); + const subReq = JSON.parse(ws.sent[ws.sent.length - 1]); + ws.simulateMessage({ jsonrpc: '2.0', id: subReq.id, result: '0xevt' }); + await subPromise; + + // Fire a notification + ws.simulateMessage({ + jsonrpc: '2.0', + method: 'eth_subscription', + params: { subscription: '0xevt', result: { test: true } }, + }); + + expect(cb1).not.toHaveBeenCalled(); + expect(cb2).toHaveBeenCalledTimes(1); + }); + + it('off() without callback removes all listeners for event', () => { + const cb1 = vi.fn(); + const cb2 = vi.fn(); + provider.on('pending', cb1); + provider.on('pending', cb2); + provider.off('pending'); + + // Internal check: no listeners should remain (indirectly tested by no callbacks firing) + // We can't directly inspect private fields, but we confirm no errors and both cbs stay uncalled + expect(cb1).not.toHaveBeenCalled(); + expect(cb2).not.toHaveBeenCalled(); + }); + + it('destroy() closes the connection', () => { + const ws = latestWs(); + expect(ws.readyState).toBe(MockWebSocket.OPEN); + + provider.destroy(); + + expect(ws.readyState).toBe(MockWebSocket.CLOSED); + }); + + it('destroy() rejects pending requests', async () => { + const ws = latestWs(); + + // Start a request that won't be answered + const promise = provider.getBlockNumber(); + await vi.advanceTimersByTimeAsync(1); + + // Destroy before responding + provider.destroy(); + + await expect(promise).rejects.toThrow(); + }); + + it('attempts reconnection on unexpected close with exponential backoff', async () => { + const ws1 = latestWs(); + const instanceCountBefore = MockWebSocket.instances.length; + + // Simulate unexpected close (not from destroy) + ws1.simulateClose(); + + // After 1s delay (2^0 * 1000), first reconnect attempt + await vi.advanceTimersByTimeAsync(1000); + expect(MockWebSocket.instances.length).toBe(instanceCountBefore + 1); + + const ws2 = latestWs(); + expect(ws2.url).toBe('wss://example.com/ws'); + + // Simulate second close + ws2.simulateClose(); + + // After 2s delay (2^1 * 1000), second reconnect attempt + await vi.advanceTimersByTimeAsync(2000); + expect(MockWebSocket.instances.length).toBe(instanceCountBefore + 2); + + const ws3 = latestWs(); + + // Simulate third close + ws3.simulateClose(); + + // After 4s delay (2^2 * 1000), third reconnect attempt + await vi.advanceTimersByTimeAsync(4000); + expect(MockWebSocket.instances.length).toBe(instanceCountBefore + 3); + }); + + it('on() returns this for chaining', () => { + const result = provider.on('block', vi.fn()); + expect(result).toBe(provider); + }); + + it('off() returns this for chaining', () => { + const result = provider.off('block'); + expect(result).toBe(provider); + }); + + it('handles JSON-RPC errors from the node', async () => { + const ws = latestWs(); + + const promise = provider.getBlockNumber(); + await vi.advanceTimersByTimeAsync(1); + + const req = JSON.parse(ws.sent[ws.sent.length - 1]); + ws.simulateMessage({ + jsonrpc: '2.0', + id: req.id, + error: { code: -32600, message: 'Invalid request' }, + }); + + await expect(promise).rejects.toThrow('Invalid request'); + }); + + it('subscribe("logs") maps to "logs" event', async () => { + const ws = latestWs(); + const logCb = vi.fn(); + provider.on('logs', logCb); + + const subPromise = provider.subscribe('logs', { + address: '0xabc', + }); + await vi.advanceTimersByTimeAsync(1); + + const subReq = JSON.parse(ws.sent[ws.sent.length - 1]); + // Verify params include the filter + expect(subReq.params).toEqual(['logs', { address: '0xabc' }]); + + ws.simulateMessage({ + jsonrpc: '2.0', + id: subReq.id, + result: '0xlogsub', + }); + await subPromise; + + ws.simulateMessage({ + jsonrpc: '2.0', + method: 'eth_subscription', + params: { + subscription: '0xlogsub', + result: { address: '0xabc', data: '0x1234' }, + }, + }); + + expect(logCb).toHaveBeenCalledTimes(1); + expect(logCb).toHaveBeenCalledWith({ address: '0xabc', data: '0x1234' }); + }); +});