Closed chitalian closed 1 year ago
If anyone sees this and needs to get this running on cloudflare workers here is a small code snippet that works for me.
import { InsertParams as ConnectionInsertParams } from "@clickhouse/client/dist/connection";
import { Result } from "./results";
import { InsertResult, TLSParams } from "@clickhouse/client/dist/connection";
import * as http_search_params from "@clickhouse/client/dist/connection/adapter/http_search_params";
import { transformUrl } from "@clickhouse/client/dist/connection/adapter/transform_url";
import { ClickHouseSettings } from "@clickhouse/client/dist/settings";
import { DataFormat, encodeJSON } from "@clickhouse/client/dist/data_formatter";
type InsertValues<T> = ReadonlyArray<T>;
export interface InsertParams<T = unknown> extends BaseParams {
/** Name of a table to insert into. */
table: string;
/** A dataset to insert. */
values: InsertValues<T>;
/** Format of the dataset to insert. */
format?: DataFormat;
}
interface BaseParams {
/** ClickHouse settings that can be applied on query level. */
clickhouse_settings?: ClickHouseSettings;
/** Parameters for query binding. https://clickhouse.com/docs/en/interfaces/http/#cli-queries-with-parameters */
query_params?: Record<string, unknown>;
/** AbortSignal instance (using `node-abort-controller` package) to cancel a request in progress. */
abort_signal?: AbortSignal;
/** A specific `query_id` that will be sent with this request.
* If it is not set, a random identifier will be generated automatically by the client. */
query_id?: string;
}
function createUrl(host: string): URL {
try {
return new URL(host);
} catch (err) {
throw new Error('Configuration parameter "host" contains malformed url.');
}
}
function normalizeConfig(config: any) {
let tls: TLSParams | undefined = undefined;
if (config.tls) {
if ("cert" in config.tls && "key" in config.tls) {
tls = {
type: "Mutual",
...config.tls,
};
} else {
tls = {
type: "Basic",
...config.tls,
};
}
}
return {
application_id: config.application,
url: createUrl(config.host ?? "http://localhost:8123"),
connect_timeout: config.connect_timeout ?? 10_000,
request_timeout: config.request_timeout ?? 300_000,
max_open_connections: config.max_open_connections ?? Infinity,
tls,
compression: {
decompress_response: config.compression?.response ?? true,
compress_request: config.compression?.request ?? false,
},
username: config.username ?? "default",
password: config.password ?? "",
application: config.application ?? "clickhouse-js",
database: config.database ?? "default",
clickhouse_settings: config.clickhouse_settings ?? {},
log: {
LoggerClass: config.log?.LoggerClass,
},
session_id: config.session_id,
};
}
export function encodeValues<T>(
values: InsertValues<T>,
format: DataFormat
): string {
// JSON* arrays
if (Array.isArray(values)) {
return values.map((value) => encodeJSON(value, format)).join("");
}
// JSON & JSONObjectEachRow format input
if (typeof values === "object") {
return encodeJSON(values, format);
}
throw new Error(
`Cannot encode values of type ${typeof values} with ${format} format`
);
}
type NormalizedConfig = ReturnType<typeof normalizeConfig>;
class ClickhouseClient {
private readonly config: NormalizedConfig;
constructor(config: any = {}) {
this.config = normalizeConfig(config);
}
private getBaseParams(params: BaseParams) {
return {
clickhouse_settings: {
...this.config.clickhouse_settings,
...params.clickhouse_settings,
},
query_params: params.query_params,
abort_signal: params.abort_signal,
session_id: this.config.session_id,
query_id: params.query_id,
};
}
async connection_insert(
params: ConnectionInsertParams
): Promise<InsertResult> {
const query_id = params.query_id || crypto.randomUUID();
const thisSearchParams = http_search_params.toSearchParams({
database: this.config.database,
clickhouse_settings: params.clickhouse_settings,
query_params: params.query_params,
query: params.query,
session_id: params.session_id,
query_id,
});
const xParams = {
method: "POST",
url: transformUrl({
url: this.config.url,
pathname: "/",
searchParams: thisSearchParams,
}),
body: params.values,
abort_signal: params.abort_signal,
};
console.log("xParams", xParams);
await fetch(xParams.url.toString(), {
method: xParams.method,
headers: {
"Content-Type": "application/octet-stream",
},
body:
typeof xParams.body === "string" ? xParams.body : xParams.body.read(),
signal: xParams.abort_signal,
});
return { query_id };
}
async insert<T>(params: InsertParams<T>): Promise<InsertResult> {
const format = params.format || "JSONCompactEachRow";
const query = `INSERT INTO ${params.table.trim()} FORMAT ${format}`;
return await this.connection_insert({
query,
values: encodeValues(params.values, format),
...this.getBaseParams(params),
});
}
}
export async function dbInsertClickhouse<
T extends keyof ClickhouseDB["Tables"]
>(
table: T,
values: ClickhouseDB["Tables"][T][]
): Promise<Result<string, string>> {
try {
console.log("INserting into clickhouse");
const client = new ClickhouseClient({
host: process.env.CLICKHOUSE_HOST ?? "http://localhost:18123",
username: process.env.CLICKHOUSE_USER ?? "default",
password: process.env.CLICKHOUSE_PASSWORD ?? "",
});
const queryResult = await client.insert({
table: table,
values: values,
format: "JSONEachRow",
// Recommended for cluster usage to avoid situations
// where a query processing error occurred after the response code
// and HTTP headers were sent to the client.
// See https://clickhouse.com/docs/en/interfaces/http/#response-buffering
clickhouse_settings: {
wait_end_of_query: 1,
},
});
console.log("Inserted into clickhouse", queryResult);
return { data: queryResult.query_id, error: null };
} catch (err) {
console.log("error inserting into clickhouse", err);
return {
data: null,
error: JSON.stringify(err),
};
}
}
type Nullable<T> = T | null;
interface ResponseCopyV1 {
response_id: Nullable<string>;
response_created_at: Nullable<string>;
latency: Nullable<number>;
status: Nullable<number>;
completion_tokens: Nullable<number>;
prompt_tokens: Nullable<number>;
model: Nullable<string>;
request_id: string;
request_created_at: string;
auth_hash: string;
user_id: Nullable<string>;
}
interface PropertiesCopyV1 {
id: number;
created_at: Nullable<string>;
user_id: Nullable<string>;
request_id: string;
auth_hash: string;
key: Nullable<string>;
value: Nullable<string>;
}
export interface ClickhouseDB {
Tables: {
response_copy_v1: ResponseCopyV1;
properties_copy_v1: PropertiesCopyV1;
};
}
We have a similar issue (#162) and some progress to discuss; let's continue there. And many thanks for the fetch snippet; this one is useful for the upcoming "browser" implementation.
Describe the bug
I am using cloudflare workers and within their environment there is no support to
http
.When we import we get this error:
Steps to reproduce
import { createClient as clickhouseCreateClient } from "@clickhouse/client";
Expected behaviour
We should migrate to
fetch
or create a new client that usesfetch
instead ofhttp
Error log