grpc / grpc

The C based gRPC (C++, Python, Ruby, Objective-C, PHP, C#)
https://grpc.io
Apache License 2.0
41.93k stars 10.55k forks source link

Under what circumstances does the client side terminate a bidi stream? #25372

Closed ykhsu closed 3 years ago

ykhsu commented 3 years ago

What version of gRPC and what language are you using?

Python 3.8 gRPC 1.25 and 1.18 in CentOS 7, gRPC 1.35 and 1.18 in Windows 10

What operating system (Linux, Windows,...) and version?

Linux CentOS 7 Windows 10

What runtime / compiler are you using (e.g. python version or version of gcc)

Python 3.8 and 2.7.16 on Linux CentOS 7 Python 3.8.6 (64-bit) and 2.7.16 on Windows 10

What did you do?

This is the same issue as documented previously in https://github.com/grpc/grpc/issues/18924. The client code in Python is very simple - creates a subscription request to a server and just loop through whatever received:

notification_stream = stub.Subscribe( filter )
try:
    for notification in notification_stream:
        #  do something with received notification

except Exception as err:
    # handle problems

Under gRPC 1.18 with Python 2.7.16, the loop kept terminating after a few notifications (no, server did not terminate it). After getting no answer, we fixed it by hacking line 230 in https://github.com/grpc/grpc/blob/v1.18.x/src/python/grpcio/grpc/_channel.py to:

                            if state.code is not None:

With that hack, we has been able to keep notifications coming with no issue since.

What did you expect to see?

The notification should keep coming until the client decide to cancel the subscription stream.

What did you see instead?

We encounter the same issue again while migrating clients to Python 3.8 with new version of gRPC (1.25/1.35). But this time, https://github.com/grpc/grpc/blob/master/src/python/grpcio/grpc/_channel.py has changed too much for our hack and we don't want to keep dealing with this problem going forward.

From what I can see, after receiving a few notifications from the server, the self._state.code change from None to grpc.StatusCode.OK in line 780 of https://github.com/grpc/grpc/blob/master/src/python/grpcio/grpc/_channel.py. This code change hence ends the iteration for subsequent incoming notification:

            elif self._state.code is grpc.StatusCode.OK:
                raise StopIteration()

The key here, I think, is to understand what would trigger grpc.StatusCode.OK even though the server would continue sending notifications?

Anything else we should know about your project / environment?

The server is a transport network equipment, running CentOS 7, supporting gNMI using gRPC C++ 1.18. The Python clients in question are for test automation and they are primarily executed on CentOS 7. As stated, they are being migrated from Python 2 to 3.

gnossen commented 3 years ago

notification_stream = stub.Subscribe( filter )

Just to confirm, your RPC uses a bidirectional stream, yes?

We're going to need a little more information to figure out what's going on in the native code layer. Can you please capture a run of your application with the premature closure with a couple of environment variables set? GRPC_TRACE=all and GRPC_VERBOSITY=DEBUG should give you a textwall we can sift through to hopefully figure out what's going on under the hood.

It would also be helpful to get the server-side logs with these environment variables set, so we can correlate. That will let us confirm that the problem is entirely on the client side.

I see that there's a similar log attached to the previous issue, but the codebase has changed quite a bit since then, which will make it pretty hard to jump through the codebase to figure things out. Regardless, I'll try to sift though those old logs while waiting on the newer ones.

gnossen commented 3 years ago

Quoting from this log from the previous issue:

I0502 09:49:58.356834786   11050 tcp_posix.cc:419]           READ 0x7f7044006f70 (peer=ipv4:135.104.234.62:8980): 17 03 03 00 33 00 00 00 00 00 00 00 07 b9 85 e9 57 30 fa 9b 2f 32 74 46 a8 2f d4 b8 c2 62 d0 b0 c5 b6 8f e8 1f 89 3a 12 a2 00 59 1b 6b 33 d0 c6 ab 2c f3 3b ed f1 a4 ef '....3...........W0../2tF./...b........:...Y.k3...,.;....'
I0502 09:49:58.356913437   11050 secure_endpoint.cc:163]     READ 0x6e9f130: 00 00 01 01 05 00 00 00 03 be 00 00 08 06 00 00 00 00 00 00 00 00 00 00 00 00 02 '...........................'
I0502 09:49:58.356942801   11050 parsing.cc:661]             parsing trailing_metadata
I0502 09:49:58.356956336   11050 hpack_parser.cc:636]        Decode: 'grpc-status: 0', elem_interned=1 [3], k_interned=1, v_interned=1
I0502 09:49:58.356964556   11050 parsing.cc:503]             HTTP:3:TRL:CLI: grpc-status: 30 '0'

...

[2019.05.02-09:49:58.358.Thread-3]  trace.datamodel.log info: o subscription stream ended
[2019.05.02-09:49:58.358.Thread-3]  trace.datamodel.log info: subscriber finished

The native code layer seems to have received a trailer with a grpc-status of 0 (OK). So, if this is a purely client-side problem, then it's quite a low level one and would theoretically affect the C++ stack as well.

gnossen commented 3 years ago

