stratosphere / stratosphere

Stratosphere is now Apache Flink.
https://github.com/apache/incubator-flink
Apache License 2.0
197 stars 84 forks source link

Nephele "loses" data packets #256

Open matpeters opened 10 years ago

matpeters commented 10 years ago

A pact job dies with the following message: "An error occurred in the channel: Expected data packet 25 but received 27"

The error occurred both in version 0.21 and with the current 0.4-snapshot. Apparently only with bigger data sizes. 15GB Input data worked fine, 22GB crashed. I sent the job, that caused the error to Stephan Ewen.

Full stack trace: 15:32:11,639 ERROR eu.stratosphere.pact.runtime.task.RegularPactTask - Error in PACT code: Join tweets and dates (2/4) 15:32:11,640 ERROR eu.stratosphere.pact.runtime.task.RegularPactTask - java.io.IOException: An error occurred in the channel: Expected data packet 25 but received 27 java.io.IOException: An error occurred in the channel: Expected data packet 25 but received 27 at eu.stratosphere.nephele.io.channels.bytebuffered.AbstractByteBufferedInputChannel.isClosed(AbstractByteBufferedInputChannel.java:144) at eu.stratosphere.nephele.io.RuntimeInputGate.isClosed(RuntimeInputGate.java:261) at eu.stratosphere.nephele.io.RuntimeInputGate.readRecord(RuntimeInputGate.java:182) at eu.stratosphere.nephele.io.MutableRecordReader.next(MutableRecordReader.java:80) at eu.stratosphere.pact.runtime.task.util.PactRecordNepheleReaderIterator.next(PactRecordNepheleReaderIterator.java:62) at eu.stratosphere.pact.runtime.task.util.PactRecordNepheleReaderIterator.next(PactRecordNepheleReaderIterator.java:27) at eu.stratosphere.pact.runtime.hash.MutableHashTable$ProbeIterator.next(MutableHashTable.java:1525) at eu.stratosphere.pact.runtime.hash.MutableHashTable.processProbeIter(MutableHashTable.java:450) at eu.stratosphere.pact.runtime.hash.MutableHashTable.nextRecord(MutableHashTable.java:536) at eu.stratosphere.pact.runtime.hash.BuildFirstHashMatchIterator.callWithNextKey(BuildFirstHashMatchIterator.java:116) at eu.stratosphere.pact.runtime.task.MatchDriver.run(MatchDriver.java:164) at eu.stratosphere.pact.runtime.task.RegularPactTask.run(RegularPactTask.java:372) at eu.stratosphere.pact.runtime.task.RegularPactTask.invoke(RegularPactTask.java:291) at eu.stratosphere.nephele.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:344) at java.lang.Thread.run(Thread.java:662) Caused by: java.io.IOException: Expected data packet 25 but received 27 at eu.stratosphere.nephele.taskmanager.runtime.RuntimeInputChannelContext.queueTransferEnvelope(RuntimeInputChannelContext.java:148) at eu.stratosphere.nephele.taskmanager.bytebuffered.ByteBufferedChannelManager.processEnvelopeWithBuffer(ByteBufferedChannelManager.java:365) at eu.stratosphere.nephele.taskmanager.bytebuffered.ByteBufferedChannelManager.processEnvelope(ByteBufferedChannelManager.java:331) at eu.stratosphere.nephele.taskmanager.bytebuffered.ByteBufferedChannelManager.processEnvelopeFromNetwork(ByteBufferedChannelManager.java:644) at eu.stratosphere.nephele.taskmanager.bytebuffered.IncomingConnection.read(IncomingConnection.java:100) at eu.stratosphere.nephele.taskmanager.bytebuffered.IncomingConnectionThread.doRead(IncomingConnectionThread.java:187) at eu.stratosphere.nephele.taskmanager.bytebuffered.IncomingConnectionThread.run(IncomingConnectionThread.java:126) 15:32:12,362 WARN eu.stratosphere.pact.runtime.task.RegularPactTask - Cancelling PACT code: Join tweets and dates (2/4) 15:32:12,362 INFO eu.stratosphere.nephele.execution.ExecutionStateTransition - TM: ExecutionState set from RUNNING to FAILED for task Join tweets and dates (2/4) 15:32:12,362 ERROR eu.stratosphere.nephele.taskmanager.runtime.RuntimeTask - java.io.IOException: An error occurred in the channel: Expected data packet 25 but received 27 at eu.stratosphere.nephele.io.channels.bytebuffered.AbstractByteBufferedInputChannel.isClosed(AbstractByteBufferedInputChannel.java:144) at eu.stratosphere.nephele.io.RuntimeInputGate.isClosed(RuntimeInputGate.java:261) at eu.stratosphere.nephele.io.RuntimeInputGate.readRecord(RuntimeInputGate.java:182) at eu.stratosphere.nephele.io.MutableRecordReader.next(MutableRecordReader.java:80) at eu.stratosphere.pact.runtime.task.util.PactRecordNepheleReaderIterator.next(PactRecordNepheleReaderIterator.java:62) at eu.stratosphere.pact.runtime.task.util.PactRecordNepheleReaderIterator.next(PactRecordNepheleReaderIterator.java:27) at eu.stratosphere.pact.runtime.hash.MutableHashTable$ProbeIterator.next(MutableHashTable.java:1525) at eu.stratosphere.pact.runtime.hash.MutableHashTable.processProbeIter(MutableHashTable.java:450) at eu.stratosphere.pact.runtime.hash.MutableHashTable.nextRecord(MutableHashTable.java:536) at eu.stratosphere.pact.runtime.hash.BuildFirstHashMatchIterator.callWithNextKey(BuildFirstHashMatchIterator.java:116) at eu.stratosphere.pact.runtime.task.MatchDriver.run(MatchDriver.java:164) at eu.stratosphere.pact.runtime.task.RegularPactTask.run(RegularPactTask.java:372) at eu.stratosphere.pact.runtime.task.RegularPactTask.invoke(RegularPactTask.java:291) at eu.stratosphere.nephele.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:344) at java.lang.Thread.run(Thread.java:662) Caused by: java.io.IOException: Expected data packet 25 but received 27 at eu.stratosphere.nephele.taskmanager.runtime.RuntimeInputChannelContext.queueTransferEnvelope(RuntimeInputChannelContext.java:148) at eu.stratosphere.nephele.taskmanager.bytebuffered.ByteBufferedChannelManager.processEnvelopeWithBuffer(ByteBufferedChannelManager.java:365) at eu.stratosphere.nephele.taskmanager.bytebuffered.ByteBufferedChannelManager.processEnvelope(ByteBufferedChannelManager.java:331) at eu.stratosphere.nephele.taskmanager.bytebuffered.ByteBufferedChannelManager.processEnvelopeFromNetwork(ByteBufferedChannelManager.java:644) at eu.stratosphere.nephele.taskmanager.bytebuffered.IncomingConnection.read(IncomingConnection.java:100) at eu.stratosphere.nephele.taskmanager.bytebuffered.IncomingConnectionThread.doRead(IncomingConnectionThread.java:187) at eu.stratosphere.nephele.taskmanager.bytebuffered.IncomingConnectionThread.run(IncomingConnectionThread.java:126)

