wechaty / python-wechaty

Python Wechaty is a Conversational RPA SDK for Chatbot Makers written in Python
https://wechaty.readthedocs.io/zh_CN/latest/
Apache License 2.0
1.56k stars 229 forks source link

🐛🐛 puppet_stub should not be none #285

Closed VEADoc closed 2 years ago

VEADoc commented 2 years ago

requirements

Describe your problem

脚本启动后能正常接收消息。

在 联系人find_all 加载全量联系人的时候 抛出grpclib.exceptions.StreamTerminatedError 错误

控制台打印的错误信息如下

Traceback (most recent call last):
  File "/Users/veadoc/Documents/Code/JinGai/Compliance/api/venv/lib/python3.7/site-packages/grpclib/client.py", line 469, in recv_trailing_metadata
    trailers = await self._stream.recv_trailers()
  File "/Users/veadoc/Documents/Code/JinGai/Compliance/api/venv/lib/python3.7/site-packages/grpclib/protocol.py", line 351, in recv_trailers
    await self.trailers_received.wait()
  File "/usr/local/Cellar/python@3.7/3.7.12_1/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/locks.py", line 293, in wait
    await fut
concurrent.futures._base.CancelledError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "script/bot/bot.py", line 140, in command_handlers
    await self.say_action(command = command)
  File "script/bot/bot.py", line 357, in say_action
    contact = await self.Contact.find(filter)
  File "/Users/veadoc/Documents/Code/JinGai/Compliance/api/venv/lib/python3.7/site-packages/wechaty/user/contact.py", line 110, in find
    contact_list = await cls.find_all(query)
  File "/Users/veadoc/Documents/Code/JinGai/Compliance/api/venv/lib/python3.7/site-packages/wechaty/user/contact.py", line 133, in find_all
    await asyncio.gather(*[contact.ready() for contact in contacts])
  File "/Users/veadoc/Documents/Code/JinGai/Compliance/api/venv/lib/python3.7/site-packages/wechaty/user/contact.py", line 173, in ready
    self.contact_id)
  File "/Users/veadoc/Documents/Code/JinGai/Compliance/api/venv/lib/python3.7/site-packages/wechaty_puppet_service/puppet.py", line 558, in contact_payload
    response = await self.puppet_stub.contact_payload(id=contact_id)
  File "/Users/veadoc/Documents/Code/JinGai/Compliance/api/venv/lib/python3.7/site-packages/wechaty_grpc/wechaty/__init__.py", line 120, in contact_payload
    "/wechaty.Puppet/ContactPayload", request, puppet.ContactPayloadResponse
  File "/Users/veadoc/Documents/Code/JinGai/Compliance/api/venv/lib/python3.7/site-packages/betterproto/__init__.py", line 1126, in _unary_unary
    return response
  File "/Users/veadoc/Documents/Code/JinGai/Compliance/api/venv/lib/python3.7/site-packages/grpclib/client.py", line 564, in __aexit__
    raise exc_val
  File "/Users/veadoc/Documents/Code/JinGai/Compliance/api/venv/lib/python3.7/site-packages/grpclib/client.py", line 554, in __aexit__
    await self._maybe_finish()
  File "/Users/veadoc/Documents/Code/JinGai/Compliance/api/venv/lib/python3.7/site-packages/grpclib/client.py", line 524, in _maybe_finish
    await self.recv_trailing_metadata()
  File "/Users/veadoc/Documents/Code/JinGai/Compliance/api/venv/lib/python3.7/site-packages/grpclib/client.py", line 485, in recv_trailing_metadata
    self._raise_for_grpc_status(status, message, details)
  File "/Users/veadoc/Documents/Code/JinGai/Compliance/api/venv/lib/python3.7/site-packages/grpclib/utils.py", line 70, in __exit__
    raise self._error
grpclib.exceptions.StreamTerminatedError: Connection lost

随后wechaty自动重启 日志摘要如下

完整日志>> 完整日志

