Closed sboettcher closed 6 years ago
Got this again, although in a slightly different form. Seems like this happens when there is too much data to process. It should still process data which is currently streamed into kafka.
radar-mongodb-connector_1 | 2017-08-10T11:13:08.230710083Z [2017-08-10 11:13:08,230] INFO Kafka version : 0.10.1.1 (org.apache.kafka.common.utils.AppInfoParser)
radar-mongodb-connector_1 | 2017-08-10T11:13:08.230869379Z [2017-08-10 11:13:08,230] INFO Kafka commitId : f10ef2720b03b247 (org.apache.kafka.common.utils.AppInfoParser)
radar-mongodb-connector_1 | 2017-08-10T11:13:08.238824479Z [2017-08-10 11:13:08,237] INFO Created connector radar-connector-mongodb-sink (org.apache.kafka.connect.cli.ConnectStandalone)
radar-mongodb-connector_1 | 2017-08-10T11:13:08.247544274Z [2017-08-10 11:13:08,246] INFO 0 have been processed (org.radarcns.mongodb.MongoDbSinkTask)
radar-mongodb-connector_1 | 2017-08-10T11:13:08.935678889Z [2017-08-10 11:13:08,935] INFO Cluster created with settings {hosts=[hotstorage:27017], mode=SINGLE, requiredClusterType=UNKNOWN, serverSelectionTimeout='30000 ms', maxWaitQueueSize=500} (org.mongodb.driver.cluster)
radar-mongodb-connector_1 | 2017-08-10T11:13:09.073556275Z [2017-08-10 11:13:09,072] INFO 0 have been written in MongoDB 0 records need to be processed. (org.radarcns.mongodb.MongoDbWriter)
radar-mongodb-connector_1 | 2017-08-10T11:13:09.280384654Z [2017-08-10 11:13:09,279] INFO No server chosen by ReadPreferenceServerSelector{readPreference=primary} from cluster description ClusterDescription{type=UNKNOWN, connectionMode=SINGLE, serverDescriptions=[ServerDescription{address=hotstorage:27017, type=UNKNOWN, state=CONNECTING}]}. Waiting for 30000 ms before timing out (org.mongodb.driver.cluster)
radar-mongodb-connector_1 | 2017-08-10T11:13:09.878461849Z [2017-08-10 11:13:09,877] INFO Opened connection [connectionId{localValue:1, serverValue:10}] to hotstorage:27017 (org.mongodb.driver.connection)
radar-mongodb-connector_1 | 2017-08-10T11:13:09.882903339Z [2017-08-10 11:13:09,881] INFO Monitor thread successfully connected to server with description ServerDescription{address=hotstorage:27017, type=STANDALONE, state=CONNECTED, ok=true, version=ServerVersion{versionList=[3, 2, 10]}, minWireVersion=0, maxWireVersion=4, maxDocumentSize=16777216, roundTripTimeNanos=1038826} (org.mongodb.driver.cluster)
radar-mongodb-connector_1 | 2017-08-10T11:13:10.101942548Z [2017-08-10 11:13:10,100] INFO Opened connection [connectionId{localValue:2, serverValue:11}] to hotstorage:27017 (org.mongodb.driver.connection)
radar-mongodb-connector_1 | 2017-08-10T11:13:10.188789796Z [2017-08-10 11:13:10,188] INFO Sink task WorkerSinkTask{id=radar-connector-mongodb-sink-0} finished initialization and start (org.apache.kafka.connect.runtime.WorkerSinkTask)
radar-mongodb-connector_1 | 2017-08-10T11:13:10.191275219Z [2017-08-10 11:13:10,188] INFO Started MongoDbWriter (org.radarcns.mongodb.MongoDbWriter)
radar-mongodb-connector_1 | 2017-08-10T11:13:10.316149157Z [2017-08-10 11:13:10,315] INFO Discovered coordinator kafka-1:9092 (id: 2147483646 rack: null) for group connect-radar-connector-mongodb-sink. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
radar-mongodb-connector_1 | 2017-08-10T11:13:10.319903046Z [2017-08-10 11:13:10,319] INFO Revoking previously assigned partitions [] for group connect-radar-connector-mongodb-sink (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
radar-mongodb-connector_1 | 2017-08-10T11:13:10.321077238Z [2017-08-10 11:13:10,320] INFO (Re-)joining group connect-radar-connector-mongodb-sink (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
radar-mongodb-connector_1 | 2017-08-10T11:13:10.343838400Z [2017-08-10 11:13:10,343] INFO Successfully joined group connect-radar-connector-mongodb-sink with generation 22 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
radar-mongodb-connector_1 | 2017-08-10T11:13:10.390329306Z [2017-08-10 11:13:10,389] INFO Setting newly assigned partitions [application_server_status-2, android_empatica_e4_electrodermal_activity_output-1, application_uptime-1, android_empatica_e4_acceleration_output-1, application_external_time-0, application_external_time-2, android_empatica_e4_battery_level_output-1, android_empatica_e4_temperature_output-1, android_empatica_e4_heartrate-1, application_record_counts-1, android_empatica_e4_inter_beat_interval_output-2, android_empatica_e4_inter_beat_interval_output-0, android_empatica_e4_blood_volume_pulse_output-1, android_empatica_e4_sensor_status_output-1, android_empatica_e4_acceleration_output-0, application_server_status-1, android_empatica_e4_electrodermal_activity_output-0, android_empatica_e4_acceleration_output-2, application_uptime-2, application_uptime-0, application_external_time-1, android_empatica_e4_battery_level_output-0, android_empatica_e4_electrodermal_activity_output-2, android_empatica_e4_battery_level_output-2, android_empatica_e4_temperature_output-2, android_empatica_e4_temperature_output-0, android_empatica_e4_heartrate-0, application_record_counts-2, android_empatica_e4_heartrate-2, application_record_counts-0, android_empatica_e4_blood_volume_pulse_output-2, android_empatica_e4_sensor_status_output-2, application_server_status-0, android_empatica_e4_blood_volume_pulse_output-0, android_empatica_e4_sensor_status_output-0, android_empatica_e4_inter_beat_interval_output-1] for group connect-radar-connector-mongodb-sink (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
radar-mongodb-connector_1 | 2017-08-10T11:13:21.467974614Z [2017-08-10 11:13:21,458] ERROR Task radar-connector-mongodb-sink-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerSinkTask)
radar-mongodb-connector_1 | 2017-08-10T11:13:21.468140255Z java.lang.IllegalStateException: Queue full
radar-mongodb-connector_1 | 2017-08-10T11:13:21.468151847Z at java.util.AbstractQueue.add(AbstractQueue.java:98)
radar-mongodb-connector_1 | 2017-08-10T11:13:21.468160127Z at java.util.concurrent.ArrayBlockingQueue.add(ArrayBlockingQueue.java:312)
radar-mongodb-connector_1 | 2017-08-10T11:13:21.468167600Z at org.radarcns.mongodb.MongoDbSinkTask.put(MongoDbSinkTask.java:111)
radar-mongodb-connector_1 | 2017-08-10T11:13:21.468174347Z at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:384)
radar-mongodb-connector_1 | 2017-08-10T11:13:21.468183248Z at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:240)
radar-mongodb-connector_1 | 2017-08-10T11:13:21.468192869Z at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:172)
radar-mongodb-connector_1 | 2017-08-10T11:13:21.468200354Z at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:143)
radar-mongodb-connector_1 | 2017-08-10T11:13:21.468206786Z at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
radar-mongodb-connector_1 | 2017-08-10T11:13:21.468213311Z at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
radar-mongodb-connector_1 | 2017-08-10T11:13:21.468219491Z at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
radar-mongodb-connector_1 | 2017-08-10T11:13:21.468226115Z at java.util.concurrent.FutureTask.run(FutureTask.java:266)
radar-mongodb-connector_1 | 2017-08-10T11:13:21.468293242Z at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
radar-mongodb-connector_1 | 2017-08-10T11:13:21.468300700Z at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
radar-mongodb-connector_1 | 2017-08-10T11:13:21.468307384Z at java.lang.Thread.run(Thread.java:745)
radar-mongodb-connector_1 | 2017-08-10T11:13:21.468315106Z [2017-08-10 11:13:21,467] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerSinkTask)
radar-mongodb-connector_1 | 2017-08-10T11:13:38.276770505Z [2017-08-10 11:13:38,271] INFO 21771 have been processed (org.radarcns.mongodb.MongoDbSinkTask)
radar-mongodb-connector_1 | 2017-08-10T11:13:39.072378217Z [2017-08-10 11:13:39,071] INFO 21771 have been written in MongoDB 0 records need to be processed. (org.radarcns.mongodb.MongoDbWriter)
radar-mongodb-connector_1 | 2017-08-10T11:13:39.242804539Z [2017-08-10 11:13:39,242] INFO Reflections took 39055 ms to scan 265 urls, producing 13109 keys and 86311 values (org.reflections.Reflections)
radar-mongodb-connector_1 | 2017-08-10T11:14:08.270713053Z [2017-08-10 11:14:08,270] INFO 0 have been processed (org.radarcns.mongodb.MongoDbSinkTask)
radar-mongodb-connector_1 | 2017-08-10T11:14:09.072629905Z [2017-08-10 11:14:09,071] INFO 0 have been written in MongoDB 0 records need to be processed. (org.radarcns.mongodb.MongoDbWriter)
radar-mongodb-connector_1 | 2017-08-10T11:14:38.271330166Z [2017-08-10 11:14:38,270] INFO 0 have been processed (org.radarcns.mongodb.MongoDbSinkTask)
radar-mongodb-connector_1 | 2017-08-10T11:14:39.071754352Z [2017-08-10 11:14:39,071] INFO 0 have been written in MongoDB 0 records need to be processed. (org.radarcns.mongodb.MongoDbWriter)
@sboettcher I got similar error, though it didn't kill the connector. There is a buffer.capacity
property that can be configured for the connector. By default, Radar-Docker
doesn't add this property, thus it takes the default value which is 20000
records. Setting a higher value for buffer.capacity
in sink-mongo.properties
resolved this issue for me.
Thank you @nivemaham! Closing as this fixed the exception.
I am reopening this because this should be maybe handled in the code. I mean the connector should only consume the number of records less than or equal to the buffer.capactiy. I set the value of buffer.capacity
to 50000. It worked for a while. Now i started the mongoDb connector again after a few days. So a large number of records have been accumulated, and thus I am getting this error again. I can again increase the buffer capacity again but is there any other way to handle this ?
How would you otherwise expect this to handle? If this has to be handled by the code, then it should know how many records it is about to consume, which I think would have been implemented if there was a way to know. Maybe we could check the latest available offset and last consumed offset of a consumer and get the difference. Not sure whether there are APIs to do so.
I don't know but this is not ideal. I mean i set the buffer.capacity=100000
and i still get this error for some partitions. If i increase it more say like to buffer.capacity=150000
, i get OutOfMemory error because of max Java heap size(-Xmx).
ok. then we should implement micro-batch processing for that.
Okay. I agree. The current implementation is not very scalable
For now, I increased the Java Max heap size to 64G (Since we have 256G on rosalind) using KAFKA_HEAP_OPTS and also increased buffer.capacity=1000000
. This seems to have fixed it but setting it this high is not ideal and should be fixed.
Also this only causes problem if mongodb connector was not running for a long time and a lot of data has been accumulated that needs to be processed
The main reason for this is that MongoDB cannot process the requests at a fast enough rate. Increasing the buffer only hides this problem for longer, because at some point it will fill up if more records are added than removed. Making longer timeouts or larger batches will not fix that: if the connector indicates that it cannot process the records in a timely fashion, it will just be killed by Kafka, and we will restart it. Two possible mitigations:
max.tasks
, distribute the topics between them. This will only work if MongoDB has enough I/O and processing capacity to handle the number of tasks efficiently.For mitigation number 2, doesn't increasing the buffer capacity in the current implementation mean the same thing? For number 3 i don't think it will be very helpful since the problem is slow processing (write) of messages by MongoDB. Increasing the max tasks will mean faster processing (consumption) of messages by the connector while the MongoDB may not be able to write at that pace. Number 1 seems good to me. We can also combine number 1 and number 3 ?
For 2, MongoDB supports multiple operations per request. That is what #28 implements. The total buffer size is not used for this. This should speed up the per-record handling speed a lot. I would start with using #28, then experiment with the flush sizes and times, perhaps experiment if number 3 makes a difference, and only if that is not satisfactory, go to number 1.
Okay thanks. Reviewing now
Not getting this anymore but still using high buffer.capacity. Closing this for now. Please reopen if anyone is experiencing this again.
I am getting a Queue full exception which kills the connector. Throwing line As far as I can tell this would only happen when the buffer is full and not emptied, but I have no idea how this would happen.