15:32:12,394 INFO eu.stratosphere.nephele.taskmanager.runtime.RuntimeTask - Canceling Hashtag Polarity Match (2/4)

sscdotopen commented 10 years ago

I might have seen a similar error message some time ago, could it be the erroneous multicast in Nephele?

StephanEwen commented 10 years ago

Nephele multicast is disabled by default in the current version.

I would expect it is faulty logic at channel tear-down (closing).

On Fri, Nov 8, 2013 at 2:03 PM, sscdotopen notifications@github.com wrote:

I might have seen a similar error message some time ago, could it be the erroneous multicast in Nephele?

— Reply to this email directly or view it on GitHubhttps://github.com/stratosphere/stratosphere/issues/256#issuecomment-28061009 .

AHeise commented 10 years ago

I have a similar issue when increasing the input size of a self-cross. With 50k input records it runs fine, but pretty much everything above crashes (reproducible!, same packet numbers on the same input sizes).

The job was not successfully executed: eu.stratosphere.nephele.client.JobExecutionException: java.io.IOException: An error occurred in the channel: Expected data packet 7 but received 8
        at eu.stratosphere.nephele.io.channels.bytebuffered.AbstractByteBufferedInputChannel.isClosed(AbstractByteBufferedInputChannel.java:144)
        at eu.stratosphere.nephele.io.channels.bytebuffered.AbstractByteBufferedInputChannel.readRecord(AbstractByteBufferedInputChannel.java:94)
        at eu.stratosphere.nephele.io.RuntimeInputGate.readRecord(RuntimeInputGate.java:193)
        at eu.stratosphere.nephele.io.MutableRecordReader.next(MutableRecordReader.java:80)
        at eu.stratosphere.pact.runtime.task.util.NepheleReaderIterator.next(NepheleReaderIterator.java:72)
        at eu.stratosphere.pact.runtime.resettable.SpillingResettableMutableObjectIterator.next(SpillingResettableMutableObjectIterator.java:152)
        at eu.stratosphere.pact.runtime.task.CrossDriver.runBlockedOuterFirst(CrossDriver.java:235)
        at eu.stratosphere.pact.runtime.task.CrossDriver.run(CrossDriver.java:165)
        at eu.stratosphere.pact.runtime.task.RegularPactTask.run(RegularPactTask.java:370)
        at eu.stratosphere.pact.runtime.task.RegularPactTask.invoke(RegularPactTask.java:291)
        at eu.stratosphere.nephele.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:344)
        at java.lang.Thread.run(Thread.java:724)
