Skip to content

Commit b7162df

Browse files
committed
feat(otel): allow clickhouse task events to be inserted using async_insert via env vars
1 parent f8977a7 commit b7162df

File tree

3 files changed

+30
-1
lines changed

3 files changed

+30
-1
lines changed

apps/webapp/app/env.server.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1122,6 +1122,10 @@ const EnvironmentSchema = z
11221122
EVENTS_CLICKHOUSE_COMPRESSION_REQUEST: z.string().default("1"),
11231123
EVENTS_CLICKHOUSE_BATCH_SIZE: z.coerce.number().int().default(1000),
11241124
EVENTS_CLICKHOUSE_FLUSH_INTERVAL_MS: z.coerce.number().int().default(1000),
1125+
EVENTS_CLICKHOUSE_INSERT_STRATEGY: z.enum(["insert", "insert_async"]).default("insert"),
1126+
EVENTS_CLICKHOUSE_WAIT_FOR_ASYNC_INSERT: z.string().default("1"),
1127+
EVENTS_CLICKHOUSE_ASYNC_INSERT_MAX_DATA_SIZE: z.coerce.number().int().default(10485760),
1128+
EVENTS_CLICKHOUSE_ASYNC_INSERT_BUSY_TIMEOUT_MS: z.coerce.number().int().default(5000),
11251129
EVENT_REPOSITORY_CLICKHOUSE_ROLLOUT_PERCENT: z.coerce.number().optional(),
11261130
EVENT_REPOSITORY_DEFAULT_STORE: z.enum(["postgres", "clickhouse"]).default("postgres"),
11271131
EVENTS_CLICKHOUSE_MAX_TRACE_SUMMARY_VIEW_COUNT: z.coerce.number().int().default(25_000),

apps/webapp/app/v3/eventRepository/clickhouseEventRepository.server.ts

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,10 @@ export type ClickhouseEventRepositoryConfig = {
6464
clickhouse: ClickHouse;
6565
batchSize?: number;
6666
flushInterval?: number;
67+
insertStrategy?: "insert" | "insert_async";
68+
waitForAsyncInsert?: boolean;
69+
asyncInsertMaxDataSize?: number;
70+
asyncInsertBusyTimeoutMs?: number;
6771
tracer?: Tracer;
6872
maximumTraceSummaryViewCount?: number;
6973
maximumTraceDetailedSummaryViewCount?: number;
@@ -119,7 +123,11 @@ export class ClickhouseEventRepository implements IEventRepository {
119123
});
120124
}
121125

122-
const [insertError, insertResult] = await this._clickhouse.taskEvents.insert(events);
126+
const [insertError, insertResult] = await this._clickhouse.taskEvents.insert(events, {
127+
params: {
128+
clickhouse_settings: this.#getClickhouseInsertSettings(),
129+
},
130+
});
123131

124132
if (insertError) {
125133
throw insertError;
@@ -134,6 +142,19 @@ export class ClickhouseEventRepository implements IEventRepository {
134142
});
135143
}
136144

145+
#getClickhouseInsertSettings() {
146+
if (this._config.insertStrategy === "insert") {
147+
return {};
148+
} else {
149+
return {
150+
async_insert: 1 as const,
151+
async_insert_max_data_size: this._config.asyncInsertMaxDataSize?.toString() ?? "10485760",
152+
async_insert_busy_timeout_ms: this._config.asyncInsertBusyTimeoutMs ?? 5000,
153+
wait_for_async_insert: this._config.waitForAsyncInsert ? (1 as const) : (0 as const),
154+
};
155+
}
156+
}
157+
137158
async #publishToRedis(events: TaskEventV1Input[]) {
138159
if (events.length === 0) return;
139160
await tracePubSub.publish(events.map((e) => e.trace_id));

apps/webapp/app/v3/eventRepository/clickhouseEventRepositoryInstance.server.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,10 @@ function initializeClickhouseRepository() {
4343
maximumTraceDetailedSummaryViewCount:
4444
env.EVENTS_CLICKHOUSE_MAX_TRACE_DETAILED_SUMMARY_VIEW_COUNT,
4545
maximumLiveReloadingSetting: env.EVENTS_CLICKHOUSE_MAX_LIVE_RELOADING_SETTING,
46+
insertStrategy: env.EVENTS_CLICKHOUSE_INSERT_STRATEGY,
47+
waitForAsyncInsert: env.EVENTS_CLICKHOUSE_WAIT_FOR_ASYNC_INSERT === "1",
48+
asyncInsertMaxDataSize: env.EVENTS_CLICKHOUSE_ASYNC_INSERT_MAX_DATA_SIZE,
49+
asyncInsertBusyTimeoutMs: env.EVENTS_CLICKHOUSE_ASYNC_INSERT_BUSY_TIMEOUT_MS,
4650
});
4751

4852
return repository;

0 commit comments

Comments
 (0)