apache / seatunnel

SeaTunnel is a next-generation super high-performance, distributed, massive data integration tool.
https://seatunnel.apache.org/
Apache License 2.0
7.57k stars 1.66k forks source link

[Bug] [InfluxDB] Can not read data when lower_bound and upper_bound is not config #6739

Open 15767714253 opened 2 months ago

15767714253 commented 2 months ago

Search before asking

What happened

2024-04-22 15:58:45,262 INFO org.apache.seatunnel.translation.flink.source.BaseSeaTunnelSourceFunction [] - Consumer subtask 1 has no restore state. 2024-04-22 15:58:45,262 INFO org.apache.seatunnel.translation.flink.source.BaseSeaTunnelSourceFunction [] - Consumer subtask 0 has no restore state. 2024-04-22 15:58:45,612 INFO org.apache.seatunnel.connectors.seatunnel.influxdb.client.InfluxDBClient [] - connect influxdb successful. sever version :1.8.10. 2024-04-22 15:58:45,612 INFO org.apache.seatunnel.connectors.seatunnel.influxdb.client.InfluxDBClient [] - connect influxdb successful. sever version :1.8.10. 2024-04-22 15:58:45,617 INFO org.apache.seatunnel.connectors.seatunnel.influxdb.source.InfluxdbSourceReader [] - connect influxdb successful. sever version :1.8.10. 2024-04-22 15:58:45,617 DEBUG org.apache.seatunnel.connectors.seatunnel.influxdb.source.InfluxDBSourceSplitEnumerator [] - Register reader 1 to InfluxDBSourceSplitEnumerator. 2024-04-22 15:58:45,617 INFO org.apache.seatunnel.connectors.seatunnel.influxdb.source.InfluxdbSourceReader [] - connect influxdb successful. sever version :1.8.10. 2024-04-22 15:58:45,617 DEBUG org.apache.seatunnel.connectors.seatunnel.influxdb.source.InfluxDBSourceSplitEnumerator [] - Register reader 0 to InfluxDBSourceSplitEnumerator. 2024-04-22 15:58:45,618 INFO org.apache.flink.runtime.taskmanager.Task [] - Source: SeaTunnel InfluxDBSource -> (Map -> to: Row, Map -> select: (/(time, 1000000) AS time, device_inst_code, active_power, apparent_power, frequency, la, power_factor, p_total_cap, reactive_power, r_total_cap, ua, create_time) -> to: Tuple2 -> Filter -> Map -> Sink Writer: Doris) (2/2)#0 (3ab54d7a6da584f3a24702c9740a02a8) switched from INITIALIZING to RUNNING. 2024-04-22 15:58:45,618 INFO org.apache.flink.runtime.taskmanager.Task [] - Source: SeaTunnel InfluxDBSource -> (Map -> to: Row, Map -> select: (/(time, 1000000) AS time, device_inst_code, active_power, apparent_power, frequency, la, power_factor, p_total_cap, reactive_power, r_total_cap, ua, create_time) -> to: Tuple2 -> Filter -> Map -> Sink Writer: Doris) (1/2)#0 (973fad63e088d82ecf6bbdcf1169a6d0) switched from INITIALIZING to RUNNING. 2024-04-22 15:58:45,622 DEBUG org.apache.seatunnel.connectors.seatunnel.influxdb.source.InfluxDBSourceSplitEnumerator [] - Assign pendingSplits to readers [1] 2024-04-22 15:58:45,622 DEBUG org.apache.seatunnel.connectors.seatunnel.influxdb.source.InfluxDBSourceSplitEnumerator [] - Assign pendingSplits to readers [0] 2024-04-22 15:58:45,622 DEBUG org.apache.seatunnel.connectors.seatunnel.influxdb.source.InfluxDBSourceSplitEnumerator [] - No more splits to assign. Sending NoMoreSplitsEvent to reader [1]. 2024-04-22 15:58:45,622 DEBUG org.apache.seatunnel.connectors.seatunnel.influxdb.source.InfluxDBSourceSplitEnumerator [] - No more splits to assign. Sending NoMoreSplitsEvent to reader [0]. 2024-04-22 15:58:45,622 INFO org.apache.seatunnel.connectors.seatunnel.influxdb.source.InfluxdbSourceReader [] - Reader received NoMoreSplits event. 2024-04-22 15:58:45,622 INFO org.apache.seatunnel.connectors.seatunnel.influxdb.source.InfluxdbSourceReader [] - Reader received NoMoreSplits event. 2024-04-22 15:58:45,627 INFO org.apache.seatunnel.connectors.seatunnel.influxdb.source.InfluxdbSourceReader [] - Closed the bounded influxDB source 2024-04-22 15:58:45,627 INFO org.apache.seatunnel.connectors.seatunnel.influxdb.source.InfluxdbSourceReader [] - Closed the bounded influxDB source 2024-04-22 15:58:45,632 DEBUG org.apache.seatunnel.translation.source.ParallelSource [] - Parallel source runs complete. 2024-04-22 15:58:45,632 DEBUG org.apache.seatunnel.translation.source.ParallelSource [] - Parallel source runs complete. 2024-04-22 15:58:45,632 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask [] - Finished task Source: SeaTunnel InfluxDBSource -> (Map -> to: Row, Map -> select: (/(time, 1000000) AS time, device_inst_code, active_power, apparent_power, frequency, la, power_factor, p_total_cap, reactive_power, r_total_cap, ua, create_time) -> to: Tuple2 -> Filter -> Map -> Sink Writer: Doris) (2/2)#0 2024-04-22 15:58:45,632 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask [] - Finished task Source: SeaTunnel InfluxDBSource -> (Map -> to: Row, Map -> select: (/(time, 1000000) AS time, device_inst_code, active_power, apparent_power, frequency, la, power_factor, p_total_cap, reactive_power, r_total_cap, ua, create_time) -> to: Tuple2 -> Filter -> Map -> Sink Writer: Doris) (1/2)#0 2024-04-22 15:58:45,633 DEBUG org.apache.seatunnel.translation.flink.source.BaseSeaTunnelSourceFunction [] - Cancel the SeaTunnelSourceFunction of Flink. 2024-04-22 15:58:45,633 DEBUG org.apache.seatunnel.translation.flink.source.BaseSeaTunnelSourceFunction [] - Cancel the SeaTunnelSourceFunction of Flink. 2024-04-22 15:58:45,634 DEBUG org.apache.seatunnel.translation.source.ParallelSource [] - Close the thread pool resource. 2024-04-22 15:58:45,634 DEBUG org.apache.seatunnel.translation.source.ParallelSource [] - Close the thread pool resource. 2024-04-22 15:58:45,634 DEBUG org.apache.seatunnel.translation.source.ParallelSource [] - Close the split enumerator for the Apache SeaTunnel source. 2024-04-22 15:58:45,634 DEBUG org.apache.seatunnel.translation.source.ParallelSource [] - Close the split enumerator for the Apache SeaTunnel source. 2024-04-22 15:58:45,634 DEBUG org.apache.seatunnel.translation.source.ParallelSource [] - Close the data reader for the Apache SeaTunnel source. 2024-04-22 15:58:45,634 DEBUG org.apache.seatunnel.translation.source.ParallelSource [] - Close the data reader for the Apache SeaTunnel source. 2024-04-22 15:58:45,635 DEBUG org.apache.seatunnel.translation.flink.source.BaseSeaTunnelSourceFunction [] - Close the SeaTunnelSourceFunction of Flink. 2024-04-22 15:58:45,635 DEBUG org.apache.seatunnel.translation.flink.source.BaseSeaTunnelSourceFunction [] - Close the SeaTunnelSourceFunction of Flink. 2024-04-22 15:58:45,636 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask [] - Closed operators for task Source: SeaTunnel InfluxDBSource -> (Map -> to: Row, Map -> select: (/(time, 1000000) AS time, device_inst_code, active_power, apparent_power, frequency, la, power_factor, p_total_cap, reactive_power, r_total_cap, ua, create_time) -> to: Tuple2 -> Filter -> Map -> Sink Writer: Doris) (2/2)#0 2024-04-22 15:58:45,636 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask [] - Closed operators for task Source: SeaTunnel InfluxDBSource -> (Map -> to: Row, Map -> select: (/(time, 1000000) AS time, device_inst_code, active_power, apparent_power, frequency, la, power_factor, p_total_cap, reactive_power, r_total_cap, ua, create_time) -> to: Tuple2 -> Filter -> Map -> Sink Writer: Doris) (1/2)#0 2024-04-22 15:58:45,639 INFO org.apache.flink.runtime.taskmanager.Task [] - Source: SeaTunnel InfluxDBSource -> (Map -> to: Row, Map -> select: (/(time, 1000000) AS time, device_inst_code, active_power, apparent_power, frequency, la, power_factor, p_total_cap, reactive_power, r_total_cap, ua, create_time) -> to: Tuple2 -> Filter -> Map -> Sink Writer: Doris) (1/2)#0 (973fad63e088d82ecf6bbdcf1169a6d0) switched from RUNNING to FINISHED. 2024-04-22 15:58:45,639 INFO org.apache.flink.runtime.taskmanager.Task [] - Source: SeaTunnel InfluxDBSource -> (Map -> to: Row, Map -> select: (/(time, 1000000) AS time, device_inst_code, active_power, apparent_power, frequency, la, power_factor, p_total_cap, reactive_power, r_total_cap, ua, create_time) -> to: Tuple2 -> Filter -> Map -> Sink Writer: Doris) (2/2)#0 (3ab54d7a6da584f3a24702c9740a02a8) switched from RUNNING to FINISHED. 2024-04-22 15:58:45,639 INFO org.apache.flink.runtime.taskmanager.Task [] - Freeing task resources for Source: SeaTunnel InfluxDBSource -> (Map -> to: Row, Map -> select: (/(time, 1000000) AS time, device_inst_code, active_power, apparent_power, frequency, la, power_factor, p_total_cap, reactive_power, r_total_cap, ua, create_time) -> to: Tuple2 -> Filter -> Map -> Sink Writer: Doris) (1/2)#0 (973fad63e088d82ecf6bbdcf1169a6d0). 2024-04-22 15:58:45,639 INFO org.apache.flink.runtime.taskmanager.Task [] - Freeing task resources for Source: SeaTunnel InfluxDBSource -> (Map -> to: Row, Map -> select: (/(time, 1000000) AS time, device_inst_code, active_power, apparent_power, frequency, la, power_factor, p_total_cap, reactive_power, r_total_cap, ua, create_time) -> to: Tuple2 -> Filter -> Map -> Sink Writer: Doris) (2/2)#0 (3ab54d7a6da584f3a24702c9740a02a8). 2024-04-22 15:58:45,639 DEBUG org.apache.flink.runtime.taskmanager.Task [] - Release task Source: SeaTunnel InfluxDBSource -> (Map -> to: Row, Map -> select: (/(time, 1000000) AS time, device_inst_code, active_power, apparent_power, frequency, la, power_factor, p_total_cap, reactive_power, r_total_cap, ua, create_time) -> to: Tuple2 -> Filter -> Map -> Sink Writer: Doris) (2/2)#0 network resources (state: FINISHED). 2024-04-22 15:58:45,639 DEBUG org.apache.flink.runtime.taskmanager.Task [] - Release task Source: SeaTunnel InfluxDBSource -> (Map -> to: Row, Map -> select: (/(time, 1000000) AS time, device_inst_code, active_power, apparent_power, frequency, la, power_factor, p_total_cap, reactive_power, r_total_cap, ua, create_time) -> to: Tuple2 -> Filter -> Map -> Sink Writer: Doris) (1/2)#0 network resources (state: FINISHED).

