daroczig / AWR.Kinesis

Amazon Kinesis Consumer Application from R for Stream Processing
GNU Affero General Public License v3.0
4 stars 1 forks source link

Corrupted protocol when using consumer and producer in one session #1

Open dselivanov opened 6 years ago

dselivanov commented 6 years ago

It seems 'Execution halted' from R's stderr stream corrupts KCL protocol when using consumer and producer in one session.

I'm trying to implement read_stream->transform->write_stream pattern with AWR.Kinesis.

library(futile.logger)
library(AWR.Kinesis)
flog.threshold(INFO)

kinesis_consumer(
  processRecords = function(records) {
    flog.info('Received %d records from Kinesis', nrow(records))
    for(record in records$data) 
      kinesis_put_record("my-stream", record, "partition-1")
  },
  logfile = "log.txt"
)

At some point of time I see

Jun 26, 2018 6:16:09 PM com.amazonaws.services.kinesis.multilang.DrainChildSTDERRTask handleLine SEVERE: Received error line from subprocess [Execution halted] for shard shardId-000000000000 Execution halted

Followed by:

Click to expand > Jun 26, 2018 6:16:09 PM com.amazonaws.services.kinesis.multilang.LineReaderTask call INFO: Stopping: Reading STDERR for shardId-000000000000 Jun 26, 2018 6:16:27 PM com.amazonaws.services.kinesis.multilang.MessageWriter writeMessage INFO: Writing ProcessRecordsMessage to child process for shard shardId-000000000000 Jun 26, 2018 6:16:27 PM com.amazonaws.services.kinesis.multilang.LineReaderTask call INFO: Starting: Reading next message from STDIN for shardId-000000000000 Jun 26, 2018 6:16:27 PM com.amazonaws.services.kinesis.multilang.LineReaderTask call INFO: Stopping: Reading next message from STDIN for shardId-000000000000 Jun 26, 2018 6:16:27 PM com.amazonaws.services.kinesis.multilang.MultiLangProtocol waitForStatusMessage SEVERE: Failed to get status message for processRecords action for shard shardId-000000000000 java.util.concurrent.ExecutionException: java.lang.RuntimeException: Reached end of STDIN of child process for shard shardId-000000000000 so won't be able to return a message. at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at com.amazonaws.services.kinesis.multilang.MultiLangProtocol.waitForStatusMessage(MultiLangProtocol.java:152) at com.amazonaws.services.kinesis.multilang.MultiLangProtocol.waitForStatusMessage(MultiLangProtocol.java:120) at com.amazonaws.services.kinesis.multilang.MultiLangProtocol.processRecords(MultiLangProtocol.java:86) at com.amazonaws.services.kinesis.multilang.MultiLangRecordProcessor.processRecords(MultiLangRecordProcessor.java:100) at com.amazonaws.services.kinesis.clientlibrary.lib.worker.ProcessTask.call(ProcessTask.java:176) at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:49) at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:24) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.RuntimeException: Reached end of STDIN of child process for shard shardId-000000000000 so won't be able to return a message. at com.amazonaws.services.kinesis.multilang.GetNextMessageTask.returnAfterEndOfInput(GetNextMessageTask.java:84) at com.amazonaws.services.kinesis.multilang.GetNextMessageTask.returnAfterEndOfInput(GetNextMessageTask.java:31) at com.amazonaws.services.kinesis.multilang.LineReaderTask.call(LineReaderTask.java:70) ... 4 more Jun 26, 2018 6:16:27 PM com.amazonaws.services.kinesis.multilang.MultiLangRecordProcessor stopProcessing SEVERE: Encountered an error while trying to process records java.lang.RuntimeException: Child process failed to process records at com.amazonaws.services.kinesis.multilang.MultiLangRecordProcessor.processRecords(MultiLangRecordProcessor.java:101) at com.amazonaws.services.kinesis.clientlibrary.lib.worker.ProcessTask.call(ProcessTask.java:176) at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:49) at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:24) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Jun 26, 2018 6:16:27 PM com.amazonaws.services.kinesis.multilang.LineReaderTask call INFO: Starting: Draining STDOUT for shardId-000000000000 Jun 26, 2018 6:16:27 PM com.amazonaws.services.kinesis.multilang.LineReaderTask call INFO: Stopping: Draining STDOUT for shardId-000000000000 Jun 26, 2018 6:16:27 PM com.amazonaws.services.kinesis.multilang.MultiLangRecordProcessor childProcessShutdownSequence INFO: Child process exited with value: 1

It looks like kinesis_put_record corrupts protocol (by creating another kinesis client in the same session?) Will it make sense to export more low-level interface to kinesis? I can send PR

daroczig commented 6 years ago

Did you try converting record to eg json before passing to the kinesis out fn?

dselivanov commented 6 years ago

Yes, I'm passing string (converting to json). Such app works some time(receiving and sending events), but eventually crash.

ср, 27 июн. 2018 г., 11:55 Gergely Daróczi notifications@github.com:

Did you try converting record to eg json before passing to the kinesis out fn?

— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub https://github.com/daroczig/AWR.Kinesis/issues/1#issuecomment-400578279, or mute the thread https://github.com/notifications/unsubscribe-auth/AE4u3Sy-jDOZjNtzUbuS3J17uIpFSNZ_ks5uAzqDgaJpZM4U5I3w .

dselivanov commented 6 years ago

This seems wrong - according to rJava docs .jbyte should be used for a single scalar byte. https://github.com/daroczig/AWR.Kinesis/blob/13fce8fea36d7e295adaaa72a98ac83b3575fadd/R/put.R#L18

daroczig commented 6 years ago

I think that's fine -- converting the string to raw first:

> .jbyte(charToRaw('foobar'))
An object of class "jbyte"
[1] 102 111 111  98  97 114
dselivanov commented 6 years ago

I've investigated quite a lot on what causes segfaults of "put records" api call. It is rJava and kinesis client, but I can't figure out what exactly and when it happens. Will keep updated.