aws / aws-greengrass-core-sdk-js

Greengrass Nodejs SDK
https://aws.github.io/aws-greengrass-core-sdk-js/
Apache License 2.0
19 stars 18 forks source link

StreamManagerClient.readMessages always returns empty Message #12

Closed MedericPixium closed 4 years ago

MedericPixium commented 4 years ago

We have been deploying lambda functions in Javascript (Node.js 12.x) Writing gives us no issue as we get the OK result when appending to the stream.

However when trying to read from the stream (with js code below) we get only empty messages. We have tried the python SDK and there is no issue. However we would like to use the Javascript as the rest of our functions are using Javascript as well.

Extract from Read function log:

[2020-09-16T16:27:30.744+08:00][INFO]-Received ReadMessagesResponse from server
[2020-09-16T16:27:30.744+08:00][INFO]-Message {}
[2020-09-16T16:27:30.744+08:00][INFO]-Message {}
[2020-09-16T16:27:30.744+08:00][INFO]-Message {}
[2020-09-16T16:27:30.744+08:00][INFO]-Message {}
[2020-09-16T16:27:30.744+08:00][INFO]-Message {}

Another issue we get (not sure if it's linked) is:

[2020-09-16T16:21:12.113+08:00][ERROR]-(node:1) UnhandledPromiseRejectionWarning: Error: connect ECONNREFUSED 127.0.0.1:8088
[2020-09-16T16:21:12.113+08:00][ERROR]-    at TCPConnectWrap.afterConnect [as oncomplete] (net.js:1141:16)
[2020-09-16T16:21:12.113+08:00][ERROR]-(node:1) UnhandledPromiseRejectionWarning: Unhandled promise rejection. This error originated either by throwing inside of an async function without a catch block, or by rejecting a promise which was not handled with .catch(). To terminate the node process on unhandled promise rejection, use the CLI flag `--unhandled-rejections=strict` (see https://nodejs.org/api/cli.html#cli_unhandled_rejections_mode). (rejection id: 1)
[2020-09-16T16:21:12.113+08:00][ERROR]-(node:1) [DEP0018] DeprecationWarning: Unhandled promise rejections are deprecated. In the future, promise rejections that are not handled will terminate the Node.js process with a non-zero exit code.
[2020-09-16T16:21:12.114+08:00][INFO]-Error: connect ECONNREFUSED 127.0.0.1:8088
[2020-09-16T16:21:12.114+08:00][INFO]-    at TCPConnectWrap.afterConnect [as oncomplete] (net.js:1141:16) {
[2020-09-16T16:21:12.114+08:00][INFO]-  errno: 'ECONNREFUSED',
[2020-09-16T16:21:12.114+08:00][INFO]-  code: 'ECONNREFUSED',
[2020-09-16T16:21:12.114+08:00][INFO]-  syscall: 'connect',
[2020-09-16T16:21:12.114+08:00][INFO]-  address: '127.0.0.1',
[2020-09-16T16:21:12.114+08:00][INFO]-  port: 8088
[2020-09-16T16:21:12.114+08:00][INFO]-}

This happens with every function that tries to read or write to the stream before it has finished initialization.

GreenGrass Version: 1.10.2 GreenGrass JS SDK Version: 1.6.1

This code is taken from the following tutorial:

https://docs.aws.amazon.com/greengrass/latest/developerguide/work-with-streams.html

We have also done the following test:

Functions:

This works with no issue.

Functions:

The JavaScript reads empty objects per the referred issue above.

Non Functional JS Code

Create Stream

const {
    StreamManagerClient,
    MessageStreamDefinition,
    StrategyOnFull,
    ResourceNotFoundException,
} = require('aws-greengrass-core-sdk').StreamManager;

const STREAM_NAME = 'SomeStream';

const c = new StreamManagerClient();
c.onConnected(async() => {
    try {
        await c.deleteMessageStream(STREAM_NAME);
    }
    catch (e) {
        // Rethrow the error if it wasn't the expected error
        if (!(e instanceof ResourceNotFoundException)) {
            throw e;
        }
    }
    try {
        await c.createMessageStream(
            new MessageStreamDefinition()
            .withName(STREAM_NAME)
            .withStrategyOnFull(StrategyOnFull.OverwriteOldestData)
        );
    }
    catch (e) {
     console.log("error creating message stream");
    }
});
c.onError((err) => {
    console.log(err);
});

module.exports.handler = function handler() {
    return '';
};

Write to Stream

const {
    StreamManagerClient,
    ReadMessagesOptions,
    ExportDefinition,
    KinesisConfig,
    MessageStreamDefinition,
    StrategyOnFull,
    ResourceNotFoundException,
} = require('aws-greengrass-core-sdk').StreamManager;
const util = require("util");
const STREAM_NAME = 'SomeStream';
const c = new StreamManagerClient();
c.onConnected(async() => {
    const interval = setInterval(async() => {
        try {
            console.log("writing to stream");
            let utf8Encode = new util.TextEncoder();
            const payload = utf8Encode.encode("Hello World");
            let message = Buffer.from(payload, 'utf8');
            const sequenceNumber = await c.appendMessage(STREAM_NAME,message);
            console.log(`message sequence number ${sequenceNumber}`);
        }
        catch (e) {
            clearInterval(interval);
            c.close();
            throw e;
        }
    }, 1000);

});

c.onError((err) => {
    console.log(err);
});

module.exports.handler = function handler() {
    return "";
};

Read from Stream

const {
    StreamManagerClient,
    ReadMessagesOptions,
    ExportDefinition,
    KinesisConfig,
    MessageStreamDefinition,
    StrategyOnFull,
    ResourceNotFoundException,
} = require('aws-greengrass-core-sdk').StreamManager;

const STREAM_NAME = 'SomeStream';
const client = new StreamManagerClient();

client.onConnected(async() => {
    while (true) {
        await new Promise(r => setTimeout(r, 5000));
        try {
            const messages = await client.readMessages(STREAM_NAME,
                new ReadMessagesOptions()
                .withMinMessageCount(11)
                // Accept up to 100 messages. By default this is 1.
                .withMaxMessageCount(100)
                // Try to wait at most 5 seconds for the minMessageCount to be fulfilled. By default, this is 0, which immediately returns the messages or an exception.
                //.withReadTimeoutMillis(5 * 1000)
            );
            if (messages && messages.length > 0) {
                for (let i = 0; i < messages.length; i++) {
                    console.dir(messages[i]);
                }
            }
        }
        catch (e) {
            console.log(e);
        }
    }

});

client.onError((err) => {
    console.log(err);
});

module.exports.handler = function handler() {
    return '';
};

Functional Python Code

Create Stream

import asyncio
import logging
import random
import time

from greengrasssdk.stream_manager import (
    MessageStreamDefinition,
    ReadMessagesOptions,
    ResourceNotFoundException,
    StrategyOnFull,
    StreamManagerClient,
    StreamManagerException
)

def main(logger):
    client = StreamManagerClient()
    stream_name = "StreamFromPython"
    kinesis_stream_name = "PythonKinesisStream"

    # Try deleting the stream (if it exists) so that we have a fresh start
    try:
        client.delete_message_stream(stream_name=stream_name)
    except ResourceNotFoundException:
        print("Resource not found")
        pass

    try:
        client.create_message_stream(MessageStreamDefinition(
            name=stream_name,  # Required.
            max_size=268435456,  # Default is 256 MB.
            stream_segment_size=16777216,  # Default is 16 MB.
            time_to_live_millis=None,  # By default, no TTL is enabled.
            strategy_on_full=StrategyOnFull.OverwriteOldestData,  # Required.
            flush_on_write=False,  # Default is false.
        ))
    except StreamManagerException as smeError:
        print(smeError)
        pass

    except ConnectionError or asyncio.TimeoutError as cError:
        print(cError)
        pass

def function_handler(event, context):
    return

logging.basicConfig(level=logging.INFO)
# Starting main
main(logger=logging.getLogger())

Write to Stream

import asyncio
import logging
import random
import time

from greengrasssdk.stream_manager import (
    ExportDefinition,
    KinesisConfig,
    MessageStreamDefinition,
    ReadMessagesOptions,
    ResourceNotFoundException,
    StrategyOnFull,
    StreamManagerClient,
    StreamManagerException
)

def main(logger):
    client = StreamManagerClient()
    my_stream_name = "StreamFromPython"
    kinesis_stream_name = "PythonKinesisStream"
    message = 'Hello World!'
    myArray = bytearray()
    myArray.extend(map(ord, message))

    while(True):
        try:
            sequence_number = client.append_message(stream_name=my_stream_name, data=myArray)
            print(sequence_number)
            print(type(sequence_number))
            print("Message has been appended with sequence number" + str(sequence_number))
        except StreamManagerException as smeError:
            print("getting stream manager exception")
            print(smeError)
        except ConnectionError or asyncio.TimeoutError as cError:
            print("getting connection error")
            print(cError)

        time.sleep(2)

def function_handler(event, context):
    return

logging.basicConfig(level=logging.INFO)
# Starting main
main(logger=logging.getLogger())

Read from Stream

import asyncio
import logging
import random
import time

from greengrasssdk.stream_manager import (
    ExportDefinition,
    KinesisConfig,
    MessageStreamDefinition,
    ReadMessagesOptions,
    ResourceNotFoundException,
    StrategyOnFull,
    StreamManagerClient,
    StreamManagerException
)

def main(logger):
    client = StreamManagerClient()
    MY_STREAM_NAME = "StreamFromPython"
    KINESIS_STREAM_NAME = "PythonKinesisStream"
    message = "Hello World"
    myArray = bytearray()
    myArray.extend(map(ord, message))

    while(True):
        try:
            message_list = client.read_messages(
                stream_name=MY_STREAM_NAME,
                options=ReadMessagesOptions(
                    max_message_count=100,
                    read_timeout_millis=5000
                    )
            )
            print("printing messsage list")
            print(message_list)

        except StreamManagerException as smeError:
            print("getting stream manager exception")
            print(smeError)
        except ConnectionError or asyncio.TimeoutError as cError:
            print("getting connection error")
            print(cError)

        time.sleep(5)

def function_handler(event, context):
    return

logging.basicConfig(level=logging.INFO)
# Starting main
main(logger=logging.getLogger())
MedericPixium commented 4 years ago

We have also done the following test:

Functions:

This works with no issue.

Functions:

The JavaScript reads empty objects per the referred issue above.

This helped us pinpoint the issue is coming from the JavaScript reading. However we have still not managed to make the reading work.

massi-ang commented 4 years ago

readMessages() returns an array of Message object. The actual payload can be printed with messages[i].payload.

Refer to https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.StreamManagerClient.html and https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.Message.html

MedericPixium commented 4 years ago

readMessages() returns an array of Message object. The actual payload can be printed with messages[i].payload.

Refer to https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.StreamManagerClient.html and https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.Message.html

Thank you. Compare to python for some reason we are unable to log the whole Message object and this got us off track.

Furthermore @massi-ang would you have any insight on the other issue with the unhandled promise return

massi-ang commented 4 years ago

You can add a unhandledRejection callback. See https://nodejs.org/api/process.html#process_event_unhandledrejection

MedericPixium commented 4 years ago

We managed to make everything work! thanks a lot!

kaku0 commented 4 years ago

Hi @MedericPixium, Could you please share some details regarding your solution of the connection refused error? I'm currently struggling with the same for several hours. I'm using Python, but tried with NodeJS as well - got same error message as you. Apparently in both cases it ends up with connection failure.

To be more specific, I'm sharing logs:

[2020-10-09T11:18:50.892Z][ERROR]-streammanagerclient.py:134,Connection  error while connecting to server: [Errno 111] Connect call failed  ('127.0.0.1', 8088)
[2020-10-09T11:18:50.892Z][FATAL]-lambda_runtime.py:142,Failed to import handler function "index.lambda_handler" due to exception: [Errno 111] Connect call failed ('127.0.0.1', 8088)

My situation is almost identical. I found my SM Client probably can't access SM service running on a local host. However, port 8088 on localhost is opened, SM authentication is disabled, tried to run lambda with and without GG container - no results. What's worth mentioning is that SM service gets initialized correctly - confirmed by logs, I'm also able to connect via nc -v 127.0.0.1 8088.

Of course, all the prerequisites and requirements for SM are met. Thanks in advance!

EDIT: Just tried to move lambda function with all it's dependencies to the temp dir under root and execute .py file manually. It results with "Broken bit parity" error at 'create_message_stream" line. Looks like this way I can establish connection with 127.0.0.1:8088, but facing some other issue. At same time, lambda deployed via GG still complains about connection failure.

massi-ang commented 4 years ago

@kaku0 can you verify if you are using the latest SDK and the latest version of Greengrass?

kaku0 commented 4 years ago

@massi-ang Sure!

Those were versions I've used: GGC Version: v1.11 GG Python SDK Version: v1.5

According to compability section, it should be all fine. But just in case, I just updated Python SDK to v1.6. Unfortunately, still the same. Logs seem to be a little different, but final effect is the same:

[2020-10-09T15:14:52.91Z][ERROR]-streammanagerclient.py:188,Unable to read from socket, likely socket is closed or server died
[2020-10-09T15:14:52.91Z][ERROR]-streammanagerclient.py:145,Connection error while connecting to server: [Errno 111] Connect call failed ('127.0.0.1', 8088)
[2020-10-09T15:15:05.354Z][ERROR]-streammanagerclient.py:188,Unable to read from socket, likely socket is closed or server died
[2020-10-09T15:15:05.354Z][ERROR]-streammanagerclient.py:145,Connection error while connecting to server: [Errno 111] Connect call failed ('127.0.0.1', 8088)

Also, I tried again to move all the lambda files to the root and run .py manually as a root. Still the same - "Broken bit parity". Found something related to this in GGStreamManager.log file:

[2020-10-09T15:16:33.89Z][ERROR]- (nioEventLoopGroup-3-1) com.amazonaws.iot.greengrass.streammanager.server.handlers.CreateMessageStreamRequestHandler: Encountered unknown exception while creating message stream python_stream_manager_test
[2020-10-09T15:16:33.89Z][ERROR]-org.mapdb.DBException$PointerChecksumBroken: Broken bit parity
[2020-10-09T15:16:33.89Z][ERROR]-       at org.mapdb.DataIO.parity4Get(DataIO.java:476) ~[AWSGreengrassStreamManager.jar:?]
[2020-10-09T15:16:33.89Z][ERROR]-       at org.mapdb.StoreWAL.longStackLoadChunk(StoreWAL.kt:732) ~[AWSGreengrassStreamManager.jar:?]
[2020-10-09T15:16:33.89Z][ERROR]-       at org.mapdb.StoreWAL.longStackTake(StoreWAL.kt:831) ~[AWSGreengrassStreamManager.jar:?]
[2020-10-09T15:16:33.89Z][ERROR]-       at org.mapdb.StoreDirectAbstract.allocateRecid(StoreDirectAbstract.kt:255) ~[AWSGreengrassStreamManager.jar:?]
[2020-10-09T15:16:33.89Z][ERROR]-       at org.mapdb.StoreWAL.put(StoreWAL.kt:383) ~[AWSGreengrassStreamManager.jar:?]
[2020-10-09T15:16:33.89Z][ERROR]-       at org.mapdb.HTreeMap.valueWrap(HTreeMap.kt:1208) ~[AWSGreengrassStreamManager.jar:?]
[2020-10-09T15:16:33.89Z][ERROR]-       at org.mapdb.HTreeMap.putprotected(HTreeMap.kt:344) ~[AWSGreengrassStreamManager.jar:?]
[2020-10-09T15:16:33.89Z][ERROR]-       at org.mapdb.HTreeMap.put(HTreeMap.kt:324) ~[AWSGreengrassStreamManager.jar:?]
[2020-10-09T15:16:33.89Z][ERROR]-       at com.amazonaws.iot.greengrass.streammanager.dao.MapDbMetadataDao.put(MapDbMetadataDao.java:73) ~[AWSGreengrassStreamManager.jar:?]
[2020-10-09T15:16:33.89Z][ERROR]-       at com.amazonaws.iot.greengrass.streammanager.store.log.LogStore.createMessageStream(LogStore.java:113) ~[AWSGreengrassStreamManager.jar:?]
[2020-10-09T15:16:33.89Z][ERROR]-       at com.amazonaws.iot.greengrass.streammanager.server.handlers.CreateMessageStreamRequestHandler.handle(CreateMessageStreamRequestHandler.java:41) [AWSGreengrassStreamManager.jar:?]
[2020-10-09T15:16:33.89Z][ERROR]-       at com.amazonaws.iot.greengrass.streammanager.server.handlers.MessageStreamHandler.channelRead(MessageStreamHandler.java:130) [AWSGreengrassStreamManager.jar:?]
[2020-10-09T15:16:33.89Z][ERROR]-       at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [AWSGreengrassStreamManager.jar:?]
[2020-10-09T15:16:33.89Z][ERROR]-       at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [AWSGreengrassStreamManager.jar:?]
[2020-10-09T15:16:33.89Z][ERROR]-       at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [AWSGreengrassStreamManager.jar:?]
[2020-10-09T15:16:33.89Z][ERROR]-       at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:321) [AWSGreengrassStreamManager.jar:?]
[2020-10-09T15:16:33.89Z][ERROR]-       at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:295) [AWSGreengrassStreamManager.jar:?]
[2020-10-09T15:16:33.89Z][ERROR]-       at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [AWSGreengrassStreamManager.jar:?]
[2020-10-09T15:16:33.89Z][ERROR]-       at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [AWSGreengrassStreamManager.jar:?]
[2020-10-09T15:16:33.89Z][ERROR]-       at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [AWSGreengrassStreamManager.jar:?]
[2020-10-09T15:16:33.89Z][ERROR]-       at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:321) [AWSGreengrassStreamManager.jar:?]
[2020-10-09T15:16:33.89Z][ERROR]-       at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:295) [AWSGreengrassStreamManager.jar:?]
[2020-10-09T15:16:33.89Z][ERROR]-       at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [AWSGreengrassStreamManager.jar:?]
[2020-10-09T15:16:33.89Z][ERROR]-       at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [AWSGreengrassStreamManager.jar:?]
[2020-10-09T15:16:33.89Z][ERROR]-       at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [AWSGreengrassStreamManager.jar:?]
[2020-10-09T15:16:33.89Z][ERROR]-       at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) [AWSGreengrassStreamManager.jar:?]
[2020-10-09T15:16:33.89Z][ERROR]-       at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [AWSGreengrassStreamManager.jar:?]
[2020-10-09T15:16:33.89Z][ERROR]-       at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [AWSGreengrassStreamManager.jar:?]
[2020-10-09T15:16:33.89Z][ERROR]-       at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) [AWSGreengrassStreamManager.jar:?]
[2020-10-09T15:16:33.89Z][ERROR]-       at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163) [AWSGreengrassStreamManager.jar:?]
[2020-10-09T15:16:33.89Z][ERROR]-       at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714) [AWSGreengrassStreamManager.jar:?]
[2020-10-09T15:16:33.89Z][ERROR]-       at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650) [AWSGreengrassStreamManager.jar:?]
[2020-10-09T15:16:33.89Z][ERROR]-       at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576) [AWSGreengrassStreamManager.jar:?]
[2020-10-09T15:16:33.89Z][ERROR]-       at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) [AWSGreengrassStreamManager.jar:?]
[2020-10-09T15:16:33.89Z][ERROR]-       at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) [AWSGreengrassStreamManager.jar:?]
[2020-10-09T15:16:33.89Z][ERROR]-       at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [AWSGreengrassStreamManager.jar:?]
[2020-10-09T15:16:33.89Z][ERROR]-       at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [AWSGreengrassStreamManager.jar:?]
[2020-10-09T15:16:33.89Z][ERROR]-       at java.lang.Thread.run(Thread.java:748) [?:1.8.0_265]
[2020-10-09T15:16:33.892Z][DEBUG]- (nioEventLoopGroup-3-1) com.amazonaws.iot.greengrass.streammanager.server.protocol.ResponseHelper: Respond request f95fa1f3-712e-4fc4-b419-98fadc14a1d9 with status UnknownFailure