SeaTunnel Version

2.3.0

SeaTunnel Config

env {
  execution.parallelism = 2
  job.mode = "BATCH"
  checkpoint.interval = 10000
}

source {

    InfluxDB {
        result_table_name = "akr1_aux_meter_data"
        url = "https://ts-wz94cg9q58l69yn36.influxdata.tsdb.aliyuncs.com:8086"
        sql = "SELECT time,device_inst_code,active_power,apparent_power,frequency,la,power_factor,p_total_cap,reactive_power,r_total_cap,ua,create_time from akr1_aux_meter_data"
        database = "ess"
        username = "readess"
        password = "Hd@20230407"
        query_timeout_sec = 60
        connect_timeout_ms = 150000
        tag_key = "device_inst_code"
        tag_sql = "show tag values from akr1_aux_meter_data with key = \"device_inst_code\""
        partition_num = 100
        fields {
            time = BIGINT
            device_inst_code = string
            active_power = DOUBLE
            apparent_power = DOUBLE
            frequency = DOUBLE
            la = DOUBLE
            power_factor = DOUBLE
            p_total_cap = DOUBLE
            reactive_power = DOUBLE
            r_total_cap = DOUBLE
            ua = DOUBLE
            create_time = DOUBLE
        }
    }

}

transform {
    sql {
      source_table_name = "akr1_aux_meter_data"
      sql = "SELECT `time`/1000000 as `time`,device_inst_code,active_power,apparent_power,frequency,la,power_factor,p_total_cap,reactive_power,r_total_cap,ua,create_time from akr1_aux_meter_data"
    }
}

