airbytehq / PyAirbyte

PyAirbyte brings the power of Airbyte to every Python developer.
https://docs.airbyte.com/pyairbyte
Other
205 stars 30 forks source link

PyAirbyteNameNormalizationError: PyAirbyteNameNormalizationError: Name cannot be empty after normalization. #344

Closed KaifAhmad1 closed 1 week ago

KaifAhmad1 commented 3 weeks ago

Connector Name

S3

Connector Version

NA

What step the error happened?

Configuring a new connector

Relevant information

import airbyte as ab

source = ab.get_source(
    "source-s3",
    config={
        "streams": [
            {
                "name": "",
                "format": {
                    "filetype": "csv",
                    "ignore_errors_on_fields_mismatch": True,
                },
                "globs": ["**"],
                "legacy_prefix": "",
                "validation_policy": "Emit Record",
            }
        ],
        "bucket": ab.get_secret("S3_BUCKET_NAME"),
        "aws_access_key_id": ab.get_secret("AWS_ACCESS_KEY"),
        "aws_secret_access_key": ab.get_secret("AWS_SECRET_KEY"),
        "region_name": ab.get_secret("AWS_REGION")
    }
)

source.check()
Connection check succeeded for `source-s3`.
source.select_all_streams() # Select all streams
read_result = source.read() # Read the data

Relevant log output

Sync Progress: source-s3 -> DuckDBCache
Started reading from source at 14:59:07:

Read 770 records over 4.0 seconds (193.5 records / second).

Cached 770 records into 1 local cache file(s).

Finished reading from source at 14:59:14.

Started cache processing at 14:59:14:

Processed 0 cache file(s) over 0.00 seconds.
Failed `source-s3 -> DuckDBCache` sync at `14:59:14`.
---------------------------------------------------------------------------
PyAirbyteNameNormalizationError           Traceback (most recent call last)
<ipython-input-4-5da58c248444> in <cell line: 2>()
      1 source.select_all_streams() # Select all streams
----> 2 read_result = source.read() # Read the data

/usr/local/lib/python3.10/dist-packages/airbyte/sources/base.py in read(self, cache, streams, write_strategy, force_full_refresh, skip_validation)
    642 
    643         try:
--> 644             result = self._read_to_cache(
    645                 cache=cache,
    646                 catalog_provider=CatalogProvider(self.configured_catalog),

/usr/local/lib/python3.10/dist-packages/airbyte/sources/base.py in _read_to_cache(self, cache, catalog_provider, stream_names, state_provider, state_writer, write_strategy, force_full_refresh, skip_validation, progress_tracker)
    729             state_writer=state_writer,
    730         )
--> 731         cache_processor.process_airbyte_messages(
    732             messages=airbyte_message_iterator,
    733             write_strategy=write_strategy,

/usr/local/lib/python3.10/dist-packages/airbyte/_future_cdk/record_processor.py in process_airbyte_messages(self, messages, write_strategy, progress_tracker)
    241         # We've finished processing input data.
    242         # Finalize all received records and state messages:
--> 243         self.write_all_stream_data(
    244             write_strategy=write_strategy,
    245             progress_tracker=progress_tracker,

/usr/local/lib/python3.10/dist-packages/airbyte/_future_cdk/record_processor.py in write_all_stream_data(self, write_strategy, progress_tracker)
    259         """
    260         for stream_name in sorted(self.catalog_provider.stream_names):
--> 261             self.write_stream_data(
    262                 stream_name,
    263                 write_strategy=write_strategy,

/usr/local/lib/python3.10/dist-packages/airbyte/_future_cdk/sql_processor.py in write_stream_data(self, stream_name, write_strategy, progress_tracker)
    503             # Make sure the target schema and target table exist.
    504             self._ensure_schema_exists()
--> 505             final_table_name = self._ensure_final_table_exists(
    506                 stream_name,
    507                 create_if_missing=True,

/usr/local/lib/python3.10/dist-packages/airbyte/_future_cdk/sql_processor.py in _ensure_final_table_exists(self, stream_name, create_if_missing)
    407         Return the table name.
    408         """
--> 409         table_name = self.get_sql_table_name(stream_name)
    410         did_exist = self._table_exists(table_name)
    411         if not did_exist and create_if_missing:

/usr/local/lib/python3.10/dist-packages/airbyte/_future_cdk/sql_processor.py in get_sql_table_name(self, stream_name)
    207         """Return the name of the SQL table for the given stream."""
    208         table_prefix = self.sql_config.table_prefix
--> 209         return self.normalizer.normalize(
    210             f"{table_prefix}{stream_name}",
    211         )

/usr/local/lib/python3.10/dist-packages/airbyte/_util/name_normalizers.py in normalize(name)
     79 
     80         if not result.replace("_", ""):
---> 81             raise exc.PyAirbyteNameNormalizationError(
     82                 message="Name cannot be empty after normalization.",
     83                 raw_name=name,

PyAirbyteNameNormalizationError: PyAirbyteNameNormalizationError: Name cannot be empty after normalization.
    Raw Name: ''
    Normalization Result: ''

Contribute

aaronsteers commented 1 week ago

@KaifAhmad1 - This is a configuration error on the source connector.

You should be able to replace this:

        "streams": [
            {
                "name": "",

With this:

        "streams": [
            {
                "name": "my_stream",

Where "my_stream" can be anything you'd like to name the stream.

aaronsteers commented 1 week ago

Closing as resolved. But please re-open or let me know if you run into any further issues.