basho / riak-python-client

The Riak client for Python.
Apache License 2.0
321 stars 183 forks source link

Stream keys on pbc issues, with early close of the stream [JIRA: CLIENTS-1051] #507

Open Guibod opened 7 years ago

Guibod commented 7 years ago

If you close a stream before it is finished, then it still try to drain an opened socket with incoming data. Maybe you should close the resource instead of releasing it in such a case.

    def close(self):
        # We have to drain the socket to make sure that we don't get
        # weird responses when some other request comes after a
        # failed/prematurely-terminated one.
        try:
            while self.next():
                pass
        except StopIteration:
            pass
        self.resource.release()

Of course you cannot achieve that with the client wrapper that yields results through an iterator, and is not closeable.

Guibod commented 7 years ago

I also suggest that you update documentation. Because the stream_keys() wrapper cannot be manually closed as stated in documentation. The wrapper try/finally for us.

Guibod commented 7 years ago

I've solved, by deleting my resource

   # obviously tthis is a method in a class with self.riak = client, and self.bucket, the bucket
    def stream(self, limit=10):
        i = 0
        resource = self.riak._acquire()
        transport = resource.object
        stream = transport.stream_keys(self.bucket, timeout=timeout)
        stream.attach(resource)

        try:
            for keylist in stream:
                for key in keylist:
                    key = bytes_to_str(key)
                    i += 1
                    if limit and i > limit:
                        logger.debug('Stream limit reached (%d)' % limit)
                        raise StopIteration
                    yield self.bucket.get(key)
        finally:
            self.riak._choose_pool().delete_resource(stream.resource)
lukebakken commented 7 years ago

Hello ... I think I get what's going on, but it would be more helpful to have a complete example if possible. Can you provide one?