airbytehq / airbyte

The leading data integration platform for ETL / ELT data pipelines from APIs, databases & files to data warehouses, data lakes & data lakehouses. Both self-hosted and Cloud-hosted.
https://airbyte.com
Other
15.31k stars 3.95k forks source link

[connector-builder] Schema inference fails on null types #39133

Open tturkenitz opened 2 months ago

tturkenitz commented 2 months ago

Connector Name

connector-builder

Connector Version

0.61.0

What step the error happened?

Configuring a new connector

Relevant information

Environment

Airbyte version: OSS Step where error happened: Connector Builder: Test

Current Behavior

When attempting to test Connector Builder configuration the process will fail during schema inference if type is Null or missing.

Stacktrace:

  Traceback (most recent call last):
  File \"/home/airbyte/.pyenv/versions/3.9.11/lib/python3.9/site-packages/airbyte_cdk/connector_builder/connector_builder_handler.py\", line 66, in read_stream
    stream_read = handler.get_message_groups(source, config, configured_catalog, state, limits.max_records)
  File \"/home/airbyte/.pyenv/versions/3.9.11/lib/python3.9/site-packages/airbyte_cdk/connector_builder/message_grouper.py\", line 125, in get_message_groups
    schema = schema_inferrer.get_stream_schema(configured_stream.stream.name)
  File \"/home/airbyte/.pyenv/versions/3.9.11/lib/python3.9/site-packages/airbyte_cdk/utils/schema_inferrer.py\", line 209, in get_stream_schema
    self._add_required_properties(self._clean(self.stream_to_builder[stream_name].to_schema()))
  File \"/home/airbyte/.pyenv/versions/3.9.11/lib/python3.9/site-packages/airbyte_cdk/utils/schema_inferrer.py\", line 107, in _clean
    self._clean(value)
  File \"/home/airbyte/.pyenv/versions/3.9.11/lib/python3.9/site-packages/airbyte_cdk/utils/schema_inferrer.py\", line 107, in _clean
    self._clean(value)
  File \"/home/airbyte/.pyenv/versions/3.9.11/lib/python3.9/site-packages/airbyte_cdk/utils/schema_inferrer.py\", line 107, in _clean
    self._clean(value)
  File \"/home/airbyte/.pyenv/versions/3.9.11/lib/python3.9/site-packages/airbyte_cdk/utils/schema_inferrer.py\", line 112, in _clean
    if isinstance(node[\"type\"], list):
KeyError: 'type'

Issue seems to be when evaluating schema_inferrer.py at line 112:

if isinstance(node["type"], list):
    if _NULL_TYPE in node["type"]:
        # we want to make sure null is always at the end as it makes schemas more readable
        node["type"].remove(_NULL_TYPE)
    node["type"].append(_NULL_TYPE)
else:
    node["type"] = [node["type"], _NULL_TYPE]

I was able to get around it by modifying the code locally and adding this check before line 112:

if node.get("type", "") == "":
    node["type"] = "string"

Expected Behavior

I think it would be great if the connector builder can implicitly convert null types to STRING. It would also be nice if a more detailed error regarding which "Attribute" (aka Node) is causing the issue.

I can't upload the full logs. I noticed that Secrets are included in the logs for the Connector Builder I would rather not risk it.

Relevant log output

No response

Contribute

imrehg commented 2 months ago

I also hit this issue while working on changes for #32814.

I've wanted to add, that this happens when parsing the response from the server (ie. already have to have something set up, and talking to some end point), and when it happens, there's no request nor response log shown, which makes it more difficult to debug (since it's response parsing, but it's unclear what's the response that triggers it. For example in my case most of the tested API endpoints work just fine, but some reliably triggers this issue.

imrehg commented 2 months ago

@tturkenitz could you try the proposed changes in the above PR, if you have a chance? Edit: It's slightly different from your workaround, but the spirit is the same (ie. setting defaults when "type" not available) it should be getting to the root of the problem, see the comment below.

imrehg commented 2 months ago

I got my debug system working and actually it's likely a more complex edge case than the linked MR. The node values that triggers for me has this value:

{
    "anyOf": [
        {"type": ["boolean", "null", "string"]},
        {
            "type": "object",
            "properties": {
                "type": {"type": "string"},
                "interval": {"type": "string"},
                "maxValue": {"type": "number"},
                "minValue": {"type": "number"},
                "currencyCode": {"type": "string"},
            },
        },
    ]
}

Thus what happens is not that "there's no type info", it's that "the type info is not correctly extracted from an anyOf?

I've updated the MR to fix the issues correctly for my case, I do wonder if these changes would work for you too, @tturkenitz (or your issue is triggered by some other cleanup edge case)? Is there a chance that you can test-drive this PR?

tturkenitz commented 2 months ago

Hey @imrehg, thanks a lot for picking this up and providing a solution. I'm going to implement the MR on my end and test if it fixes my issue too. I'll report back with my results shortly.

tturkenitz commented 2 months ago

I've tested the fix on my end, but it doesn't resolve my specific issue. The type field is missing from the document entirely, and re-adding it as STRING resolves the problem. This occurs when I interact with the Coupa ExpenseReports endpoint and only happens when extracting more than one record. I suspect a schema difference between the records could be the issue but I'm not familiar enough with Airbyte code and my assumption is that Airbyte is able to reconcile the schemas into one master schema document when such things happen, but maybe not in this case?

I can share the schema, but it's quite large at 15k lines, and I’m not sure if it reflects the missing type accurately. It seems to be the schema Airbyte retrieved from the first document.

Adding

if node.get("type", "") == "":
    node["type"] = "string"

does solve the issue for me, but I doubt it's production ready code 😅

imrehg commented 2 months ago

@tturkenitz for testing could you try a debug step?

In schema_inferrer.py:

https://github.com/airbytehq/airbyte/blob/f1d12954b8ac995dc2ac6205617af98a24bc351f/airbyte-cdk/python/airbyte_cdk/utils/schema_inferrer.py#L95

below that line add:

            if "type" not in node:
                print(node)

which should print the node data in the logs when the cleaning step fails. This would show what's your offending schema content.

Let me know if any of this is unclear 😬

I would be surprised if the schema inferrer wouldn't have any info to go on it (the type doesn't come from your source, but the tool that looks at the source's response).

I suspect a schema difference between the records could be the issue

That's totally the case for my breakage as well, that's when the inferrer would end up with an anyOf entry (a collection of variations of the data encoded in them), and the handling of that anyOf is the problematic bit in the code.

tturkenitz commented 2 months ago

@imrehg,

I captured the malformed schema. It seems I was actually wrong, type does exist in the schema, but it is set to Null. It confuses me, because my solution assumed that type is completly missing and it adds it back to the schema. But maybe, type is removed by Airbyte as it processes the schema and my solution re-adds it? Lots of assumptions, sorry!

Here is the node and you can see that there are multiple attributes like parent-id, salesforce-id and avatar-thumb-url where type is null.

{
    "anyOf": [
        {
            "type": "string"
        },
        {
            "type": "object",
            "properties": {
                "parent-id": {
                    "type": "null"
                },
                "lookup": {
                    "type": "object",
                    "properties": {
                        "content-groups": {
                            "type": "array",
                            "items": {
                                "type": "object",
                                "properties": {
                                    "updated-by": {
                                        "type": "object",
                                        "properties": {
                                            "salesforce-id": {
                                                "type": "null"
                                            },
                                            "avatar-thumb-url": {
                                                "type": "null"
                                            }
                                        }
                                    }
                                }
                            }
                        }
                    }
                }
            }
        }
    ]
}

This is the log row header:

airbyte-connector-builder-server  | 2024-06-06 14:14:47 INFO i.a.w.i.VersionedAirbyteStreamFactory(logMalformedLogMessage):390
imrehg commented 2 months ago

Hey @tturkenitz thanks for passing on the node information!

I'm skeptical of the null causing any issues. I think schema inferrer getting null would happen if all the examples the inferrer has seen had null as the value (and APIs can send that back: send a field name but setting it explicitly to null, rather than only sending the field if it has a non-null value).

Instead, your troublesome node seems to have the same characteristics as mine (anyOf that isn't just a null and something else as two entries)

I've tested your node info, and I've found that.

  1. current master branch indeed breaks with that KeyError
  2. the patch from #39146 works correctly with your example too, so seems to be addressing the issue

Sorry for sounding basic, could you check if you were actually using the patched version of the CDK from that merge request?

Direct testing

For reference, since I don't have access to the source you are using, I've used a simple code for direct testing, directly feeding. Install the version of the CDK in a Python environment and run the script. With the patch I get `Success`, with the CDK from `master` it's the usual failure. (click the triangle to expand the code) ```python import airbyte_cdk as cdk node = { "anyOf": [ { "type": "string" }, { "type": "object", "properties": { "id": { "type": "number" }, "created-at": { "type": "string" }, "updated-at": { "type": "string" }, "active": { "type": "boolean" }, "name": { "type": "string" }, "description": { "type": "string" }, "external-ref-num": { "type": "string" }, "external-ref-code": { "type": "string" }, "parent-id": { "type": "null" }, "lookup-id": { "type": "number" }, "depth": { "type": "number" }, "is-default": { "type": "boolean" }, "approval-group-1": { "type": "string" }, "approval-user-1": { "type": "string" }, "approval-group-2": { "type": "string" }, "approval-user-2": { "type": "string" }, "custom-fields": { "type": "object", "properties": { "watcher": { "type": "string" }, "watcher-group": { "type": "string" }, "requester-known-for-invoice": { "type": "string" }, "territory": { "type": "string" } } }, "lookup": { "type": "object", "properties": { "id": { "type": "number" }, "created-at": { "type": "string" }, "updated-at": { "type": "string" }, "active": { "type": "boolean" }, "name": { "type": "string" }, "description": { "type": "string" }, "fixed-depth": { "type": "boolean" }, "level-1-name": { "type": "string" }, "level-2-name": { "type": "string" }, "level-3-name": { "type": "string" }, "level-4-name": { "type": "string" }, "level-5-name": { "type": "string" }, "level-6-name": { "type": "string" }, "level-7-name": { "type": "string" }, "level-8-name": { "type": "string" }, "level-9-name": { "type": "string" }, "level-10-name": { "type": "string" }, "content-groups": { "type": "array", "items": { "type": "object", "properties": { "id": { "type": "number" }, "created-at": { "type": "string" }, "updated-at": { "type": "string" }, "name": { "type": "string" }, "description": { "type": "string" }, "updated-by": { "type": "object", "properties": { "id": { "type": "number" }, "login": { "type": "string" }, "email": { "type": "string" }, "employee-number": { "type": "string" }, "firstname": { "type": "string" }, "lastname": { "type": "string" }, "fullname": { "type": "string" }, "salesforce-id": { "type": "null" }, "avatar-thumb-url": { "type": "null" }, "department-ucf": { "type": "string" }, "role": { "type": "string" }, "uaf": { "type": "string" }, "custom-fields": { "type": "object", "properties": { "test-employee-number": { "type": "string" }, "default-cost-center": { "type": "string" }, "frequent-buyer-training": { "type": "boolean" }, "approver-training": { "type": "boolean" }, "starter": { "type": "boolean" }, "coa-test": { "type": "string" } } } } } } } } } }, "account-type": { "type": "object", "properties": { "id": { "type": "number" }, "name": { "type": "string" } } } } } ] } cdk.utils.SchemaInferrer._clean(None, node) print("Success") ```
tturkenitz commented 2 months ago

Hi @imrehg , your patch works fine and behaves as expected.

Looks good and hopefully it can be merged and released in future versions soon. Really appreciate you taking the lead on this 🙇