apache / seatunnel

SeaTunnel is a next-generation super high-performance, distributed, massive data integration tool.
https://seatunnel.apache.org/
Apache License 2.0
8.06k stars 1.83k forks source link

Read kafka data in real time failed to be written to hive #8058

Closed gaotong521 closed 1 week ago

gaotong521 commented 1 week ago

Search before asking

What happened

Read kafka data in real time. Incremental data writing to the hive cannot be implemented. After a task is started, only one file is written to the hdfs using the hive metadata service

SeaTunnel Version

2.3.5

SeaTunnel Config

{
    "env": {
        "parallelism": 1,
        "job.mode": "STREAMING",
        "checkpoint.interval": 10000,
        "job.name": "seatunnel_1722581282427"
    },
    "source": [
        {
            "plugin_name": "kafka",
            "topic": "ods_vehicle_info_up_stress",
            "consumer.group": "kafka_stress_1",
            "bootstrap.servers": "10.188.15.181:9092",
            "format": "json",
            "schema": {
                "fields": {
                    "acc_long": "double",
                    "device_id": "string",
                    "device_sn": "string",
                    "hea": "double",
                    "spd": "double",
                    "vehicle_speed": "double",
                    "outof_control": "boolean",
                    "vehicle_type": "int",
                    "lane_number": "int",
                    "lon": "double",
                    "abs_activate": "boolean",
                    "endurance": "double",
                    "acc_lat": "double",
                    "vehicle_num": "string",
                    "limit_speed": "int",
                    "acc_vert": "double",
                    "emergency_status": "boolean",
                    "palstance": "double",
                    "drive_status": "int",
                    "acc_yaw": "double",
                    "lat": "double",
                    "region_code": "bigint"
                }
            },
            "result_table_name": "source_kafka_001"
        }
    ],
    "transform": [
        {
            "plugin_name": "FieldMapper",
            "source_table_name": "source_kafka_001",
            "result_table_name": "source_kafka_002",
            "field_mapper": {
                "acc_long": "acc_long",
                "device_id": "device_id",
                "device_sn": "device_sn",
                "hea": "hea",
                "spd": "spd",
                "vehicle_speed": "vehicle_speed",
                "outof_control": "outof_control",
                "vehicle_type": "vehicle_type",
                "lane_number": "lane_number",
                "lon": "lon",
                "abs_activate": "abs_activate",
                "endurance": "endurance",
                "acc_lat": "acc_lat",
                "vehicle_num": "vehicle_num",
                "limit_speed": "limit_speed",
                "acc_vert": "acc_vert",
                "emergency_status": "emergency_status",
                "palstance": "palstance",
                "drive_status": "drive_status",
                "acc_yaw": "acc_yaw",
                "lat": "lat",
                "region_code": "region_code"
            }
        },
        {
            "plugin_name": "Sql",
            "source_table_name": "source_kafka_002",
            "result_table_name": "source_kafka_003",
            "query": "select device_sn,vehicle_type,vehicle_num,drive_status,emergency_status,abs_activate,outof_control,lon,lat,hea,spd,acc_long,acc_lat,acc_vert,acc_yaw,endurance,vehicle_speed,lane_number,limit_speed,palstance,device_id,region_code,null as ele,null as vip_status,null as accelerator_pedal_position,null as distance_traveled,null as fuel_level_input,null as maf,null as stft,null as ltft,null as dtime,null as rsp_id from source_kafka_002"
        }
    ],
    "sink": [
        {
            "plugin_name": "Hive",
            "source_table_name": "source_kafka_003",
            "table_name": "data_pressure_test.ods_vehicle_info_up_60",
            "metastore_uri": "thrift://cdh-node7:9083",
            "krb5_path": "/etc/krb5.conf",
            "kerberos_principal": "sxh_1182736820@GOHIGH.COM",
            "kerberos_keytab_path": "/etc/krb/keytab/sxh_1182736820.keytab",
            "hdfs_site_path": "/home/gh-cloud/apache-seatunnel-2.3.5/hive-conf/hdfs-site.xml",
            "hive_site_path": "/home/gh-cloud/apache-seatunnel-2.3.5/hive-conf/hive-site.xml"
        }
    ]
}

Running Command

${FLINK_HOME}/bin/flink run -t yarn-per-job -Denv.java.opts=-Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8 -c org.apache.seatunnel.core.starter.flink.SeaTunnelFlink /home/gh-cloud/apache-seatunnel-2.3.5/starter/seatunnel-flink-15-starter.jar --config /tmp/dolphinscheduler/exec/process/root/115845810057216/115866503969792_17/245377/246565/seatunnel_245377_246565.json --name SeaTunnel

Error Exception

Incremental kafka real-time data cannot be written to hive

Zeta or Flink or Spark Version

flink 1.15

Java or Scala Version

No response

Screenshots

No response

Are you willing to submit PR?

Code of Conduct

gaotong521 commented 1 week ago

image

liunaijie commented 1 week ago

close as it's duplicated, track in this issue https://github.com/apache/seatunnel/issues/8056