apache / seatunnel

SeaTunnel is a next-generation super high-performance, distributed, massive data integration tool.
https://seatunnel.apache.org/
Apache License 2.0
8.03k stars 1.82k forks source link

[Bug][Kafka-to-Hive]Failed to read kafka data and write hive data #8056

Open gaotong521 opened 1 day ago

gaotong521 commented 1 day ago

Search before asking

What happened

kafka generates data in real time, but only a dozen pieces of data are written to the hive table, and the program does not report errors

SeaTunnel Version

2.3.5

SeaTunnel Config

{
    "env": {
        "parallelism": 3,
        "job.mode": "STREAMING",
        "checkpoint.interval": 10000,
        "job.name": "seatunnel_1722581282427"
    },
    "source": [
        {
            "plugin_name": "kafka",
            "topic": "ods_vehicle_info_up_stress",
            "consumer.group": "kafka_stress_1",
            "bootstrap.servers": "10.188.15.181:9092",
            "format": "json",
            "schema": {
                "fields": {
                    "acc_long": "double",
                    "device_id": "string",
                    "device_sn": "string",
                    "hea": "double",
                    "spd": "double",
                    "vehicle_speed": "double",
                    "outof_control": "boolean",
                    "vehicle_type": "int",
                    "lane_number": "int",
                    "lon": "double",
                    "abs_activate": "boolean",
                    "endurance": "double",
                    "acc_lat": "double",
                    "vehicle_num": "string",
                    "limit_speed": "int",
                    "acc_vert": "double",
                    "emergency_status": "boolean",
                    "palstance": "double",
                    "drive_status": "int",
                    "acc_yaw": "double",
                    "lat": "double",
                    "region_code": "bigint"
                }
            },
            "result_table_name": "source_kafka_001"
        }
    ],
    "transform": [
        {
            "plugin_name": "FieldMapper",
            "source_table_name": "source_kafka_001",
            "result_table_name": "source_kafka_002",
            "field_mapper": {
                "acc_long": "acc_long",
                "device_id": "device_id",
                "device_sn": "device_sn",
                "hea": "hea",
                "spd": "spd",
                "vehicle_speed": "vehicle_speed",
                "outof_control": "outof_control",
                "vehicle_type": "vehicle_type",
                "lane_number": "lane_number",
                "lon": "lon",
                "abs_activate": "abs_activate",
                "endurance": "endurance",
                "acc_lat": "acc_lat",
                "vehicle_num": "vehicle_num",
                "limit_speed": "limit_speed",
                "acc_vert": "acc_vert",
                "emergency_status": "emergency_status",
                "palstance": "palstance",
                "drive_status": "drive_status",
                "acc_yaw": "acc_yaw",
                "lat": "lat",
                "region_code": "region_code"
            }
        },
        {
            "plugin_name": "Sql",
            "source_table_name": "source_kafka_002",
            "result_table_name": "source_kafka_003",
            "query": "select device_sn,vehicle_type,vehicle_num,drive_status,emergency_status,abs_activate,outof_control,lon,lat,hea,spd,acc_long,acc_lat,acc_vert,acc_yaw,endurance,vehicle_speed,lane_number,limit_speed,palstance,device_id,region_code,null as ele,null as vip_status,null as accelerator_pedal_position,null as distance_traveled,null as fuel_level_input,null as maf,null as stft,null as ltft,null as dtime,null as rsp_id from source_kafka_002"
        }
    ],
    "sink": [
        {
            "plugin_name": "Hive",
            "source_table_name": "source_kafka_003",
            "table_name": "data_pressure_test.ods_vehicle_info_up_60",
            "metastore_uri": "thrift://cdh-node7:9083",
            "krb5_path": "/etc/krb5.conf",
            "kerberos_principal": "sxh_1182736820@GOHIGH.COM",
            "kerberos_keytab_path": "/etc/krb/keytab/sxh_1182736820.keytab",
            "hdfs_site_path": "/home/gh-cloud/apache-seatunnel-2.3.5/hive-conf/hdfs-site.xml",
            "hive_site_path": "/home/gh-cloud/apache-seatunnel-2.3.5/hive-conf/hive-site.xml"
        }
    ]
}

Running Command

SeaTunnel task command: ${SEATUNNEL_HOME}/bin/start-seatunnel-flink-15-connector-v2.sh --config /tmp/dolphinscheduler/exec/process/root/115845810057216/115866505726976_6/245064/246245/seatunnel_245064_246245.json --deploy-mode run -t yarn-per-job

Error Exception

No error is reported, kerberos login is normal, but only a dozen data can be written, and then it is always running, but the data is not written

Zeta or Flink or Spark Version

flink1.15

Java or Scala Version

No response

Screenshots

No response

Are you willing to submit PR?

Code of Conduct

gaotong521 commented 1 day ago

No error is reported, kerberos login is normal, but only a dozen data can be written, and then it is always running, but the data is not written

liunaijie commented 1 day ago

No error is reported, kerberos login is normal, but only a dozen data can be written, and then it is always running, but the data is not written

You can update to Console sink to verify whether is no data consumer from kafka or Hive Writer has issue.

liunaijie commented 1 day ago

And for your config, you use FieldMapper and Sql, you can merge them into Sql transform.

the sql query is like:

select col_a as new_col_a, b, col_c as new_col_c from table
gaotong521 commented 1 day ago

No error is reported, kerberos login is normal, but only a dozen data can be written, and then it is always running, but the data is not written

You can update to Console sink to verify whether is no data consumer from kafka or Hive Writer has issue.

I changed the sink terminal to the mysql connected by jdbc so that it could be written normally

flymianmian commented 16 hours ago

2.3.6 修了几个hive 的bug,你可以升级下版本试试。