confluentinc / librdkafka

The Apache Kafka C/C++ library
Other
175 stars 3.14k forks source link

How To Remove messages after a particular time #808

Closed abhit011 closed 7 years ago

abhit011 commented 7 years ago

Description

Does there exist any settings, which takes care of deleting all messages across all partitions/replications, after a particular time, lets say after 20 Min / 30 Min etc ? And whats the maximum size of each logs directory e.g kafka_2.11-0.10.0.1kafka-logs, that can grow ? Lets say if my each Kafka Message Object size is 88 bytes and I have 3 brokers and replication factor of 3, then in that case case there will be at least 88 * 3 = 264 bytes reserved in OS for each message object. Now if I have a traffic of around 1 Million messages per seconds then 1 Million x 264 = 264000000 bytes close to ~.24 GB. And with in a minute it will be 14.4 GB and with in a hour it will be 864 GB..

Abhi

abhit011 commented 7 years ago

Most of the times , in my Kafka Consumer , I get the below errors in logs...

DEBUG Consume failed: Broker: Unknown member LOG-5-FAIL: 127.0.0.1:9094/bootstrap: Connection closed LOG-5-FAIL: 127.0.0.1:9093/bootstrap: Connection closed LOG-5-FAIL: 127.0.0.1:9095/bootstrap: Connection closed

When the above error comes in my connection closed, i need to restart the Kafka Consumer :(

Kafka broker log (Why there are exceptions, shown) [2016-09-28 13:26:44,895] INFO [Group Metadata Manager on Broker 1]: Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.GroupMetadataManager) [2016-09-28 13:27:14,684] INFO Scheduling log segment 0 for log 06-0 for deletion. (kafka.log.Log) [2016-09-28 13:27:14,686] ERROR Uncaught exception in scheduled task 'kafka-log-retention' (kafka.utils.KafkaScheduler) kafka.common.KafkaStorageException: Failed to change the log file suffix from to .deleted for log segment 0 at kafka.log.LogSegment.kafkaStorageException$1(LogSegment.scala:263) at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:265) at kafka.log.Log.kafka$log$Log$$asyncDeleteSegment(Log.scala:832) at kafka.log.Log.kafka$log$Log$$deleteSegment(Log.scala:823) at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:579) at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:579) at scala.collection.immutable.List.foreach(List.scala:381) at kafka.log.Log.deleteOldSegments(Log.scala:579) at kafka.log.LogManager.kafka$log$LogManager$$cleanupExpiredSegments(LogManager.scala:427) at kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:458) at kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:456) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732) at kafka.log.LogManager.cleanupLogs(LogManager.scala:456) at kafka.log.LogManager$$anonfun$startup$1.apply$mcV$sp(LogManager.scala:192) at kafka.utils.KafkaScheduler$$anonfun$1.apply$mcV$sp(KafkaScheduler.scala:110) at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:56) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.nio.file.FileSystemException: C:kafka_2.11-0.10.0.1kafka-logs1\06-0\00000000000000000000.log -> C:kafka_2.11-0.10.0.1kafka-logs1\06-0\00000000000000000000.log.deleted: The process cannot access the file because it is being used by another process.

edenhill commented 7 years ago

These are general Kafka questions and not directly related to librdkafka.

Look for log.retention here: http://kafka.apache.org/documentation.html#brokerconfigs

And regarding the broker crash it seems like two processes are accessing the same file:

Caused by: java.nio.file.FileSystemException: C:kafka_2.11-0.10.0.1kafka-logs1\06-0\00000000000000000000.log -> C:kafka_2.11-0.10.0.1kafka-logs1\06-0\00000000000000000000.log.deleted: The process cannot access the file because it is being used by another process.

abhit011 commented 7 years ago

Oh yeh its like log.retention.hours and log.retention.minutes ..

But how to deal with two processes accessing the same file ?

edenhill commented 7 years ago

Make sure you specify different log.dirs for different kafka brokers.

abhit011 commented 7 years ago

its already different I have three brokers and their log directories are kafka_2.11-0.10.0.1kafka-logs, kafka_2.11-0.10.0.1kafka-logs1 and kafka_2.11-0.10.0.1kafka-logs2

edenhill commented 7 years ago

Okay, I'm sorry but troubleshooting Kafka broker operations is outside the scope of librdkafka open source support.

abhit011 commented 7 years ago

is this librdkafka error coming because of broker ?

