Closed saravmajestic closed 4 months ago
Here are the code search results. I'm now analyzing these search results to write the PR.
src/dbt_client/dbtCoreIntegration.ts
---
+++
@@ -1,9 +1,13 @@
async applyDeferConfig(): Promise<void> {
const { deferToProduction, manifestPath, favorState } =
await this.getDeferConfig();
- await this.python?.lock<void>(
- (python) =>
- python!`project.set_defer_config(${deferToProduction}, ${manifestPath}, ${favorState})`,
- );
- await this.rebuildManifest();
+ try {
+ await this.python?.lock<void>(
+ (python) =>
+ python!`project.set_defer_config(${deferToProduction}, ${manifestPath}, ${favorState})`,
+ );
+ await this.rebuildManifest();
+ } finally {
+ await this.executionInfrastructure.closePythonBridge(this.python);
+ }
}
src/webview_provider/insightsPanel.ts
---
+++
@@ -14,9 +14,14 @@
cancellable: false,
},
async () => {
- await this.dbtProjectContainer
- .findDBTProject(Uri.file(params.projectRoot))
- ?.applyDeferConfig();
+ const dbtProject = this.dbtProjectContainer
+ .findDBTProject(Uri.file(params.projectRoot));
+ await dbtProject?.applyDeferConfig();
+
+ // Close any Python threads created by the dbt project
+ if (dbtProject instanceof DBTCoreProjectIntegration) {
+ await dbtProject.executionInfrastructure.closePythonBridge(dbtProject.python);
+ }
},
);
}
src/dbt_client/dbtCoreIntegration.ts
---
+++
@@ -10,34 +10,36 @@
queryThread.kill(2);
},
async () => {
- await this.createPythonDbtProject(queryThread);
- await queryThread.ex`project.init_project()`;
- // compile query
- const compiledQuery = await this.unsafeCompileQuery(limitQuery);
- // execute query
- let result: ExecuteSQLResult;
try {
- result = await queryThread!.lock<ExecuteSQLResult>(
- (python) => python`to_dict(project.execute_sql(${compiledQuery}))`,
- );
- const { manifestPathType } =
- this.deferToProdService.getDeferConfigByProjectRoot(
- this.projectRoot.fsPath,
+ await this.createPythonDbtProject(queryThread);
+ await queryThread.ex`project.init_project()`;
+ // compile query
+ const compiledQuery = await this.unsafeCompileQuery(limitQuery);
+ // execute query
+ let result: ExecuteSQLResult;
+ try {
+ result = await queryThread!.lock<ExecuteSQLResult>(
+ (python) => python`to_dict(project.execute_sql(${compiledQuery}))`,
);
- if (manifestPathType === ManifestPathType.REMOTE) {
- this.altimateRequest.sendDeferToProdEvent(ManifestPathType.REMOTE);
+ const { manifestPathType } =
+ this.deferToProdService.getDeferConfigByProjectRoot(
+ this.projectRoot.fsPath,
+ );
+ if (manifestPathType === ManifestPathType.REMOTE) {
+ this.altimateRequest.sendDeferToProdEvent(ManifestPathType.REMOTE);
+ }
+ } catch (err) {
+ const message = `Error while executing sql: ${compiledQuery}`;
+ this.dbtTerminal.error("dbtCore:executeSQL", message, err);
+ if (err instanceof PythonException) {
+ throw new ExecuteSQLError(err.exception.message, compiledQuery!);
+ }
+ throw new ExecuteSQLError((err as Error).message, compiledQuery!);
}
- } catch (err) {
- const message = `Error while executing sql: ${compiledQuery}`;
- this.dbtTerminal.error("dbtCore:executeSQL", message, err);
- if (err instanceof PythonException) {
- throw new ExecuteSQLError(err.exception.message, compiledQuery!);
- }
- throw new ExecuteSQLError((err as Error).message, compiledQuery!);
+ return { ...result, compiled_stmt: compiledQuery };
} finally {
- await queryThread.end();
+ await this.executionInfrastructure.closePythonBridge(queryThread);
}
- return { ...result, compiled_stmt: compiledQuery };
},
);
}
src/dbt_client/dbtCoreIntegration.ts
---
+++
@@ -1,33 +1,49 @@
async unsafeCompileNode(modelName: string): Promise<string> {
this.throwBridgeErrorIfAvailable();
- const output = await this.python?.lock<CompilationResult>(
- (python) =>
- python!`to_dict(project.compile_node(project.get_ref_node(${modelName})))`,
- );
- return output.compiled_sql;
+ try {
+ const output = await this.python?.lock<CompilationResult>(
+ (python) =>
+ python!`to_dict(project.compile_node(project.get_ref_node(${modelName})))`,
+ );
+ return output.compiled_sql;
+ } finally {
+ await this.executionInfrastructure.closePythonBridge(this.python);
+ }
}
async unsafeCompileQuery(query: string): Promise<string> {
this.throwBridgeErrorIfAvailable();
- const output = await this.python?.lock<CompilationResult>(
- (python) => python!`to_dict(project.compile_sql(${query}))`,
- );
- return output.compiled_sql;
+ try {
+ const output = await this.python?.lock<CompilationResult>(
+ (python) => python!`to_dict(project.compile_sql(${query}))`,
+ );
+ return output.compiled_sql;
+ } finally {
+ await this.executionInfrastructure.closePythonBridge(this.python);
+ }
}
async validateSql(query: string, dialect: string, models: any) {
this.throwBridgeErrorIfAvailable();
- const result = await this.python?.lock<ValidateSqlParseErrorResponse>(
- (python) =>
- python!`to_dict(validate_sql(${query}, ${dialect}, ${models}))`,
- );
- return result;
+ try {
+ const result = await this.python?.lock<ValidateSqlParseErrorResponse>(
+ (python) =>
+ python!`to_dict(validate_sql(${query}, ${dialect}, ${models}))`,
+ );
+ return result;
+ } finally {
+ await this.executionInfrastructure.closePythonBridge(this.python);
+ }
}
async validateSQLDryRun(query: string) {
this.throwBridgeErrorIfAvailable();
- const result = await this.python?.lock<{ bytes_processed: string }>(
- (python) => python!`to_dict(project.validate_sql_dry_run(${query}))`,
- );
- return result;
+ try {
+ const result = await this.python?.lock<{ bytes_processed: string }>(
+ (python) => python!`to_dict(project.validate_sql_dry_run(${query}))`,
+ );
+ return result;
+ } finally {
+ await this.executionInfrastructure.closePythonBridge(this.python);
+ }
}
src/dbt_client/dbtCoreIntegration.ts
---
+++
@@ -1,37 +1,45 @@
async getColumnsOfModel(modelName: string) {
this.throwBridgeErrorIfAvailable();
- // Get database and schema
- const node = (await this.python?.lock(
- (python) => python!`to_dict(project.get_ref_node(${modelName}))`,
- )) as ResolveReferenceNodeResult;
- // Get columns
- if (!node) {
- return [];
+ try {
+ // Get database and schema
+ const node = (await this.python?.lock(
+ (python) => python!`to_dict(project.get_ref_node(${modelName}))`,
+ )) as ResolveReferenceNodeResult;
+ // Get columns
+ if (!node) {
+ return [];
+ }
+ // TODO: fix this type
+ return await this.getColumsOfRelation(
+ node.database,
+ node.schema,
+ node.alias || modelName,
+ );
+ } finally {
+ await this.executionInfrastructure.closePythonBridge(this.python);
}
- // TODO: fix this type
- return this.getColumsOfRelation(
- node.database,
- node.schema,
- node.alias || modelName,
- );
}
async getColumnsOfSource(sourceName: string, tableName: string) {
this.throwBridgeErrorIfAvailable();
- // Get database and schema
- const node = (await this.python?.lock(
- (python) =>
- python!`to_dict(project.get_source_node(${sourceName}, ${tableName}))`,
- )) as ResolveReferenceSourceResult;
- // Get columns
- if (!node) {
- return [];
+ try {
+ // Get database and schema
+ const node = (await this.python?.lock(
+ (python) =>
+ python!`to_dict(project.get_source_node(${sourceName}, ${tableName}))`,
+ )) as ResolveReferenceSourceResult;
+ // Get columns
+ if (!node) {
+ return [];
+ }
+ return await this.getColumsOfRelation(
+ node.database,
+ node.schema,
+ node.identifier,
+ );
+ } finally {
+ await this.executionInfrastructure.closePythonBridge(this.python);
}
- return this.getColumsOfRelation(
- node.database,
- node.schema,
- node.identifier,
- );
}
private async getColumsOfRelation(
@@ -40,10 +48,14 @@
objectName: string,
): Promise<DBColumn[]> {
this.throwBridgeErrorIfAvailable();
- return this.python?.lock<DBColumn[]>(
- (python) =>
- python!`to_dict(project.get_columns_in_relation(project.create_relation(${database}, ${schema}, ${objectName})))`,
- );
+ try {
+ return await this.python?.lock<DBColumn[]>(
+ (python) =>
+ python!`to_dict(project.get_columns_in_relation(project.create_relation(${database}, ${schema}, ${objectName})))`,
+ );
+ } finally {
+ await this.executionInfrastructure.closePythonBridge(this.python);
+ }
}
async getBulkSchema(
@@ -51,27 +63,35 @@
cancellationToken: CancellationToken,
): Promise<Record<string, DBColumn[]>> {
const result: Record<string, DBColumn[]> = {};
- for (const n of nodes) {
- if (cancellationToken.isCancellationRequested) {
- break;
+ try {
+ for (const n of nodes) {
+ if (cancellationToken.isCancellationRequested) {
+ break;
+ }
+ if (n.resource_type === DBTProject.RESOURCE_TYPE_SOURCE) {
+ const source = n as SourceNode;
+ result[n.unique_id] = await this.getColumnsOfSource(
+ source.name,
+ source.table,
+ );
+ } else {
+ const model = n as Node;
+ result[n.unique_id] = await this.getColumnsOfModel(model.name);
+ }
}
- if (n.resource_type === DBTProject.RESOURCE_TYPE_SOURCE) {
- const source = n as SourceNode;
- result[n.unique_id] = await this.getColumnsOfSource(
- source.name,
- source.table,
- );
- } else {
- const model = n as Node;
- result[n.unique_id] = await this.getColumnsOfModel(model.name);
- }
+ return result;
+ } finally {
+ await this.executionInfrastructure.closePythonBridge(this.python);
}
- return result;
}
async getCatalog(): Promise<Catalog> {
this.throwBridgeErrorIfAvailable();
- return await this.python?.lock<Catalog>(
- (python) => python!`to_dict(project.get_catalog())`,
- );
+ try {
+ return await this.python?.lock<Catalog>(
+ (python) => python!`to_dict(project.get_catalog())`,
+ );
+ } finally {
+ await this.executionInfrastructure.closePythonBridge(this.python);
+ }
}
src/dbt_client/dbtCoreIntegration.ts
---
+++
@@ -17,6 +17,6 @@
);
return result;
} finally {
- healthCheckThread.end();
+ await this.executionInfrastructure.closePythonBridge(healthCheckThread);
}
}
src/dbt_client/dbtCloudIntegration.ts
---
+++
@@ -23,38 +23,43 @@
cancellationTokenSource.cancel();
},
async () => {
- const { stdout, stderr } = await showCommand.execute(
- cancellationTokenSource.token,
- );
- const exception = this.processJSONErrors(stderr);
- if (exception) {
- throw exception;
+ try {
+ const { stdout, stderr } = await showCommand.execute(
+ cancellationTokenSource.token,
+ );
+ const exception = this.processJSONErrors(stderr);
+ if (exception) {
+ throw exception;
+ }
+ const parsedLines = stdout
+ .trim()
+ .split("\n")
+ .map((line) => JSON.parse(line.trim()));
+ const previewLine = parsedLines.filter((line) =>
+ line.data.hasOwnProperty("preview"),
+ );
+ const compiledSqlLines = parsedLines.filter((line) =>
+ line.data.hasOwnProperty("sql"),
+ );
+ const preview = JSON.parse(previewLine[0].data.preview);
+ const compiledSql =
+ compiledSqlLines[compiledSqlLines.length - 1].data.sql;
+ return {
+ table: {
+ column_names: preview.length > 0 ? Object.keys(preview[0]) : [],
+ column_types:
+ preview.length > 0
+ ? Object.keys(preview[0]).map((obj: any) => "string")
+ : [],
+ rows: preview.map((obj: any) => Object.values(obj)),
+ },
+ compiled_sql: compiledSql,
+ raw_sql: query,
+ };
+ } finally {
+ // Close the Python bridge used by the dbt cloud command
+ await this.executionInfrastructure.closePythonBridge(this.python);
}
- const parsedLines = stdout
- .trim()
- .split("\n")
- .map((line) => JSON.parse(line.trim()));
- const previewLine = parsedLines.filter((line) =>
- line.data.hasOwnProperty("preview"),
- );
- const compiledSqlLines = parsedLines.filter((line) =>
- line.data.hasOwnProperty("sql"),
- );
- const preview = JSON.parse(previewLine[0].data.preview);
- const compiledSql =
- compiledSqlLines[compiledSqlLines.length - 1].data.sql;
- return {
- table: {
- column_names: preview.length > 0 ? Object.keys(preview[0]) : [],
- column_types:
- preview.length > 0
- ? Object.keys(preview[0]).map((obj: any) => "string")
- : [],
- rows: preview.map((obj: any) => Object.values(obj)),
- },
- compiled_sql: compiledSql,
- raw_sql: query,
- };
},
);
}
src/dbt_client/dbtCloudIntegration.ts
---
+++
@@ -12,17 +12,21 @@
"json",
]),
);
- const { stdout, stderr } = await compileQueryCommand.execute();
- const compiledLine = stdout
- .trim()
- .split("\n")
- .map((line) => JSON.parse(line.trim()))
- .filter((line) => line.data.hasOwnProperty("compiled"));
- const exception = this.processJSONErrors(stderr);
- if (exception) {
- throw exception;
+ try {
+ const { stdout, stderr } = await compileQueryCommand.execute();
+ const compiledLine = stdout
+ .trim()
+ .split("\n")
+ .map((line) => JSON.parse(line.trim()))
+ .filter((line) => line.data.hasOwnProperty("compiled"));
+ const exception = this.processJSONErrors(stderr);
+ if (exception) {
+ throw exception;
+ }
+ return compiledLine[0].data.compiled;
+ } finally {
+ await this.executionInfrastructure.closePythonBridge(this.python);
}
- return compiledLine[0].data.compiled;
}
async unsafeCompileQuery(query: string): Promise<string> {
@@ -39,17 +43,21 @@
"json",
]),
);
- const { stdout, stderr } = await compileQueryCommand.execute();
- const compiledLine = stdout
- .trim()
- .split("\n")
- .map((line) => JSON.parse(line.trim()))
- .filter((line) => line.data.hasOwnProperty("compiled"));
- const exception = this.processJSONErrors(stderr);
- if (exception) {
- throw exception;
+ try {
+ const { stdout, stderr } = await compileQueryCommand.execute();
+ const compiledLine = stdout
+ .trim()
+ .split("\n")
+ .map((line) => JSON.parse(line.trim()))
+ .filter((line) => line.data.hasOwnProperty("compiled"));
+ const exception = this.processJSONErrors(stderr);
+ if (exception) {
+ throw exception;
+ }
+ return compiledLine[0].data.compiled;
+ } finally {
+ await this.executionInfrastructure.closePythonBridge(this.python);
}
- return compiledLine[0].data.compiled;
}
async validateSql(
@@ -59,11 +67,15 @@
): Promise<ValidateSqlParseErrorResponse> {
this.throwIfNotAuthenticated();
this.throwBridgeErrorIfAvailable();
- const result = await this.python?.lock<ValidateSqlParseErrorResponse>(
- (python) =>
- python!`to_dict(validate_sql(${query}, ${dialect}, ${models}))`,
- );
- return result;
+ try {
+ const result = await this.python?.lock<ValidateSqlParseErrorResponse>(
+ (python) =>
+ python!`to_dict(validate_sql(${query}, ${dialect}, ${models}))`,
+ );
+ return result;
+ } finally {
+ await this.executionInfrastructure.closePythonBridge(this.python);
+ }
}
async validateSQLDryRun(query: string): Promise<{ bytes_processed: string }> {
@@ -80,15 +92,19 @@
"json",
]),
);
- const { stdout, stderr } = await validateSqlCommand.execute();
- const compiledLine = stdout
- .trim()
- .split("\n")
- .map((line) => JSON.parse(line.trim()))
- .filter((line) => line.data.hasOwnProperty("compiled"));
- const exception = this.processJSONErrors(stderr);
- if (exception) {
- throw exception;
+ try {
+ const { stdout, stderr } = await validateSqlCommand.execute();
+ const compiledLine = stdout
+ .trim()
+ .split("\n")
+ .map((line) => JSON.parse(line.trim()))
+ .filter((line) => line.data.hasOwnProperty("compiled"));
+ const exception = this.processJSONErrors(stderr);
+ if (exception) {
+ throw exception;
+ }
+ return JSON.parse(compiledLine[0].data.compiled);
+ } finally {
+ await this.executionInfrastructure.closePythonBridge(this.python);
}
- return JSON.parse(compiledLine[0].data.compiled);
}
src/dbt_client/dbtCloudIntegration.ts
---
+++
@@ -15,17 +15,21 @@
"json",
]),
);
- const { stdout, stderr } = await compileQueryCommand.execute();
- const compiledLine = stdout
- .trim()
- .split("\n")
- .map((line) => JSON.parse(line.trim()))
- .filter((line) => line.data.hasOwnProperty("compiled"));
- const exception = this.processJSONErrors(stderr);
- if (exception) {
- throw exception;
+ try {
+ const { stdout, stderr } = await compileQueryCommand.execute();
+ const compiledLine = stdout
+ .trim()
+ .split("\n")
+ .map((line) => JSON.parse(line.trim()))
+ .filter((line) => line.data.hasOwnProperty("compiled"));
+ const exception = this.processJSONErrors(stderr);
+ if (exception) {
+ throw exception;
+ }
+ return JSON.parse(compiledLine[0].data.compiled);
+ } finally {
+ await this.executionInfrastructure.closePythonBridge(this.python);
}
- return JSON.parse(compiledLine[0].data.compiled);
}
async getColumnsOfModel(modelName: string): Promise<DBColumn[]> {
@@ -42,15 +46,19 @@
"json",
]),
);
- const { stdout, stderr } = await compileQueryCommand.execute();
- const compiledLine = stdout
- .trim()
- .split("\n")
- .map((line) => JSON.parse(line.trim()))
- .filter((line) => line.data.hasOwnProperty("compiled"));
- const exception = this.processJSONErrors(stderr);
- if (exception) {
- throw exception;
+ try {
+ const { stdout, stderr } = await compileQueryCommand.execute();
+ const compiledLine = stdout
+ .trim()
+ .split("\n")
+ .map((line) => JSON.parse(line.trim()))
+ .filter((line) => line.data.hasOwnProperty("compiled"));
+ const exception = this.processJSONErrors(stderr);
+ if (exception) {
+ throw exception;
+ }
+ return JSON.parse(compiledLine[0].data.compiled);
+ } finally {
+ await this.executionInfrastructure.closePythonBridge(this.python);
}
- return JSON.parse(compiledLine[0].data.compiled);
}
src/dbt_client/dbtCloudIntegration.ts
---
+++
@@ -35,18 +35,22 @@
"json",
]),
);
- const { stdout, stderr } =
- await compileQueryCommand.execute(cancellationToken);
- const compiledLine = stdout
- .trim()
- .split("\n")
- .map((line) => JSON.parse(line.trim()))
- .filter((line) => line.data.hasOwnProperty("compiled"));
- const exception = this.processJSONErrors(stderr);
- if (exception) {
- throw exception;
+ try {
+ const { stdout, stderr } =
+ await compileQueryCommand.execute(cancellationToken);
+ const compiledLine = stdout
+ .trim()
+ .split("\n")
+ .map((line) => JSON.parse(line.trim()))
+ .filter((line) => line.data.hasOwnProperty("compiled"));
+ const exception = this.processJSONErrors(stderr);
+ if (exception) {
+ throw exception;
+ }
+ return JSON.parse(compiledLine[0].data.compiled);
+ } finally {
+ await this.executionInfrastructure.closePythonBridge(this.python);
}
- return JSON.parse(compiledLine[0].data.compiled);
}
async getCatalog(): Promise<Catalog> {
@@ -55,9 +59,9 @@
const bulkModelQuery = `
{% set result = [] %}
{% for n in graph.nodes.values() %}
- {% if n.resource_type == "test" or
- n.resource_type == "analysis" or
- n.resource_type == "sql_operation" or
+ {% if n.resource_type == "test" or
+ n.resource_type == "analysis" or
+ n.resource_type == "sql_operation" or
n.config.materialized == "ephemeral" %}
{% continue %}
{% endif %}
@@ -97,16 +101,20 @@
"json",
]),
);
- const { stdout, stderr } = await compileQueryCommand.execute();
- const compiledLine = stdout
- .trim()
- .split("\n")
- .map((line) => JSON.parse(line.trim()))
- .filter((line) => line.data.hasOwnProperty("compiled"));
- const exception = this.processJSONErrors(stderr);
- if (exception) {
- throw exception;
+ try {
+ const { stdout, stderr } = await compileQueryCommand.execute();
+ const compiledLine = stdout
+ .trim()
+ .split("\n")
+ .map((line) => JSON.parse(line.trim()))
+ .filter((line) => line.data.hasOwnProperty("compiled"));
+ const exception = this.processJSONErrors(stderr);
+ if (exception) {
+ throw exception;
+ }
+ const result: Catalog = JSON.parse(compiledLine[0].data.compiled);
+ return result;
+ } finally {
+ await this.executionInfrastructure.closePythonBridge(this.python);
}
- const result: Catalog = JSON.parse(compiledLine[0].data.compiled);
- return result;
}
src/dbt_client/dbtCloudIntegration.ts
---
+++
@@ -5,9 +5,13 @@
configPath,
}: HealthcheckArgs): Promise<ProjectHealthcheck> {
this.throwBridgeErrorIfAvailable();
- const result = await this.python?.lock<ProjectHealthcheck>(
- (python) =>
- python!`to_dict(project_healthcheck(${manifestPath}, ${catalogPath}, ${configPath}, ${config}))`,
- );
- return result;
+ try {
+ const result = await this.python?.lock<ProjectHealthcheck>(
+ (python) =>
+ python!`to_dict(project_healthcheck(${manifestPath}, ${catalogPath}, ${configPath}, ${config}))`,
+ );
+ return result;
+ } finally {
+ await this.executionInfrastructure.closePythonBridge(this.python);
+ }
}
Your changes have been successfully made to the branch sweep/getting_error_io_error_file_already_open
. I have validated these changes using a syntax checker and a linter.
[!TIP] To recreate the pull request, edit the issue title or description.
This is an automated message generated by Sweep AI.
Works fine in dbt1.8
Expected behavior
dbt build command should pass
Actual behavior
dbt build command fails
Steps To Reproduce
When user updates defer config, config is saved in vscode settings here: src\webview_provider\insightsPanel.ts -> updateDeferConfig function. After the config is saved, if user executes
dbt build
command in terminal or usebuild dbt model
in actions menu, python exception thrown. "IO error: file already open in python.exe". This happens when user uses duckdb. It is possible that a python thread is created somewhere in code and it connects to db, but the thread is not closed after the process is completed which makes the db locked. Find the root cause of this issue and gracefully close the thread after the process is completed.Log output/Screenshots
No response
Operating System
MacOs
dbt version
1.7
dbt Adapter
dbt-duckdb
dbt Power User version
any
Are you willing to submit PR?