datahub-project / datahub

The Metadata Platform for your Data and AI Stack
https://datahubproject.io
Apache License 2.0
9.93k stars 2.94k forks source link

hive_etl.py Schema Error #1596

Closed liaicheng closed 4 years ago

liaicheng commented 4 years ago

Bellow is my data:

{
    "auditHeader": null,
    "proposedSnapshot": ["com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot", {
        "urn": "urn:li:dataset:(urn:li:dataPlatform:hive,abel_test.base_indicator_value,PROD)",
        "aspects": [{
            "owners": [{
                "owner": "urn:li:corpuser:test",
                "type": "DATAOWNER"
            }],
            "lastModified": {
                "time": 1584090706,
                "actor": "urn:li:corpuser:test"
            }
        }, {
            "upstreams": [{
                "auditStamp": {
                    "time": 1584090706,
                    "actor": "urn:li:corpuser:test"
                },
                "dataset": "urn:li:dataset:(urn:li:dataPlatform:hive,ma(name:indicator_time,PROD)",
                "type": "EXTERNAL_TABLE"
            }]
        }, {
            "elements": [{
                "url": "cdh-10-47-250-3",
                "description": "sample doc to describe upstreams",
                "createStamp": {
                    "time": 1584090706,
                    "actor": "urn:li:corpuser:test"
                }
            }]
        }, {
            "schemaName": "abel_test.base_indicator_value",
            "platform": "urn:li:dataPlatform:hive",
            "version": 0,
            "created": {
                "time": 1531386535,
                "actor": "urn:li:corpuser:test"
            },
            "lastModified": {
                "time": 1584090706,
                "actor": "urn:li:corpuser:test"
            },
            "hash": "",
            "platformSchema": {
                "OtherSchema": "[('person_id', 'bigint', ''), ('indicator_id', 'bigint', '������ID'), ('base_value', 'double', '������������'), ('indicator_value', 'double', ''), ('message', 'string', '������������������������'), ('indicator_time', 'timestamp', '������������'), ('level', 'int', '������������')]"
            },
            "fields": [{
                "fieldPath": "",
                "description": "",
                "nativeDataType": "string",
                "type": {
                    "type": {
                        "com.linkedin.pegasus2avro.schema.StringType": {}
                    }
                }
            }]
        }]
    }],
    "proposedDelta": null
}

While i use hive_etl.py to test, error occurs:

avro.io.AvroTypeException: The datum

{
    'auditHeader': None,
    'proposedSnapshot': ('com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot', {
        'urn': 'urn:li:dataset:(urn:li:dataPlatform:hive,abel_test.base_indicator_value,PROD)',
        'aspects': [{
            'owners': [{
                'owner': 'urn:li:corpuser:test',
                'type': 'DATAOWNER'
            }],
            'lastModified': {
                'time': 1584078957,
                'actor': 'urn:li:corpuser:test'
            }
        }, {
            'upstreams': [{
                'auditStamp': {
                    'time': 1584078957,
                    'actor': 'urn:li:corpuser:test'
                },
                'dataset': 'urn:li:dataset:(urn:li:dataPlatform:hive,ma(name:indicator_time,PROD)',
                'type': 'EXTERNAL_TABLE'
            }]
        }, {
            'elements': [{
                'url': 'cdh-10-47-250-3',
                'description': 'sample doc to describe upstreams',
                'createStamp': {
                    'time': 1584078957,
                    'actor': 'urn:li:corpuser:test'
                }
            }]
        }, {
            'schemaName': 'abel_test.base_indicator_value',
            'platform': 'urn:li:dataPlatform:hive',
            'version': 0,
            'created': {
                'time': 1531386535,
                'actor': 'urn:li:corpuser:test'
            },
            'lastModified': {
                'time': 1584078957,
                'actor': 'urn:li:corpuser:test'
            },
            'hash': '',
            'platformSchema': {
                'OtherSchema': "[('person_id', 'bigint', ''), ('indicator_id', 'bigint', 'dfds'), ('base_value', 'double', 'dfds'), ('indicator_value', 'double', ''), ('message', 'string', 'aaa'), ('indicator_time', 'timestamp', 'test'), ('level', 'int', 'da')]"
            },
            'fields': [{
                'fieldPath': '',
                'description': '',
                'nativeDataType': 'string',
                'type': {
                    'type': {
                        'com.linkedin.pegasus2avro.schema.StringType': {}
                    }
                }
            }]
        }]
    }),
    'proposedDelta': None
}

is not an example of the schema

Any problem with my data? the code hive_etl.py is OK for others? Thanks

RealChrisL commented 4 years ago

Hi, @liaicheng ,

The datum you use hive_etl.py to test in the error message is qualified for the Kafka schema validator.

Nonetheless, as for the data from your formatted JSON, the type of snapshot should to be parenthesis ("com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot")instead of bracket ["com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot"]

Meanwhile, You can always verify against the mce-cli.py to see if it is a valid event.

Thanks.

liaicheng commented 4 years ago

Thanks for your response. When i changed my data as you mentioned, but it didn't work. Otherwise, mce-cli.py is a code for kafka consumer and producer , that didn't not give me some useful tip about schema error. @RealChrisL

RealChrisL commented 4 years ago

@liaicheng, mce-cli.py provides the fundamental debuggability for the metadata ingestion, especially in Kafka schema validation.

I am able to ingest the datum:

Producing MetadataChangeEvent records to topic MetadataChangeEvent. ^c to exit. MCE1: {'auditHeader': None, 'proposedSnapshot': ('com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot', {'urn': 'urn:li:dataset:(urn:li:dataPlatform:hive,abel_test.base_indicator_value,PROD)', 'aspects': [{'owners': [{'owner': 'urn:li:corpuser:test', 'type': 'DATAOWNER'}], 'lastModified': {'time': 1584078957, 'actor': 'urn:li:corpuser:test'}}, {'upstreams': [{'auditStamp': {'time': 1584078957, 'actor': 'urn:li:corpuser:test'}, 'dataset': 'urn:li:dataset:(urn:li:dataPlatform:hive,ma(name:indicator_time,PROD)', 'type': 'EXTERNAL_TABLE'}]}, {'elements': [{'url': 'cdh-10-47-250-3', 'description': 'sample doc to describe upstreams', 'createStamp': {'time': 1584078957, 'actor': 'urn:li:corpuser:test'}}]}, {'schemaName': 'abel_test.base_indicator_value', 'platform': 'urn:li:dataPlatform:hive', 'version': 0, 'created': {'time': 1531386535, 'actor': 'urn:li:corpuser:test'}, 'lastModified': {'time': 1584078957, 'actor': 'urn:li:corpuser:test'}, 'hash': '', 'platformSchema': {'OtherSchema': "[('person_id', 'bigint', ''), ('indicator_id', 'bigint', 'dfds'), ('base_value', 'double', 'dfds'), ('indicator_value', 'double', ''), ('message', 'string', 'aaa'), ('indicator_time', 'timestamp', 'test'), ('level', 'int', 'da')]"}, 'fields': [{'fieldPath': '', 'description': '', 'nativeDataType': 'string', 'type': {'type': {'com.linkedin.pegasus2avro.schema.StringType': {}}}}]}]}), 'proposedDelta': None}

Please also consider to break down the aspects for the verification.