%5|1475052645.108|FAIL|rdkafka#producer-1| 127.0.0.1:9093/bootstrap: Connection closed %3|1475052645.108|ERROR|rdkafka#producer-1| 127.0.0.1:9093/bootstrap: Connection closed %5|1475052645.208|FAIL|rdkafka#producer-1| 127.0.0.1:9094/bootstrap: Connection closed %3|1475052645.208|ERROR|rdkafka#producer-1| 127.0.0.1:9094/bootstrap: Connection closed %5|1475052944.207|FAIL|rdkafka#producer-1| machine:9094/1: Connection closed %3|1475052944.207|ERROR|rdkafka#producer-1| machine:9094/1: Connection closed %5|1475053101.840|FAIL|rdkafka#producer-1| machine:9093/0: Connection closed %3|1475053101.871|ERROR|rdkafka#producer-1| machine:9093/0: Connection closed %5|1475053101.841|FAIL|rdkafka#producer-1| machine:9095/2: Connection closed %3|1475053101.871|ERROR|rdkafka#producer-1| machine:9095/2: Connection closed %5|1475053101.841|FAIL|rdkafka#producer-1| machine:9094/1: Connection closed %3|1475053101.898|ERROR|rdkafka#producer-1| machine:9094/1: Connection closed %3|1475053102.007|FAIL|rdkafka#producer-1| machine:9094/1: Failed to resolve 'machine:9094': The requested name is valid, but no data of the requested type was found. %3|1475053102.007|FAIL|rdkafka#producer-1| machine:9095/2: Failed to resolve 'machine:9095': The requested name is valid, but no data of the requested type was found. %3|1475053102.008|FAIL|rdkafka#producer-1| machine:9093/0: Failed to resolve 'machine:9093': The requested name is valid, but no data of the requested type was found. %3|1475053102.008|ERROR|rdkafka#producer-1| machine:9094/1: Failed to resolve 'machine:9094': The requested name is valid, but no data of the requested type was found. %3|1475053102.008|ERROR|rdkafka#producer-1| machine:9095/2: Failed to resolve 'machine:9095': The requested name is valid, but no data of the requested type was found. %3|1475053102.008|ERROR|rdkafka#producer-1| machine:9093/0: Failed to resolve 'machine:9093': The requested name is valid, but no data of the requested type was found. %5|1475053123.346|FAIL|rdkafka#producer-1| machine:9093/0: Connection closed %5|1475053123.346|FAIL|rdkafka#producer-1| machine:9094/1: Connection closed %3|1475053123.365|ERROR|rdkafka#producer-1| machine:9093/0: Connection closed %3|1475053123.366|ERROR|rdkafka#producer-1| machine:9094/1: Connection closed %5|1475053702.036|FAIL|rdkafka#producer-1| 127.0.0.1:9095/bootstrap: Connection closed %3|1475053702.036|ERROR|rdkafka#producer-1| 127.0.0.1:9095/bootstrap: Connection closed %5|1475053723.469|FAIL|rdkafka#producer-1| 127.0.0.1:9093/bootstrap: Connection closed %3|1475053723.469|ERROR|rdkafka#producer-1| 127.0.0.1:9093/bootstrap: Connection closed %5|1475053723.482|FAIL|rdkafka#producer-1| machine:9094/1: Connection closed %3|1475053723.482|ERROR|rdkafka#producer-1| machine:9094/1: Connection closed %5|1475053723.507|FAIL|rdkafka#producer-1| machine:9093/0: Connection closed %3|1475053723.507|ERROR|rdkafka#producer-1| machine:9093/0: Connection closed %5|1475054323.717|FAIL|rdkafka#producer-1| 127.0.0.1:9094/bootstrap: Connection closed

edenhill commented 7 years ago

Most likely yes, check the broker logs for why it disconnects the clients.

abhit011 commented 7 years ago

I have deleted all the logs, and then started fresh again , 3nodes, 3 partitions, 2 replication factor and again it gives same error

I checked the server.log kafka.common.KafkaStorageException: Failed to change the log file suffix from to .deleted for log segment 0 at kafka.log.LogSegment.kafkaStorageException$1(LogSegment.scala:263) at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:265) at kafka.log.Log.kafka$log$Log$$asyncDeleteSegment(Log.scala:832) at kafka.log.Log.kafka$log$Log$$deleteSegment(Log.scala:823) at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:579) at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:579) at scala.collection.immutable.List.foreach(List.scala:381) at kafka.log.Log.deleteOldSegments(Log.scala:579) at kafka.log.LogManager.kafka$log$LogManager$$cleanupExpiredSegments(LogManager.scala:427) at kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:458) at kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:456) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732) at kafka.log.LogManager.cleanupLogs(LogManager.scala:456) at kafka.log.LogManager$$anonfun$startup$1.apply$mcV$sp(LogManager.scala:192) at kafka.utils.KafkaScheduler$$anonfun$1.apply$mcV$sp(KafkaScheduler.scala:110) at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:56) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.nio.file.FileSystemException: C:kafka_2.11-0.10.0.1kafka-logs2\07-2\00000000000000000000.log -> C:kafka_2.11-0.10.0.1kafka-logs2\07-2\00000000000000000000.log.deleted: The process cannot access the file because it is being used by another process.

at sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:86)
at sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:97)
at sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:387)
at sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:287)
at java.nio.file.Files.move(Files.java:1395)
at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:670)
at kafka.log.FileMessageSet.renameTo(FileMessageSet.scala:370)
... 27 more
Suppressed: java.nio.file.FileSystemException: C:kafka_2.11-0.10.0.1kafka-logs2\07-2\00000000000000000000.log -> C:kafka_2.11-0.10.0.1kafka-logs2\07-2\00000000000000000000.log.deleted: The process cannot access the file because it is being used by another process.
edenhill commented 7 years ago

I'm sorry but I'm not able to provide Kafka broker support.

abhit011 commented 7 years ago

But where i should ask then ?

edenhill commented 7 years ago

Try the kafka-users mailing list

abhit011 commented 7 years ago

Do I need to keep on polling on Consumer side ? I mean any heartbeats ???

abhit011 commented 7 years ago

I still could not get why in between i get this message at my Kafka Consumer

[2016-09-30 17:40:44.955] EventCb CRIT LOG-5-FAIL : 127.0.0.1:9094/bootstrap: Connection closed [2016-09-30 17:40:44.957] EventCb NOTE ERROR (Local: Broker transport failure): 127.0.0.1:9094/bootstrap: Connection closed

Where as both Kafka Producer and Kafka Consumer keep on producing and consuming fine

abhit011 commented 7 years ago

Lot of people who are eventually using Kakfa on windows are facing this issue

https://issues.apache.org/jira/browse/KAFKA-1194 https://issues.apache.org/jira/browse/KAFKA-2170

edenhill commented 7 years ago

You don't need to poll to maintain heartbeats, librdkafka does that in the background.

edenhill commented 7 years ago

Check broker logs to see why connection was closed, probably just the idle connection cleaner

abhit011 commented 7 years ago

controller.log or server.log , inside C:\kafka_2.11-0.10.0.1\logs