豆豆友情提示:这是一个非官方 GitHub 代理镜像,主要用于网络测试或访问加速。请勿在此进行登录、注册或处理任何敏感信息。进行这些操作请务必访问官方网站 github.com。 Raw 内容也通过此代理提供。
Skip to content

Commit 210bacd

Browse files
ergunshOrKoN
andauthored
chore: Implement ClearcutSender HTTP transport for telemetry disabled by default (#805)
This PR completes the telemetry system by implementing the transport layer for `ClearcutSender`. It enables actual HTTP communication with the Clearcut backend, handling event batching, rate limiting, and reliable delivery, including robust shutdown handling. **Key Changes:** * **HTTP Transport**: Implemented `fetch`-based transport sending `POST` requests to the Clearcut HTTP server. * **Event Batching**: Events are now buffered and flushed periodically (default: 15 minutes) or on shutdown. * **Reliability & Rate Limiting**: * **Server-Side Backoff**: Respects `next_request_wait_millis` from server responses to handle rate limiting dynamically. * **Transient Error Retries**: Failed requests (5xx, 429) result in events being requeued for the next flush. * **Request Timeouts**: Enforced 30s timeout on requests to prevent hanging processes. * **Session Rotation**: Automatically rotates session IDs every 24 hours. * **Safety & Stability**: * **Buffer Overflow Protection**: Caps the buffer at 1000 events to prevent memory leaks, dropping oldest events if necessary. * **Optimistic Removal**: Prevents race conditions and duplicate events during shutdown by optimistically removing events from the buffer before sending. * **Testing Improvements**: * **E2E Robustness**: Updated E2E tests to use a mock web server instead of relying on the logger to log specific lines. **Implementation Roadmap:** These changes finalize the planned telemetry architecture: 1. **CLI & Opt-out Mechanism ([Merged](#757 2. **Logger Scaffolding & Integration ([Merged](#758 3. **Persistence Layer ([Merged](#766 4. **Watchdog Process Architecture ([Merged](#769 5. **Transport, Batching & Retries (This PR):** * Finalized `ClearcutSender` with HTTP transport, batching, and server-directed backoff strategies. --------- Co-authored-by: Alex Rudenko <alexrudenko@chromium.org>
1 parent def02bd commit 210bacd

File tree

9 files changed

+890
-145
lines changed

9 files changed

+890
-145
lines changed

src/cli.ts

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,23 @@ export const cliOptions = {
209209
// Marked as `false` until the feature is ready to be enabled by default.
210210
default: false,
211211
hidden: true,
212-
describe: 'Set to false to opt-out of usage statistics collection.',
212+
describe:
213+
'Set to false to opt-out of usage statistics collection. Google collects usage data to improve the tool, handled under the Google Privacy Policy (https://policies.google.com/privacy). This is independent from Chrome browser metrics.',
214+
},
215+
clearcutEndpoint: {
216+
type: 'string',
217+
hidden: true,
218+
describe: 'Endpoint for Clearcut telemetry.',
219+
},
220+
clearcutForceFlushIntervalMs: {
221+
type: 'number',
222+
hidden: true,
223+
describe: 'Force flush interval in milliseconds (for testing).',
224+
},
225+
clearcutIncludePidHeader: {
226+
type: 'boolean',
227+
hidden: true,
228+
describe: 'Include watchdog PID in Clearcut request headers (for testing).',
213229
},
214230
} satisfies Record<string, YargsOptions>;
215231

src/main.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,9 @@ if (args.usageStatistics) {
4242
clearcutLogger = new ClearcutLogger({
4343
logFile: args.logFile,
4444
appVersion: VERSION,
45+
clearcutEndpoint: args.clearcutEndpoint,
46+
clearcutForceFlushIntervalMs: args.clearcutForceFlushIntervalMs,
47+
clearcutIncludePidHeader: args.clearcutIncludePidHeader,
4548
});
4649
}
4750

src/telemetry/clearcut-logger.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,9 @@ export class ClearcutLogger {
3737
logFile?: string;
3838
persistence?: Persistence;
3939
watchdogClient?: WatchdogClient;
40+
clearcutEndpoint?: string;
41+
clearcutForceFlushIntervalMs?: number;
42+
clearcutIncludePidHeader?: boolean;
4043
}) {
4144
this.#persistence = options.persistence ?? new FilePersistence();
4245
this.#watchdog =
@@ -46,6 +49,9 @@ export class ClearcutLogger {
4649
appVersion: options.appVersion,
4750
osType: detectOsType(),
4851
logFile: options.logFile,
52+
clearcutEndpoint: options.clearcutEndpoint,
53+
clearcutForceFlushIntervalMs: options.clearcutForceFlushIntervalMs,
54+
clearcutIncludePidHeader: options.clearcutIncludePidHeader,
4955
});
5056
}
5157

