jcustenborder / kafka-connect-spooldir

Kafka Connect connector for reading CSV files into Kafka.
Apache License 2.0
165 stars 124 forks source link

CSV Task Rebalancing Issues #141

Open corybreed opened 4 years ago

corybreed commented 4 years ago

Scenario: I am using the CSV connector in a data pipeline to import a large csv data set from AWS EFS into Kafka. This data pipeline process configures a lot of csv connectors at the beginning of it’s process, and routinely encounters csv files in excess of 300MB in size.

What I am experiencing When a rebalance happens (due to new conenctors getting configured, etc) my large csv tasks are getting interrupted by the connect process. I can see in my logs the following exception is being thrown right after the log that shows that a rebalance has started:

ERROR Exception encountered processing line 228238 of /incoming/**************.csv. (com.github.jcustenborder.kafka.connect.spooldir.AbstractSourceTask)

This is expected behavior, as connect needs to stop some of the tasks and restart them on a different worker.

What I’d like to happen Ideally, I’d like to be able to recover from the rebalancing interruption automatically, and continue process where the csv file where it left off. Unfortunately, it seems like the only way to do that is to manually move the file back into the incoming.path directory after the rebalancing is finished (since the exception that is being thrown above is causing the csv file to be moved to the error.path directory).

What I’ve tried I’m in the process of tweaking the graceful shutdown period for my tasks to see if I can prevent connect from forcefully killing my csv tasks when it thinks it needs to rebalance it’s workload. I feel like this will help lessen the number of exceptions that I see for the rebalancing situation, but I don’t think I can guarantee that it will eliminate the problem because of the fact that the imported CSV files can be quite large. There is always a chance that my task encounters a file that takes longer than the graceful shutdown period to finish processing.

Do you have any suggestions for

jcustenborder commented 4 years ago

Hey @corybreed!

Thanks for the issue. Can you give me the full stack trace that you are seeing? There was an issue with rebalancing and this connector a long time ago but I thought we fixed it. Please take a look at your worker logs. There might be another stack trace there that might be the problem. What version of connect are you using as well?

J

corybreed commented 4 years ago

Thanks for the quick response. I'll try and get the logs together by Monday. We're using version 5.4.2 of connect

corybreed commented 4 years ago

@jcustenborder

Here is a snapshot of the logs + the stacktrace for when we are seeing our CSV task get shut down by Connect

[2020-07-29 17:57:47,347] INFO Creating processing flag /**********/pat_lab/incoming/2020-07-24T04-12-19.807735Z-pat_lab3.csv.PROCESSING (com.github.jcustenborder.kafka.connect.spooldir.InputFile)
[2020-07-29 17:57:47,353] INFO Opening /**********/pat_lab/incoming/2020-07-24T04-12-19.807735Z-pat_lab3.csv (com.github.jcustenborder.kafka.connect.spooldir.AbstractSourceTask)
[2020-07-29 17:57:48,695] INFO Processed 20000 lines of /**********/pat_lab/incoming/2020-07-24T04-12-19.807735Z-pat_lab3.csv (com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceTask)
[2020-07-29 17:57:49,444] INFO Processed 40000 lines of /**********/pat_lab/incoming/2020-07-24T04-12-19.807735Z-pat_lab3.csv (com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceTask)
[2020-07-29 17:57:50,227] INFO Processed 60000 lines of /**********/pat_lab/incoming/2020-07-24T04-12-19.807735Z-pat_lab3.csv (com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceTask)
[2020-07-29 17:57:50,972] INFO Processed 80000 lines of /**********/pat_lab/incoming/2020-07-24T04-12-19.807735Z-pat_lab3.csv (com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceTask)
[2020-07-29 17:57:51,713] INFO Processed 100000 lines of /**********/pat_lab/incoming/2020-07-24T04-12-19.807735Z-pat_lab3.csv (com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceTask)
[2020-07-29 17:57:52,452] INFO Processed 120000 lines of /**********/pat_lab/incoming/2020-07-24T04-12-19.807735Z-pat_lab3.csv (com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceTask)
[2020-07-29 17:57:53,179] INFO Processed 140000 lines of /**********/pat_lab/incoming/2020-07-24T04-12-19.807735Z-pat_lab3.csv (com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceTask)
[2020-07-29 17:57:53,963] INFO Processed 160000 lines of /**********/pat_lab/incoming/2020-07-24T04-12-19.807735Z-pat_lab3.csv (com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceTask)
[2020-07-29 17:57:54,735] INFO Processed 180000 lines of /**********/pat_lab/incoming/2020-07-24T04-12-19.807735Z-pat_lab3.csv (com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceTask)
[2020-07-29 17:57:55,638] INFO Processed 200000 lines of /**********/pat_lab/incoming/2020-07-24T04-12-19.807735Z-pat_lab3.csv (com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceTask)
[2020-07-29 17:57:56,501] INFO Processed 220000 lines of /**********/pat_lab/incoming/2020-07-24T04-12-19.807735Z-pat_lab3.csv (com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceTask)
[2020-07-29 17:57:57,211] INFO Processed 240000 lines of /**********/pat_lab/incoming/2020-07-24T04-12-19.807735Z-pat_lab3.csv (com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceTask)
[2020-07-29 17:57:57,926] INFO Processed 260000 lines of /**********/pat_lab/incoming/2020-07-24T04-12-19.807735Z-pat_lab3.csv (com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceTask)
[2020-07-29 17:57:58,628] INFO Processed 280000 lines of /**********/pat_lab/incoming/2020-07-24T04-12-19.807735Z-pat_lab3.csv (com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceTask)
[2020-07-29 17:57:59,333] INFO Processed 300000 lines of /**********/pat_lab/incoming/2020-07-24T04-12-19.807735Z-pat_lab3.csv (com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceTask)
[2020-07-29 17:58:00,031] INFO Processed 320000 lines of /**********/pat_lab/incoming/2020-07-24T04-12-19.807735Z-pat_lab3.csv (com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceTask)
[2020-07-29 17:58:00,798] INFO Processed 340000 lines of /**********/pat_lab/incoming/2020-07-24T04-12-19.807735Z-pat_lab3.csv (com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceTask)
[2020-07-29 17:58:01,539] INFO Processed 360000 lines of /**********/pat_lab/incoming/2020-07-24T04-12-19.807735Z-pat_lab3.csv (com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceTask)
[2020-07-29 17:58:02,312] INFO Processed 380000 lines of /**********/pat_lab/incoming/2020-07-24T04-12-19.807735Z-pat_lab3.csv (com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceTask)
[2020-07-29 17:58:03,085] INFO Processed 400000 lines of /**********/pat_lab/incoming/2020-07-24T04-12-19.807735Z-pat_lab3.csv (com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceTask)
[2020-07-29 17:58:03,924] INFO Processed 420000 lines of /**********/pat_lab/incoming/2020-07-24T04-12-19.807735Z-pat_lab3.csv (com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceTask)
[2020-07-29 17:58:04,793] INFO Processed 440000 lines of /**********/pat_lab/incoming/2020-07-24T04-12-19.807735Z-pat_lab3.csv (com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceTask)
[2020-07-29 17:58:05,661] INFO Processed 460000 lines of /**********/pat_lab/incoming/2020-07-24T04-12-19.807735Z-pat_lab3.csv (com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceTask)
[2020-07-29 17:58:06,555] INFO Processed 480000 lines of /**********/pat_lab/incoming/2020-07-24T04-12-19.807735Z-pat_lab3.csv (com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceTask)
[2020-07-29 17:58:07,344] INFO Processed 500000 lines of /**********/pat_lab/incoming/2020-07-24T04-12-19.807735Z-pat_lab3.csv (com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceTask)
[2020-07-29 17:58:08,122] INFO Processed 520000 lines of /**********/pat_lab/incoming/2020-07-24T04-12-19.807735Z-pat_lab3.csv (com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceTask)
[2020-07-29 17:58:08,905] INFO Processed 540000 lines of /**********/pat_lab/incoming/2020-07-24T04-12-19.807735Z-pat_lab3.csv (com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceTask)
[2020-07-29 17:58:09,771] INFO Processed 560000 lines of /**********/pat_lab/incoming/2020-07-24T04-12-19.807735Z-pat_lab3.csv (com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceTask)
[2020-07-29 17:58:10,727] INFO Processed 580000 lines of /**********/pat_lab/incoming/2020-07-24T04-12-19.807735Z-pat_lab3.csv (com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceTask)
[2020-07-29 17:58:11,634] INFO Processed 600000 lines of /**********/pat_lab/incoming/2020-07-24T04-12-19.807735Z-pat_lab3.csv (com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceTask)
[2020-07-29 17:58:12,517] INFO Processed 620000 lines of /**********/pat_lab/incoming/2020-07-24T04-12-19.807735Z-pat_lab3.csv (com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceTask)
[2020-07-29 17:58:13,407] INFO Processed 640000 lines of /**********/pat_lab/incoming/2020-07-24T04-12-19.807735Z-pat_lab3.csv (com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceTask)
[2020-07-29 17:58:14,307] INFO Processed 660000 lines of /**********/pat_lab/incoming/2020-07-24T04-12-19.807735Z-pat_lab3.csv (com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceTask)
[2020-07-29 17:58:15,201] INFO Processed 680000 lines of /**********/pat_lab/incoming/2020-07-24T04-12-19.807735Z-pat_lab3.csv (com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceTask)
[2020-07-29 17:58:15,597] INFO Closing /**********/pat_lab/incoming/2020-07-24T04-12-19.807735Z-pat_lab3.csv (com.github.jcustenborder.kafka.connect.spooldir.InputFile)
[2020-07-29 17:58:15,598] ERROR Exception encountered processing line 688366 of /**********/pat_lab/incoming/2020-07-24T04-12-19.807735Z-pat_lab3.csv. (com.github.jcustenborder.kafka.connect.spooldir.AbstractSourceTask)
[2020-07-29 17:58:15,598] ERROR Error during processing, moving /**********/incoming/2020-07-24T04-12-19.807735Z-pat_lab3.csv to /**********/pat_lab/error. (com.github.jcustenborder.kafka.connect.spooldir.AbstractCleanUpPolicy)
[2020-07-29 17:58:15,600] INFO Moving /**********/pat_lab/incoming/2020-07-24T04-12-19.807735Z-pat_lab3.csv to /**********/pat_lab/error/2020-07-24T04-12-19.807735Z-pat_lab3.csv (com.github.jcustenborder.kafka.connect.spooldir.InputFile)
[2020-07-29 17:58:15,601] INFO Removing processing flag /**********/pat_lab/incoming/2020-07-24T04-12-19.807735Z-pat_lab3.csv.PROCESSING (com.github.jcustenborder.kafka.connect.spooldir.InputFile)
[2020-07-29 17:58:15,629] INFO [Worker clientId=connect-1, groupId=pd.system.connect.pd.pd-master] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2020-07-29 17:58:15,640] INFO WorkerSourceTask{id=pat_lab.poll-0} Finished commitOffsets successfully in 32 ms (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2020-07-29 17:58:15,640] ERROR WorkerSourceTask{id=pat_lab.poll-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
    org.apache.kafka.connect.errors.ConnectException: java.io.IOException: Stream closed
    at com.github.jcustenborder.kafka.connect.spooldir.AbstractSourceTask.read(AbstractSourceTask.java:268)
    at com.github.jcustenborder.kafka.connect.spooldir.AbstractSourceTask.poll(AbstractSourceTask.java:149)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:265)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:232)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Stream closed
    at java.io.BufferedInputStream.getBufIfOpen(BufferedInputStream.java:170)
    at java.io.BufferedInputStream.read(BufferedInputStream.java:336)
    at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
    at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
    at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
    at java.io.InputStreamReader.read(InputStreamReader.java:184)
    at java.io.BufferedReader.fill(BufferedReader.java:161)
    at java.io.BufferedReader.readLine(BufferedReader.java:324)
    at java.io.BufferedReader.readLine(BufferedReader.java:389)
    at com.opencsv.stream.reader.LineReader.readLine(LineReader.java:41)
    at com.opencsv.CSVReader.getNextLine(CSVReader.java:434)
    at com.opencsv.CSVReader.readNext(CSVReader.java:349)
    at com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceTask.process(SpoolDirCsvSourceTask.java:105)
    at com.github.jcustenborder.kafka.connect.spooldir.AbstractSourceTask.read(AbstractSourceTask.java:256)
    ... 10 more
