ClickHouse / spark-clickhouse-connector

Spark ClickHouse Connector build on DataSourceV2 API
https://clickhouse.com/docs/en/integrations/apache-spark
Apache License 2.0
188 stars 66 forks source link

Support for Spark Streaming #315

Closed ayvorobev closed 4 months ago

ayvorobev commented 5 months ago

Do you plan to support spark streaming on Discretized Streams (DStreams) or structured streaming?

For now it throws exception after several microbatches:

Caused by: java.io.IOException: Error writing request body to server
    at java.base/sun.net.www.protocol.http.HttpURLConnection$StreamingOutputStream.checkError(HttpURLConnection.java:3846)
    at java.base/sun.net.www.protocol.http.HttpURLConnection$StreamingOutputStream.write(HttpURLConnection.java:3824)
    at com.clickhouse.data.stream.Lz4OutputStream.flushBuffer(Lz4OutputStream.java:51)
    at com.clickhouse.data.stream.AbstractByteArrayOutputStream.writeBuffer(AbstractByteArrayOutputStream.java:80)

Code like this:

kafkaInputStream.foreachRDD { rdd =>
  val spark = getSparkSession(rdd.conf)
  import spark.implicits._
  rdd.toDS.writeTo("catalogName.db.table").append()
}

It seems that reusing catalog on microbatches causes some error state in clickhouse client in connector. Maybe it needs additional tweaks?

camper42 commented 5 months ago

https://github.com/housepower/spark-clickhouse-connector/issues/257#issuecomment-1691107726

maybe try this?

ayvorobev commented 5 months ago

Thank you for advice, but still getting this error. Also tried async=false and http_connection_provider=HTTP_CLIENT. All configs typed in code for SparkConf, not spark-defaults.conf.

Caused by: com.clickhouse.client.ClickHouseException: Error writing request body to server, server ClickHouseNode [uri=http://ipAddress:8123/databaseName, options={async=false,http_connection_provider=HTTP_CLIENT,custom_http_params=async_insert=1,wait_for_async_insert=1}]@-1819139033
        at com.clickhouse.client.ClickHouseException.of(ClickHouseException.java:168)
        at com.clickhouse.client.AbstractClient.execute(AbstractClient.java:282)
        at com.clickhouse.client.ClickHouseClientBuilder$Agent.sendOnce(ClickHouseClientBuilder.java:282)
        at com.clickhouse.client.ClickHouseClientBuilder$Agent.send(ClickHouseClientBuilder.java:294)
        at com.clickhouse.client.ClickHouseClientBuilder$Agent.execute(ClickHouseClientBuilder.java:349)
        at com.clickhouse.client.ClickHouseClient.executeAndWait(ClickHouseClient.java:1056)
        at com.clickhouse.client.ClickHouseRequest.executeAndWait(ClickHouseRequest.java:2154)
        at xenon.clickhouse.client.NodeClient.$anonfun$syncInsert$2(NodeClient.scala:118)
        at scala.util.Try$.apply(Try.scala:210)
        at xenon.clickhouse.client.NodeClient.syncInsert(NodeClient.scala:118)
        ... 23 more
Caused by: java.io.IOException: Error writing request body to server
        at java.base/sun.net.www.protocol.http.HttpURLConnection$StreamingOutputStream.checkError(HttpURLConnection.java:3846)
        at java.base/sun.net.www.protocol.http.HttpURLConnection$StreamingOutputStream.write(HttpURLConnection.java:3824)
        at com.clickhouse.data.stream.Lz4OutputStream.flushBuffer(Lz4OutputStream.java:51)
        at com.clickhouse.data.stream.AbstractByteArrayOutputStream.writeBuffer(AbstractByteArrayOutputStream.java:80)
        at com.clickhouse.data.ClickHouseInputStream.pipe(ClickHouseInputStream.java:889)
        at com.clickhouse.data.ClickHouseInputStream.pipe(ClickHouseInputStream.java:504)
        at com.clickhouse.client.http.ClickHouseHttpConnection.postData(ClickHouseHttpConnection.java:318)
        at com.clickhouse.client.http.HttpUrlConnectionImpl.post(HttpUrlConnectionImpl.java:225)
        at com.clickhouse.client.http.ClickHouseHttpClient.send(ClickHouseHttpClient.java:124)
        at com.clickhouse.client.AbstractClient.execute(AbstractClient.java:280)
        ... 31 more
camper42 commented 5 months ago

what is your connector, clickhouse jdbc and clickhouse server version?

ayvorobev commented 5 months ago

clickhouse server: 23.8.11 connector: 0.7.3 clickhouse-jdbc: 0.4.6

camper42 commented 5 months ago

try with lower spark.clickhouse.write.batchSize ?

I don't know ClickHouse very well, but in my experience, it seems to be more likely to run into this problem if a table has columns with long string, or is not properly partitioned.

If this doesn't help, I'm out of ideas.

ayvorobev commented 5 months ago

I figured it out. I forgot to add dependency for http_connection_provider=HTTP_CLIENT parameter (org.apache.httpcomponents.client5), so it fallbacks to HTTP_URL_CONNECTION. Now it works good.

HTTP_URL_CONNECTION don't work for streaming.