src/telemetry/types.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,14 @@ export interface LogRequest {
4747
}>;
4848
}
4949

50+
export interface LogResponse {
51+
/**
52+
* If present, the client must wait this many milliseconds before
53+
* issuing the next HTTP request.
54+
*/
55+
next_request_wait_millis?: number;
56+
}
57+
5058
// Enums
5159
export enum OsType {
5260
OS_TYPE_UNSPECIFIED = 0,

src/telemetry/watchdog-client.ts

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@ export class WatchdogClient {
2020
appVersion: string;
2121
osType: OsType;
2222
logFile?: string;
23+
clearcutEndpoint?: string;
24+
clearcutForceFlushIntervalMs?: number;
25+
clearcutIncludePidHeader?: boolean;
2326
},
2427
options?: {spawn?: typeof spawn},
2528
) {
@@ -37,6 +40,17 @@ export class WatchdogClient {
3740
if (config.logFile) {
3841
args.push(`--log-file=${config.logFile}`);
3942
}
43+
if (config.clearcutEndpoint) {
44+
args.push(`--clearcut-endpoint=${config.clearcutEndpoint}`);
45+
}
46+
if (config.clearcutForceFlushIntervalMs) {
47+
args.push(
48+
`--clearcut-force-flush-interval-ms=${config.clearcutForceFlushIntervalMs}`,
49+
);
50+
}
51+
if (config.clearcutIncludePidHeader) {
52+
args.push('--clearcut-include-pid-header');
53+
}
4054

4155
const spawner = options?.spawn ?? spawn;
4256
this.#childProcess = spawner(process.execPath, args, {

src/telemetry/watchdog/clearcut-sender.ts

Lines changed: 215 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -7,53 +7,246 @@
77
import crypto from 'node:crypto';
88

99
import {logger} from '../../logger.js';
10-
import type {ChromeDevToolsMcpExtension, OsType} from '../types.js';
10+
import type {
11+
ChromeDevToolsMcpExtension,
12+
LogRequest,
13+
LogResponse,
14+
OsType,
15+
} from '../types.js';
1116

17+
export interface ClearcutSenderConfig {
18+
appVersion: string;
19+
osType: OsType;
20+
clearcutEndpoint?: string;
21+
forceFlushIntervalMs?: number;
22+
includePidHeader?: boolean;
23+
}
24+
25+
const MAX_BUFFER_SIZE = 1000;
26+
const DEFAULT_CLEARCUT_ENDPOINT =
27+
'https://play.googleapis.com/log?format=json_proto';
28+
const DEFAULT_FLUSH_INTERVAL_MS = 15 * 60 * 1000;
29+
30+
const LOG_SOURCE = 2839;
31+
const CLIENT_TYPE = 47;
32+
const MIN_RATE_LIMIT_WAIT_MS = 30_000;
33+
const REQUEST_TIMEOUT_MS = 30_000;
34+
const SHUTDOWN_TIMEOUT_MS = 5_000;
1235
const SESSION_ROTATION_INTERVAL_MS = 24 * 60 * 60 * 1000;
1336

37+
interface BufferedEvent {
38+
event: ChromeDevToolsMcpExtension;
39+
timestamp: number;
40+
}
41+
1442
export class ClearcutSender {
1543
#appVersion: string;
1644
#osType: OsType;
45+
#clearcutEndpoint: string;
46+
#flushIntervalMs: number;
47+
#includePidHeader: boolean;
1748
#sessionId: string;
1849
#sessionCreated: number;
50+
#buffer: BufferedEvent[] = [];
51+
#flushTimer: ReturnType<typeof setTimeout> | null = null;
52+
#isFlushing = false;
53+
#timerStarted = false;
1954

20-
constructor(appVersion: string, osType: OsType) {
21-
this.#appVersion = appVersion;
22-
this.#osType = osType;
55+
constructor(config: ClearcutSenderConfig) {
56+
this.#appVersion = config.appVersion;
57+
this.#osType = config.osType;
58+
this.#clearcutEndpoint =
59+
config.clearcutEndpoint ?? DEFAULT_CLEARCUT_ENDPOINT;
60+
this.#flushIntervalMs =
61+
config.forceFlushIntervalMs ?? DEFAULT_FLUSH_INTERVAL_MS;
62+
this.#includePidHeader = config.includePidHeader ?? false;
2363
this.#sessionId = crypto.randomUUID();
2464
this.#sessionCreated = Date.now();
2565
}
2666

27-
async send(event: ChromeDevToolsMcpExtension): Promise<void> {
28-
this.#rotateSessionIfNeeded();
29-
const enrichedEvent = this.#enrichEvent(event);
30-
this.transport(enrichedEvent);
31-
}
67+
enqueueEvent(event: ChromeDevToolsMcpExtension): void {
68+
if (Date.now() - this.#sessionCreated > SESSION_ROTATION_INTERVAL_MS) {
69+
this.#sessionId = crypto.randomUUID();
70+
this.#sessionCreated = Date.now();
71+
}
3272

33-
transport(event: ChromeDevToolsMcpExtension): void {
34-
logger('Telemetry event', JSON.stringify(event, null, 2));
73+
this.#addToBuffer({
74+
...event,
75+
session_id: this.#sessionId,
76+
app_version: this.#appVersion,
77+
os_type: this.#osType,
78+
});
79+
80+
if (!this.#timerStarted) {
81+
this.#timerStarted = true;
82+
this.#scheduleFlush(this.#flushIntervalMs);
83+
}
3584
}
3685

3786
async sendShutdownEvent(): Promise<void> {
87+
if (this.#flushTimer) {
88+
clearTimeout(this.#flushTimer);
89+
this.#flushTimer = null;
90+
}
91+
3892
const shutdownEvent: ChromeDevToolsMcpExtension = {
3993
server_shutdown: {},
4094
};
41-
await this.send(shutdownEvent);
95+
this.enqueueEvent(shutdownEvent);
96+
97+
try {
98+
await Promise.race([
99+
this.#finalFlush(),
100+
new Promise(resolve => setTimeout(resolve, SHUTDOWN_TIMEOUT_MS)),
101+
]);
102+
} catch (error) {
103+
logger('Final flush failed:', error);
104+
}
42105
}
43106

44-
#rotateSessionIfNeeded(): void {
45-
if (Date.now() - this.#sessionCreated > SESSION_ROTATION_INTERVAL_MS) {
46-
this.#sessionId = crypto.randomUUID();
47-
this.#sessionCreated = Date.now();
107+
async #flush(): Promise<void> {
108+
if (this.#isFlushing) {
109+
return;
110+
}
111+
112+
if (this.#buffer.length === 0) {
113+
this.#scheduleFlush(this.#flushIntervalMs);
114+
return;
115+
}
116+
117+
this.#isFlushing = true;
118+
let nextDelayMs = this.#flushIntervalMs;
119+
120+
// Optimistically remove events from buffer before sending.
121+
// This prevents race conditions where a simultaneous #finalFlush would include these same events.
122+
const eventsToSend = [...this.#buffer];
123+
this.#buffer = [];
124+
125+
try {
126+
const result = await this.#sendBatch(eventsToSend);
127+
128+
if (result.success) {
129+
if (result.nextRequestWaitMs !== undefined) {
130+
nextDelayMs = Math.max(
131+
result.nextRequestWaitMs,
132+
MIN_RATE_LIMIT_WAIT_MS,
133+
);
134+
}
135+
} else if (result.isPermanentError) {
136+
logger(
137+
'Permanent error, dropped batch of',
138+
eventsToSend.length,
139+
'events',
140+
);
141+
} else {
142+
// Transient error: Requeue events at the front of the buffer
143+
// to maintain order and retry them later.
144+
this.#buffer = [...eventsToSend, ...this.#buffer];
145+
}
146+
} catch (error) {
147+
// Safety catch for unexpected errors, requeue events
148+
this.#buffer = [...eventsToSend, ...this.#buffer];
149+
logger('Flush failed unexpectedly:', error);
150+
} finally {
151+
this.#isFlushing = false;
152+
this.#scheduleFlush(nextDelayMs);
48153
}
49154
}
50155

51-
#enrichEvent(event: ChromeDevToolsMcpExtension): ChromeDevToolsMcpExtension {
52-
return {
53-
...event,
54-
session_id: this.#sessionId,
55-
app_version: this.#appVersion,
56-
os_type: this.#osType,
156+
#addToBuffer(event: ChromeDevToolsMcpExtension): void {
157+
if (this.#buffer.length >= MAX_BUFFER_SIZE) {
158+
this.#buffer.shift();
159+
logger('Telemetry buffer overflow: dropped oldest event');
160+
}
161+
this.#buffer.push({
162+
event,
163+
timestamp: Date.now(),
164+
});
165+
}
166+
167+
#scheduleFlush(delayMs: number): void {
168+
if (this.#flushTimer) {
169+
clearTimeout(this.#flushTimer);
170+
}
171+
this.#flushTimer = setTimeout(() => {
172+
this.#flush().catch(err => {
173+
logger('Flush error:', err);
174+
});
175+
}, delayMs);
176+
}
177+
178+
async #sendBatch(events: BufferedEvent[]): Promise<{
179+
success: boolean;
180+
isPermanentError?: boolean;
181+
nextRequestWaitMs?: number;
182+
}> {
183+
const requestBody: LogRequest = {
184+
log_source: LOG_SOURCE,
185+
request_time_ms: Date.now().toString(),
186+
client_info: {
187+
client_type: CLIENT_TYPE,
188+
},
189+
log_event: events.map(({event, timestamp}) => ({
190+
event_time_ms: timestamp.toString(),
191+
source_extension_json: JSON.stringify(event),
192+
})),
57193
};
194+
195+
const controller = new AbortController();
196+
const timeoutId = setTimeout(() => controller.abort(), REQUEST_TIMEOUT_MS);
197+
try {
198+
const response = await fetch(this.#clearcutEndpoint, {
199+
method: 'POST',
200+
headers: {
201+
'Content-Type': 'application/json',
202+
// Used in E2E tests to confirm that the watchdog process is killed
203+
...(this.#includePidHeader
204+
? {'X-Watchdog-Pid': process.pid.toString()}
205+
: {}),
206+
},
207+
body: JSON.stringify(requestBody),
208+
signal: controller.signal,
209+
});
210+
211+
clearTimeout(timeoutId);
212+
if (response.ok) {
213+
const data = (await response.json()) as LogResponse;
214+
return {
215+
success: true,
216+
nextRequestWaitMs: data.next_request_wait_millis,
217+
};
218+
}
219+
220+
const status = response.status;
221+
if (status >= 500 || status === 429) {
222+
return {success: false};
223+
}
224+
225+
logger('Telemetry permanent error:', status);
226+
return {success: false, isPermanentError: true};
227+
} catch {
228+
clearTimeout(timeoutId);
229+
return {success: false};
230+
}
231+
}
232+
233+
async #finalFlush(): Promise<void> {
234+
if (this.#buffer.length === 0) {
235+
return;
236+
}
237+
const eventsToSend = [...this.#buffer];
238+
await this.#sendBatch(eventsToSend);
239+
}
240+
241+
stopForTesting(): void {
242+
if (this.#flushTimer) {
243+
clearTimeout(this.#flushTimer);
244+
this.#flushTimer = null;
245+
}
246+
this.#timerStarted = false;
247+
}
248+
249+
get bufferSizeForTesting(): number {
250+
return this.#buffer.length;
58251
}
59252
}

0 commit comments

Comments
 (0)