[2020-07-29 17:58:15,640] ERROR WorkerSourceTask{id=pat_lab.poll-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)
[2020-07-29 17:58:15,640] INFO [Producer clientId=connector-producer-pat_lab.poll-0] Closing the Kafka producer with timeoutMillis = 30000 ms. (org.apache.kafka.clients.producer.KafkaProducer)
jcustenborder commented 4 years ago

Thanks @corybreed! Can you reliably reproduce this? I'm wondering if we're getting an additional call to poll() after we've closed the stream.

corybreed commented 4 years ago

Yes, it's very reliably reproduced on our connect cluster. We've done a lot of testing try and narrow down when the connector crashes, and it always crashes on us when we have more than 1 active connect worker. I also think the problem is probably magnified in our use case because of the fact that during the beginning of our ETL pipeline run, we configure 100+ csv connectors all at once, which is causing lots of rebalances to happen. Which in turn causes us to be able to pretty reliably reproduce the crashes.

jcustenborder commented 4 years ago

@corybreed Which version of the connect framework are you using?

corybreed commented 4 years ago

We're running using the Confluent Platform Dockerfile v5.4.2

mark-patrick-bank commented 4 years ago

@jcustenborder Hey–workmate of @corybreed's. We have a worker's log statements written during one of these situations, in case this sheds any more light on things: https://gist.github.com/mark-patrick-bank/23ad6192ac6f1ae0fd5c1827802c4e7d

We've since taken to waiting after we configure all 4200 connectors for worker logs to quiet down about rebalancing before we move data into incoming dirs, but we still do have the same issues. Depending on the next couple of weeks, I may be able to provide some better information.

mark-patrick-bank commented 4 years ago

Hey Jeremey, just wanted to clarify: after letting rebalances subside, we don't have any issues processing files with the spool dir connector. We haven't retried having it process data during rebalances since we found that out. We just configure all our connectors, wait a couple hours for them to sort out, and move data into position.