sink {
    Doris {
      nodeUrls = ["172.18.180.224:8030"]
            username = root
                  password = "HDYL@2023#0614"  
      database = "ess"
        table = "akr1_aux_meter_data"
        batch_max_rows = 2000
        sink.properties.format = "JSON"
        sink.properties.strip_outer_array = true
    }
}

Running Command

./start-seatunnel-flink-connector-v2.sh -c ../jobs/influxdb_prod_20240422/akr1_aux_meter_data.yaml

Error Exception

没有任何报错。直接没数据。结束

Zeta or Flink or Spark Version

No response

Java or Scala Version

No response

Screenshots

No response

Are you willing to submit PR?

Code of Conduct

EricJoy2048 commented 2 months ago

Does the sql: SELECT time,device_inst_code,active_power,apparent_power,frequency,la,power_factor,p_total_cap,reactive_power,r_total_cap,ua,create_time from akr1_aux_meter_data have result?

15767714253 commented 2 months ago

Through remote debugging, I've discovered that if lower_bound and upper_bound are not configured, the split will certainly be 0, definitely preventing any queries from occurring.

EricJoy2048 commented 2 months ago

Through remote debugging, I've discovered that if lower_bound and upper_bound are not configured, the split will certainly be 0, definitely preventing any queries from occurring.

I think this is a bug, can you fix it?

15767714253 commented 2 months ago

Through remote debugging, I've discovered that if lower_bound and upper_bound are not configured, the split will certainly be 0, definitely preventing any queries from occurring.

I think this is a bug, can you fix it?

I'm not sure if it's a bug because previously the execution worked fine, and I could retrieve data, but now I can't, even though I haven't changed the version or configuration at all. I'm quite puzzled.

github-actions[bot] commented 1 month ago

This issue has been automatically marked as stale because it has not had recent activity for 30 days. It will be closed in next 7 days if no further activity occurs.