......省略.....
2021-12-19 19:00:12,724 - Contact - INFO - load contact <Contact <1688851070064210> <王潮生>>
2021-12-19 19:00:12,726 - Contact - INFO - load contact <Contact <1688851075246154> <林来康>>
2021-12-19 19:00:56,228 - Wechaty - INFO - restarting the bot ...
2021-12-19 19:00:56,228 - Wechaty - INFO - wechaty is stoping ...
2021-12-19 19:00:56,228 - Wechaty - INFO - stopping - stop puppet
2021-12-19 19:00:56,228 - HostiePuppet - INFO - stop()
2021-12-19 19:00:57,081 - Wechaty - INFO - stopping - unset puppet
2021-12-19 19:00:57,081 - Wechaty - INFO - wechaty has been stopped gracefully!
2021-12-19 19:00:57,082 - Wechaty - INFO - init_puppet_event_bridge() <<wechaty_puppet_service.puppet.PuppetService object at 0x11a10d150>>
2021-12-19 19:00:57,082 - Wechaty - INFO - initPuppetEventBridge() puppet.on(dong) (listenerCount:1) registering...
2021-12-19 19:00:57,082 - Wechaty - INFO - initPuppetEventBridge() puppet.on(error) (listenerCount:1) registering...
2021-12-19 19:00:57,082 - Wechaty - INFO - initPuppetEventBridge() puppet.on(heart-beat) (listenerCount:1) registering...
2021-12-19 19:00:57,082 - Wechaty - INFO - initPuppetEventBridge() puppet.on(ready) (listenerCount:1) registering...
2021-12-19 19:00:57,082 - Wechaty - INFO - receive <reset> event <%s>
2021-12-19 19:00:57,083 - Wechaty - INFO - initPuppetEventBridge() puppet.on(reset) (listenerCount:0) registering...
2021-12-19 19:00:57,083 - Wechaty - INFO - initPuppetEventBridge() puppet.on(friendship) (listenerCount:1) registering...
2021-12-19 19:00:57,083 - Wechaty - INFO - initPuppetEventBridge() puppet.on(login) (listenerCount:1) registering...
2021-12-19 19:00:57,083 - Wechaty - INFO - initPuppetEventBridge() puppet.on(logout) (listenerCount:1) registering...
2021-12-19 19:00:57,083 - Wechaty - INFO - initPuppetEventBridge() puppet.on(message) (listenerCount:1) registering...
2021-12-19 19:00:57,083 - Wechaty - INFO - initPuppetEventBridge() puppet.on(room-invite) (listenerCount:1) registering...
2021-12-19 19:00:57,083 - Wechaty - INFO - initPuppetEventBridge() puppet.on(room-join) (listenerCount:1) registering...
2021-12-19 19:00:57,083 - Wechaty - INFO - initPuppetEventBridge() puppet.on(room-leave) (listenerCount:1) registering...
2021-12-19 19:00:57,084 - Wechaty - INFO - initPuppetEventBridge() puppet.on(room-topic) (listenerCount:1) registering...
2021-12-19 19:00:57,084 - Wechaty - INFO - initPuppetEventBridge() puppet.on(scan) (listenerCount:1) registering...
2021-12-19 19:00:57,084 - Wechaty - INFO - starting puppet ...
2021-12-19 19:00:57,084 - HostiePuppet - INFO - init puppet
2021-12-19 19:00:57,134 - HostiePuppet - INFO - starting the puppet ...
2021-12-19 19:00:57,935 - HostiePuppet - INFO - puppet has started ...
2021-12-19 19:00:57,935 - HostiePuppet - INFO - listening the event from the puppet ...

再接着就出现了错误 WechatyPuppetError('puppet_stub should not be none', None, None)

