aws-greengrass / aws-greengrass-nucleus

The Greengrass nucleus component provides functionality for device side orchestration of deployments and lifecycle management for execution of Greengrass components and applications. This includes features such as starting, stopping, and monitoring execution of components and apps, interprocess communication server for communication between components, component installation and configuration management.
Apache License 2.0
109 stars 45 forks source link

(greengrasscoreipc): iot_core_publish memory leak? #848

Closed kris-sum closed 3 years ago

kris-sum commented 3 years ago

Describe the bug I'm sending thousands of messages per minute into IoT Core using IPC.new_publish_to_iot_core(), and there seems to be a tiny memory leak in the IPC client, which stacks up over time. Over the course of a few days, my script consumes hundreds of megabytes.

To Reproduce

import os
from awscrt.io import (
    ClientBootstrap,
    DefaultHostResolver,
    EventLoopGroup,
    SocketDomain,
    SocketOptions,
)
from awsiot.eventstreamrpc import Connection, LifecycleHandler, MessageAmendment
import awsiot.greengrasscoreipc.client as client
from awsiot.greengrasscoreipc.model import (
    QOS,
    PublishToIoTCoreRequest
)

class IPCUtils:
    def connect(self):
        elg = EventLoopGroup()
        resolver = DefaultHostResolver(elg)
        bootstrap = ClientBootstrap(elg, resolver)
        socket_options = SocketOptions()
        socket_options.domain = SocketDomain.Local
        amender = MessageAmendment.create_static_authtoken_amender(os.getenv("SVCUID"))
        hostname = os.getenv("AWS_GG_NUCLEUS_DOMAIN_SOCKET_FILEPATH_FOR_COMPONENT")
        connection = Connection(
            host_name=hostname,
            port=8033,
            bootstrap=bootstrap,
            socket_options=socket_options,
            connect_message_amender=amender,
        )
        self.lifecycle_handler = LifecycleHandler()
        connect_future = connection.connect(self.lifecycle_handler)
        TIMEOUT = 10
        connect_future.result(timeout=TIMEOUT)
        return connection

class IPCService():
    def __init__(self, log):
        self.log = log
        ipc_utils = IPCUtils()
        connection = ipc_utils.connect()
        self.ipc_client = client.GreengrassCoreIPCClient(connection)

    def sendMessage(self, topic, message):
        request = PublishToIoTCoreRequest()
        request.topic_name = topic
        request.payload = message
        request.qos = QOS.AT_LEAST_ONCE
        operation = self.ipc_client.new_publish_to_iot_core()
        try:
            operation.activate(request)
        except Exception as err:
            self.log.error(f'IPC error {err}')

