Closed vincentkoc closed 1 year ago
Issue comes from:
tagged original reviewers and committers
The issue seemed to be that bulk API and REST API cannot cover tables with "many" columns and "special" datatypes. The REST API has this limit while Bulk API has this list and does NOT support "object" and "base64" types.
Can we exclude certain columns to bypass the issue ? I specified limited number of columns in ConfiguredCatalog, but that seems not changing any behavior.
@poolmaster is there some query i could run in Salesforce Workbench or otherwise to output the cols and the data types etc for us to diagnose the issue in relation to the Bulk vs REST API calls?
Any way we can update logic to ignore the columns beyond a certain point for now to resolve the block/issue temporarily?
@koconder I don't have one. But I'm pretty sure that was the root cause. It basically blocks the connector from working for certain some critical tables. I think we need to support either:
Option 2 seems better IMHO
@poolmaster are you willing to raise a PR for option 2? I'm happy to help review, test and push it through.
This is my first time using AirByte (testing it as a potential solution in our infrastructure). I am bumping into this same issue. Our SFDC schema is very complex and has a lot of fields. I think this is the bug I am running into: https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connectors/source-salesforce/source_salesforce/source.py#L69
I changed that code to this:
@classmethod
def _get_api_type(cls, stream_name, properties):
# Salesforce BULK API currently does not support loading fields with data type base64 and compound data
properties_not_supported_by_bulk = {
key: value for key, value in properties.items() if value.get("format") == "base64" or "object" in value["type"]
}
properties_length = len(",".join(p for p in properties))
rest_required = stream_name in UNSUPPORTED_BULK_API_SALESFORCE_OBJECTS or properties_not_supported_by_bulk
# If we have a lot of properties we can overcome REST API URL length and get an error: "reason: URI Too Long".
# For such cases connector tries to use BULK API because it uses POST request and passes properties in the request body.
bulk_required = properties_length + 2000 > Salesforce.REQUEST_SIZE_LIMITS
print(f"xxxxxxxxxx bulk_required: {bulk_required} properties_lengthL {properties_length} + LIMIT: {Salesforce.REQUEST_SIZE_LIMITS}")
if rest_required and not bulk_required:
return "rest"
if not rest_required:
return "bulk"
And this is the output:
2023-02-09 05:45:12 INFO i.a.w.i.DefaultAirbyteStreamFactory(parseJson):125 - xxxxxxxxxx bulk_required: True properties_lengthL 14846 + LIMIT: 16384
airbyte-worker | 2023-02-09 05:45:12 ERROR i.a.w.i.DefaultAirbyteStreamFactory(internalLog):163 - None: Stream Account cannot be processed by REST or BULK API.
airbyte-worker | Traceback (most recent call last):
airbyte-worker | File "/airbyte/integration_code/main.py", line 13, in <module>
airbyte-worker | launch(source, sys.argv[1:])
airbyte-worker | File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/entrypoint.py", line 131, in launch
airbyte-worker | for message in source_entrypoint.run(parsed_args):
airbyte-worker | File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/entrypoint.py", line 116, in run
airbyte-worker | catalog = self.source.discover(self.logger, config)
airbyte-worker | File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/abstract_source.py", line 75, in discover
airbyte-worker | streams = [stream.as_airbyte_stream() for stream in self.streams(config=config)]
airbyte-worker | File "/airbyte/integration_code/source_salesforce/source.py", line 110, in streams
airbyte-worker | streams = self.generate_streams(config, stream_objects, sf)
airbyte-worker | File "/airbyte/integration_code/source_salesforce/source.py", line 95, in generate_streams
airbyte-worker | raise Exception(f"{api_type}: Stream {stream_name} cannot be processed by REST or BULK API.")
The return boolean logic is just a little off. It should return "bulk"
but is returning None
in this case because the code falls through. This is not the perfect solution, but to test, I changed the return to this, and it worked for me:
rest_required = stream_name in UNSUPPORTED_BULK_API_SALESFORCE_OBJECTS or properties_not_supported_by_bulk
# If we have a lot of properties we can overcome REST API URL length and get an error: "reason: URI Too Long".
# For such cases connector tries to use BULK API because it uses POST request and passes properties in the request body.
bulk_required = properties_length + 2000 > Salesforce.REQUEST_SIZE_LIMITS
print(f"xxxxxxxxxx bulk_required: {bulk_required} properties_lengthL {properties_length} + LIMIT: {Salesforce.REQUEST_SIZE_LIMITS}")
if rest_required and not bulk_required:
return "rest"
if not rest_required:
return "bulk"
return "bulk" # this case needs handled better
This looks like it was likely resolved in the PR: https://github.com/airbytehq/airbyte/pull/22597/files#diff-1180da547d1ffa8dbd6c895174d912b0023e3ccfce51920fab55913a0629baa8
Seems like #22597 has added new issues:
airbytehq/oncall/issues/1403 cc @davydov-d
2023-02-12 22:50:25 source > Syncing stream: Account
2023-02-12 22:50:25 source > <h1>Bad Message 431</h1><pre>reason: Request Header Fields Too Large</pre>
2023-02-12 22:50:25 source > Encountered an exception while reading stream Account
Traceback (most recent call last):
File "/usr/local/lib/python3.9/site-packages/requests/models.py", line 971, in json
return complexjson.loads(self.text, **kwargs)
File "/usr/local/lib/python3.9/json/__init__.py", line 346, in loads
return _default_decoder.decode(s)
File "/usr/local/lib/python3.9/json/decoder.py", line 337, in decode
obj, end = self.raw_decode(s, idx=_w(s, 0).end())
File "/usr/local/lib/python3.9/json/decoder.py", line 355, in raw_decode
raise JSONDecodeError("Expecting value", s, err.value) from None
json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)
During handling of the above exception, another exception occurred:
@koconder may I ask you to share your credentials if possible so I could reproduce this? or at least the log file of the failed sync. Thanks!
@koconder may I ask you to share your credentials if possible so I could reproduce this? or at least the log file of the failed sync. Thanks!
nevermind, I have found the root cause and prepared a patch for that: https://github.com/airbytehq/airbyte/pull/22896
Fixed in https://github.com/airbytehq/airbyte/pull/22896
Please feel free to reopen or contact me in case of any questions
Still an issue @davydov-d
raise e
File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/abstract_source.py", line 120, in read
yield from self._read_stream(
File "/airbyte/integration_code/source_salesforce/source.py", line 138, in _read_stream
yield from super()._read_stream(logger, stream_instance, configured_stream, state_manager, internal_config)
File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/abstract_source.py", line 189, in _read_stream
for record in record_iterator:
File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/abstract_source.py", line 256, in _read_incremental
for message_counter, record_data_or_message in enumerate(records, start=1):
File "/airbyte/integration_code/source_salesforce/streams.py", line 162, in read_records
yield from self._read_pages(
File "/airbyte/integration_code/source_salesforce/streams.py", line 209, in _read_pages
chunk_page_records = {record[self.primary_key]: record for record in chunk_page_records}
File "/airbyte/integration_code/source_salesforce/streams.py", line 209, in <dictcomp>
chunk_page_records = {record[self.primary_key]: record for record in chunk_page_records}
KeyError: 'Id'
2023-02-17 04:30:21 destination > Airbyte message consumer: succeeded.
2023-02-17 04:30:21 destination > executing on success close procedure.
2023-02-17 04:30:21 destination > Flushing all 0 current buffers (0 bytes in total)
2023-02-17 04:30:21 destination > Completed integration: io.airbyte.integrations.destination.s3.S3Destination
2023-02-17 04:30:21 destination > Completed destination: io.airbyte.integrations.destination.s3.S3Destination
I will send you the full log on Slack
oh dang, I think I know what the problem is
Still buggy, new bug introduced:
Inconsistent record with primary key 0014a000007hTpOAAU found. It consists of 1 chunks instead of 2. Skipping it.
This issue above repeats for all rows in the first object which was originally the issue Accounts
the fix has been released
@davydov-d im testing now... so far the chunking has started fingers crosssed One error so far for each chunk
2023-03-03 09:25:26 ERROR c.n.s.DateTimeValidator(tryParse):82 - Invalid date-time: Invalid timezone offset: +0000
@koconder that is rather a warning, and it used to be logged even before chunks were introduced. In case it's something critical, please report a new issue since it's not related to this one.
https://github.com/airbytehq/oncall/issues/1571 confirmed it works