databricks / dbt-databricks

A dbt adapter for Databricks.
https://databricks.com
Apache License 2.0
228 stars 119 forks source link

Fix streaming tables idempotency issue #711

Closed case-k-git closed 5 months ago

case-k-git commented 5 months ago

Resolves #

https://github.com/databricks/dbt-databricks/issues/710

Description

As I post the issue , Running the following model more than twice causes dbt run to fail. The refresh process is supposed to be called, but the process fails before that.

SQL

{{
   config(
     materialized='streaming_table'
   )
}}

select
 *
 ,_metadata.file_path as file_path
from stream read_files('s3://path/to/your/data/',
format => 'parquet',header => true)

ERROR LOG

% dbt run
01:49:41  Running with dbt=1.8.1
01:49:41  Registered adapter: databricks=1.8.1
01:49:42  Found 1 model, 1 source, 586 macros
01:49:42  
01:49:43  Concurrency: 1 threads (target='dev')
01:49:43  
01:49:43  1 of 1 START sql streaming_table model schema_demo.daily_model_load_st_2 ....... [RUN]
01:49:45  Unhandled error while executing 
HTTPSConnectionPool(host='hoge.cloud.databricks.com', port=443): Max retries exceeded with url: /api/2.0/pipelines/8cd5b015-2315-48f8-991c-537215d5c989 (Caused by SSLError(SSLCertVerificationError(1, '[SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: self-signed certificate in certificate chain (_ssl.c:1006)')))
01:49:45  1 of 1 ERROR creating sql streaming_table model schema_demo.daily_model_load_st_2  [ERROR in 1.61s]
01:49:46  
01:49:46  Finished running 1 streaming table model in 0 hours 0 minutes and 4.35 seconds (4.35s).
01:49:46  
01:49:46  Completed with 1 error and 0 warnings:
01:49:46  
01:49:46    HTTPSConnectionPool(host='hoge.cloud.databricks.com', port=443): Max retries exceeded with url: /api/2.0/pipelines/8cd5b015-2315-48f8-991c-537215d5c989 (Caused by SSLError(SSLCertVerificationError(1, '[SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: self-signed certificate in certificate chain (_ssl.c:1006)')))
01:49:46  
01:49:46  Done. PASS=0 WARN=0 ERROR=1 SKIP=0 TOTAL=1

I also confirm existing idempotency integration test code is failing.

tests/functional/adapter/streaming_tables/test_st_basic.py::TestStreamingTablesBasic::test_streaming_table_create_idempotent https://github.com/databricks/dbt-databricks/blob/main/tests/functional/adapter/streaming_tables/test_st_basic.py#L131

ERROR LOG

/bin/bash -c '.tox/integration-databricks-uc-sql-endpoint/bin/python -m pytest -v --profile databricks_uc_sql_endpoint -n 10 --dist loadscope tests/functional/adapter/streaming_tables/test_st_basic.py::TestStreamingTablesBasic::test_streaming_table_create_idempotent  ; ret=$?; [ $ret = 5 ] && exit 0 || exit $ret'

================================================================ test session starts =================================================================
platform darwin -- Python 3.11.5, pytest-8.2.2, pluggy-1.5.0 -- .tox/integration-databricks-uc-sql-endpoint/bin/python
cachedir: .pytest_cache
rootdir: /Users/hoge/Library/CloudStorage/OneDrive-hoge/Desktop/workspace/dbt-databricks
configfile: pytest.ini
plugins: csv-3.0.0, flaky-3.8.1, dotenv-0.5.2, xdist-3.6.1
10 workers [1 item]       
scheduling tests via LoadScopeScheduling

tests/functional/adapter/streaming_tables/test_st_basic.py::TestStreamingTablesBasic::test_streaming_table_create_idempotent 

[gw0] [100%] FAILED tests/functional/adapter/streaming_tables/test_st_basic.py::TestStreamingTablesBasic::test_streaming_table_create_idempotent 

====================================================================== FAILURES ======================================================================
__________________________________________ TestStreamingTablesBasic.test_streaming_table_create_idempotent ___________________________________________
[gw0] darwin -- Python 3.11.5 .tox/integration-databricks-uc-sql-endpoint/bin/python

