deephaven / deephaven-core

Deephaven Community Core
Other
257 stars 80 forks source link

pydeephaven clients can get a RST_STREAM with CANCELLED error in some cases #5996

Open jmao-denver opened 2 months ago

jmao-denver commented 2 months ago

Discovered during #5990

   def test_publish_fetch(self):
        plugin_client = self.session.plugin_client(self.session.exportable_objects["plot3"])
        self.assertIsNotNone(plugin_client)

        with self.subTest("Plugin object"):
            # First fetch the Plugin object, then publish it
            export_plugin_client = self.session.fetch(plugin_client)
            shared_ticket = SharedTicket.random_ticket()
            self.session.publish(export_plugin_client, shared_ticket)

            # Another session to use the shared Plugin object
            sub_session = Session()
            server_obj = ServerObject(type="Figure", ticket=shared_ticket)
            sub_plugin_client = sub_session.plugin_client(server_obj)
            payload, refs = next(sub_plugin_client.resp_stream)
            self.assertGreater(len(payload), 0)
            self.assertGreater(len(refs), 0)
            ref = refs[0]
            self.assertEqual(ref.type, "Table")
            sub_plugin_client.close()
            sub_session.close()

        with self.subTest("Fetchable in the Plugin object"):
            payload, refs = next(plugin_client.resp_stream)
            self.assertGreater(len(payload), 0)
            self.assertGreater(len(refs), 0)
            ref = refs[0]
            self.assertEqual(ref.type, "Table")
            fetched = ref.fetch()
            self.assertIsNotNone(fetched)
            self.assertEqual(fetched.size, 30)

            # Publish the fetchable
            shared_ticket = SharedTicket.random_ticket()
            self.session.publish(ref, shared_ticket)

            # Another session to use the shared fetchable
            sub_session = Session()
            sub_table = sub_session.fetch_table(shared_ticket)
            self.assertIsNotNone(sub_table)
            self.assertEqual(sub_table.size, 30)
            sub_session.close()

        with self.subTest("released Plugin object"):
            sub_session = Session()
            server_obj = ServerObject(type="Figure", ticket=shared_ticket)
            sub_plugin_client = sub_session.plugin_client(server_obj)
            self.session.release(export_plugin_client)
            with self.assertRaises(Exception):
                payload, refs = next(sub_plugin_client.resp_stream)
            sub_session.close()

        plugin_client.close()
SubTest error: Traceback (most recent call last):
  File "/Users/jianfengmao/.pyenv/versions/3.8.18/lib/python3.8/unittest/case.py", line 60, in testPartExecutor
    yield
  File "/Users/jianfengmao/.pyenv/versions/3.8.18/lib/python3.8/unittest/case.py", line 582, in subTest
    yield
  File "/Users/jianfengmao/git/deephaven-core/py/client/tests/test_plugin_client.py", line 64, in test_publish_fetch
    payload, refs = next(plugin_client.resp_stream)
  File "/Users/jianfengmao/git/deephaven-core/py/client/pydeephaven/experimental/plugin_client.py", line 128, in __next__
    resp = next(self.stream_resp)
  File "/Users/jianfengmao/Playground/venv3.8/lib/python3.8/site-packages/grpc/_channel.py", line 543, in __next__
    return self._next()
  File "/Users/jianfengmao/Playground/venv3.8/lib/python3.8/site-packages/grpc/_channel.py", line 952, in _next
    raise self
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
    status = StatusCode.CANCELLED
    details = "Received RST_STREAM with error code 8"
    debug_error_string = "UNKNOWN:Error received from peer ipv6:%5B::1%5D:10000 {created_time:"2024-08-28T08:50:56.199878-06:00", grpc_status:1, grpc_message:"Received RST_STREAM with error code 8"}"
>

Note that this 8 error code is not a grpc status 8: Resource Exhausted, but an h2 RST_STREAM error code 8: Cancel.

It turns out that the Jetty server implementation is reading AsyncContext.complete() as not a call to end the stream as being finished correctly, but to send the CANCEL error code to the client. Likewise, the python client is the only gRPC client we're aware of that reads RST_STREAMs that arrive after the final end stream message and allow it to change the meaning of that last message.

Changing the implementation of gwt-servlet upstream does not cause any integration test failures in grpc-java, but that's not a surprise since only the python client sees this bug anyway. Our initial "fix" does however cause tomcat to start having this same bug, so that won't be an acceptable solution. We're going to open tickets with grpc-java, jetty, and tomcat, and hopefully get the servlet container providers on the same page before making a change to grpc-java itself.

In the meantime, for some cases of this, we could wrap python client calls with try/except and handle the CANCEL case as success. We can also investigate changing the behavior of specific grpc bidi streams so that this issue cannot happen, by forcing the client to half-close first.

rcaudy commented 1 month ago

I think we may want to consider two things here:

  1. Correct the tests to keep the publishing session open until the other session has exported the shared ticket.
  2. Making the error handling better.
niloc132 commented 2 days ago

The original fix for this seems to break our grpc-web adapter, something to do with trailers being sent as a data frame instead of being flushed and closed with trailers. It appears to be on the server side though, not the client - we should ensure that trailers are being correctly written to other kinds of clients too.

We've also discussed adding tests with TLS - ideally we add another gradle docker flag so that all client tests can validate that they work correctly. JS API tests are somewhat more important to test this way, since we change transport implementations.