omniti-labs / pg_amqp

AMQP Support for Postgres
https://labs.omniti.com/labs/pg_amqp
207 stars 63 forks source link

Messages received twice #38

Open kumy opened 1 year ago

kumy commented 1 year ago

When I send a test message using:

SELECT amqp.publish(1, 'test', '', '{"foo": "foobar"}'::text)

-- or even
-- SELECT amqp.publish(1, 'test', '', 'foobar');

I receive the message twice:

$ ./receive.py 
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received b'{"foo": "foobar"}'
 [x] Received b'{"foo": "foobar"}'

This is also visible in the rabbitmq logs

2022-12-03 12:10:14.765749+01:00 [info] <0.12818.0> accepting AMQP connection <0.12818.0> (10.0.3.28:38712 -> 10.0.3.19:5672)
2022-12-03 12:10:14.766576+01:00 [info] <0.12818.0> connection <0.12818.0> (10.0.3.28:38712 -> 10.0.3.19:5672): user 'user1' authenticated and granted access to vhost '/'
2022-12-03 12:10:14.768661+01:00 [info] <0.12834.0> accepting AMQP connection <0.12834.0> (10.0.3.28:38724 -> 10.0.3.19:5672)
2022-12-03 12:10:14.769267+01:00 [info] <0.12834.0> connection <0.12834.0> (10.0.3.28:38724 -> 10.0.3.19:5672): user 'user1' authenticated and granted access to vhost '/'
2022-12-03 12:10:14.773221+01:00 [warning] <0.12834.0> closing AMQP connection <0.12834.0> (10.0.3.28:38724 -> 10.0.3.19:5672, vhost: '/', user: 'user1'):
2022-12-03 12:10:14.773221+01:00 [warning] <0.12834.0> client unexpectedly closed TCP connection
2022-12-03 12:10:14.773382+01:00 [warning] <0.12818.0> closing AMQP connection <0.12818.0> (10.0.3.28:38712 -> 10.0.3.19:5672, vhost: '/', user: 'user1'):
2022-12-03 12:10:14.773382+01:00 [warning] <0.12818.0> client unexpectedly closed TCP connection

Interestingly, if I use such query:

SELECT amqp.publish(1, 'test', '', CONCAT('{"foo": "foobar"}')::text)

The message is sent once:

2022-12-03 13:20:51.300889+01:00 [info] <0.14980.0> accepting AMQP connection <0.14980.0> (10.0.3.28:35496 -> 10.0.3.19:5672)
2022-12-03 13:20:51.301731+01:00 [info] <0.14980.0> connection <0.14980.0> (10.0.3.28:35496 -> 10.0.3.19:5672): user 'user1' authenticated and granted access to vhost '/'
2022-12-03 13:20:51.306825+01:00 [warning] <0.14980.0> closing AMQP connection <0.14980.0> (10.0.3.28:35496 -> 10.0.3.19:5672, vhost: '/', user: 'user1'):
2022-12-03 13:20:51.306825+01:00 [warning] <0.14980.0> client unexpectedly closed TCP connection

Sometime such error appear in rabbitmq

