aws / aws-iot-device-sdk-python-v2

Next generation AWS IoT Client SDK for Python using the AWS Common Runtime
Apache License 2.0
410 stars 212 forks source link

IPC iot_core_publish memory leak? #149

Closed kris-sum closed 3 years ago

kris-sum commented 3 years ago

Confirm by changing [ ] to [x] below to ensure that it's a bug:

Describe the bug I'm sending thousands of messages per minute into IoT Core using IPC.new_publish_to_iot_core(), and there seems to be a tiny memory leak in the IPC client, which stacks up over time. Over the course of a few days, my script consumes hundreds of megabytes.

To Reproduce

import os
from awscrt.io import (
    ClientBootstrap,
    DefaultHostResolver,
    EventLoopGroup,
    SocketDomain,
    SocketOptions,
)
from awsiot.eventstreamrpc import Connection, LifecycleHandler, MessageAmendment
import awsiot.greengrasscoreipc.client as client
from awsiot.greengrasscoreipc.model import (
    QOS,
    PublishToIoTCoreRequest
)

class IPCUtils:
    def connect(self):
        elg = EventLoopGroup()
        resolver = DefaultHostResolver(elg)
        bootstrap = ClientBootstrap(elg, resolver)
        socket_options = SocketOptions()
        socket_options.domain = SocketDomain.Local
        amender = MessageAmendment.create_static_authtoken_amender(os.getenv("SVCUID"))
        hostname = os.getenv("AWS_GG_NUCLEUS_DOMAIN_SOCKET_FILEPATH_FOR_COMPONENT")
        connection = Connection(
            host_name=hostname,
            port=8033,
            bootstrap=bootstrap,
            socket_options=socket_options,
            connect_message_amender=amender,
        )
        self.lifecycle_handler = LifecycleHandler()
        connect_future = connection.connect(self.lifecycle_handler)
        TIMEOUT = 10
        connect_future.result(timeout=TIMEOUT)
        return connection

class IPCService():
    def __init__(self, log):
        self.log = log
        ipc_utils = IPCUtils()
        connection = ipc_utils.connect()
        self.ipc_client = client.GreengrassCoreIPCClient(connection)

    def sendMessage(self, topic, message):
        request = PublishToIoTCoreRequest()
        request.topic_name = topic
        request.payload = message
        request.qos = QOS.AT_LEAST_ONCE
        operation = self.ipc_client.new_publish_to_iot_core()
        try:
            operation.activate(request)
        except Exception as err:
            self.log.error(f'IPC error {err}')