self = <test_st_basic.TestStreamingTablesBasic object at 0x164317fd0>, project = <dbt.tests.fixtures.project.TestProjInfo object at 0x1647ae050>
my_streaming_table = <DatabricksRelation `catalog_demo`.`test17192800526881969844_test_st_basic`.`my_streaming_table`>

    def test_streaming_table_create_idempotent(self, project, my_streaming_table):
        assert self.query_relation_type(project, my_streaming_table) == "streaming_table"
>       util.run_dbt(["run", "--models", my_streaming_table.identifier])

tests/functional/adapter/streaming_tables/test_st_basic.py:144: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = ['run', '--models', 'my_streaming_table', '--project-dir', '/private/var/folders/sn/382bdzv15pndfxx0_ld4l50m0000gp/T/pytest-of-hoge/pytest-27/popen-gw0/project0', '--profiles-dir', ...]
expect_pass = True

    def run_dbt(
        args: Optional[List[str]] = None,
        expect_pass: bool = True,
    ):
        # Ignore logbook warnings
        warnings.filterwarnings("ignore", category=DeprecationWarning, module="logbook")

        # reset global vars
        reset_metadata_vars()

        # The logger will complain about already being initialized if
        # we don't do this.
        log_manager.reset_handlers()
        if args is None:
            args = ["run"]

        print("\n\nInvoking dbt with {}".format(args))
        from dbt.flags import get_flags

        flags = get_flags()
        project_dir = getattr(flags, "PROJECT_DIR", None)
        profiles_dir = getattr(flags, "PROFILES_DIR", None)
        if project_dir and "--project-dir" not in args:
            args.extend(["--project-dir", project_dir])
        if profiles_dir and "--profiles-dir" not in args:
            args.extend(["--profiles-dir", profiles_dir])

        dbt = dbtRunner()
        res = dbt.invoke(args)

        # the exception is immediately raised to be caught in tests
        # using a pattern like `with pytest.raises(SomeException):`
        if res.exception is not None:
            raise res.exception

        if expect_pass is not None:
>           assert res.success == expect_pass, "dbt exit state did not match expected"
E           AssertionError: dbt exit state did not match expected

.tox/integration-databricks-uc-sql-endpoint/lib/python3.11/site-packages/dbt/tests/util.py:110: AssertionError
--------------------------------------------------------------- Captured stdout setup ----------------------------------------------------------------

=== Test project_root: /private/var/folders/sn/382bdzv15pndfxx0_ld4l50m0000gp/T/pytest-of-hoge/pytest-27/popen-gw0/project0

Invoking dbt with ['seed']
01:47:34  Running with dbt=1.8.2
01:47:34  Registered adapter: databricks=1.8.1
01:47:34  Unable to do partial parsing because saved manifest not found. Starting full parse.
01:47:34  Found 3 models, 1 seed, 585 macros
01:47:34  
01:47:36  Concurrency: 1 threads (target='default')
01:47:36  
01:47:36  1 of 1 START seed file test17192800526881969844_test_st_basic.my_seed .......... [RUN]
01:47:40  1 of 1 OK loaded seed file test17192800526881969844_test_st_basic.my_seed ...... [INSERT 3 in 4.21s]
01:47:41  
01:47:41  Finished running 1 seed in 0 hours 0 minutes and 6.62 seconds (6.62s).
01:47:41  
01:47:41  Completed successfully
01:47:41  
01:47:41  Done. PASS=1 WARN=0 ERROR=0 SKIP=0 TOTAL=1

Invoking dbt with ['run', '--models', 'my_streaming_table', '--full-refresh']
01:47:41  Running with dbt=1.8.2
01:47:41  Registered adapter: databricks=1.8.1
01:47:41  Found 3 models, 1 seed, 585 macros
01:47:41  
01:47:43  Concurrency: 1 threads (target='default')
01:47:43  
01:47:43  1 of 1 START sql streaming_table model test17192800526881969844_test_st_basic.my_streaming_table  [RUN]
01:48:55  1 of 1 OK created sql streaming_table model test17192800526881969844_test_st_basic.my_streaming_table  [OK in 72.08s]
01:48:56  
01:48:56  Finished running 1 streaming table model in 0 hours 1 minutes and 15.23 seconds (75.23s).
01:48:56  
01:48:56  Completed successfully
01:48:56  
01:48:56  Done. PASS=1 WARN=0 ERROR=0 SKIP=0 TOTAL=1
---------------------------------------------------------------- Captured stdout call ----------------------------------------------------------------

