JuliaComputing / AMQPClient.jl

A Julia AMQP (Advanced Message Queuing Protocol) / RabbitMQ Client.
Other
39 stars 21 forks source link

How to handle broker restart #24

Open pshashk opened 4 years ago

pshashk commented 4 years ago

Communication via fanout exchange works fine.

using AMQPClient

ENV["JULIA_DEBUG"] = "all"

const HOST = "localhost"
const PORT = 5672
const EXCHANGE = "exchange"
const AUTH = Dict{String,Any}(
    "MECHANISM" => "AMQPLAIN",
    "LOGIN" => "guest",
    "PASSWORD" => "guest"
)

function produce()
    conn = connection(;virtualhost="/", host=HOST, port=PORT, auth_params=AUTH)
    chnl = channel(conn, AMQPClient.UNUSED_CHANNEL, true)
    success = exchange_declare(chnl, EXCHANGE, EXCHANGE_TYPE_FANOUT)

    () -> begin
        timestamp = round(Int, time())
        msg = Message(Vector{UInt8}(string(timestamp)), content_type="text/plain")
        basic_publish(chnl, msg; exchange=EXCHANGE, routing_key="")
        @info "sent" data=timestamp
    end
end

function consume()
    conn = connection(;virtualhost="/", host=HOST, port=PORT, auth_params=AUTH)
    chnl = channel(conn, AMQPClient.UNUSED_CHANNEL, true)
    success, queue_name, message_count, consumer_count = queue_declare(chnl, "")
    success = queue_bind(chnl, queue_name, EXCHANGE, "")

    function consumer(msg)
        @info "recieved" data=parse(Int, String(msg.data))
        basic_ack(chnl, msg.delivery_tag)
    end

    success, consumer_tag = basic_consume(chnl, queue_name, consumer)
end

consumer = consume()
producer = produce()

producer()
┌ Info: sent
└   data = 1572692964
┌ Info: recieved
└   data = 1572692964

But when I restart RabbitMQ (docker restart ...) and call producer again julia process hangs and eats all available memory without any error reporting.

The RabbitMQ reliability guide says that in such cases it is necessary to create new connection and channel. How can I do that with AMQPClient?

pshashk commented 4 years ago

On the producer side, it seems like a simple state check is enough.


create_connection() = connection(;virtualhost="/", host=HOST, port=PORT, auth_params=AUTH)
create_channel(conn) = channel(conn, AMQPClient.UNUSED_CHANNEL, true)   

mutable struct Producer
    conn
    chnl
    function Producer()   
        conn = create_connection()
        chnl = create_channel(conn)    
        success = exchange_declare(chnl, EXCHANGE, EXCHANGE_TYPE_FANOUT)
        return new(conn, chnl)
    end
end

function (p::Producer)()
    if p.conn.state != AMQPClient.CONN_STATE_OPEN  
        p.conn = create_connection()
        p.chnl = create_channel(p.conn)        
    elseif p.chnl.state != AMQPClient.CONN_STATE_OPEN  
        p.chnl = create_channel(p.conn)
    end
    timestamp = round(Int, time())
    msg = Message(Vector{UInt8}(string(timestamp)), content_type="text/plain")
    basic_publish(p.chnl, msg; exchange=EXCHANGE, routing_key="")
    @info "sent" data=timestamp
end

But messages still don't make it to the consumer. So some connection handling is required there as well.

pshashk commented 4 years ago

I've managed to implement consumer that could handle broker restarts. But I doubt that's the best way to do it.

function consume()
    while true
        try
            conn = connection()
            chnl = channel(conn, AMQPClient.UNUSED_CHANNEL, true)
            success = exchange_declare(chnl, EXCHNG, EXCHANGE_TYPE_FANOUT, durable=true)
            success, queue_name, _, _ = queue_declare(
                chnl, ""; exclusive=true, durable=false, auto_delete=true
            )
            success = queue_bind(chnl, queue_name, EXCHNG, ROUTE)
            callback = (msg) -> begin
                @info "recieved" data=String(msg.data)
                basic_ack(chnl, msg.delivery_tag)
            end
            success, consumer_tag = basic_consume(chnl, queue_name, callback)
            fetch(chnl.consumers[consumer_tag].receiver)
        catch e
            @error exception=(e, stacktrace(catch_backtrace()))
            sleep(5)
        end
    end
end
nsslh commented 3 years ago

@pshashk This is excellent, thanks. My consumer service now restarts gracefully on connection error.