databricks / dbt-databricks

A dbt adapter for Databricks.
https://databricks.com
Apache License 2.0
209 stars 110 forks source link

dbt session connection times out after 15 minutes with Python models #324

Closed artemsheiko closed 7 months ago

artemsheiko commented 1 year ago

Describe the bug

We obtain Invalid SessionHandle: SessionHandle while dbt is executed on Databricks All-purpose clusters.

We tried to set the following configurations for a cluster :

Steps To Reproduce

The process might be described as follows :

Expected behavior

A clear and concise description of what you expected to happen.

Screenshots and log output

If applicable, add screenshots or log output to help explain your problem.

System information

The output of dbt --version:

dbt-core                           1.4.5
dbt-databricks                 1.4.1
dbt-spark                         1.4.1
susodapop commented 1 year ago

We think this has to do with Databricks SQL gateway expiring inactive sessions after fifteen minutes of inactivity. There is a race condition when a DAG includes both Python and SQL models, since these models often different connections. SQL models will use a SQL warehouse whereas Python models use an all-purpose cluster.

While processing the DAG, Python models run against their connection and SQL models wait for the Python models to complete. If this takes longer than fifteen minutes, the session used for SQL models times out and raises this exception.

One obvious fix is to implement a keep-alive for the sql connection. We're investigating this now.

azorej commented 1 year ago

@susodapop I made a minimal example to reproduce the bug (and added description): https://github.com/azorej/dbt_python_model_connection_timeout/tree/main

The problem with settings hive.server2.idle.session.timeout, spark.hadoop.hive.server2.idle.session.timeout, spark.hive.server2.idle.session.timeout is specific for Databricks clusters, i think: they are working as expected if i set them in cluster settings section (before cluster starts). And they don't work if i set them through SQL (it's expected as python models don't use standard dbt connections) or pyspark

While processing the DAG, Python models run against their connection and SQL models wait for the Python models to complete. If this takes longer than fifteen minutes, the session used for SQL models times out and raises this exception.

Not quite right. The problem is in macros: dbt macros use connections from pool by model name (e.g. model.dbt_python_model_connection_timeout.long_python_model). But python models don't use standard dbt connection mechanism. So if we have, for example, the next chain of executions: get_columns_in_relation_raw -> python_model -> get_columns_in_relation_raw than session connection can expire if python_model will be too long

