qweeze / rstream

A Python asyncio-based client for RabbitMQ Streams
MIT License
71 stars 12 forks source link

Stream already exists error #6

Closed wrobell closed 2 years ago

wrobell commented 2 years ago

I am using the publisher example from the readme file.

First run

Traceback (most recent call last):
  File "/home/wrobell/projects/snippets/rabbitmq/rabbitmq.py", line 11, in <module>
    asyncio.run(publish())
  File "/usr/lib/python3.9/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/usr/lib/python3.9/asyncio/base_events.py", line 642, in run_until_complete
    return future.result()
  File "/home/wrobell/projects/snippets/rabbitmq/rabbitmq.py", line 9, in publish
    await producer.publish('mystream', f'msg: {i}'.encode())
  File "/home/wrobell/.local/lib/python3.9/site-packages/rstream/producer.py", line 194, in publish
    publishing_ids = await self.publish_batch(
  File "/home/wrobell/.local/lib/python3.9/site-packages/rstream/producer.py", line 156, in publish_batch
    publisher = await self._get_or_create_publisher(stream, publisher_name)
  File "/home/wrobell/.local/lib/python3.9/site-packages/rstream/producer.py", line 110, in _get_or_create_publisher
    client = await self._get_or_create_client(stream)
  File "/home/wrobell/.local/lib/python3.9/site-packages/rstream/producer.py", line 94, in _get_or_create_client
    self._clients[stream] = await self._pool.get((leader.host, leader.port))
  File "/home/wrobell/.local/lib/python3.9/site-packages/rstream/client.py", line 481, in get
    self._clients[addr] = await self.new(addr)
  File "/home/wrobell/.local/lib/python3.9/site-packages/rstream/client.py", line 495, in new
    await client.authenticate(
  File "/home/wrobell/.local/lib/python3.9/site-packages/rstream/client.py", line 265, in authenticate
    await self.sync_request(
  File "/home/wrobell/.local/lib/python3.9/site-packages/rstream/client.py", line 127, in sync_request
    resp.check_response_code()
  File "/home/wrobell/.local/lib/python3.9/site-packages/rstream/schema.py", line 35, in check_response_code
    raise ServerError.from_code(code)
rstream.exceptions.SASLAuthenticationFailureLoopback

Please note that above error happens, when publishing a message to the stream. The related queue got created:

# rabbitmqctl list_queues
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name    messages
mystream        0

I can publish a message to the queue using Pika. Not sure, if it is configuration issue on my side...

Anyway, trying again to run the same publisher example

Traceback (most recent call last):
  File "/home/wrobell/projects/snippets/rabbitmq/rabbitmq.py", line 11, in <module>
    asyncio.run(publish())
  File "/usr/lib/python3.9/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/usr/lib/python3.9/asyncio/base_events.py", line 642, in run_until_complete
    return future.result()
  File "/home/wrobell/projects/snippets/rabbitmq/rabbitmq.py", line 6, in publish
    await producer.create_stream('mystream')
  File "/home/wrobell/.local/lib/python3.9/site-packages/rstream/producer.py", line 228, in create_stream
    await self.default_client.create_stream(stream, arguments)
  File "/home/wrobell/.local/lib/python3.9/site-packages/rstream/client.py", line 292, in create_stream
    await self.sync_request(
  File "/home/wrobell/.local/lib/python3.9/site-packages/rstream/client.py", line 127, in sync_request
    resp.check_response_code()
  File "/home/wrobell/.local/lib/python3.9/site-packages/rstream/schema.py", line 35, in check_response_code
    raise ServerError.from_code(code)
rstream.exceptions.StreamAlreadyExists

I would expect the stream to be reused when calling Producer.create_stream.

Gsantomaggio commented 2 years ago

The behavior is by protocol that is different from AMQP.

If you want make it idempotent you have to catch the exception.

We do the same in Java, Go and Rust * EDIT Not true anymore we changed the behaviour.

See: https://github.com/qweeze/rstream/issues/6#issuecomment-944347523

qweeze commented 2 years ago

I guess we could add something like exist_ok: bool parameter to create_stream and hide exception handling from user to get more friendly api. Pathlib does that

Gsantomaggio commented 2 years ago

Sounds good to me !

wrobell commented 2 years ago

Looking at Java client documentation, it says

StreamCreator#create is idempotent: trying to re-create a stream with the same name and same properties (e.g. maximum size, see below) will not throw an exception. In other words, you can be sure the stream has been created once StreamCreator#create returns. Note it is not possible to create a stream with the same name as an existing stream but with different properties. Such a request will result in an exception.

Gsantomaggio commented 2 years ago

sorry @wrobell you are right. We updated it recently.

btw the @qweeze's idea looks good to me

qweeze commented 2 years ago

I added exist_ok parameter to create_stream to both producer and consumer, and also missing_ok parameter to delete_stream

Speaking of idempotence of stream re-creation - as far as I can see there's currently no way to retrieve existing stream's properties from RabbitMQ. So we only can check if the stream with same name exists, but we cannot check both name and properties.

Gsantomaggio commented 2 years ago

as far as I can see there's currently no way to retrieve existing stream's properties from RabbitMQ

Actually there is a call to query the Metadata but In GO, Java and .NET we have a High-Level client up to the Low-level TCP client and the easier way is something like:

var response = await client.CreateStream(spec.Name, spec.Args);
if (response.ResponseCode is ResponseCode.Ok or ResponseCode.StreamAlreadyExists)
  return;
throw new CreateStreamException($"Failed to create stream, error code: {response.ResponseCode.ToString()}");

See here

Gsantomaggio commented 2 years ago

I think we can close this issue. using:

producer.create_stream('mystream', exists_ok=True)

It works