if __name__ == '__main__':
    import time

    serv = IPCService({})
    while True: 
        for x in range (0, 20): 
            serv.sendMessage("TestThing/test/topic/{}".format(x), b'{"somejson":"value", "largeval": "0e 85 f0 a0 f3 55 39 a9 44 df fa 30 57 3b 14 8d 88 23 a5 04 80 de 18 b2 c9 cc 59 65 45 f6 89 bb 92 8f 12 b0 85 a8 1a 31 a2 38 9e 1d a1 d2 f8 16 8b 4f 31 8e b9 3b 7c 5d 93 71 e9 5b 0a c8 80 57 1e 1d f9 3b 38 07 47 15 f7 c7 15 63 66 27 ba e3 62 a5 60 dd e4 16 b4 c7 0c 69 ff d1 e9 79 b3 59 42 6d c9 0e 44 a5 9d 7d 0e bb ef e6 fc 55 ee 40 d0 ec 04 26 17 09 66 63 28 aa 57 0c f4 34 82 37 9c 94 80 62 64 1c e3 cc 78 04 11 7d 0b 40 ea b9 e6 92 b8 bd e2 56 7d 92 21 71 12 e1 01 f4 a7 e5 64 a5 e7 98 31 d1 f8 31 c3 5b 53 9f b5 41 2a e5 51 09 a9 4a 71 dd d7 46 d1 f1 55 4e 3e 8d 63 17 7e f5 4e 32 94 f8 a3 02 38 e7 82 30 a2 ae ea d5 09 12 12 d7 68 7c 5d 43 7b f5 e2 a3 42 77 90 13 5c e6 b8 92 b0 df 07 93 e1 27 42 80 75 06 bc 22 8d 01 02 ff 31 e2 66 fe 3c 42 d6 61 f4 ab 1e b9 73 8d db 3d 46 19 64 3a c8 71 d2 dc 56 9e d4 c8 8a 6a 6a d8 c0 1c fa 00 5e f2 68 3e 2f 17 8e 44 6d f7 55 19 ab 8f 43 35 e5 8c 1e 3d 7a 5e 1b 63 0d 83 66 3b 40 c2 a1 06 0c 1b f3 bc fc 49 1c 02 4a 6b 27 bb 24 11 04 9f 30 18 4c d5 4f f1 e3 12 df c9 6c 16 d2 35 ff 5a 60 dc d4 2a 3d fa 8b 70 51 b6 ca 4f 89 50 b9 e2 c0 c3 40 94 46 eb 59 43 31 38 6b ae 47 1a ee 56 f6 36 e9 1d f3 03 49 ec 9a 16 2e 7f 63 45 00 03 67 d1 2d bc 9f 69 21 7d 8f ac cb 6f b3 89 d1 3f ce 89 f7 17 73 d3 af 45 b7 3d 40 f8 be c8 b5 47 60 01 6e 62 5a f1 87 e5 f4 53 52 86 66 08 b6 80 ec 5a 88 cc 13 8c 45 7f e3 d4 8b 97 03 7b 03 ec f5 5d 95 30 4b d6 86 64 08 a8 ef 3c d6 d8 fb 58 61 b2 78 86 f3 a3 22 70 63 a2 ea 89 52 f3 4a 6d 7a 85 7e b6 02 3c e6 9e 6b 31 4a b9 9f 2f a8 ef 9e 48 03 da 21 a4 a8 e6 ea d7 cb ca 3b c9 03 e2 71 ac f3 9f 0a 22 e2 0e 6c 5f 47 6f ad b2 a8 a8 7c 58 05 97 22 35 e3 5d 32 a8 0c 26 98 63 40 03 90 65 6e db 9c b8 3b 56 2f a6 1b b2 9d f0 25 59 93 a7 ac 3c f6 f3 54 b1 b3 52 dd cd 30 8a ab e2 b8 fe 6f ec 14 9f 28 be 96 dc 75 90 ef 19 22 7e b1 fc 7f 0f a0 fe 61 aa 6e de 57 c1 24 74 22 ed e4 96 74 64 b6 5d 4c 98 3f 9c 6a e7 59 ec f8 5e 83 62 9c f0 fc 3e 3f d1 e7 28 52 92 fd ae 9d 4c d4 6c f0 97 04 fb 33 9a 69 68 64 94 3f 5a 7a 86 20 09 d8 5a dd 10 e2 d4 f5 59 48 52 45 8c 3c fc 32 0f bc 09 98 e8 2c a2 40 10 09 a9 8b b6 e1 58 e6 f0 cf f0 1a c5 d1 1b be f1 fc c5 30 a6 cf 41 56 51 ab 2e 32 3f 79 68 8a 53 a2 fc a3 69 79 1c cd 34 4b 55 46 e6 cb cb dd fd 59 e6 b5 7c 55 44 98 6b f9 21 89 cb b8 8a "}')
        time.sleep(1)

Expected behavior Python script should run without growing memory resource usage.

Actual behavior Memory usage increases as quantity of messages increases.

Environment

Additional context The amount of memory leaked does not appear to be proportional to the request payload size. My attempts to use tracemalloc to further diagnose this leak haven't turned up anything I can immediately see wrong (but i'm not an expert in python)

graebm commented 3 years ago

Fixed in version 1.5.13

github-actions[bot] commented 3 years ago

⚠️COMMENT VISIBILITY WARNING⚠️

Comments on closed issues are hard for our team to see. If you need more assistance, please either tag a team member or open a new issue that references this one. If you wish to keep having a conversation with other community members under this issue feel free to do so.