airbytehq / airbyte

The leading data integration platform for ETL / ELT data pipelines from APIs, databases & files to data warehouses, data lakes & data lakehouses. Both self-hosted and Cloud-hosted.
https://airbyte.com
Other
16.28k stars 4.15k forks source link

Lowcode CDK: Stream Slicer "step" field can't resolve config dict #18767

Closed andnig closed 1 year ago

andnig commented 2 years ago

Environment

Both the check and the read commands fail - but the read command is more verbose in how it fails.

Example: source.yaml

  stream_slicer:
    type: "DatetimeStreamSlicer"
    start_datetime:
      datetime: "{{ config['start_date'] }}"
      datetime_format: "%Y-%m-%dT%H:%M:%SZ"
    end_datetime:
      datetime: "{{ now_utc() }}"
      datetime_format: "%Y-%m-%d %H:%M:%S.%f+00:00"
    step: "{{ config['slice_range'] }}"
    datetime_format: "%s"
    cursor_field: "{{ options['stream_cursor_field'] }}"

secrets/config.json

{"access_token": "secret", "backend_url": "secret", "dataset_id": "8f418098-ca28-4df5-9498-0df9fe78eda7", "start_date": "2019-10-21T00:00:00Z", "slice_range": "1d"}

The exception of python main.py read --config secrets/config.json --catalog integration_tests/configured_catalog.json --debug is as follows:

