datahub-project / datahub

The Metadata Platform for your Data Stack
https://datahubproject.io
Apache License 2.0
9.84k stars 2.9k forks source link

metadata-ingestion script performance is too slow when contains too many table. #1520

Closed clojurians-org closed 4 years ago

clojurians-org commented 4 years ago

i use WhereHows/metadata-ingestion/mysql_etl.py to load data, and load oracle data by similar step(metadata-ingestion/rdbms_etl.py). but when the database contains too much table, it's too slow to finish.

i think it should be better to load all schema information in on step, and group it to build the final schema information in streaming way rather than per each table alone.

i attached the simple oracle script for completeness.

#! /usr/bin/python
import sys
import time
import dbms

HOST = ''
DATABASE = ''
USER = ''
PASSWORD = ''
PORT = '1521'

AVROLOADPATH = '../../metadata-events/mxe-schemas/src/renamed/avro/com/linkedin/mxe/MetadataChangeEvent.avsc'
KAFKATOPIC = 'MetadataChangeEvent'
BOOTSTRAP = 'localhost:9092'
SCHEMAREGISTRY = 'http://localhost:8081'

def build_rdbms_dataset_mce(dataset_name, schema):
    """
    Create the MetadataChangeEvent
    """

    actor, fields, sys_time = "urn:li:corpuser:datahub", [], long(time.time())

    owner = {"owners":[{"owner":actor,"type":"DATAOWNER"}],"lastModified":{"time":0,"actor":actor}}

    for columnIdx in range(len(schema)):
        fields.append({"fieldPath":str(schema[columnIdx][0]),"nativeDataType":str(schema[columnIdx][1]),"type":{"type":{"com.linkedin.pegasus2avro.schema.StringType":{}}}})

    schema_name = {"schemaName":dataset_name,"platform":"urn:li:dataPlatform:oracle","version":0,"created":{"time":sys_time,"actor":actor},
               "lastModified":{"time":sys_time,"actor":actor},"hash":"","platformSchema":{"tableSchema":str(schema)},
               "fields":fields}

    mce = {"auditHeader": None,
           "proposedSnapshot":("com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot",
                               {"urn": "urn:li:dataset:(urn:li:dataPlatform:oracle,"+ dataset_name +",PROD)","aspects": [owner, schema_name]}),
           "proposedDelta": None}

    # Produce the MetadataChangeEvent to Kafka.
    produce_rdbms_dataset_mce(mce)

def produce_rdbms_dataset_mce(mce):
    """
    Produce MetadataChangeEvent records.
    """
    from confluent_kafka import avro
    from confluent_kafka.avro import AvroProducer

    conf = {'bootstrap.servers': BOOTSTRAP,
            'schema.registry.url': SCHEMAREGISTRY}
    record_schema = avro.load(AVROLOADPATH)
    producer = AvroProducer(conf, default_value_schema=record_schema)

    try:
        producer.produce(topic=KAFKATOPIC, value=mce)
        producer.poll(0)
        sys.stdout.write('\n%s has been successfully produced!\n' % mce)
    except ValueError as e:
        sys.stdout.write('Message serialization failed %s' % e)
    producer.flush()

try:
    # Leverage DBMS wrapper to build the connection with the underlying RDBMS,
    # which currently supports IBM DB2, Firebird, MSSQL Server, MySQL, Oracle,
    # PostgreSQL, SQLite and ODBC connections.
    # https://sourceforge.net/projects/pydbms/
    connection = dbms.connect.oracle(USER, PASSWORD, DATABASE, HOST, PORT)

    # Execute platform-specific queries with cursor to retrieve the metadata.
    # Examples can be found in ../mysql-etl/mysql_etl.py
    cursor = connection.cursor()

    ignored_owner_regex = 'ANONYMOUS|PUBLIC|SYS|SYSTEM|DBSNMP|MDSYS|CTXSYS|XDB|TSMSYS|ORACLE.*|APEX.*|TEST?*|GG_.*|\$'

    cursor.execute("""select owner, table_name from ALL_TABLES where NOT REGEXP_LIKE(OWNER, '%s')""" % ignored_owner_regex)
    datasets = cursor.fetchall()

    for dataset in datasets:
      database = dataset[0]
      table = dataset[1]

      print ("send meta info: " + table)
      column_info_sql = """
        select
          c.COLUMN_NAME, c.DATA_TYPE
        from ALL_TAB_COLUMNS c
          left join ALL_COL_COMMENTS m
            on c.OWNER = m.OWNER
            and c.TABLE_NAME = m.TABLE_NAME
            and c.COLUMN_NAME = m.COLUMN_NAME
        where c.owner = '%s' and c.table_name = '%s' """ 

      cursor.execute(column_info_sql % (database, table))
      schema = cursor.fetchall()
      build_rdbms_dataset_mce(database + "." + table, schema)
    # Build the MetadataChangeEvent via passing arguments.

except ValueError as e:
    sys.stdout.write('Error while connecting to RDBMS %s' % e)

sys.exit(0)
mars-lan commented 4 years ago

Thank you for the suggestion, @clojurians-org. Would you like to directly create a PR for this improvement?

clojurians-org commented 4 years ago

I'm glad to do it. but currently i use clojure to do script thing for JVM ecosystem. i don't know whether you guys will accept it. the related document: https://clojure.org/guides/deps_and_cli

a simple script command is: clj jdbc.clj it contains two file: deps.edn and jdbc.clj

[op@my-200 jdbc-etl]$ cat deps.edn
{:mvn/repos
  { "maven-repos" {:url "http://10.132.37.56:8081/repository/maven-central/"}}
 :deps
  { org.clojure/java.jdbc {:mvn/version "0.7.10"}
    postgresql/postgresql {:mvn/version "9.1-901-1.jdbc4"} }
}
[op@my-200 jdbc-etl]$ cat jdbc.clj
(require '[clojure.java.jdbc :as j])

(def pg-db {:dbtype "postgresql"
            :dbname "monitor"
            :host "10.132.37.201"
            :user "monitor"
            :password "monitor"})

(println (j/query pg-db "select * from information_schema.columns limit 10") ) 
clojurians-org commented 4 years ago

i already write the craft version in case you're interested! :NOTE there's some adjustment remaining i will do in next few days. i'll also add the mce/mae-job clojure scripting way!

https://github.com/clojurians-org/simple-datahub/blob/master/metadata-ingestion/clj-etl/dataset-jdbc.clj

cd metadata-ingestion/clj-etl && clj jdbc.clj
mars-lan commented 4 years ago

This looks great. Could you please create a PR to put your script and some simple README under https://github.com/linkedin/WhereHows/tree/datahub/contrib/metadata-ingestion? Thanks.

clojurians-org commented 4 years ago

BTW: i benchmark the old way and new ways. the new way is 100 times fast than old way. the ingestion rate bottleneck is up to the consumer job now.

total table num:  7218

python:
real    117m30.338s
user    2m5.205s
sys     0m10.801s

clojure:
real    0m29.489s
user    0m33.958s
sys     0m2.039s
clojurians-org commented 4 years ago

i'll submit it in one weekend.

mars-lan commented 4 years ago

Thanks. Look forward to the PR!

mars-lan commented 4 years ago

This will likely be taken care of by @jplaisted as part of https://github.com/linkedin/datahub/issues/1743. Closing it for now.

jplaisted commented 4 years ago

Noted to verify :)