2022-12-03 13:27:44.055767+01:00 [error] <0.16605.0>     supervisor: {<0.16605.0>,rabbit_channel_sup}
2022-12-03 13:27:44.055767+01:00 [error] <0.16605.0>     errorContext: shutdown_error
2022-12-03 13:27:44.055767+01:00 [error] <0.16605.0>     reason: noproc
2022-12-03 13:27:44.055767+01:00 [error] <0.16605.0>     offender: [{pid,<0.16608.0>},
2022-12-03 13:27:44.055767+01:00 [error] <0.16605.0>                {id,channel},
2022-12-03 13:27:44.055767+01:00 [error] <0.16605.0>                {mfargs,
2022-12-03 13:27:44.055767+01:00 [error] <0.16605.0>                    {rabbit_channel,start_link,
2022-12-03 13:27:44.055767+01:00 [error] <0.16605.0>                        [2,<0.16596.0>,<0.16606.0>,<0.16596.0>,
2022-12-03 13:27:44.055767+01:00 [error] <0.16605.0>                         <<"10.0.3.28:38456 -> 10.0.3.19:5672">>,
2022-12-03 13:27:44.055767+01:00 [error] <0.16605.0>                         rabbit_framing_amqp_0_9_1,
2022-12-03 13:27:44.055767+01:00 [error] <0.16605.0>                         {user,
2022-12-03 13:27:44.055767+01:00 [error] <0.16605.0>                             <<"user1">>,
2022-12-03 13:27:44.055767+01:00 [error] <0.16605.0>                             [administrator],
2022-12-03 13:27:44.055767+01:00 [error] <0.16605.0>                             [{rabbit_auth_backend_internal,
2022-12-03 13:27:44.055767+01:00 [error] <0.16605.0>                                  #Fun<rabbit_auth_backend_internal.2.133116481>}]},
2022-12-03 13:27:44.055767+01:00 [error] <0.16605.0>                         <<"/">>,
2022-12-03 13:27:44.055767+01:00 [error] <0.16605.0>                         [{<<"authentication_failure_close">>,bool,true},
2022-12-03 13:27:44.055767+01:00 [error] <0.16605.0>                          {<<"exchange_exchange_bindings">>,bool,true}],
2022-12-03 13:27:44.055767+01:00 [error] <0.16605.0>                         <0.16597.0>,<0.16607.0>]}},
2022-12-03 13:27:44.055767+01:00 [error] <0.16605.0>                {restart_type,intrinsic},
2022-12-03 13:27:44.055767+01:00 [error] <0.16605.0>                {shutdown,70000},
2022-12-03 13:27:44.055767+01:00 [error] <0.16605.0>                {child_type,worker}]

My Dockerfile

FROM postgres:12

COPY pgdg.preferences /etc/apt/preferences.d/pgdg

RUN apt-get update \
 && apt-get install -y --allow-downgrades \
      postgresql-12-postgis-3 \
      postgresql-12-postgis-3-scripts \
      postgis \
      postgresql-12-pgtap \
      make \
      gcc \
      postgresql-server-dev-12 \
      curl \
 && apt-get clean \
 && rm -r /var/lib/apt/lists/* \
 \
 && mkdir /tmp/pgxn \
 && cd /tmp/pgxn \
 \
# && curl -L https://github.com/pmwebster/pg_amqp/archive/librabbitmq-update.tar.gz|tar xzf - \
# && cd /tmp/pgxn/pg_amqp-librabbitmq-update \
# && sed -i 's/.*HAVE_POLL.*/PG_CFLAGS=-D HAVE_POLL\nPG_CPPFLAGS=-D HAVE_POLL/' Makefile \
# && make install \
# && cd .. \
# && rm -fr /tmp/pgxn/pg_amqp-librabbitmq-update \
 && curl -L https://github.com/omniti-labs/pg_amqp/archive/240d477d40c5e7a579b931c98eb29cef4edda164.tar.gz|tar xzf - \
 && cd /tmp/pgxn/pg_amqp-240d477d40c5e7a579b931c98eb29cef4edda164 \
 && make install \
 && cd .. \
 && rm -fr /tmp/pgxn/pg_amqp-240d477d40c5e7a579b931c98eb29cef4edda164 \
 \
 && apt-get remove --purge -y \
      make \
      gcc \
      postgresql-server-dev-12 \
      curl

I have tested with main branch and with PR 28 - Makefile, pg_amqp.c, librabbitmq: updating librabbitmq-c to current v… ; with Postgres 12 and 14.

Does someone has an idea of what could going wrong?

Also from this blog post

Tries to be smart about bring up a connection to a broker on demand and leaving it connected to accelerate the next publication request.

I understood that the connection should stay up, but from the logs it seems to be closed after each [group of] message, is it really the expected behavior?