Unable to export a dataframe to bigquery in streaming pipeline
To reproduce
Create a strem pipeline
use gcloud pub sub emulator as source
consume data, transform it into a dataframe
export to bigquery
Expected behavior
Create a new table in bigquery with the data
Screenshots
ighteous_night] [GoogleCloudPubSubSource] Subscription already exists: projects/spacy/subscriptions/spacy-sub
[righteous_night] BigQuery initialized
[righteous_night] └─ Connecting to BigQuery warehouse...
[righteous_night] DONE
[righteous_night] [GoogleCloudPubSubSource] Start consuming batch messages.
[righteous_night] [GoogleCloudPubSubSource] Number of received messages: 1
[gracious_rogue] {'spacy': ['this is really spacy']}
[righteous_night] [GenericIOSink] Batch ingest 1 records, time=1708880014.3901374. Sample: spacy
[righteous_night] 0 this is really spacy
[righteous_night]
[righteous_night] ├─
[righteous_night] └─ Exporting data to table ''...
Pipeline righteous_night execution failed with error:
Traceback (most recent call last):
File "/usr/local/lib/python3.10/site-packages/mage_ai/server/websocket_server.py", line 116, in run_pipeline
pipeline.execute_sync(
File "/usr/local/lib/python3.10/site-packages/mage_ai/data_preparation/models/pipeline.py", line 616, in execute_sync
StreamingPipelineExecutor(self).execute(
File "/usr/local/lib/python3.10/site-packages/mage_ai/data_preparation/executors/streaming_pipeline_executor.py", line 97, in execute
raise e
File "/usr/local/lib/python3.10/site-packages/mage_ai/data_preparation/executors/streaming_pipeline_executor.py", line 87, in execute
self.__execute_in_python(
File "/usr/local/lib/python3.10/site-packages/mage_ai/data_preparation/executors/streaming_pipeline_executor.py", line 206, in __execute_in_python
source.batch_read(handler=handle_batch_events)
File "/usr/local/lib/python3.10/site-packages/mage_ai/streaming/sources/google_cloud_pubsub.py", line 149, in batch_read
handler(message_values) # Handle the received messages.
File "/usr/local/lib/python3.10/site-packages/mage_ai/data_preparation/executors/streaming_pipeline_executor.py", line 177, in handle_batch_events
handle_batch_events_recursively(
File "/usr/local/lib/python3.10/site-packages/mage_ai/data_preparation/executors/streaming_pipeline_executor.py", line 165, in handle_batch_events_recursively
handle_batch_events_recursively(
File "/usr/local/lib/python3.10/site-packages/mage_ai/data_preparation/executors/streaming_pipeline_executor.py", line 162, in handle_batch_events_recursively
sinks_by_uuid[downstream_block.uuid].batch_write(
File "/usr/local/lib/python3.10/site-packages/mage_ai/streaming/sinks/generic_io.py", line 73, in batch_write
self.io_client.export(
File "/usr/local/lib/python3.10/site-packages/mage_ai/io/bigquery.py", line 319, in export
__process(database=database)
File "/usr/local/lib/python3.10/site-packages/mage_ai/io/bigquery.py", line 313, in __process
df.columns = df.columns.str.replace(' ', '_')
File "/usr/local/lib/python3.10/site-packages/pandas/core/accessor.py", line 182, in __get__
accessor_obj = self._accessor(obj)
File "/usr/local/lib/python3.10/site-packages/pandas/core/strings/accessor.py", line 181, in __init__
self._inferred_dtype = self._validate(data)
File "/usr/local/lib/python3.10/site-packages/pandas/core/strings/accessor.py", line 235, in _validate
raise AttributeError("Can only use .str accessor with string values!")
AttributeError: Can only use .str accessor with string values!
Mage version
v0.9.64
Describe the bug
Unable to export a dataframe to bigquery in streaming pipeline
To reproduce
Expected behavior
Create a new table in bigquery with the data
Screenshots
Operating system
Runing mage on docker
Additional context
No response