豆豆友情提示:这是一个非官方 GitHub 代理镜像,主要用于网络测试或访问加速。请勿在此进行登录、注册或处理任何敏感信息。进行这些操作请务必访问官方网站 github.com。 Raw 内容也通过此代理提供。
Skip to content

Commit c498cc3

Browse files
feat(client): add support for binary messages
1 parent fd8868c commit c498cc3

File tree

3 files changed

+136
-36
lines changed

3 files changed

+136
-36
lines changed

src/internal/ws.ts

Lines changed: 66 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -32,14 +32,49 @@ export type ReconnectingOverrides<Parameters = Record<string, unknown>> =
3232
};
3333

3434
/**
35-
* A bounded queue for outgoing WebSocket messages. Messages are serialized
36-
* on enqueue and held until the connection is ready to flush them. The queue
37-
* enforces a configurable byte-size limit (measured in UTF-8 bytes) and can
38-
* return the original deserialized messages via {@link drain} when the
39-
* connection permanently closes.
35+
* Raw data types that can be sent over a WebSocket without serialization.
36+
*/
37+
export type RawWebSocketData = string | ArrayBufferLike | ArrayBufferView | Buffer[];
38+
39+
export type UnsentMessage<T> = { type: 'message'; message: T } | { type: 'raw'; data: RawWebSocketData };
40+
41+
type QueueEntry =
42+
| { kind: 'json'; data: string; byteLength: number }
43+
| { kind: 'raw'; data: RawWebSocketData; byteLength: number };
44+
45+
/**
46+
* Flatten `Buffer[]` fragments into a single `Buffer` so that `ws.send()`
47+
* transmits the correct bytes.
48+
*/
49+
export function flattenRawData(data: RawWebSocketData): Exclude<RawWebSocketData, Buffer[]> {
50+
if (Array.isArray(data)) return Buffer.concat(data);
51+
return data;
52+
}
53+
54+
function snapshotRawData(data: RawWebSocketData): Exclude<RawWebSocketData, Buffer[]> {
55+
if (typeof data === 'string') return data;
56+
if (Array.isArray(data)) return Buffer.concat(data);
57+
if (ArrayBuffer.isView(data)) {
58+
return Buffer.from(data.buffer.slice(data.byteOffset, data.byteOffset + data.byteLength));
59+
}
60+
return data.slice(0);
61+
}
62+
63+
function rawByteLength(data: RawWebSocketData): number {
64+
if (typeof data === 'string') return Buffer.byteLength(data, 'utf8');
65+
if (Array.isArray(data)) return data.reduce((sum, buf) => sum + buf.byteLength, 0);
66+
if ('byteLength' in data) return data.byteLength;
67+
return 0;
68+
}
69+
70+
/**
71+
* A bounded queue for outgoing WebSocket messages. JSON messages are
72+
* serialized on enqueue; raw messages are stored as-is. The queue enforces
73+
* a configurable byte-size limit and can return the original messages via
74+
* {@link drain} when the connection permanently closes.
4075
*/
4176
export class SendQueue<T = unknown> {
42-
private _queue: { data: string; byteLength: number }[] = [];
77+
private _queue: QueueEntry[] = [];
4378
private _bytes: number = 0;
4479
private _maxBytes: number;
4580

@@ -48,7 +83,7 @@ export class SendQueue<T = unknown> {
4883
}
4984

5085
/**
51-
* Serialize and enqueue a message. Returns `true` if the message was
86+
* Serialize and enqueue a JSON message. Returns `true` if the message was
5287
* accepted, `false` if it would exceed the byte-size limit.
5388
*/
5489
enqueue(event: T): boolean {
@@ -57,7 +92,22 @@ export class SendQueue<T = unknown> {
5792
if (this._bytes + byteLength > this._maxBytes) {
5893
return false;
5994
}
60-
this._queue.push({ data, byteLength });
95+
this._queue.push({ kind: 'json', data, byteLength });
96+
this._bytes += byteLength;
97+
return true;
98+
}
99+
100+
/**
101+
* Enqueue raw data without serialization. Returns `true` if the data was
102+
* accepted, `false` if it would exceed the byte-size limit.
103+
*/
104+
enqueueRaw(data: RawWebSocketData): boolean {
105+
const snapshot = snapshotRawData(data);
106+
const byteLength = rawByteLength(snapshot);
107+
if (this._bytes + byteLength > this._maxBytes) {
108+
return false;
109+
}
110+
this._queue.push({ kind: 'raw', data: snapshot, byteLength });
61111
this._bytes += byteLength;
62112
return true;
63113
}
@@ -67,7 +117,7 @@ export class SendQueue<T = unknown> {
67117
* message and all subsequent messages are re-queued and the error is
68118
* re-thrown so the caller can report it.
69119
*/
70-
flush(send: (data: string) => void): void {
120+
flush(send: (data: string | RawWebSocketData) => void): void {
71121
const pending = this._queue.splice(0);
72122
this._bytes = 0;
73123
for (let i = 0; i < pending.length; i++) {
@@ -83,11 +133,14 @@ export class SendQueue<T = unknown> {
83133
}
84134

85135
/**
86-
* Drain the queue and return the deserialized messages. Resets byte
87-
* tracking to zero.
136+
* Drain the queue and return the unsent messages. JSON messages are
137+
* deserialized back to their original form. Resets byte tracking to zero.
88138
*/
89-
drain(): T[] {
90-
const unsent = this._queue.map((item) => JSON.parse(item.data) as T);
139+
drain(): UnsentMessage<T>[] {
140+
const unsent = this._queue.map((entry): UnsentMessage<T> => {
141+
if (entry.kind === 'raw') return { type: 'raw', data: entry.data };
142+
return { type: 'message', message: JSON.parse(entry.data) as T };
143+
});
91144
this._queue = [];
92145
this._bytes = 0;
93146
return unsent;

src/resources/responses/internal-base.ts

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,20 @@ import { EventEmitter } from '../../core/EventEmitter';
66
import { OpenAIError } from '../../core/error';
77
import { stringifyQuery } from '../../internal/utils';
88

9-
import type { ReconnectingEvent } from '../../internal/ws';
9+
import type { RawWebSocketData, ReconnectingEvent, UnsentMessage } from '../../internal/ws';
1010

1111
export type ResponsesStreamMessage =
1212
| { type: 'connecting' | 'open' | 'closing' }
13-
| { type: 'close'; code: number; reason: string; unsent: ResponsesAPI.ResponsesClientEvent[] }
13+
| {
14+
type: 'close';
15+
code: number;
16+
reason: string;
17+
unsent: UnsentMessage<ResponsesAPI.ResponsesClientEvent>[];
18+
}
1419
| { type: 'reconnecting'; reconnect: ReconnectingEvent }
1520
| { type: 'reconnected' }
1621
| { type: 'message'; message: ResponsesAPI.ResponsesServerEvent }
22+
| { type: 'raw'; data: RawWebSocketData }
1723
| { type: 'error'; error: WebSocketError };
1824

1925
export class WebSocketError extends OpenAIError {
@@ -34,8 +40,9 @@ type Simplify<T> = { [KeyType in keyof T]: T[KeyType] } & {};
3440
type WebSocketEvents = Simplify<
3541
{
3642
event: (event: ResponsesAPI.ResponsesServerEvent) => void;
43+
raw: (data: RawWebSocketData) => void;
3744
error: (error: WebSocketError) => void;
38-
close: (code: number, reason: string, unsent: ResponsesAPI.ResponsesClientEvent[]) => void;
45+
close: (code: number, reason: string, unsent: UnsentMessage<ResponsesAPI.ResponsesClientEvent>[]) => void;
3946
reconnecting: (event: ReconnectingEvent) => void;
4047
reconnected: () => void;
4148
} & {
@@ -51,6 +58,11 @@ export abstract class ResponsesEmitter extends EventEmitter<WebSocketEvents> {
5158
*/
5259
abstract send(event: ResponsesAPI.ResponsesClientEvent): void;
5360

61+
/**
62+
* Send raw data over the WebSocket without JSON serialization.
63+
*/
64+
abstract sendRaw(data: RawWebSocketData): void;
65+
5466
/**
5567
* Close the WebSocket connection.
5668
*/

src/resources/responses/ws.ts

Lines changed: 55 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,12 @@ import { InternalEventEmitter } from '../../core/EventEmitter';
66
import { sleep } from '../../internal/utils/sleep';
77
import {
88
SendQueue,
9+
flattenRawData,
910
isRecoverableClose,
11+
type RawWebSocketData,
1012
type ReconnectingEvent,
1113
type ReconnectingOverrides,
14+
type UnsentMessage,
1215
} from '../../internal/ws';
1316
import * as ResponsesAPI from './responses';
1417
import { OpenAI } from '../../client';
@@ -77,7 +80,7 @@ export class ResponsesWS extends ResponsesEmitter {
7780
socketSwap: (oldSocket: WS.WebSocket, newSocket: WS.WebSocket) => void;
7881
reconnecting: (event: ReconnectingEvent<Record<string, unknown>>) => void;
7982
reconnected: () => void;
80-
close: (code: number, reason: string, unsent: ResponsesAPI.ResponsesClientEvent[]) => void;
83+
close: (code: number, reason: string, unsent: UnsentMessage<ResponsesAPI.ResponsesClientEvent>[]) => void;
8184
}>();
8285

8386
constructor(client: OpenAI, options?: ResponsesWSClientOptions | null | undefined) {
@@ -110,6 +113,24 @@ export class ResponsesWS extends ResponsesEmitter {
110113
}
111114
}
112115

116+
sendRaw(data: RawWebSocketData) {
117+
if (this._isReconnecting || this.socket.readyState === WS.WebSocket.CONNECTING) {
118+
if (!this._sendQueue.enqueueRaw(data)) {
119+
this._onError(null, 'send queue is full, message discarded', undefined);
120+
}
121+
return;
122+
}
123+
if (this.socket.readyState !== WS.WebSocket.OPEN) {
124+
this._onError(null, 'cannot send on a closed WebSocket', undefined);
125+
return;
126+
}
127+
try {
128+
this.socket.send(flattenRawData(data));
129+
} catch (err) {
130+
this._onError(null, 'could not send data', err);
131+
}
132+
}
133+
113134
close(props?: { code: number; reason: string }) {
114135
this._intentionallyClosed = true;
115136
this._closeCode = props?.code ?? 1000;
@@ -167,6 +188,10 @@ export class ResponsesWS extends ResponsesEmitter {
167188
push({ type: 'message', message: event });
168189
};
169190

191+
const onRaw = (data: RawWebSocketData) => {
192+
push({ type: 'raw', data });
193+
};
194+
170195
// All errors (API + socket) funnel through _onError → 'error' event
171196
const onEmitterError = (err: WebSocketError) => {
172197
push({ type: 'error', error: err });
@@ -190,7 +215,11 @@ export class ResponsesWS extends ResponsesEmitter {
190215
}
191216
};
192217

193-
const onClose = (code: number, reason: string, unsent: ResponsesAPI.ResponsesClientEvent[]) => {
218+
const onClose = (
219+
code: number,
220+
reason: string,
221+
unsent: UnsentMessage<ResponsesAPI.ResponsesClientEvent>[],
222+
) => {
194223
push({ type: 'close', code, reason, unsent });
195224
done = true;
196225
flushResolvers();
@@ -205,6 +234,7 @@ export class ResponsesWS extends ResponsesEmitter {
205234

206235
const cleanup = () => {
207236
this.off('event', onEvent);
237+
this.off('raw', onRaw);
208238
this.off('error', onEmitterError);
209239
currentSocket.off('open', onOpen);
210240
this._internalEvents.off('close', onClose);
@@ -214,6 +244,7 @@ export class ResponsesWS extends ResponsesEmitter {
214244
};
215245

216246
this.on('event', onEvent);
247+
this.on('raw', onRaw);
217248
this.on('error', onEmitterError);
218249
this.socket.on('open', onOpen);
219250
this._internalEvents.on('close', onClose);
@@ -297,25 +328,27 @@ export class ResponsesWS extends ResponsesEmitter {
297328
},
298329
});
299330

300-
socket.on('message', (wsEvent) => {
301-
const event = (() => {
302-
try {
303-
return JSON.parse(wsEvent.toString()) as ResponsesAPI.ResponsesServerEvent;
304-
} catch (err) {
305-
this._onError(null, 'could not parse websocket event', err);
306-
return null;
307-
}
308-
})();
331+
socket.on('message', (wsEvent, isBinary) => {
332+
if (isBinary) {
333+
this._emit('raw', wsEvent as RawWebSocketData);
334+
return;
335+
}
309336

310-
if (event) {
311-
this._emit('event', event);
337+
let event: ResponsesAPI.ResponsesServerEvent;
338+
try {
339+
event = JSON.parse(wsEvent.toString()) as ResponsesAPI.ResponsesServerEvent;
340+
} catch {
341+
this._emit('raw', wsEvent as RawWebSocketData);
342+
return;
343+
}
312344

313-
if (event.type === 'error') {
314-
this._onError(event);
315-
} else {
316-
// @ts-ignore TS isn't smart enough to get the relationship right here
317-
this._emit(event.type, event);
318-
}
345+
this._emit('event', event);
346+
347+
if (event.type === 'error') {
348+
this._onError(event);
349+
} else {
350+
// @ts-ignore TS isn't smart enough to get the relationship right here
351+
this._emit(event.type, event);
319352
}
320353
});
321354

@@ -523,7 +556,9 @@ export class ResponsesWS extends ResponsesEmitter {
523556

524557
private _flushSendQueue(): void {
525558
try {
526-
this._sendQueue.flush((data) => this.socket.send(data));
559+
this._sendQueue.flush((data) =>
560+
this.socket.send(typeof data === 'string' ? data : flattenRawData(data)),
561+
);
527562
} catch (err) {
528563
this._onError(null, 'could not send queued data', err);
529564
}

0 commit comments

Comments
 (0)