Invoking dbt with ['run', '--models', 'my_streaming_table']
01:48:57  Running with dbt=1.8.2
01:48:58  Registered adapter: databricks=1.8.1
01:48:58  Found 3 models, 1 seed, 585 macros
01:48:58  
01:48:59  Concurrency: 1 threads (target='default')
01:48:59  
01:48:59  1 of 1 START sql streaming_table model test17192800526881969844_test_st_basic.my_streaming_table  [RUN]
01:49:01  Unhandled error while executing 
HTTPSConnectionPool(host='hoge-prod.cloud.databricks.com', port=443): Max retries exceeded with url: /api/2.0/pipelines/7c9d204a-4f63-4afa-bdf3-20e483d9404f (Caused by SSLError(SSLCertVerificationError(1, '[SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: self-signed certificate in certificate chain (_ssl.c:1006)')))
01:49:01  1 of 1 ERROR creating sql streaming_table model test17192800526881969844_test_st_basic.my_streaming_table  [ERROR in 1.96s]
01:49:02  
01:49:02  Finished running 1 streaming table model in 0 hours 0 minutes and 4.55 seconds (4.55s).
01:49:02  
01:49:02  Completed with 1 error and 0 warnings:
01:49:02  
01:49:02    HTTPSConnectionPool(host='hoge-prod.cloud.databricks.com', port=443): Max retries exceeded with url: /api/2.0/pipelines/7c9d204a-4f63-4afa-bdf3-20e483d9404f (Caused by SSLError(SSLCertVerificationError(1, '[SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: self-signed certificate in certificate chain (_ssl.c:1006)')))
01:49:02  
01:49:02  Done. PASS=0 WARN=0 ERROR=1 SKIP=0 TOTAL=1
============================================================== short test summary info ===============================================================
FAILED tests/functional/adapter/streaming_tables/test_st_basic.py::TestStreamingTablesBasic::test_streaming_table_create_idempotent - AssertionError: dbt exit state did not match expected
============================================================ 1 failed in 99.80s (0:01:39) ============================================================

Investigation

Step for Investigation.

the task has been failed when called get_streaming_table_configuration_changes method. So task is faield before executing refresh_streaming_table method. Checked inside get_streaming_table_configuration_changes method, it failed when calling adapter.get_relation_config. Inside get_relation_cofnig, finally find the method get_from_relation which causing this issue.

Checklist

Confirm all streaming_tables test has been passed

dbt-databricks % /bin/bash -c '/Users/username/Desktop/workspace/dbt-databricks/.tox/integration-databricks-uc-sql-endpoint/bin/python -m pytest -v --profile databricks_uc_sql_endpoint -n 10 --dist loadscope tests/functional/adapter/streaming_tables/* ; ret=$?; [ $ret = 5 ] && exit 0 || exit $ret'

streaming_dbt_test_results

benc-db commented 5 months ago

Core issue is cert related, not idempotency. We should debug why the certificate is invalid, not removing an API call.

benc-db commented 5 months ago

Recommend filling a ticket with Databricks saying the error you're getting when trying to call this API: https://docs.databricks.com/api/workspace/pipelines/get

kdazzle commented 5 months ago

The cert issue is because your python installation isn't connected to your certificate store. If you're on osx, running commands like this should help:

open /Applications/Python\ 3.11/Install\ Certificates.command
pip install --upgrade certifi
case-k-git commented 5 months ago

@benc-db @kdazzle Thank you for your review. I see so this my environment issue. I try the command to upgrade certifi but still have this issue.Let me check that. Thank you.

case-k-git commented 5 months ago

This is solve the issue in my case thank you!

pip install pip-system-certs

https://github.com/dbt-labs/dbt-core/issues/8554