apache / seatunnel

SeaTunnel is a next-generation super high-performance, distributed, massive data integration tool.
https://seatunnel.apache.org/
Apache License 2.0
7.77k stars 1.74k forks source link

[Bug] [Zeta] If the data exceeds 1 million, duplicate data will appear #5610

Closed matianhe3 closed 2 months ago

matianhe3 commented 10 months ago

Search before asking

What happened

when data more than 1million, the data seem appear duplicate , i get more than 2million data on starrocks, and seatunnel not stop , continue run , and duplicate data increase.

SeaTunnel Version

2.3.3 zeta

SeaTunnel Config

env {
  execution.parallelism = 3
  job.mode = BATCH
  job.name = mkt
}

source {
  Jdbc {
    result_table_name = source
    url = "jdbc:sqlserver://"
    driver = com.microsoft.sqlserver.jdbc.SQLServerDriver
    connection_check_timeout_sec = 100
    user = 
    password = ""
    query = """select id, keyword, city, realcity, ver, imei, channel, uid, os, lands,
    server_time, m_usercode, m_username, m_roomid, m_channel, m_addtime, m_updatetime,
    m_searchcode, m_type, dt
    FROM YS_BI.marketing.marketing_share_room_record
    """
  }
}

transform {
}

sink {
  StarRocks {
    source_table_name = [result]
    nodeUrls = ["clickhouse01:8030", "clickhouse02:8030", "clickhouse03:8030"]
    base-url = "jdbc:mysql:loadBalance://clickhouse01:9030,clickhouse02:9030,clickhouse03:9030"
    username = root
    password = ""
    database = dws
    table = mkt_record
    save_mode_create_template = """
CREATE TABLE IF NOT EXISTS `${database}`.`${table_name}`
(
  server_time DATETIME NOT NULL,
  dt DATE NOT NULL,
  ${rowtype_fields}
) ENGINE = OLAP
DUPLICATE KEY(server_time)
PARTITION BY (dt)
DISTRIBUTED BY HASH(dt)
"""
  }
}

Running Command

seatunnel.sh -c mkt.conf

Error Exception

duplicate data

Zeta or Flink or Spark Version

Zeta

Java or Scala Version

java 1.8

Screenshots

No response

Are you willing to submit PR?

Code of Conduct

zhilinli123 commented 10 months ago

Can you give us more details? Examples include the StarRocks version and Mysql & StarRocks DDl

matianhe3 commented 10 months ago

Can you give us more details? Examples include the StarRocks version and Mysql & StarRocks DDl

starrocks 3.1

maybe this problem , now for me, i import by sql where dt between '' and '' to limit once data lower than 1 million, and run many times. image

CREATE TABLE `mkt_record` (
  `server_time` datetime NOT NULL COMMENT "",
  `dt` date NOT NULL COMMENT "",
  `id` bigint(20) NULL COMMENT "",
  `keyword` varchar(65533) NULL COMMENT "",
  `city` varchar(65533) NULL COMMENT "",
  `realcity` varchar(65533) NULL COMMENT "",
  `ver` varchar(65533) NULL COMMENT "",
  `imei` varchar(65533) NULL COMMENT "",
  `channel` varchar(65533) NULL COMMENT "",
  `uid` int(11) NULL COMMENT "",
  `os` varchar(65533) NULL COMMENT "",
  `lands` int(11) NULL COMMENT "",
  `m_usercode` varchar(65533) NULL COMMENT "",
  `m_username` varchar(65533) NULL COMMENT "",
  `m_roomid` int(11) NULL COMMENT "",
  `m_channel` varchar(65533) NULL COMMENT "",
  `m_addtime` datetime NULL COMMENT "",
  `m_updatetime` datetime NULL COMMENT "",
  `m_searchcode` varchar(65533) NULL COMMENT "",
  `m_type` int(11) NULL COMMENT ""
) ENGINE=OLAP 
DUPLICATE KEY(`server_time`)
PARTITION BY date_trunc('day', dt)
DISTRIBUTED BY HASH(`dt`)
PROPERTIES (
"replication_num" = "3",
"in_memory" = "false",
"enable_persistent_index" = "false",
"replicated_storage" = "true",
"partition_live_number" = "100",
"compression" = "LZ4"
);
github-actions[bot] commented 9 months ago

This issue has been automatically marked as stale because it has not had recent activity for 30 days. It will be closed in next 7 days if no further activity occurs.

igaotang commented 9 months ago

I had encountered this problem in version 2.3.3 from dameng to GaussDB. The cluster have 3 node.Every source record has 4 duplicate in running.

liugddx commented 8 months ago

when parallelism >1,you need set partition_column,partition_upper_bound and partition_lower_bound. https://seatunnel.apache.org/docs/2.3.3/connector-v2/source/Jdbc#partition_column-string

SbloodyS commented 4 months ago

I also encounter this problems in Spark engine with seatunnel 2.3.3. My config is

{
    "env" : {
        "job.mode" : "BATCH",
        "job.name" : "ods_wxzj_t_keywords_message",
        "parallelism" : 1
    },
    "source" : [
        {
            "driver" : "com.mysql.cj.jdbc.Driver",
            "url" : "jdbc:mysql://*:3306/weiya_chat?serverTimezone=GMT%2b8&useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true&tinyInt1isBit=false&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=round",
            "user" : "",
            "password" : "",
            "query" : "SELECT\nid\n,fromUser as fromuser\n,tolist\n,msgtype\n,content\n,keyword\n,status\n,fromUserName as fromusername\n,tolistName as tolistname\n,fromUserId as fromuserid\n,roomId as roomid\n,isRoomChat as isroomchat\n,departmentName as departmentname\n,departmentId as departmentid\n,type\n,msgTime as msgtime\n,createdAt as createdat\n,logId as logid\n,msgStatus as msgstatus\n,groupType as grouptype\n,containMobile as containmobile\n,containAddr as containaddr\nFROM\nweiya_chat.t_keywords_message",
            "plugin_name" : "Jdbc",
            "partition_column" : "id",
            "partition_num" : 8
        }
    ],
    "sink" : [
        {
            "plugin_name" : "Hive",
            "table_name" : "tmp.tmp_dcp_dw_source_ods_wxzj_t_keywords_message_wyqy_seatunnel_3000983",
            "metastore_uri" : "thrift://*:9083"
        }
    ]
}
SbloodyS commented 4 months ago

When I set the parallelism to 1, the problem still exists...