In regard to the patch you mentioned existing in your codebase, here is an annotated version of the (canonical) code snippet that was modified. The cyclomatic complexity here is pretty staggering, so the annotation is almost mandatory to understand what's going on here:

                        # At this point, we have a serailzed request ready to submit to the native code layer's
                        # submission queue.
                        operations = (cygrpc.SendMessageOperation(
                            serialized_request, _EMPTY_FLAGS),)
                        # Enqueue the send message operation to the native code layer.
                        operating = call.operate(operations, event_handler)
                        if operating: # If we successfully enqueued the operation
                            # Record that we're expecting acknowledgement of our write operation from the native code layer.
                            state.due.add(cygrpc.OperationType.send_message)  
                        else:
                            # There's been an error, so we break out of this method entirely.
                            return
                        while True:
                            # Sleep until we receive some change in state from the native code layer.
                            state.condition.wait(condition_wait_timeout)
                            # Here be dragons. Very long explanation behind this call.
                            cygrpc.block_if_fork_in_progress(state)
                            if state.code is None:  # If core hasn't received a final status from the native code layer.
                                if cygrpc.OperationType.send_message not in state.due:   # And we've received acknowledgement of our write operation.
                                    break  # Break out of this loop and get another request from the user to serialize and send out.
                            else: # We *have* received a final status from the native code layer.
                                return

Your change to that snippet was:

if state.code is not None:

So, looking back at the annotation, the logic is now "Get another request from the user code if the RPC has received a final status and the native code layer has acknowledged our last write". This is pretty wrong. At first glance, it just ignores any final RPC statuses received and continues the RPC. On second glance though, the else clause is problematic as well. It will abort the RPC when a final status hasn't been received. The only reason I would expect this to have worked is that you have been receiving RPC closures (whether from the server or from some client-side bug) so quickly that this branch was never reached.

This doesn't really bring us any closer to solving the issue, but it certainly doesn't cast a positive light on the hack.

ykhsu commented 3 years ago

First, the server implements gNMI and the subscribe RPC is defined in https://github.com/openconfig/gnmi/blob/master/proto/gnmi/gnmi.proto. The client executes its subscribe RPC as a stream.

I can't comment intelligently with the hack but hundreds of automated tests by our users running daily for years did not complain :-) Since taking logs from the server is not simple, let me attach a log from a simple test for this problem for now. In this test, after receiving some notifications from the subscribe RPC stream, the loop terminated within 30 seconds.

Note this test was executed under gRPC 1.25 and NOT 1.35 in a CentOS 7.7.

grpc_subscribe_stream.log

gnossen commented 3 years ago

Since this has tripped others using bidirectional streams up in the past -- your request iterator isn't terminating prematurely, is it? All I see in your snippet is filter. When the request iterator terminates, the stream will close.

Also, is your code base open source? If so, can you link to it so I can take a look at the usage?

ykhsu commented 3 years ago

Unfortunately, our code is not open sourced and bit complicated. But, in this case, our API only supports a subscription with a single filter which is an protobuf message encoded according to the given data model entity. So the iterator only yields a single message and it is roughly defined as:

    def encode_subscriberequest( entity,  # data model object
                                 is_prefix_enabled=None # ...  a list of encoding options omitted
                                 ): 
          # code omitted: encoding entity to a subscription_list
          sub_req =  gnmi.SubscribeRequest(subscribe=subscription_list)
          yield sub_req

In the simple test I did, the encoded message yielded by _encodesubscriberequest(), as shown in the log, is:

subscribe {
  prefix {
  }
  subscription {
    path {
      elem {
        name: "*"
      }
    }
  }
  updates_only: true
}
gnossen commented 3 years ago

@ykhsu If your request iterator only yields a single element then that's exactly the problem. You'll need to keep the iterator open in order to keep the stream open. Something like this:

class IndefiniteRequestIterator:
  def __init__(self, requests: Iterable, stop_event: threading.Event):
    self._raw_iterator = iter(requests)
    self._stop_event = stop_event

  def __next__(self):
    try:
      # If there are still requests, yield them.
      return next(self._raw_iterator)
    except StopIteration:
      # If there aren't, keep the RPC open until told otherwise.
      self._stop_event.wait()

Or, to make something that resembles your snippet above a little more:

def encode_subscriberequest( entity,  # data model object
                                 is_prefix_enabled=None # ...  a list of encoding options omitted
                                 stop_event: threading.Event
                                 ): 
          # code omitted: encoding entity to a subscription_list
          sub_req =  gnmi.SubscribeRequest(subscribe=subscription_list)
          yield sub_req
          stop_event.wait() # Set this event from a different thread in order to terminate the RPC.

Warning: Untested code. But it should get the message across.

I apologize if I've misunderstood you, but it seems like there's a decent chance this is the problem.

ykhsu commented 3 years ago

@gnossen Thanks for your suggestion! It sounds right and I'll confirm once I get the time to deal with this.

ykhsu commented 3 years ago

@gnossen You're right. I played with your suggestion and was able to keep subscription up for hours! Thank you very much for your explanation. This issue can be closed.