onyx-platform / onyx

Distributed, masterless, high performance, fault tolerant data processing
http://www.onyxplatform.org
Eclipse Public License 1.0
2.05k stars 204 forks source link

Correlation ID Alignment Error on Insufficient Segment Term Buffer Size #831

Open alexanderkiel opened 6 years ago

alexanderkiel commented 6 years ago

I have a job which transfers quite big segments (can't easily say how big). During the upgrade vom v0.9.15 to v0.12.0-rc1, I got an exception with the message "Lost and regained image with the same session-id and different correlation-id.". The exception handler registered at the lifecycle showed that this exception was thrown during a read-batch lifecycle at a task receiving big segments.

Adding :onyx.messaging/term-buffer-size.segment 16777216 to the peer config (as suggested in the change logs) solved my issue.

I would like to see a more concrete error message due to insufficient segment term buffer size.

lbradstreet commented 6 years ago

Thank you for the report. I'm surprised that that is the error that you saw for this case, as we do have a more specific error that reports that the segment size is too big. We need to improve the error messages around the Aeron exception handling in other respects soon.

I'll have some improvements here soon, hopefully before 0.12 final is released.

alexanderkiel commented 6 years ago

Thanks, I'll the the next RC's when available. I also use onyx-peer-http-query and I don't see a metric about segment sizes. Having such a metric would be good to investigate such issues.

lbradstreet commented 6 years ago

I agree. We actually track the cumulative bytes transferred, so it would not be hard to do this as well.

lbradstreet commented 6 years ago

Do you have the full onyx.log for this issue? I'm having a little trouble tracking it down, as I see this exception on the publishing peer rather than the subscribing peer.

                    ^[[33monyx.peer.task-lifecycle/^[[1;33mwrite-batch^[[m  ^[[32m     task_lifecycle.clj:  160^[[m                                                                                                                                                                      
^[[33monyx.plugin.messaging-output.MessengerOutput/^[[1;33mwrite-batch^[[m  ^[[32m   messaging_output.clj:  100^[[m                                                                                                                                                                      
                                    ^[[33mclojure.core/^[[1;33mex-info^[[m  ^[[32m               core.clj: 4617^[[m                                                                                                                                                                      
^[[1;31mclojure.lang.ExceptionInfo^[[m: ^[[3mAeron buffer size is not big enough to contain the segment.                                                                                                                                                                                 
                                                                     Please increase the segments term buffer length.                                                                                                                                                                    
                                                                     This can be performed via the peer-config `:onyx.messaging/term-buffer-size.segment`. Term buffer size must be at least size = max-segment-size * 8.^[[m                                                            
^[[1;31mclojure.lang.ExceptionInfo^[[m: ^[[3mCaught exception inside task lifecycle :lifecycle/write-batch. Rebooting the task. -> Exception type: clojure.lang.ExceptionInfo. Exception message: Aeron buffer size is not big enough to contain the segment.                            
                                                                     Please increase the segments term buffer length.                                                                                                                                                                    
                                                                     This can be performed via the peer-config `:onyx.messaging/term-buffer-size.segment`. Term buffer size must be at least size = max-segment-size * 8.^[[m                                                            
alexanderkiel commented 6 years ago

Here is the full exception:

#error {
 :cause "Lost and regained image with the same session-id and different correlation-id."
 :data {:correlation-id 37, :original-exception :clojure.lang.ExceptionInfo}
 :via
 [{:type clojure.lang.ExceptionInfo
   :message "Lost and regained image with the same session-id and different correlation-id."
   :data {:correlation-id 37, :original-exception :clojure.lang.ExceptionInfo}
   :at [clojure.core$ex_info invokeStatic "core.clj" 4725]}]
 :trace
 [[clojure.core$ex_info invokeStatic "core.clj" 4725]
  [clojure.core$ex_info invoke "core.clj" 4725]
  [onyx.messaging.aeron.subscriber$check_correlation_id_alignment invokeStatic "subscriber.clj" 77]
  [onyx.messaging.aeron.subscriber$check_correlation_id_alignment invoke "subscriber.clj" 75]
  [onyx.messaging.aeron.subscriber.Subscriber onFragment "subscriber.clj" 265]
  [io.aeron.ControlledFragmentAssembler onFragment "ControlledFragmentAssembler.java" 141]
  [io.aeron.Image controlledPoll "Image.java" 332]
  [io.aeron.Subscription controlledPoll "Subscription.java" 250]
  [onyx.messaging.aeron.subscriber.Subscriber poll_BANG_ "subscriber.clj" 207]
  [onyx.messaging.aeron.messenger.AeronMessenger poll "messenger.clj" 152]
  [onyx.peer.read_batch$read_function_batch invokeStatic "read_batch.clj" 20]
  [onyx.peer.read_batch$read_function_batch invokePrim "read_batch.clj" -1]
  [onyx.peer.task_lifecycle$build_read_batch$fn__28544 invoke "task_lifecycle.clj" 640]
  [onyx.peer.task_lifecycle$wrap_lifecycle_metrics$fn__28676 invoke "task_lifecycle.clj" 1076]
  [onyx.peer.task_lifecycle.TaskStateMachine exec "task_lifecycle.clj" 1049]
  [onyx.peer.task_lifecycle$iteration invokeStatic "task_lifecycle.clj" 524]
  [onyx.peer.task_lifecycle$iteration invoke "task_lifecycle.clj" 521]
  [onyx.peer.task_lifecycle$run_task_lifecycle_BANG_ invokeStatic "task_lifecycle.clj" 542]
  [onyx.peer.task_lifecycle$run_task_lifecycle_BANG_ invoke "task_lifecycle.clj" 532]
  [onyx.peer.task_lifecycle$start_task_lifecycle_BANG_$fn__28698 invoke "task_lifecycle.clj" 1142]
  [clojure.core.async$thread_call$fn__7640 invoke "async.clj" 442]
  [clojure.lang.AFn run "AFn.java" 22]
  [java.util.concurrent.ThreadPoolExecutor runWorker "ThreadPoolExecutor.java" 1149]
  [java.util.concurrent.ThreadPoolExecutor$Worker run "ThreadPoolExecutor.java" 624]
  [java.lang.Thread run "Thread.java" 748]]}

Both the sending and the receiving peers run with the standard buffer settings from v0.12.0-rc1. I get the error on the receiving side. I run 2 sending peers and 17 receiving peers.

Sending catalog entry:

{:onyx/fn :...,
 :onyx/type :function,
 :onyx/name :...,
 :onyx/n-peers 2,
 :onyx/required-tags [:...]
 :onyx/batch-size 1}

Receiving catalog entry:

{:onyx/name :...
 :onyx/fn :...
 :onyx/plugin :onyx.peer.function/function
 :onyx/medium :function
 :onyx/type :output
 :onyx/group-by-fn :...
 :onyx/required-tags [:...]
 :onyx/min-peers 2
 :onyx/flux-policy :kill
 :onyx/batch-size 10
 :onyx/batch-timeout 1000}

I run the media driver in a separate JVM. With matching version 1.6.0.