I see two solutions here:

  1. Poll model connection while python model is running (not the best one as it's possible there are can be two different clusters: one for python model and one for macros/incremental merge)
  2. Recreate standard model connection after python model ends
leo-schick commented 1 year ago

I get this error, too. In my use case, the stacktrace of the error always looks like this:

Exception ignored in: <function Connection.__del__ at 0x7f9f3e330e50>
Traceback (most recent call last):
  File "/databricks/python/lib/python3.9/site-packages/databricks/sql/client.py", line 209, in __del__
    self._close(close_cursors=False)
  File "/databricks/python/lib/python3.9/site-packages/databricks/sql/client.py", line 247, in _close
    self.thrift_backend.close_session(self._session_handle)
  File "/databricks/python/lib/python3.9/site-packages/databricks/sql/thrift_backend.py", line 494, in close_session
    self.make_request(self._client.CloseSession, req)
  File "/databricks/python/lib/python3.9/site-packages/databricks/sql/thrift_backend.py", line 407, in make_request
    ThriftBackend._check_response_for_error(response)
  File "/databricks/python/lib/python3.9/site-packages/databricks/sql/thrift_backend.py", line 209, in _check_response_for_error
    raise DatabaseError(response.status.errorMessage)
databricks.sql.exc.DatabaseError: Invalid SessionHandle: SessionHandle [716d501b-e581-4a78-a99e-7005ff253d98]

@susodapop It looks to me that this is related to the close operation of a session which you mentioned you fixed already for Metabase. Is that somethin you can implement for databricks-sql-connector as well? I guess this will fix this issue.

benc-db commented 12 months ago

@artemsheiko @leo-schick does this still reproduce with 1.6.3?

leo-schick commented 11 months ago

@benc-db I must say that I can’t verify it anymore since I moved the model logic now over to an relational database where this happened and now it is blasting fast. I probably was using Spark in a wrong way.

Maybe @artemsheiko can give an answer on this.

leo-schick commented 11 months ago

@benc-db Unfortunately, this isse is not solved. I still run some python models here and there and this is what I got now:

Environment:

dbt-databricks==1.6.4
dbt-core==1.6.2
Databricks Runtime 11.3 LTS

Python Model Code

from datetime import date
from pyspark.sql.functions import *

def model(dbt, session):
    dbt.config(
        submission_method="all_purpose_cluster",
        #location_root='/mnt/lakehouse/Finance'
        #create_notebook=True,
        #cluster_id="abcd-1234-wxyz"
    )

    # make sure that upstreams are there
    dbt.ref('CostAllocationTransaction')  # make sure that the table is created before we start inserting data into it
    #dbt.ref('CostAllocationTransaction_CreateFromRule')

    from_date = date(2020, 8, 1)

    cursor = dbt.ref('CostAllocationRule').join(
        dbt.source('Finance_static', 'Company'),
        (col('CostAllocationRule.CompanyNum') == col('Company.CompanyNum')),
        'inner').filter(
            (col('Company.IsConsolidateCompany') == False) &
            (col('CostAllocationRule.ApplyOn').isin([
                1, # Actuals
                3 # Actuals and Budget
            ]))
        ).orderBy(
            col('CostAllocationRule.SortOrder')
        ).select(
            col('CostAllocationRule.CompanyNum'),
            col('CostAllocationRule.RuleNum')
        )

    def process_rule(rule):
        print(f'Rule: {rule["RuleNum"]}')

        session.sql(f"""
INSERT INTO Finance.CostAllocationTransaction
(
    CompanyNum,
    CostAllocationRuleNum,
    LedgerAccountNum,
    TransDate,
    BookingText,

    AmountACY,
    DimensionDepartmentNum,
    DimensionCostCenterNum,
    DimensionGroupNum,
    DimensionStoreNum,
    IsCorrection,
    IsCrediting
)
SELECT
    '{rule["CompanyNum"]}' AS CompanyNum,
    {rule["RuleNum"]} AS CostAllocationRuleNum,
    LedgerAccountNum,
    TransDate,
    BookingText,

    AmountACY,
    DimensionDepartmentNum,
    DimensionCostCenterNum,
    DimensionGroupNum,
    DimensionStoreNum,
    false AS IsCorrection,
    CASE WHEN AmountACY >= 0 THEN false ELSE true END AS IsCrediting
FROM Finance.CostAllocationTransaction_CreateFromRule('{rule["CompanyNum"]}', {rule["RuleNum"]}, '{from_date.strftime('%Y-%m-%d')}')
""")

    for row in cursor.rdd.collect():
        process_rule(row)

    return cursor

Output of the console log:

05:15:45  Running with dbt=1.6.2
05:15:47  Registered adapter: databricks=1.6.4
05:15:47  Unable to do partial parsing because saved manifest not found. Starting full parse.
05:16:00  Found 855 models, 12 tests, 410 sources, 0 exposures, 0 metrics, 729 macros, 0 groups, 0 semantic models
05:16:00  
05:16:19  Concurrency: 14 threads (target='default')

[...]

05:49:46  databricks-sql-connector adapter: Attempted to close session that was already closed: Invalid SessionHandle: SessionHandle [73d1c770-45bc-4f24-961a-60be2a8360c3]
05:49:46  31 of 48 ERROR creating python table model Finance.CostAllocationTransaction_rebuild  [ERROR in 1841.94s]

[...]

05:49:47  Finished running 46 table models, 1 view model, 1 incremental model in 0 hours 33 minutes and 46.23 seconds (2026.23s).
05:49:47  
05:49:47  Completed with 1 error and 0 warnings:
05:49:47  
05:49:47    Runtime Error in model CostAllocationTransaction_rebuild (models/Finance/CostAllocationTransaction_rebuild.py)
  Python model failed with traceback as:
  ---------------------------------------------------------------------------
  AnalysisException                         Traceback (most recent call last)
  <command--1> in <cell line: 193>()
      191 )
      192 
  --> 193 writer.saveAsTable("`Finance`.`CostAllocationTransaction_rebuild`")
      194 
      195 

  /databricks/spark/python/pyspark/instrumentation_utils.py in wrapper(*args, **kwargs)
       46             start = time.perf_counter()
       47             try:
  ---> 48                 res = func(*args, **kwargs)
       49                 logger.log_success(
       50                     module_name, class_name, function_name, time.perf_counter() - start, signature

  /databricks/spark/python/pyspark/sql/readwriter.py in saveAsTable(self, name, format, mode, partitionBy, **options)
     1039         if format is not None:
     1040             self.format(format)
  -> 1041         self._jwrite.saveAsTable(name)
     1042 
     1043     def json(

  /databricks/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py in __call__(self, *args)
     1319 
     1320         answer = self.gateway_client.send_command(command)
  -> 1321         return_value = get_return_value(
     1322             answer, self.gateway_client, self.target_id, self.name)
     1323 

  /databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
      200                 # Hide where the exception came from that shows a non-Pythonic
      201                 # JVM exception message.
  --> 202                 raise converted from None
      203             else:
      204                 raise

  AnalysisException: The location of the existing table `spark_catalog`.`Finance`.`CostAllocationTransaction_rebuild` is `dbfs:/user/hive/warehouse/finance.db/costallocationtransaction_rebuild`. It doesn't match the specified location `dbfs:/mnt/lakehouse/Finance/CostAllocationTransaction_rebuild`.
05:49:47  
05:49:47  Done. PASS=43 WARN=0 ERROR=1 SKIP=4 TOTAL=48
sfwatergit commented 9 months ago

This issue continues to plague us. Is there a planned fix or workaround?

benc-db commented 9 months ago

@sfwatergit can you share what it looks like when it fails for you? The original report is from 1.4.x, what version are you using? Depending on exactly how its failing, there is a change coming in 1.7.3 that might help.

sfwatergit commented 9 months ago

Sure. We're currently on 1.6.6

It looks like this (from a dbt log where the issue shows up):


13:36:42.884438 [debug] [MainThread]: Command end result
13:36:43.303606 [info ] [MainThread]: 
13:36:43.304056 [info ] [MainThread]: Completed with 1 error and 0 warnings:
13:36:43.304294 [info ] [MainThread]: 
13:36:43.304514 [error] [MainThread]: Runtime Error in model stg_telemetry_messages (models/staging/evh/stg_telemetry_messages.py)
13:36:43.304736 [error] [MainThread]:   Runtime Error
13:36:43.304951 [error] [MainThread]:     Invalid SessionHandle: SessionHandle [b757ec9c-04d8-4ce0-a5ac-cf6226364b60]
13:36:43.305204 [info ] [MainThread]: 
13:36:43.305439 [info ] [MainThread]: Done. PASS=35 WARN=0 ERROR=1 SKIP=23 TOTAL=59
13:36:43.305951 [debug] [MainThread]: Command `dbt build` failed at 13:36:43.305883 after 1348.04 seconds

Where stg_telemetry_messages.py is a Python model that often runs for over 15 minutes.

Are you referring to the DBT_DATABRICKS_LONG_SESSIONS variable in dbt/connections.py? I was just looking at that. Would setting this environment variable to True address the issue? Until 1.7.3 is released, we should be able to test if that helps by pointing our dependency to this GitHub repo, right?

benc-db commented 9 months ago

Yes, if you can test with the version in main in Github and let me know what happens, that would be great.

sfwatergit commented 9 months ago

@benc-db

No luck. Still broken:

03:37:33  databricks-sql-connector adapter: Attempted to close session that was already closed: Invalid SessionHandle: SessionHandle [d298b344-47ef-49e0-90f4-570e2f71942e]
03:37:34  databricks-sql-connector adapter: Attempted to close session that was already closed: Invalid SessionHandle: SessionHandle [57e56dbd-d2ea-4f8b-89c5-46f75aebc9c3]
03:37:35  databricks-sql-connector adapter: Attempted to close session that was already closed: Invalid SessionHandle: SessionHandle [60071703-6c07-43dc-a5e0-b5186125a77a]
03:37:35  databricks-sql-connector adapter: Attempted to close session that was already closed: Invalid SessionHandle: SessionHandle [f1337f95-a00a-4230-8f68-d12f5d8e8809]
03:37:36  
03:37:36  Running 1 on-run-end hook
03:37:52  1 of 1 START hook: elementary.on-run-end.0 ..................................... [RUN]
03:37:52  1 of 1 OK hook: elementary.on-run-end.0 ........................................ [OK in 0.00s]
03:37:52  
03:37:52  databricks-sql-connector adapter: Attempted to close session that was already closed: Invalid SessionHandle: SessionHandle [3bcdfd2b-214f-4a42-9b46-813562e5ed3c]
03:37:52  
03:37:52  Finished running 7 incremental models, 17 table models, 1 view model, 2 hooks in 4 hours 17 minutes and 38.88 seconds (15458.88s).
03:37:52  
03:37:52  Completed with 1 error and 0 warnings:
03:37:52  
03:37:52    Runtime Error in model stg_telemetry_messages (models/staging/evh/stg_telemetry_messages.py)
  Runtime Error
    Invalid SessionHandle: SessionHandle [3bcdfd2b-214f-4a42-9b46-813562e5ed3c]
03:37:52  
03:37:52  Done. PASS=0 WARN=0 ERROR=1 SKIP=24 TOTAL=25
benc-db commented 9 months ago

@rcypher-databricks @susodapop any insight here?

leo-schick commented 8 months ago

Unfortunately, yes: image

Environment:

Databricks Runtime 13.3 LTS
dbt-core==1.7.4
dbt-databricks==1.7.1
benc-db commented 7 months ago

This should be fixed in the latest version, which will probably release next week.