apache / incubator-streampark

Make stream processing easier! Easy-to-use streaming application development framework and operation platform.
https://streampark.apache.org/
Apache License 2.0
3.91k stars 1.01k forks source link

SinkRequest SQL stitching Bug #2931

Closed caogaoshuai closed 1 year ago

caogaoshuai commented 1 year ago

Search before asking

Java Version

1.8

Scala Version

2.11.x

StreamPark Version

1.2.3

Flink Version

1.13.6

deploy mode

yarn-application

What happened

When I insert data into the database (clickhouse) through streamx-flink-connector-clickhouse_2.11, there are some cases where the insertion fails. I know the cause of this bug. In the class file org.apache.streampark.flink.connector.failover.SinkRequest, the regular expression INSERT_REGEXP will fail to extract the data to be inserted.

 lazy val sqlStatement: String = {
    val prefixMap: Map[String, List[String]] = Map[String, List[String]]()
    records.foreach(
      x => {
        val valueMatcher = INSERT_REGEXP.matcher(x)
        if (valueMatcher.find()) {
          val prefix = valueMatcher.group(1)
          prefixMap.get(prefix) match {
            case Some(value) => value.add(valueMatcher.group(3))
            case None => prefixMap.put(prefix, List(valueMatcher.group(3)))
          }
        } else {
          logWarn(s"ignore record: $x")
        }
      })

Give an example. When the insert statement is "INSERT INTO table(a) value(' value of')", the program originally expected to extract the string is (' value of'), but it is indeed of') , which will cause a splicing error in the SQL statement. Another example. When the insert statement is "INSERT INTO table(a) values('a')", the program originally expected to extract the string is ('a'), but it is indeed s('a'). I think it would be better to have a more robust extraction method, but I'm out of good ideas.

Error Exception

2023-08-08 19:42:48,305 ERROR com.streamxhub.streamx.flink.connector.clickhouse.internal.ClickHouseWriterTask [] - [StreamX] Error ClickHouseSink executing callback, params = 
{ user: default, password: ******, hosts: http://192.168.1.96:8123,http://192.168.1.97:8123,http://192.168.1.98:8123,http://192.168.1.99:8123,http://192.168.1.100:8123,http://192.168.1.101:8123 }
, StatusCode = 400 
2023-08-08 19:42:48,305 WARN  com.streamxhub.streamx.flink.connector.clickhouse.internal.ClickHouseWriterTask [] - [StreamX] Failed to send data to ClickHouse, cause: limit of attempts is exceeded. ClickHouse response = NettyResponse {
        statusCode=400
        headers=
                Date: Tue, 08 Aug 2023 11:42:48 GMT
                Connection: Keep-Alive
                Content-Type: text/plain; charset=UTF-8
                X-ClickHouse-Server-Display-Name: core-1-2.c-17050c8955804b92.cn-shanghai.emr.aliyuncs.com
                Transfer-Encoding: chunked
                X-ClickHouse-Exception-Code: 27
                Keep-Alive: timeout=10
                X-ClickHouse-Summary: {"read_rows":"0","read_bytes":"0","written_rows":"0","written_bytes":"0","total_rows_to_read":"0"}
        body=
Code: 27. DB::ParsingException: Cannot parse input: expected '(' before: '=12.6), CGM\' ), ( \'333678010@android\' , 333678010 , \'android\' , 1691468942268 , 1691494966100 , 1681222439167590400 , \'ZLZQBcdBirIDAAgaEmg7Hj78\' , {\'appVersion\'':  at row 215: While executing ValuesBlockInputFormat. (CANNOT_PARSE_INPUT_ASSERTION_FAILED) (version 22.3.8.28)

Screenshots

No response

Are you willing to submit PR?

Code of Conduct

caogaoshuai commented 1 year ago

Here is one of my solution implementations. https://github.com/apache/incubator-streampark/pull/2932

zhoulii commented 1 year ago

closed via #2932