{"type": "TRACE", "trace": {"type": "ERROR", "emitted_at": 1667298757214.9302, "error": {"message": "Something went wrong in the connector. See the logs for more details.", "internal_message": "", "stack_trace": "Traceback (most recent call last):
  File \"/home/andreas/github/airbyte/airbyte-integrations/connectors/source-senseforce/main.py\", line 13, in <module>
    launch(source, sys.argv[1:])
  File \"/home/andreas/miniconda3/envs/airbyte_general/lib/python3.10/site-packages/airbyte_cdk/entrypoint.py\", line 131, in launch
    for message in source_entrypoint.run(parsed_args):
  File \"/home/andreas/miniconda3/envs/airbyte_general/lib/python3.10/site-packages/airbyte_cdk/entrypoint.py\", line 122, in run
    for message in generator:
  File \"/home/andreas/miniconda3/envs/airbyte_general/lib/python3.10/site-packages/airbyte_cdk/sources/abstract_source.py\", line 100, in read
    stream_instances = {s.name: s for s in self.streams(config)}
  File \"/home/andreas/miniconda3/envs/airbyte_general/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/yaml_declarative_source.py\", line 66, in streams
    source_streams = [self._factory.create_component(stream_config, config, True)() for stream_config in self._stream_configs()]
  File \"/home/andreas/miniconda3/envs/airbyte_general/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/yaml_declarative_source.py\", line 66, in <listcomp>
    source_streams = [self._factory.create_component(stream_config, config, True)() for stream_config in self._stream_configs()]
  File \"/home/andreas/miniconda3/envs/airbyte_general/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/parsers/factory.py\", line 129, in create_component
    return self.build(
  File \"/home/andreas/miniconda3/envs/airbyte_general/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/parsers/factory.py\", line 146, in build
    updated_kwargs = {k: self._create_subcomponent(k, v, kwargs, config, class_, instantiate) for k, v in kwargs.items()}
  File \"/home/andreas/miniconda3/envs/airbyte_general/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/parsers/factory.py\", line 146, in <dictcomp>
    updated_kwargs = {k: self._create_subcomponent(k, v, kwargs, config, class_, instantiate) for k, v in kwargs.items()}
  File \"/home/andreas/miniconda3/envs/airbyte_general/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/parsers/factory.py\", line 209, in _create_subcomponent
    return self.create_component(definition, config, instantiate)()
  File \"/home/andreas/miniconda3/envs/airbyte_general/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/parsers/factory.py\", line 129, in create_component
    return self.build(
  File \"/home/andreas/miniconda3/envs/airbyte_general/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/parsers/factory.py\", line 146, in build
    updated_kwargs = {k: self._create_subcomponent(k, v, kwargs, config, class_, instantiate) for k, v in kwargs.items()}
  File \"/home/andreas/miniconda3/envs/airbyte_general/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/parsers/factory.py\", line 146, in <dictcomp>
    updated_kwargs = {k: self._create_subcomponent(k, v, kwargs, config, class_, instantiate) for k, v in kwargs.items()}
  File \"/home/andreas/miniconda3/envs/airbyte_general/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/parsers/factory.py\", line 200, in _create_subcomponent
    return self.create_component(definition, config, instantiate)()
  File \"/home/andreas/miniconda3/envs/airbyte_general/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/create_partial.py\", line 56, in newfunc
    ret = func(*args, *fargs, **dynamic_args)
  File \"<string>\", line 16, in __init__
  File \"/home/andreas/miniconda3/envs/airbyte_general/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/stream_slicers/datetime_stream_slicer.py\", line 82, in __post_init__
    self._step = self._parse_timedelta(self.step)
  File \"/home/andreas/miniconda3/envs/airbyte_general/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/stream_slicers/datetime_stream_slicer.py\", line 195, in _parse_timedelta
    assert parts is not None
AssertionError
", "failure_type": "system_error"}}}

Expected Behavior

We can use the config to set the "step" variables in the DateTimeStreamSlicer without getting the assertionError

Logs

{"type": "DEBUG", "message": "Debug logs enabled", "data": {}}
{"type": "LOG", "log": {"level": "INFO", "message": "Starting syncing SourceSenseforce"}}
{"type": "DEBUG", "message": "parsed YAML into declarative source", "data": {"parsed_config": "{\"version\": \"0.1.0\", \"definitions\": {\"step\": \"{{ config['slice_range'] }}\", \"selector\": {\"extractor\": {\"field_pointer\": []}}, \"requester\": {\"url_base\": \"{{ config['backend_url'] }}\", \"http_method\": \"POST\", \"request_options_provider\": {\"request_body_data\": \"[{\\\"clause\\\": {\\\"type\\\": \\\"timestamp\\\", \\\"operator\\\": 10, \\\"parameters\\\": \
    [{\\\"value\\\": {{ stream_slice['start_time'] | int * 1000 }} or 0 },\
     {\\\"value\\\": {{ stream_slice['end_time'] | int * 1000 + (86400000 - 1) or utc_now() }} }\
    ]\
    \
  }, \\\"columnName\\\": \\\"Timestamp\\\"}]/\
\", \"request_headers\": {\"Content-Type\": \"application/json\"}}, \"authenticator\": {\"type\": \"BearerAuthenticator\", \"api_token\": \"{{ config['access_token'] }}\"}}, \"stream_slicer\": {\"type\": \"DatetimeStreamSlicer\", \"start_datetime\": {\"datetime\": \"{{ config['start_date'] }}\", \"datetime_format\": \"%Y-%m-%dT%H:%M:%SZ\"}, \"end_datetime\": {\"datetime\": \"{{ now_utc() }}\", \"datetime_format\": \"%Y-%m-%d %H:%M:%S.%f+00:00\"}, \"step\": \"{{ config['slice_range'] }}\", \"datetime_format\": \"%s\", \"cursor_field\": \"{{ options['stream_cursor_field'] }}\"}, \"retriever\": {\"record_selector\": {\"extractor\": {\"field_pointer\": []}}, \"paginator\": {\"type\": \"DefaultPaginator\", \"url_base\": \"{{ config['backend_url'] }}\", \"page_size_option\": {\"inject_into\": \"request_parameter\", \"field_name\": \"limit\"}, \"pagination_strategy\": {\"type\": \"OffsetIncrement\", \"page_size\": 10}, \"page_token_option\": {\"field_name\": \"offset\", \"inject_into\": \"request_parameter\"}}, \"stream_slicer\": {\"type\": \"DatetimeStreamSlicer\", \"start_datetime\": {\"datetime\": \"{{ config['start_date'] }}\", \"datetime_format\": \"%Y-%m-%dT%H:%M:%SZ\"}, \"end_datetime\": {\"datetime\": \"{{ now_utc() }}\", \"datetime_format\": \"%Y-%m-%d %H:%M:%S.%f+00:00\"}, \"step\": \"{{ config['slice_range'] }}\", \"datetime_format\": \"%s\", \"cursor_field\": \"{{ options['stream_cursor_field'] }}\"}, \"requester\": {\"url_base\": \"{{ config['backend_url'] }}\", \"http_method\": \"POST\", \"request_options_provider\": {\"request_body_data\": \"[{\\\"clause\\\": {\\\"type\\\": \\\"timestamp\\\", \\\"operator\\\": 10, \\\"parameters\\\": \
    [{\\\"value\\\": {{ stream_slice['start_time'] | int * 1000 }} or 0 },\
     {\\\"value\\\": {{ stream_slice['end_time'] | int * 1000 + (86400000 - 1) or utc_now() }} }\
    ]\
    \
  }, \\\"columnName\\\": \\\"Timestamp\\\"}]/\
\", \"request_headers\": {\"Content-Type\": \"application/json\"}}, \"authenticator\": {\"type\": \"BearerAuthenticator\", \"api_token\": \"{{ config['access_token'] }}\"}}}, \"base_stream\": {\"retriever\": {\"record_selector\": {\"extractor\": {\"field_pointer\": []}}, \"paginator\": {\"type\": \"DefaultPaginator\", \"url_base\": \"{{ config['backend_url'] }}\", \"page_size_option\": {\"inject_into\": \"request_parameter\", \"field_name\": \"limit\"}, \"pagination_strategy\": {\"type\": \"OffsetIncrement\", \"page_size\": 10}, \"page_token_option\": {\"field_name\": \"offset\", \"inject_into\": \"request_parameter\"}}, \"stream_slicer\": {\"type\": \"DatetimeStreamSlicer\", \"start_datetime\": {\"datetime\": \"{{ config['start_date'] }}\", \"datetime_format\": \"%Y-%m-%dT%H:%M:%SZ\"}, \"end_datetime\": {\"datetime\": \"{{ now_utc() }}\", \"datetime_format\": \"%Y-%m-%d %H:%M:%S.%f+00:00\"}, \"step\": \"{{ config['slice_range'] }}\", \"datetime_format\": \"%s\", \"cursor_field\": \"{{ options['stream_cursor_field'] }}\"}, \"requester\": {\"url_base\": \"{{ config['backend_url'] }}\", \"http_method\": \"POST\", \"request_options_provider\": {\"request_body_data\": \"[{\\\"clause\\\": {\\\"type\\\": \\\"timestamp\\\", \\\"operator\\\": 10, \\\"parameters\\\": \
    [{\\\"value\\\": {{ stream_slice['start_time'] | int * 1000 }} or 0 },\
     {\\\"value\\\": {{ stream_slice['end_time'] | int * 1000 + (86400000 - 1) or utc_now() }} }\
    ]\
    \
  }, \\\"columnName\\\": \\\"Timestamp\\\"}]/\
\", \"request_headers\": {\"Content-Type\": \"application/json\"}}, \"authenticator\": {\"type\": \"BearerAuthenticator\", \"api_token\": \"{{ config['access_token'] }}\"}}}}, \"dataset_stream\": {\"retriever\": {\"record_selector\": {\"extractor\": {\"field_pointer\": []}}, \"paginator\": {\"type\": \"DefaultPaginator\", \"url_base\": \"{{ config['backend_url'] }}\", \"page_size_option\": {\"inject_into\": \"request_parameter\", \"field_name\": \"limit\"}, \"pagination_strategy\": {\"type\": \"OffsetIncrement\", \"page_size\": 10}, \"page_token_option\": {\"field_name\": \"offset\", \"inject_into\": \"request_parameter\"}}, \"stream_slicer\": {\"type\": \"DatetimeStreamSlicer\", \"start_datetime\": {\"datetime\": \"{{ config['start_date'] }}\", \"datetime_format\": \"%Y-%m-%dT%H:%M:%SZ\"}, \"end_datetime\": {\"datetime\": \"{{ now_utc() }}\", \"datetime_format\": \"%Y-%m-%d %H:%M:%S.%f+00:00\"}, \"step\": \"{{ config['slice_range'] }}\", \"datetime_format\": \"%s\", \"cursor_field\": \"{{ options['stream_cursor_field'] }}\"}, \"requester\": {\"url_base\": \"{{ config['backend_url'] }}\", \"http_method\": \"POST\", \"request_options_provider\": {\"request_body_data\": \"[{\\\"clause\\\": {\\\"type\\\": \\\"timestamp\\\", \\\"operator\\\": 10, \\\"parameters\\\": \
    [{\\\"value\\\": {{ stream_slice['start_time'] | int * 1000 }} or 0 },\
     {\\\"value\\\": {{ stream_slice['end_time'] | int * 1000 + (86400000 - 1) or utc_now() }} }\
    ]\
    \
  }, \\\"columnName\\\": \\\"Timestamp\\\"}]/\
\", \"request_headers\": {\"Content-Type\": \"application/json\"}}, \"authenticator\": {\"type\": \"BearerAuthenticator\", \"api_token\": \"{{ config['access_token'] }}\"}}}, \"$options\": {\"name\": \"dataset\", \"primary_key\": [\"Timestamp\", \"Thing\", \"Id\"], \"path\": \"/api/dataset/execute/8f418098-ca28-4df5-9498-0df9fe78eda7\", \"stream_cursor_field\": \"timestamp\"}, \"class_name\": \"airbyte_cdk.sources.declarative.declarative_stream.DeclarativeStream\"}}, \"streams\": [{\"retriever\": {\"record_selector\": {\"extractor\": {\"field_pointer\": []}}, \"paginator\": {\"type\": \"DefaultPaginator\", \"url_base\": \"{{ config['backend_url'] }}\", \"page_size_option\": {\"inject_into\": \"request_parameter\", \"field_name\": \"limit\"}, \"pagination_strategy\": {\"type\": \"OffsetIncrement\", \"page_size\": 10}, \"page_token_option\": {\"field_name\": \"offset\", \"inject_into\": \"request_parameter\"}}, \"stream_slicer\": {\"type\": \"DatetimeStreamSlicer\", \"start_datetime\": {\"datetime\": \"{{ config['start_date'] }}\", \"datetime_format\": \"%Y-%m-%dT%H:%M:%SZ\"}, \"end_datetime\": {\"datetime\": \"{{ now_utc() }}\", \"datetime_format\": \"%Y-%m-%d %H:%M:%S.%f+00:00\"}, \"step\": \"{{ config['slice_range'] }}\", \"datetime_format\": \"%s\", \"cursor_field\": \"{{ options['stream_cursor_field'] }}\"}, \"requester\": {\"url_base\": \"{{ config['backend_url'] }}\", \"http_method\": \"POST\", \"request_options_provider\": {\"request_body_data\": \"[{\\\"clause\\\": {\\\"type\\\": \\\"timestamp\\\", \\\"operator\\\": 10, \\\"parameters\\\": \
    [{\\\"value\\\": {{ stream_slice['start_time'] | int * 1000 }} or 0 },\
     {\\\"value\\\": {{ stream_slice['end_time'] | int * 1000 + (86400000 - 1) or utc_now() }} }\
    ]\
    \
  }, \\\"columnName\\\": \\\"Timestamp\\\"}]/\
\", \"request_headers\": {\"Content-Type\": \"application/json\"}}, \"authenticator\": {\"type\": \"BearerAuthenticator\", \"api_token\": \"{{ config['access_token'] }}\"}}}, \"$options\": {\"name\": \"dataset\", \"primary_key\": [\"Timestamp\", \"Thing\", \"Id\"], \"path\": \"/api/dataset/execute/8f418098-ca28-4df5-9498-0df9fe78eda7\", \"stream_cursor_field\": \"timestamp\"}, \"class_name\": \"airbyte_cdk.sources.declarative.declarative_stream.DeclarativeStream\"}], \"check\": {\"stream_names\": [\"dataset\"]}}", "path_to_yaml_file": "senseforce.yaml", "source_name": "SourceSenseforce"}}
{"type": "LOG", "log": {"level": "FATAL", "message": "
Traceback (most recent call last):
  File \"/home/andreas/github/airbyte/airbyte-integrations/connectors/source-senseforce/main.py\", line 13, in <module>
    launch(source, sys.argv[1:])
  File \"/home/andreas/miniconda3/envs/airbyte_general/lib/python3.10/site-packages/airbyte_cdk/entrypoint.py\", line 131, in launch
    for message in source_entrypoint.run(parsed_args):
  File \"/home/andreas/miniconda3/envs/airbyte_general/lib/python3.10/site-packages/airbyte_cdk/entrypoint.py\", line 122, in run
    for message in generator:
  File \"/home/andreas/miniconda3/envs/airbyte_general/lib/python3.10/site-packages/airbyte_cdk/sources/abstract_source.py\", line 100, in read
    stream_instances = {s.name: s for s in self.streams(config)}
  File \"/home/andreas/miniconda3/envs/airbyte_general/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/yaml_declarative_source.py\", line 66, in streams
    source_streams = [self._factory.create_component(stream_config, config, True)() for stream_config in self._stream_configs()]
  File \"/home/andreas/miniconda3/envs/airbyte_general/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/yaml_declarative_source.py\", line 66, in <listcomp>
    source_streams = [self._factory.create_component(stream_config, config, True)() for stream_config in self._stream_configs()]
  File \"/home/andreas/miniconda3/envs/airbyte_general/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/parsers/factory.py\", line 129, in create_component
    return self.build(
  File \"/home/andreas/miniconda3/envs/airbyte_general/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/parsers/factory.py\", line 146, in build
    updated_kwargs = {k: self._create_subcomponent(k, v, kwargs, config, class_, instantiate) for k, v in kwargs.items()}
  File \"/home/andreas/miniconda3/envs/airbyte_general/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/parsers/factory.py\", line 146, in <dictcomp>
    updated_kwargs = {k: self._create_subcomponent(k, v, kwargs, config, class_, instantiate) for k, v in kwargs.items()}
  File \"/home/andreas/miniconda3/envs/airbyte_general/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/parsers/factory.py\", line 209, in _create_subcomponent
    return self.create_component(definition, config, instantiate)()
  File \"/home/andreas/miniconda3/envs/airbyte_general/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/parsers/factory.py\", line 129, in create_component
    return self.build(
  File \"/home/andreas/miniconda3/envs/airbyte_general/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/parsers/factory.py\", line 146, in build
    updated_kwargs = {k: self._create_subcomponent(k, v, kwargs, config, class_, instantiate) for k, v in kwargs.items()}
  File \"/home/andreas/miniconda3/envs/airbyte_general/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/parsers/factory.py\", line 146, in <dictcomp>
    updated_kwargs = {k: self._create_subcomponent(k, v, kwargs, config, class_, instantiate) for k, v in kwargs.items()}
  File \"/home/andreas/miniconda3/envs/airbyte_general/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/parsers/factory.py\", line 200, in _create_subcomponent
    return self.create_component(definition, config, instantiate)()
  File \"/home/andreas/miniconda3/envs/airbyte_general/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/create_partial.py\", line 56, in newfunc
    ret = func(*args, *fargs, **dynamic_args)
  File \"<string>\", line 16, in __init__
  File \"/home/andreas/miniconda3/envs/airbyte_general/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/stream_slicers/datetime_stream_slicer.py\", line 82, in __post_init__
    self._step = self._parse_timedelta(self.step)
  File \"/home/andreas/miniconda3/envs/airbyte_general/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/stream_slicers/datetime_stream_slicer.py\", line 195, in _parse_timedelta
    assert parts is not None
AssertionError"}}
{"type": "TRACE", "trace": {"type": "ERROR", "emitted_at": 1667298757214.9302, "error": {"message": "Something went wrong in the connector. See the logs for more details.", "internal_message": "", "stack_trace": "Traceback (most recent call last):
  File \"/home/andreas/github/airbyte/airbyte-integrations/connectors/source-senseforce/main.py\", line 13, in <module>
    launch(source, sys.argv[1:])
  File \"/home/andreas/miniconda3/envs/airbyte_general/lib/python3.10/site-packages/airbyte_cdk/entrypoint.py\", line 131, in launch
    for message in source_entrypoint.run(parsed_args):
  File \"/home/andreas/miniconda3/envs/airbyte_general/lib/python3.10/site-packages/airbyte_cdk/entrypoint.py\", line 122, in run
    for message in generator:
  File \"/home/andreas/miniconda3/envs/airbyte_general/lib/python3.10/site-packages/airbyte_cdk/sources/abstract_source.py\", line 100, in read
    stream_instances = {s.name: s for s in self.streams(config)}
  File \"/home/andreas/miniconda3/envs/airbyte_general/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/yaml_declarative_source.py\", line 66, in streams
    source_streams = [self._factory.create_component(stream_config, config, True)() for stream_config in self._stream_configs()]
  File \"/home/andreas/miniconda3/envs/airbyte_general/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/yaml_declarative_source.py\", line 66, in <listcomp>
    source_streams = [self._factory.create_component(stream_config, config, True)() for stream_config in self._stream_configs()]
  File \"/home/andreas/miniconda3/envs/airbyte_general/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/parsers/factory.py\", line 129, in create_component
    return self.build(
  File \"/home/andreas/miniconda3/envs/airbyte_general/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/parsers/factory.py\", line 146, in build
    updated_kwargs = {k: self._create_subcomponent(k, v, kwargs, config, class_, instantiate) for k, v in kwargs.items()}
  File \"/home/andreas/miniconda3/envs/airbyte_general/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/parsers/factory.py\", line 146, in <dictcomp>
    updated_kwargs = {k: self._create_subcomponent(k, v, kwargs, config, class_, instantiate) for k, v in kwargs.items()}
  File \"/home/andreas/miniconda3/envs/airbyte_general/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/parsers/factory.py\", line 209, in _create_subcomponent
    return self.create_component(definition, config, instantiate)()
  File \"/home/andreas/miniconda3/envs/airbyte_general/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/parsers/factory.py\", line 129, in create_component
    return self.build(
  File \"/home/andreas/miniconda3/envs/airbyte_general/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/parsers/factory.py\", line 146, in build
    updated_kwargs = {k: self._create_subcomponent(k, v, kwargs, config, class_, instantiate) for k, v in kwargs.items()}
  File \"/home/andreas/miniconda3/envs/airbyte_general/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/parsers/factory.py\", line 146, in <dictcomp>
    updated_kwargs = {k: self._create_subcomponent(k, v, kwargs, config, class_, instantiate) for k, v in kwargs.items()}
  File \"/home/andreas/miniconda3/envs/airbyte_general/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/parsers/factory.py\", line 200, in _create_subcomponent
    return self.create_component(definition, config, instantiate)()
  File \"/home/andreas/miniconda3/envs/airbyte_general/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/create_partial.py\", line 56, in newfunc
    ret = func(*args, *fargs, **dynamic_args)
  File \"<string>\", line 16, in __init__
  File \"/home/andreas/miniconda3/envs/airbyte_general/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/stream_slicers/datetime_stream_slicer.py\", line 82, in __post_init__
    self._step = self._parse_timedelta(self.step)
  File \"/home/andreas/miniconda3/envs/airbyte_general/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/stream_slicers/datetime_stream_slicer.py\", line 195, in _parse_timedelta
    assert parts is not None
AssertionError
", "failure_type": "system_error"}}}

Steps to Reproduce

  1. Create a source.yaml and configure it with a DateStreamSlicer (example see above)
  2. Add a {{ config['your-config-for-slice-window'] }} to the "step" field of the stream slicer
  3. Run python main.py read --config secrets/config.json --catalog integration_tests/configured_catalog.json --debug

Note: If I use the exact same example but use step: "1d" instead of the config - dict, everything works as expected.

Example of source.yaml to reproduce

(For an example of config.json - see above)

version: "0.1.0"

definitions:
  step: "{{ config['slice_range'] }}"
  selector:
    extractor:
      field_pointer: []
  requester:
    # url_base: "http://localhost:8080"
    url_base: "{{ config['backend_url'] }}"
    http_method: "POST"
    request_options_provider:
      request_body_data: |
          [{"clause": {"type": "timestamp", "operator": 10, "parameters": 
              [{"value": {{ stream_slice['start_time'] | int * 1000 }} or 0 },
               {"value": {{ stream_slice['end_time'] | int * 1000 + (86400000 - 1) or utc_now() }} }
              ]

            }, "columnName": "Timestamp"}]/
      request_headers:
        Content-Type: application/json
    authenticator:
      type: BearerAuthenticator
      api_token: "{{ config['access_token'] }}"
  stream_slicer:
    type: "DatetimeStreamSlicer"
    start_datetime:
      datetime: "{{ config['start_date'] }}"
      datetime_format: "%Y-%m-%dT%H:%M:%SZ"
    end_datetime:
      datetime: "{{ now_utc() }}"
      datetime_format: "%Y-%m-%d %H:%M:%S.%f+00:00"
    step: "1d"
    datetime_format: "%s"
    cursor_field: "{{ options['stream_cursor_field'] }}"
  retriever:
    record_selector:
      $ref: "*ref(definitions.selector)"
    paginator:
      type: DefaultPaginator
      url_base: "*ref(definitions.requester.url_base)"
      page_size_option:
        inject_into: "request_parameter"
        field_name: "limit"
      pagination_strategy:
        type: "OffsetIncrement"
        page_size: 10
      page_token_option:
        field_name: "offset"
        inject_into: "request_parameter"
    stream_slicer:
      $ref: "*ref(definitions.stream_slicer)"
    requester:
      $ref: "*ref(definitions.requester)"
  base_stream:
    retriever:
      $ref: "*ref(definitions.retriever)"
  dataset_stream:
    $ref: "*ref(definitions.base_stream)"
    $options:
      name: "dataset"
      primary_key:
        - "Timestamp"
        - "Thing"
        - "Id"
      path: "/api/dataset/execute/{{ config['dataset_id']}}"
      stream_cursor_field: "timestamp"

streams:
  - "*ref(definitions.dataset_stream)"

check:
  stream_names:
    - "dataset"
maxi297 commented 1 year ago

Implemented in https://github.com/airbytehq/airbyte/pull/21930