akarneliuk / pygnmi

The pure Python implementation of the gNMI client.
https://training.karneliuk.com
BSD 3-Clause "New" or "Revised" License
129 stars 44 forks source link

handling grpc._channel._MultiThreadedRendezvous #104

Open brunoonovais opened 2 years ago

brunoonovais commented 2 years ago

Hi Folks,

On a subscribe_stream call, we may receive the grpc._channel._MultiThreadedRendezvous exception. It seems that for some reason, this is not caught by the script using pygnmi, but can be caught inside the enqueue_updates call, example:

            try:
                for update in subscription:
                    self._updates.put(update)
            except grpc.RpcError as e:
                print(f"{e = }")

I couldn't figure out why, so opening this case to see if we can fix this and close the channel when such error is seen, otherwise it can't be treated. Example of my code that never hits this exception:

    try:
        with gNMIclient(target=host, username=username, password=password, insecure=True, debug=True) as gc:
            telemetry_stream = gc.subscribe_stream(subscribe=subscriptionlist)
            for telemetry_entry in telemetry_stream:
                log.info(f"telemetry_entry={telemetry_entry}")
    except grpc.RpcError as rpc_error:
        print(f'RPC failed with code {rpc_error.code()}: {rpc_error}')

A simple way to hit this is by subscribing to a path that doesn't exist.

brunoonovais commented 2 years ago

Seems like the issue is that subscribe_stream is running on its own thread. I wonder if we instead use the concurrent.futures module with a future we'll be able to handle this better by capturing the thread's result.

akarneliuk commented 2 years ago

Hey @brunoonovais ,

The idea of running subscribe_stream in a separate thread was done for a purpose of scalability to ensure that multiple threads can co-exist. We can try to look if there will be any benefits of changing the structure.

Best, Anton

brunoonovais commented 2 years ago

Sounds good, Anton! The point here is that if a subscription closes (i.e. a process restarts or a box reload or any other issue), one can treat the condition if the thread result is passed down. If not, what happens is that the main script won't catch any exception, and basically get stuck.

On Sun, Nov 13, 2022 at 8:58 AM Anton Karneliuk @.***> wrote:

Hey @brunoonovais https://github.com/brunoonovais ,

The idea of running subscribe_stream in a separate thread was done for a purpose of scalability to ensure that multiple threads can co-exist. We can try to look if there will be any benefits of changing the structure.

Best, Anton

— Reply to this email directly, view it on GitHub https://github.com/akarneliuk/pygnmi/issues/104#issuecomment-1312737598, or unsubscribe https://github.com/notifications/unsubscribe-auth/AKG3BBH2Y5DQ4T7OUFRGHOTWIDXZRANCNFSM6AAAAAAR4YTWEU . You are receiving this because you were mentioned.Message ID: @.***>

theblues25 commented 1 year ago

I encountered a similar problem while using subscribe2. While it generally works well, we face issues with the session to the device, such as network problems, node reboot, or service interruptions, I am unable to receive errors through exceptions. The subscribe session stops sending updates and returns nothing, and I am unsure if there is another way to handle this. It would be helpful to have a method to check whether the grpc session to the device is still normal while receiving updates from the subscribe, or perhaps a way to reconnect the existing session again.

sai1274 commented 1 year ago

I'm encountering similar issue while trying to subscribe for the path greater than the limit

subscription_dict = {'subscription': [{'path': 'openconfig-interfaces:interfaces/', 'mode':'sample'}], 'mode': 'poll', 'encoding':'json'}
    subscription_ids = []
    for i in range(2):
        subscription_id = client.subscribe_poll(subscribe=subscription_dict)
        subscription_ids.append(subscription_id)
        time.sleep(2)

It's throwing me the below error but i'm unable to catch it

Exception in thread Thread-11:
Traceback (most recent call last):
  File "/usr/lib64/python3.6/threading.py", line 937, in _bootstrap_inner
    self.run()
  File "/usr/lib64/python3.6/threading.py", line 885, in run
    self._target(*self._args, **self._kwargs)
  File "/ws/sampsamp-bgl/venv/lib64/python3.6/site-packages/pygnmi/client.py", line 999, in enqueue_updates
    for update in subscription:
  File "/ws/sampsamp-bgl/venv/lib64/python3.6/site-packages/grpc/_channel.py", line 426, in __next__
    return self._next()
  File "/ws/sampsamp-bgl/venv/lib64/python3.6/site-packages/grpc/_channel.py", line 827, in _next
    raise self
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
        status = StatusCode.RESOURCE_EXHAUSTED
        details = "gNMI: subscribe: exceeded the maximum number of streaming gRPCs per user: 1"
        debug_error_string = "UNKNOWN:Error received from peer ipv4:xx.xx.xx.xx:57400 {created_time:"2023-11-17T17:28:33.376998151+05:30", grpc_status:8, grpc_message:"gNMI: subscribe: exceeded the maximum number of streaming gRPCs per user: 1"}"
>

Is there any fix? We tried to catch the exception in enqueue_updates even then i'm unable to catch it

def enqueue_updates():
            stub = gNMIStub(channel)
            subscription = stub.Subscribe(_client_stream, metadata=metadata)
            for update in subscription:
                try:
                    self._updates.put(update)
                except Exception as error:
                    print(str(error))
                    raise Exception(error)
jhanm12 commented 1 year ago

I'm encountering similar issue while trying to subscribe for the path greater than the limit

subscription_dict = {'subscription': [{'path': 'openconfig-interfaces:interfaces/', 'mode':'sample'}], 'mode': 'poll', 'encoding':'json'}
    subscription_ids = []
    for i in range(2):
        subscription_id = client.subscribe_poll(subscribe=subscription_dict)
        subscription_ids.append(subscription_id)
        time.sleep(2)

It's throwing me the below error but i'm unable to catch it

Exception in thread Thread-11:
Traceback (most recent call last):
  File "/usr/lib64/python3.6/threading.py", line 937, in _bootstrap_inner
    self.run()
  File "/usr/lib64/python3.6/threading.py", line 885, in run
    self._target(*self._args, **self._kwargs)
  File "/ws/sampsamp-bgl/venv/lib64/python3.6/site-packages/pygnmi/client.py", line 999, in enqueue_updates
    for update in subscription:
  File "/ws/sampsamp-bgl/venv/lib64/python3.6/site-packages/grpc/_channel.py", line 426, in __next__
    return self._next()
  File "/ws/sampsamp-bgl/venv/lib64/python3.6/site-packages/grpc/_channel.py", line 827, in _next
    raise self
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
        status = StatusCode.RESOURCE_EXHAUSTED
        details = "gNMI: subscribe: exceeded the maximum number of streaming gRPCs per user: 1"
        debug_error_string = "UNKNOWN:Error received from peer ipv4:xx.xx.xx.xx:57400 {created_time:"2023-11-17T17:28:33.376998151+05:30", grpc_status:8, grpc_message:"gNMI: subscribe: exceeded the maximum number of streaming gRPCs per user: 1"}"
>

Is there any fix? We tried to catch the exception in enqueue_updates even then i'm unable to catch it

def enqueue_updates():
            stub = gNMIStub(channel)
            subscription = stub.Subscribe(_client_stream, metadata=metadata)
            for update in subscription:
                try:
                    self._updates.put(update)
                except Exception as error:
                    print(str(error))
                    raise Exception(error)

@akarneliuk Can you please help us with this? We are interested in capturing the error.