if __name__ == '__main__':
    import time

    serv = IPCService({})
    while True: 
        for x in range (0, 20): 
            serv.sendMessage("TestThing/test/topic/{}".format(x), b'{"somejson":"value", "largeval": "0e 85 f0 a0 f3 55 39 a9 44 df fa 30 57 3b 14 8d 88 23 a5 04 80 de 18 b2 c9 cc 59 65 45 f6 89 bb 92 8f 12 b0 85 a8 1a 31 a2 38 9e 1d a1 d2 f8 16 8b 4f 31 8e b9 3b 7c 5d 93 71 e9 5b 0a c8 80 57 1e 1d f9 3b 38 07 47 15 f7 c7 15 63 66 27 ba e3 62 a5 60 dd e4 16 b4 c7 0c 69 ff d1 e9 79 b3 59 42 6d c9 0e 44 a5 9d 7d 0e bb ef e6 fc 55 ee 40 d0 ec 04 26 17 09 66 63 28 aa 57 0c f4 34 82 37 9c 94 80 62 64 1c e3 cc 78 04 11 7d 0b 40 ea b9 e6 92 b8 bd e2 56 7d 92 21 71 12 e1 01 f4 a7 e5 64 a5 e7 98 31 d1 f8 31 c3 5b 53 9f b5 41 2a e5 51 09 a9 4a 71 dd d7 46 d1 f1 55 4e 3e 8d 63 17 7e f5 4e 32 94 f8 a3 02 38 e7 82 30 a2 ae ea d5 09 12 12 d7 68 7c 5d 43 7b f5 e2 a3 42 77 90 13 5c e6 b8 92 b0 df 07 93 e1 27 42 80 75 06 bc 22 8d 01 02 ff 31 e2 66 fe 3c 42 d6 61 f4 ab 1e b9 73 8d db 3d 46 19 64 3a c8 71 d2 dc 56 9e d4 c8 8a 6a 6a d8 c0 1c fa 00 5e f2 68 3e 2f 17 8e 44 6d f7 55 19 ab 8f 43 35 e5 8c 1e 3d 7a 5e 1b 63 0d 83 66 3b 40 c2 a1 06 0c 1b f3 bc fc 49 1c 02 4a 6b 27 bb 24 11 04 9f 30 18 4c d5 4f f1 e3 12 df c9 6c 16 d2 35 ff 5a 60 dc d4 2a 3d fa 8b 70 51 b6 ca 4f 89 50 b9 e2 c0 c3 40 94 46 eb 59 43 31 38 6b ae 47 1a ee 56 f6 36 e9 1d f3 03 49 ec 9a 16 2e 7f 63 45 00 03 67 d1 2d bc 9f 69 21 7d 8f ac cb 6f b3 89 d1 3f ce 89 f7 17 73 d3 af 45 b7 3d 40 f8 be c8 b5 47 60 01 6e 62 5a f1 87 e5 f4 53 52 86 66 08 b6 80 ec 5a 88 cc 13 8c 45 7f e3 d4 8b 97 03 7b 03 ec f5 5d 95 30 4b d6 86 64 08 a8 ef 3c d6 d8 fb 58 61 b2 78 86 f3 a3 22 70 63 a2 ea 89 52 f3 4a 6d 7a 85 7e b6 02 3c e6 9e 6b 31 4a b9 9f 2f a8 ef 9e 48 03 da 21 a4 a8 e6 ea d7 cb ca 3b c9 03 e2 71 ac f3 9f 0a 22 e2 0e 6c 5f 47 6f ad b2 a8 a8 7c 58 05 97 22 35 e3 5d 32 a8 0c 26 98 63 40 03 90 65 6e db 9c b8 3b 56 2f a6 1b b2 9d f0 25 59 93 a7 ac 3c f6 f3 54 b1 b3 52 dd cd 30 8a ab e2 b8 fe 6f ec 14 9f 28 be 96 dc 75 90 ef 19 22 7e b1 fc 7f 0f a0 fe 61 aa 6e de 57 c1 24 74 22 ed e4 96 74 64 b6 5d 4c 98 3f 9c 6a e7 59 ec f8 5e 83 62 9c f0 fc 3e 3f d1 e7 28 52 92 fd ae 9d 4c d4 6c f0 97 04 fb 33 9a 69 68 64 94 3f 5a 7a 86 20 09 d8 5a dd 10 e2 d4 f5 59 48 52 45 8c 3c fc 32 0f bc 09 98 e8 2c a2 40 10 09 a9 8b b6 e1 58 e6 f0 cf f0 1a c5 d1 1b be f1 fc c5 30 a6 cf 41 56 51 ab 2e 32 3f 79 68 8a 53 a2 fc a3 69 79 1c cd 34 4b 55 46 e6 cb cb dd fd 59 e6 b5 7c 55 44 98 6b f9 21 89 cb b8 8a "}')
        time.sleep(1)

Expected behavior Python script should run without growing memory resource usage.

Actual behavior Memory usage increases as quantity of messages increases.

Environment

Additional context The amount of memory leaked does not appear to be proportional to the request payload size. My attempts to use tracemalloc to further diagnose this leak haven't turned up anything I can immediately see wrong (but i'm not an expert in python)

MikeDombo commented 3 years ago

Hi Kris, This memory leak is only in your Python application and not the Greengrass nucleus, right? If that's the case, can you please open an issue in the Python SDK repository instead? I know it shouldn't matter to you, but we're actually two teams and they are much better situated to help with this problem.

kris-sum commented 3 years ago

Doh! My mistake, will re-post.