......省略.....
2021-12-19 19:01:52,034 - HostiePuppet - DEBUG - receive message info <{'messageId': '5098305'}>
2021-12-19 19:01:52,035 - Wechaty - DEBUG - receive <message> event <EventMessagePayload(message_id='5098305', type=None, from_id=None, filename=None, text=None, timestamp=None, room_id=None, to_id=None, mention_ids=None)>
2021-12-19 19:01:52,035 - Message - DEBUG - Message ready <Message <5098305> is not ready>
2021-12-19 19:01:52,091 - Wechaty - ERROR - internal error <WechatyPuppetError('puppet_stub should not be none', None, None)>
2021-12-19 19:02:01,595 - HostiePuppet - DEBUG - receive message info <{'messageId': '5098326'}>
2021-12-19 19:02:01,595 - Wechaty - DEBUG - receive <message> event <EventMessagePayload(message_id='5098326', type=None, from_id=None, filename=None, text=None, timestamp=None, room_id=None, to_id=None, mention_ids=None)>
2021-12-19 19:02:01,595 - Message - DEBUG - Message ready <Message <5098326> is not ready>
2021-12-19 19:02:01,661 - Wechaty - INFO - receive message <Message#message_type_text[🗣 Contact <1688853093056171> <陈定奉>@👥 Room <R:10696052140033673 - 8178|李工    时代公园4-106      14号对模型 16号对小图  >]  角度问题>
2021-12-19 19:02:01,661 - Wechaty - DEBUG - emit() event <['Message#message_type_text[🗣 Contact <1688853093056171> <陈定奉>@👥 Room <R:10696052140033673 - 8178|李工    时代公园4-106      14号对模型 16号对小图  >]\t角度问题']> <{}>
2021-12-19 19:02:01,662 - Message - DEBUG - Message ready <Message#message_type_text[🗣 Contact <1688853093056171> <陈定奉>@👥 Room <R:10696052140033673 - 8178|李工    时代公园4-106      14号对模型 16号对小图  >]   角度问题>
2021-12-19 19:02:02,755 - HostiePuppet - DEBUG - receive message info <{'messageId': '5098332'}>
2021-12-19 19:02:02,755 - HostiePuppet - DEBUG - receive message info <{'messageId': '5098317'}>
2021-12-19 19:02:02,755 - Wechaty - DEBUG - receive <message> event <EventMessagePayload(message_id='5098332', type=None, from_id=None, filename=None, text=None, timestamp=None, room_id=None, to_id=None, mention_ids=None)>
2021-12-19 19:02:02,755 - Message - DEBUG - Message ready <Message <5098332> is not ready>
2021-12-19 19:02:02,756 - Wechaty - DEBUG - receive <message> event <EventMessagePayload(message_id='5098317', type=None, from_id=None, filename=None, text=None, timestamp=None, room_id=None, to_id=None, mention_ids=None)>
2021-12-19 19:02:02,756 - Message - DEBUG - Message ready <Message <5098317> is not ready>
2021-12-19 19:02:02,819 - Wechaty - ERROR - internal error <WechatyPuppetError('puppet_stub should not be none', None, None)>
2021-12-19 19:02:02,819 - Wechaty - ERROR - internal error <WechatyPuppetError('puppet_stub should not be none', None, None)>

调用say 发送消息也会报错

控制台打印错误跟踪

Traceback (most recent call last):
File "script/bot/bot.py", line 140, in command_handlers
await self.say_action(command = command)
File "script/bot/bot.py", line 357, in say_action
contact = await self.Contact.find(filter)
File "/Users/veadoc/Documents/Code/JinGai/Compliance/api/venv/lib/python3.7/site-packages/wechaty/user/contact.py", line 110, in find
contact_list = await cls.find_all(query)
File "/Users/veadoc/Documents/Code/JinGai/Compliance/api/venv/lib/python3.7/site-packages/wechaty/user/contact.py", line 133, in find_all
await asyncio.gather(*[contact.ready() for contact in contacts])
File "/Users/veadoc/Documents/Code/JinGai/Compliance/api/venv/lib/python3.7/site-packages/wechaty/user/contact.py", line 173, in ready
self.contact_id)
File "/Users/veadoc/Documents/Code/JinGai/Compliance/api/venv/lib/python3.7/site-packages/wechaty_puppet_service/puppet.py", line 558, in contact_payload
response = await self.puppet_stub.contact_payload(id=contact_id)
File "/Users/veadoc/Documents/Code/JinGai/Compliance/api/venv/lib/python3.7/site-packages/wechaty_puppet_service/puppet.py", line 203, in puppet_stub
raise WechatyPuppetError('puppet_stub should not be none')
wechaty_puppet.exceptions.WechatyPuppetError: WechatyPuppetError('puppet_stub should not be none', None, None)

