cybercongress / cyber-search

🚀 Toolchain for transactions parsing and processing
https://cybercongress.github.io/cyber-search/
Other
35 stars 14 forks source link

Error sending message to kafka #32

Closed hleb-albau closed 6 years ago

hleb-albau commented 6 years ago
10:59:06.906 [RxCachedThreadScheduler-1] INFO fund.cyber.pump.ChainPump - Processing ETHEREUM_CLASSIC 2811165 block
10:59:06.984 [RxCachedThreadScheduler-1] INFO fund.cyber.pump.kafka.KafkaStorage - Initializing kafka storage producer
10:59:07.510 [RxCachedThreadScheduler-1] INFO fund.cyber.pump.kafka.KafkaStorage - Initializing kafka storage producer completed
10:59:08.765 [RxCachedThreadScheduler-1] ERROR fund.cyber.pump.ChainPump - Error during processing ETHEREUM_CLASSIC stream
java.lang.RuntimeException: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.RecordTooLargeException: The message is 1049170 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.
    at fund.cyber.pump.kafka.KafkaStorageAction.store(KafkaStorageActionTemplate.kt:25) ~[pumps.jar:na]
    at fund.cyber.pump.ChainPump$initializeStreamProcessing$3.accept(ChainPump.kt:81) ~[pumps.jar:na]
    at fund.cyber.pump.ChainPump$initializeStreamProcessing$3.accept(ChainPump.kt:11) ~[pumps.jar:na]
    at io.reactivex.internal.subscribers.LambdaSubscriber.onNext(LambdaSubscriber.java:65) ~[rxjava-2.1.6.jar:na]
    at io.reactivex.internal.operators.flowable.FlowableSkipWhile$SkipWhileSubscriber.onNext(FlowableSkipWhile.java:71) [rxjava-2.1.6.jar:na]
    at io.reactivex.internal.operators.flowable.FlowableScan$ScanSubscriber.onNext(FlowableScan.java:83) [rxjava-2.1.6.jar:na]
    at io.reactivex.internal.operators.flowable.FlowableObserveOn$ObserveOnSubscriber.runAsync(FlowableObserveOn.java:400) [rxjava-2.1.6.jar:na]
    at io.reactivex.internal.operators.flowable.FlowableObserveOn$BaseObserveOnSubscriber.run(FlowableObserveOn.java:176) [rxjava-2.1.6.jar:na]
    at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:61) [rxjava-2.1.6.jar:na]
    at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:52) [rxjava-2.1.6.jar:na]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_131]
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) [na:1.8.0_131]
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) [na:1.8.0_131]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_131]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_131]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_131]
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.RecordTooLargeException: The message is 1049170 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.
    at com.google.common.util.concurrent.AbstractFuture.getDoneValue(AbstractFuture.java:476) ~[guava-19.0.jar:na]
    at com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:435) ~[guava-19.0.jar:na]
    at com.google.common.util.concurrent.AbstractFuture$TrustedFuture.get(AbstractFuture.java:79) ~[guava-19.0.jar:na]
    at fund.cyber.node.common.ConcurrencyKt.awaitAll(Concurrency.kt:17) ~[common.jar:na]
    at fund.cyber.pump.kafka.KafkaStorageAction.store(KafkaStorageActionTemplate.kt:21) ~[pumps.jar:na]
    ... 15 common frames omitted
Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The message is 1049170 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration. 

Right now we store 3 entities to kafka. What entity is serialized to 1mb? May be we should add transparient field for some field-collection?

abitrolly commented 6 years ago

Entities - block, transaction and uncle. We need to calculate size for these lists for each block.

abitrolly commented 6 years ago

The problem is that we've getting message from chainpump that is bigger than 1Mb. And it is recommendation for Kafka that messages should not be that big.

chainpump parses ETH block and sends it to Kafka.

cyborgshead commented 6 years ago

@YodaMike Please provide general description of this task and way to fix them

hleb-albau commented 6 years ago

Some transactions with smart contract code has size more than 1mb. So, all we can do is just adjust kafka max message size to more than 1mb.

abitrolly commented 6 years ago

What query can I run over standard Ethereum API to find all transactions with >1Mb size?

On 11 March 2018 at 09:52, Hleb Albau notifications@github.com wrote:

Some transactions with smart contract code has size more than 1mb. So, all we can do is just adjust kafka max message size to more than 1mb.

— You are receiving this because you commented. Reply to this email directly, view it on GitHub https://github.com/cyberFund/cyber-search/issues/32#issuecomment-372093890, or mute the thread https://github.com/notifications/unsubscribe-auth/AIX9M7bL_ROP2fU_QAWY4Bmx5iNW8ookks5tdMnKgaJpZM4RIPCY .

-- Anatoli Babenia

+1 (650) 605-3365 +375 (29) 320-4241

hleb-albau commented 6 years ago

@abitrolly check it here http://cyberfund.io/cybernode/chains/ethereum/

hleb-albau commented 6 years ago

Set max message size to 15mb. Config Example: https://stackoverflow.com/questions/21020347/how-can-i-send-large-messages-with-kafka-over-15mb