JuliaComputing / AMQPClient.jl

A Julia AMQP (Advanced Message Queuing Protocol) / RabbitMQ Client.
Other
39 stars 21 forks source link

Messages aren't sent sychronously #27

Closed andyferris closed 3 years ago

andyferris commented 3 years ago

We are seeing an issue where messages are being sent to RabbitMQ via basic_publish throughout a series of extended, CPU-bound calculations, and messages are not appearing in our other services until after all the calculations are complete. These messages are intended to report incremental progress, result sets (as they are created), logging, etc, and the delay is proving to be a bit of a problem for us.

I'm not sure I understand what part of this process is asynchronous but generally in Julia I haven't needed to worry about this behavior (unless I explicitly spin up a task, etc). E.g. the logging messages throughout the calculation are being emitted in real time. I have tried inserting yield() in various places to no success. Nor did I see a function like flush() make sure everything gets emitted before proceeding.

Are there any known solutions to this problem?

pfitzseb commented 3 years ago

So afaict (correct me if I'm wrong, Tanmay), this happens because sending a message works by pushing said message into a Channel, which is then consumed by an async task. If the main task doesn't yield more often than messages are send then the send queue just grows up to a limit of (currently) 1024.

We had the same problem with JSONRPC.jl and then added an awful flush method, which yields until the send queue is empty.

andyferris commented 3 years ago

Right, thanks.

How would you go about solving it? Is this a global ongoing task, or one per message? Would it be possible to return a task from basic_publish that I can await? Should we add an "awful" flush method?

andyferris commented 3 years ago

OK I have 3 ideas.

The first is the "awful" flush method:

function Base.flush(conn::AMQPClient.Connection)
    while isready(conn.sendq)
        yield()
    end
end

The second is to make CONN_MAX_QUEUED globally configurable by turning it into const CONN_MAX_QUEUED = Ref(1024) and allowing users to overide it before creating connections. If I set it to zero then put! becomes blocking until the message is sent.

The third is to make CONN_MAX_QUEUED configurable in the Connection constructor, which seems better than the above (and could be done in addition to flush).

function Connection(virtualhost::String="/", host::String="localhost", port::Int=AMQP_DEFAULT_PORT, send_queue_size::Int = CONN_MAX_QUEUED)
    sendq = Channel{TAMQPGenericFrame}(send_queue_size)
    sendlck = Channel{UInt8}(1)
    put!(sendlck, 1)
    new(virtualhost, host, port, nothing,
        Dict{Symbol,Any}(), Dict{String,Any}(), 0, 0, 0,
        CONN_STATE_CLOSED, sendq, sendlck, Dict{TAMQPChannel, AbstractChannel}(),
        nothing, nothing, nothing,
        0.0, 0.0)
end
tanmaykm commented 3 years ago

Yes. We could have the flush method and make the send queue size configurable as well.

c42f commented 3 years ago

Having configurable queue size ensures you get some backpressure which will eventually cause the computation to yield. So that's good. If the computation emits entries into the queue on a schedule it'll even ensure the results are timely in a backhanded kind of way. The queue size is a bit of an annoying tuning parameter but I don't think there's any great way to have a proper deadline. You could use a timer but that's also liable to be blocked if there's heavy computation going on.