Reproduce your problem

say_action 相关代码


async def say_action(self,command:ControlCommand, **kwargs):
"""发送消息"""
    if not command.room_id and not command.contact_id:
        print( 'Room ID and Contact ID can not be empty together')
        fail_res = {'result':'FAIL','msg':'群聊ID 和 联系人ID 不可同时为空'}
        res = await self.write_result(fail_res,command.request_id)
        return

    if command.room_id:
        """群里发送"""

        filter = RoomQueryFilter(id=command.room_id)
        room = await self.Room.find(filter)

        if not room:
            fail_res = {'result':'FAIL','msg':'群聊不存在'}
            res = await self.write_result(fail_res,command.request_id)
            return

        mention_ids = []
        if command.contact_id:
            mention_ids = command.contact_id.split('|')

        await room.say(
            some_thing=command.payload,
            mention_ids=mention_ids
        )

    else:
        filter = ContactQueryFilter(id=command.contact_id)
        contact = await self.Contact.find(filter)
        if not contact:
            filter = ContactQueryFilter(weixin=command.contact_id)
            contact = await self.Contact.find(filter)

        if not contact:
            fail_res = {'result':'FAIL','msg':'联系人不存在'}
            print(fail_res)
            res = await self.write_result(fail_res,command.request_id)
            return

        await contact.say(
            command.payload
        )

    res = await self.write_result({'result':'SUCCESS'},command.request_id)

    return
VEADoc commented 2 years ago

和 grpc 请求频率是否有关系? 刚启动的时候 缓存里的 没有联系人信息。就需要 grpc 请求获取 短时间 请求过多 导致被服务器断连 ? 有没有这种可能的。🥶

VEADoc commented 2 years ago

我这边能稳定复现。

机器人加入的群聊有 2586 个

filter = RoomQueryFilter(id=room_id)
room = await self.Room.find(filter)

执行上述代码 百分百 得到一个错误 grpclib.exceptions.StreamTerminatedError: Connection lost

所以我觉得是 grpc 服务器那边短时间内接收到大量请求 ,把我连接给关了 😮‍💨

VEADoc commented 2 years ago

通过 on_login 事件 先 分批加载一下 通讯录和群聊 控制住并发度 测试过 [ 1000 ,800 ,500] 三个值, 在有限的测试下 初步认为 一次性加载 500 条数据 sleep 2秒最稳定

    async def on_login(self, contact: Contact):
        await super().on_login(contact)
        """全量加载一下通讯录和群聊"""

        """分批加载防止 Connect Lost"""
        step = 500

        """联系人"""
        contact_ids = await self.Contact.get_puppet().contact_list()
        contacts: List[Contact] = [self.Contact.load(contact_id) for contact_id in contact_ids]

        rich.print(f'[dark_orange] Load {contacts.__len__()} Contacts...')
        x = 0
        while True:
            _contacts = [x for x in contacts[x:x+step]]
            if not _contacts:
                break
            await asyncio.gather(*[contact.ready() for contact in _contacts])
            rich.print(f'[cyan] {_contacts.__len__()} Contacts Ready!')
            x += step
            await asyncio.sleep(2)

        """群聊"""
        room_ids = await self.Room.get_puppet().room_search()
        rooms = [self.Room.load(room_id) for room_id in room_ids]

        rich.print(f'[dark_orange] Load {rooms.__len__()} Rooms...')
        x = 0
        while True:
            _rooms = [x for x in rooms[x:x+step]]
            if not _rooms:
                break
            await asyncio.gather(*[room.ready() for room in _rooms])
            rich.print(f'[cyan] {_rooms.__len__()} Rooms Ready!')
            x += step
            await asyncio.sleep(2)

        rich.print(f'[green]Load Success!')

image

wj-Mcat commented 2 years ago

You can update wechaty to get new feature which has solved it.