Netflix / conductor-community

Apache License 2.0
61 stars 72 forks source link

AMQPObservableQueue 'ack' implementation is inconsistent with the interface #241

Open jcockrill-rq opened 1 year ago

jcockrill-rq commented 1 year ago

Describe the bug

The AMQPObservableQueue class incorrectly implements the List<String> ack(List<Message> messages); function from the ObservableQueue interface. The javadoc of ObservableQueue#ack states:

@return the id of the ones which could not be ack'ed

The behaviour of AMQPObservableQueue is instead to return the successfully-ack'ed messages. The consequence of this appears to be that com.netflix.conductor.core.events.queue.DefaultEventQueueProcessor in conductor-core generates a lot of log-error spam because of this line:

https://github.com/Netflix/conductor/blob/631a04dd790f5d46e76691b47a748cad77e0e20d/core/src/main/java/com/netflix/conductor/core/events/queue/DefaultEventQueueProcessor.java#L155

List<String> failures = queue.ack(Collections.singletonList(msg));
if (!failures.isEmpty()) {
    LOGGER.error("Not able to ack the messages {}", failures);
}

The associated AMQPObservableQueueTest test-class has a testAck test which doesn't currently properly validate this behaviour insofar as all it does is validate that the returned List<String> is not-null. It doesn't validate that it is empty.

Details Conductor version: current version on main Persistence implementation: N/A Queue implementation: N/A Lock: N/A Workflow definition: N/A Task definition: N/A Event handler definition: AMQP

To Reproduce

See associated test-case.

Expected behavior

AMQPObservableQueue#ack should only return messages that have thrown an error.

Screenshots

N/A

Additional context

N/A

jcockrill-rq commented 1 year ago

Please see PR #242 for the associated test-update and fix.