Caused by: java.io.IOException: Expected data packet 7 but received 8
        at eu.stratosphere.nephele.taskmanager.runtime.RuntimeInputChannelContext.queueTransferEnvelope(RuntimeInputChannelContext.java:148)
        at eu.stratosphere.nephele.taskmanager.bytebuffered.ByteBufferedChannelManager.processEnvelopeWithBuffer(ByteBufferedChannelManager.java:365)
        at eu.stratosphere.nephele.taskmanager.bytebuffered.ByteBufferedChannelManager.processEnvelope(ByteBufferedChannelManager.java:331)
        at eu.stratosphere.nephele.taskmanager.bytebuffered.ByteBufferedChannelManager.processEnvelopeFromNetwork(ByteBufferedChannelManager.java:644)
        at eu.stratosphere.nephele.taskmanager.bytebuffered.IncomingConnection.read(IncomingConnection.java:100)
        at eu.stratosphere.nephele.taskmanager.bytebuffered.IncomingConnectionThread.doRead(IncomingConnectionThread.java:187)
        at eu.stratosphere.nephele.taskmanager.bytebuffered.IncomingConnectionThread.run(IncomingConnectionThread.java:126)

        at eu.stratosphere.nephele.client.JobClient.submitJobAndWait(JobClient.java:353)
        at eu.stratosphere.sopremo.server.SopremoExecutionThread.executePlan(SopremoExecutionThread.java:152)
        at eu.stratosphere.sopremo.server.SopremoExecutionThread.processPlan(SopremoExecutionThread.java:71)
        at eu.stratosphere.sopremo.server.SopremoExecutionThread.run(SopremoExecutionThread.java:64)
        at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
        at java.util.concurrent.FutureTask$Sync.innerRun(Unknown Source)
        at java.util.concurrent.FutureTask.run(Unknown Source)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(Unknown Source)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
        at java.lang.Thread.run(Unknown Source)
StephanEwen commented 10 years ago

Good that it is reproducable. Can you send me the job and the data and the other parameters (DOP, etc) ? I will make this a priority issue...

mathiaspet commented 10 years ago

Hi,

is there a fix for this issue? I just saw the bug on a small data set with the current release.

StephanEwen commented 10 years ago

We are in the process of reworking the network stack for fault tolerance. I think it will be fixed along those lines.

Can you provide a test that sort of reliably reproduces the problem?

mathiaspet commented 10 years ago

Hi,

increasing the number of nw buffers helped. The job finished. I have just a test program from one of the PhD students from Trento, Matteo Lissandrini. It is located here: https://github.com/kuzeko/Tweets-Analyser.

I can package and link the data set that he used. In general, this seems to happen if a lot of messages about task state changes get sent.