mbroadst / qamqp

AMQP 0.9.1 implementation for Qt
Other
151 stars 127 forks source link

Initial stab at test case for Issue 23. #24

Closed sjlongland closed 9 years ago

sjlongland commented 9 years ago

This tries to declare a queue, consume it multiple times in quick succession, then sends a few messages to the queue and receives them back. If the bug is fixed, there should only be one consumer created, each message should be received once and the body should appear once.

mbroadst commented 9 years ago

@sjlongland I think the code over here solves your problem, and properly tests the potential issue: https://github.com/mbroadst/qamqp/commit/d2e76f062b0260f6fd2dca0012e6550579f5609d. I know its not ideal, but its far less code and doesn't immediately make any big changes.

I think the way to go in the future is to develop a sort of internal state machine since we have a bunch of state to track here, that isn't being tracked (sure this patch solves the multiple consumers, but what about multiple attempts to e.g. cancel? This problem is endemic.

sjlongland commented 9 years ago

Hi, yes that's more or less what I did in https://github.com/sjlongland/qamqp/commit/51b4444835e4b2e16a63446224a5d34ae0ab5595; yours is certainly slightly cleaner.

What I was starting to do in my code was wrap the Client, Queue and Exchange objects in objects of my own, so I had the following code.

This code is part of a proprietary code base that's in pre-alpha stage, so it has copyright notices that state this; there is talk of doing something under an open-source license and I'm happy to contribute the ideas shown in this code to QAMQP if it helps.

AMQPConnectionHandler is just meant to re-start a connection if it fails. This is before I noticed the Auto-Connect feature of QAMQP. Basically it would do a re-connect itself then tell the queues and exchanges what was happening. The other thing it did was hand out references to the objects, acting as a factory, that way the reference-counting QSharedPointer class would take care of clean-up.

The other thing it does is that when an exchange or queue of a particular name is requested more than once, the same reference is handed out: a weak reference is held by the connection, if the object is asked for again, the weak reference is converted to a strong reference and only re-created if the reference turns out to be NULL.

/*
 * MeterMaster Data Access Layer
 * (C) 2015 VRT Systems
 *
 * The following Software is the proprietary property of VRT Systems.
 * Unauthorised disclosure or use of this Software is forbidden to the extent
 * permitted by law.  Published on Github
 *
 * vim: set ts=8 sts=4 noet tw=78 sw=8 si:
 */
#ifndef _AMQP_CONNECTION_HANDLER_H
#define _AMQP_CONNECTION_HANDLER_H

#include <qamqp/qamqpclient.h>
#include <qamqp/qamqpchannel.h>
#include <qamqp/qamqpexchange.h>
#include <qamqp/qamqpqueue.h>

#include <QObject>
#include <QWeakPointer>
#include <QSharedPointer>
#include <QHash>
#include <QString>

#include "message/BaseMessage.h"
#include "util/SelfRef.h"
#include "util/logcontext.h"

namespace MeterMasterDAL {
    /* Forward declarations */
    class AMQPConnectionHandler;
    typedef QSharedPointer<AMQPConnectionHandler>
        AMQPConnectionRef;
    typedef QWeakPointer<AMQPConnectionHandler>
        AMQPConnectionWeakRef;

    class AMQPExchangeHandler;
    typedef QSharedPointer<AMQPExchangeHandler>
        AMQPExchangeRef;
    typedef QWeakPointer<AMQPExchangeHandler>
        AMQPExchangeWeakRef;

    class AMQPQueueHandler;
    typedef QSharedPointer<AMQPQueueHandler>
        AMQPQueueRef;
    typedef QWeakPointer<AMQPQueueHandler>
        AMQPQueueWeakRef;

    /*!
     * AMQPConnectionHandler is a class for managing the graceful
     * re-establishment of AMQP connections.
     */
    class AMQPConnectionHandler : public QObject,
            public SelfRef<AMQPConnectionHandler> {

        Q_OBJECT

        public:
            enum AMQPConnectionState {
                /*! Connection is presently disconnected */
                CONN_DISCONNECTED,
                /*! Connection is being made */
                CONN_CONNECTING,
                /*! Connection has been established */
                CONN_CONNECTED,
                /*! Connection has failed */
                CONN_FAILED,
                /*! Connection is being disconnected */
                CONN_DISCONNECTING,
            };

            static const int NUM_RETRIES;
            static AMQPConnectionRef create(
                    const QString& connection_uri,
                    bool auto_connect=true,
                    QObject* parent=0);
            virtual ~AMQPConnectionHandler();

            /*!
             * Change the URI for this connection.
             * This triggers a re-connection to the service.
             */
            void setConnectionUri(const QString& uri)
            {
                this->connection_uri = uri;
                this->disconnectClient();
                if (this->intended_state == CONN_CONNECTED)
                    this->connectToAMQP();
            }

            /*!
             * Get the current state of this connection.
             */
            AMQPConnectionState getState() const
            {
                return this->state;
            }

            /*!
             * See if the connection is in the given state.
             */
            bool isInState(AMQPConnectionState state) const
            {
                return this->getState() == state;
            }

            /*!
             * Get a reference to the default exchange.
             */
            virtual AMQPExchangeRef getExchange() const
            {
                return this->getExchange(QString());
            }

            /*!
             * Get or create a reference to the default
             * exchange.
             */
            virtual AMQPExchangeRef getExchange()
            {
                return this->getExchange(QString());
            }

            /*!
             * Get a reference to an existing exchange.
             */
            virtual AMQPExchangeRef getExchange(
                    const QString& name) const
            {
                return AMQPExchangeRef(
                    this->exchanges.value(name));
            }

            /*!
             * Get or create a reference to the specified exchange.
             */
            virtual AMQPExchangeRef getExchange(
                    const QString& name,
                    QAmqpExchange::ExchangeType
                        type=QAmqpExchange::Direct,
                    QAmqpExchange::ExchangeOption
                        options=QAmqpExchange::AutoDelete);

            /*! Create a new queue with given options */
            virtual AMQPQueueRef getQueue(
                    int options=QAmqpQueue::Exclusive)
            {
                return this->getQueue(QString(), options);
            }

            /*! Create a queue with a given name and options */
            virtual AMQPQueueRef getQueue(const QString& name) const
            {
                return AMQPQueueRef(
                    this->queues.value(name));
            }

            /*! Create a queue with a given name and options */
            virtual AMQPQueueRef getQueue(const QString& name,
                    int options=QAmqpQueue::Exclusive);

            /*!
             * Send a message via this connection.  This tries to
             * locate the exchange the message is destined for then
             * passes the message to it for sending.
             *
             * @retval  true    Message was sent
             * @retval  false   Message was not sent
             */
            virtual bool sendMessage(MessageRef message) const;

        public slots:
            /*!
             * Connect to AMQP.
             */
            virtual void connectToAMQP();

            /*!
             * Disconnect from AMQP
             */
            virtual void disconnectFromAMQP();

        signals:
            /*!
             * Connection error encountered: for human
             * consumption.
             *
             * @param   what        Cause of error
             *              condition
             */
            void connectionError(
                    AMQPConnectionRef connection,
                    const QString what);

            /*!
             * Connection was lost.
             * @param   reconnecting    True if
             *              re-connecting
             */
            void connectionLost(
                    AMQPConnectionRef connection,
                    bool reconnecting);

            /*!
             * Connection has changed state.
             */
            void changedState(
                    AMQPConnectionRef connection);

            /*! Connected to AMQP */
            void connected(
                    AMQPConnectionRef connection);

        protected:
            AMQPConnectionHandler(
                    const QString& connection_uri,
                    QObject* parent=0);

            /*! Client object */
            QAmqpClient*        client;

            /*! Connection state */
            AMQPConnectionState state;

            /*!
             * Intended state, for restoring the previous
             * state when a connection is dropped.
             */
            AMQPConnectionState intended_state;

            /*! Number of retries remaining on connect */
            int         retries;

            /*! Connection URI */
            QString         connection_uri;

            /*! Weak reference to exchanges */
            QHash<QString, AMQPExchangeWeakRef>
                        exchanges;

            /*! Weak reference to queues */
            QHash<QString, AMQPQueueWeakRef>
                        queues;

            /*! Change of state */
            virtual void newState(AMQPConnectionState state);

            /*! Replace connection instance */
            virtual void replaceInstance(QAmqpClient* client);

            /*! Disconnect instance */
            virtual void disconnectClient();

            /*!
             * Log category instance.
             */
            log4cpp::Category& log;

        protected slots:
            /*! Unnamed queue has been declared */
            virtual void queueDeclared(AMQPQueueRef queue);
            /*! Nameless queue has been dropped */
            virtual void namelessQueueDropped(
                    AMQPQueueRef queue);
            /*! Connection to AMQP established */
            virtual void connected();
            /*! Connection to AMQP lost */
            virtual void disconnected();
            /*! AMQP connection error */
            void error(QAMQP::Error error);
            /*! Connection to AMQP failed */
            void socketError(QAbstractSocket::SocketError e);
        private:

            friend class AMQPConnectionObject;
    };

    inline std::ostream& operator<<(std::ostream& out,
            AMQPConnectionHandler::AMQPConnectionState s)
    {
        switch(s) {
            case AMQPConnectionHandler::CONN_DISCONNECTED:
                out << "CONN_DISCONNECTED";
                break;
            case AMQPConnectionHandler::CONN_CONNECTING:
                out << "CONN_CONNECTING";
                break;
            case AMQPConnectionHandler::CONN_CONNECTED:
                out << "CONN_CONNECTED";
                break;
            case AMQPConnectionHandler::CONN_FAILED:
                out << "CONN_FAILED";
                break;
            case AMQPConnectionHandler::CONN_DISCONNECTING:
                out << "CONN_DISCONNECTING";
                break;
            default:
                out << "CONN_?";
        }
        return out;
    }
}

#endif

/*
 * MeterMaster Data Access Layer
 * (C) 2015 VRT Systems
 *
 * The following Software is the proprietary property of VRT Systems.
 * Unauthorised disclosure or use of this Software is forbidden to the extent
 * permitted by law.
 *
 * vim: set ts=8 sts=4 noet tw=78 sw=8 si:
 */

#include "amqp/AMQPConnectionHandler.h"
#include "amqp/AMQPExchangeHandler.h"
#include "amqp/AMQPQueueHandler.h"

using namespace MeterMasterDAL;
const int AMQPConnectionHandler::NUM_RETRIES = 3;

AMQPConnectionRef AMQPConnectionHandler::create(
                    const QString& connection_uri,
                    bool auto_connect,
                    QObject* parent)
{
    log4cpp::Category& log(
            log4cpp::Category::getInstance(
                "AMQPConnectionHandler"));

    log.debugStream() << "Creating connection handler";
    AMQPConnectionRef ch = newRef(AMQPConnectionHandler,
            connection_uri, parent);
    log.debugStream() << "Created connection handler";
    if (auto_connect)
        ch->connectToAMQP();
    log.debugStream() << "Connection attempt started";
    return ch;
}

AMQPConnectionHandler::~AMQPConnectionHandler()
{
    this->log.debugStream()
        << "Destructor called.";
    this->replaceInstance(NULL);
}

/*!
 * Get or create a reference to the specified exchange.
 */
AMQPExchangeRef AMQPConnectionHandler::getExchange(
        const QString& name,
        QAmqpExchange::ExchangeType type,
        QAmqpExchange::ExchangeOption options)
{
    AMQPExchangeRef ex(((const AMQPConnectionHandler*)
                this)->getExchange(name));
    if (ex)
        return ex;

    /* No exchange of this name exists. */
    ex = newRef(AMQPExchangeHandler,
            this->ref(), name, type, options);
    this->exchanges[name] = ex.toWeakRef();
    connect(this,       SIGNAL(connected(
                    AMQPConnectionRef)),
        ex.data(),  SLOT(connected(
                    AMQPConnectionRef)));
    connect(this,       SIGNAL(connectionLost(
                    AMQPConnectionRef, bool)),
        ex.data(),  SLOT(connectionLost(
                    AMQPConnectionRef, bool)));
    return ex;
}

/*! Create a queue with a given name and options */
AMQPQueueRef AMQPConnectionHandler::getQueue(const QString& name,
        int options)
{
    AMQPQueueRef q;
    if (!(name.isEmpty() || name.isNull())) {
        q = AMQPQueueRef(this->queues.value(name));
        if (q)
            return q;
    }

    q = newRef(AMQPQueueHandler,
            this->ref(), name, options);
    connect(this,       SIGNAL(connected(
                    AMQPConnectionRef)),
        q.data(),   SLOT(connected(
                    AMQPConnectionRef)));
    connect(this,       SIGNAL(connectionLost(
                    AMQPConnectionRef, bool)),
        q.data(),   SLOT(connectionLost(
                    AMQPConnectionRef, bool)));
    if (!(name.isEmpty() || name.isNull()))
        this->queues[name] = q.toWeakRef();
    return q;
}

/*!
 * Send a message via this connection.  This tries to
 * locate the exchange the message is destined for then
 * passes the message to it for sending.
 *
 * @retval  true    Message was sent
 * @retval  false   Message was not sent
 */
bool AMQPConnectionHandler::sendMessage(MessageRef message) const
{
    /* Try to find the responsible queue */
    AMQPExchangeRef ex(this->getExchange(message->exchange));
    if (ex)
        return ex->sendMessage(message);
    return false;
}

/*!
 * Connect to AMQP.
 */
void AMQPConnectionHandler::connectToAMQP()
{
    this->log.debugStream() << "Attempting connection to AMQP";
    this->intended_state = CONN_CONNECTED;
    this->retries--;

    this->replaceInstance(new QAmqpClient(this));
    this->newState(CONN_CONNECTING);
    this->log.debugStream() << "Beginning connection attempt";
    this->client->connectToHost(this->connection_uri);
    this->log.debugStream() << "Waiting for connected signal";
}

/*!
 * Disconnect from AMQP
 */
void AMQPConnectionHandler::disconnectFromAMQP()
{
    this->intended_state = CONN_DISCONNECTED;
    this->newState(CONN_DISCONNECTING);
    this->disconnectClient();
}

void AMQPConnectionHandler::disconnectClient()
{
    if ((this->client != NULL) && (this->client->isConnected()))
        this->client->disconnectFromHost();
}

AMQPConnectionHandler::AMQPConnectionHandler(
                    const QString& connection_uri,
                    QObject* parent)
:   QObject(parent),
    client(NULL),
    state(CONN_DISCONNECTED),
    intended_state(CONN_DISCONNECTED),
    retries(AMQPConnectionHandler::NUM_RETRIES),
    connection_uri(connection_uri),
    log(log4cpp::Category::getInstance("AMQPConnectionHandler"))
{
    this->log.debugStream() << "New connection handler for "
                << connection_uri;
}

/*! Unnamed queue has been declared */
void AMQPConnectionHandler::queueDeclared(AMQPQueueRef queue)
{
    if (!this->queues.contains(queue->getName()))
        this->queues[queue->getName()] = queue.toWeakRef();
}

/*! Nameless queue has been dropped */
void AMQPConnectionHandler::namelessQueueDropped(
        AMQPQueueRef queue)
{
    if (this->queues.value(queue->getName()) == queue)
        this->queues.remove(queue->getName());
}
/*! Connection to AMQP established */
void AMQPConnectionHandler::connected()
{
    this->retries = AMQPConnectionHandler::NUM_RETRIES;
    this->newState(CONN_CONNECTED);
    emit connected(this->ref());
}

/*! Connection to AMQP lost */
void AMQPConnectionHandler::disconnected()
{
    bool reconnecting = (this->intended_state != CONN_DISCONNECTED)
            && (this->retries > 0);
    this->log.debugStream() << "Received disconnected signal";
    emit connectionLost(this->ref(), reconnecting);
    this->newState(CONN_DISCONNECTED);
    if (reconnecting) {
        this->log.debugStream() << "Re-trying connection";
        this->connectToAMQP();
    } else {
        this->log.debugStream() << "Giving up.";
        this->replaceInstance(NULL);
    }
}

/*! Change of state */
void AMQPConnectionHandler::newState(AMQPConnectionState state)
{
    if (this->state == state)
        return;
    this->log.debugStream() << "Entering state " << state;
    this->state = state;
    emit changedState(this->ref());
}
/*! AMQP connection error */
void AMQPConnectionHandler::error(QAMQP::Error error)
{
    this->newState(CONN_FAILED);
    this->log.errorStream()
        << "Connection error: "
        << error;

    emit connectionError(this->ref(),
            this->client->errorString());
    if (this->client->isConnected())
        return;

    this->disconnected();
}

inline std::ostream& operator<<(std::ostream& out,
        QAbstractSocket::SocketError e)
{
    switch(e) {
        case QAbstractSocket::ConnectionRefusedError:
            out << "ConnectionRefusedError";
            break;
        case QAbstractSocket::RemoteHostClosedError:
            out << "RemoteHostClosedError";
            break;
        case QAbstractSocket::HostNotFoundError:
            out << "HostNotFoundError";
            break;
        case QAbstractSocket::SocketAccessError:
            out << "SocketAccessError";
            break;
        case QAbstractSocket::SocketResourceError:
            out << "SocketResourceError";
            break;
        case QAbstractSocket::SocketTimeoutError:
            out << "SocketTimeoutError";
            break;
        case QAbstractSocket::DatagramTooLargeError:
            out << "DatagramTooLargeError";
            break;
        case QAbstractSocket::NetworkError:
            out << "NetworkError";
            break;
        case QAbstractSocket::AddressInUseError:
            out << "AddressInUseError";
            break;
        case QAbstractSocket::SocketAddressNotAvailableError:
            out << "SocketAddressNotAvailableError";
            break;
        case QAbstractSocket::UnsupportedSocketOperationError:
            out << "UnsupportedSocketOperationError";
            break;
        case QAbstractSocket::ProxyAuthenticationRequiredError:
            out << "ProxyAuthenticationRequiredError";
            break;
        case QAbstractSocket::SslHandshakeFailedError:
            out << "SslHandshakeFailedError";
            break;
        case QAbstractSocket::UnfinishedSocketOperationError:
            out << "UnfinishedSocketOperationError";
            break;
        case QAbstractSocket::ProxyConnectionRefusedError:
            out << "ProxyConnectionRefusedError";
            break;
        case QAbstractSocket::ProxyConnectionClosedError:
            out << "ProxyConnectionClosedError";
            break;
        case QAbstractSocket::ProxyConnectionTimeoutError:
            out << "ProxyConnectionTimeoutError";
            break;
        case QAbstractSocket::ProxyNotFoundError:
            out << "ProxyNotFoundError";
            break;
        case QAbstractSocket::ProxyProtocolError:
            out << "ProxyProtocolError";
            break;
        default:
            out << "UnknownSocketError";
            break;
    }
    return out;
}

/*! Connection to AMQP failed */
void AMQPConnectionHandler::socketError(
    QAbstractSocket::SocketError error)
{
    this->newState(CONN_FAILED);
    emit connectionError(this->ref(),
            this->client->errorString());
    this->log.debugStream()
        << "Socket error: "
        << (QAbstractSocket::SocketError)error
        << this->client->errorString();
    this->disconnected();
}

/*! Replace connection instance */
void AMQPConnectionHandler::replaceInstance(QAmqpClient* client)
{
    this->log.debugStream()     << "Replacing QAmqpClient instance at "
                << (void*)this->client
                << " with instance at "
                << (void*)client;
    if (this->client != NULL) {
        this->log.debugStream() << "Shutting down old instance.";
        if (this->client->isConnected()) {
            this->client->disconnectFromHost();
            disconnect(this->client);
        }
        this->client->deleteLater();
        this->log.debugStream() << "Old instance destroyed.";
    }

    this->log.debugStream() << "Switching instance.";
    this->client = client;
    if (client == NULL)
        return;

    this->log.debugStream() << "Connecting signals to new instance.";
    connect(this->client,   SIGNAL(connected()),
        this,       SLOT(connected()));
    connect(this->client,   SIGNAL(disconnected()),
        this,       SLOT(disconnected()));
    connect(this->client,   SIGNAL(error(QAMQP::Error)),
        this,       SLOT(error(QAMQP::Error)));
    connect(this->client,   SIGNAL(socketError(QAbstractSocket::SocketError)),
        this,       SLOT(socketError(QAbstractSocket::SocketError)));
}

AMQPExchangeHandler and AMQPQueueHandler are subclasses of AMQPConnectionObject for convenience:

/*
 * MeterMaster Data Access Layer
 * (C) 2015 VRT Systems
 *
 * The following Software is the proprietary property of VRT Systems.
 * Unauthorised disclosure or use of this Software is forbidden to the extent
 * permitted by law.
 *
 * vim: set ts=8 sts=4 noet tw=78 sw=8 si:
 */
#ifndef _AMQP_OBJECT_HANDLER_H
#define _AMQP_OBJECT_HANDLER_H

#include "util/SelfRef.h"
#include "amqp/AMQPConnectionHandler.h"

namespace MeterMasterDAL {
    /*!
     * AMQPConnectionObject is a child object of AMQPConnectionHandler
     * that manages a reference to the parent.
     */
    class AMQPConnectionObject {
        public:
            virtual ~AMQPConnectionObject();

        protected:
            AMQPConnectionObject(
                    AMQPConnectionRef connection);

            AMQPConnectionRef   connection;
            QAmqpClient*&       client;
        protected slots:
        private:
    };
}

#endif
/*
 * MeterMaster Data Access Layer
 * (C) 2015 VRT Systems
 *
 * The following Software is the proprietary property of VRT Systems.
 * Unauthorised disclosure or use of this Software is forbidden to the extent
 * permitted by law.
 *
 * vim: set ts=8 sts=4 noet tw=78 sw=8 si:
 */
#include "amqp/AMQPConnectionObject.h"
#include "amqp/AMQPExchangeHandler.h"
#include "amqp/AMQPQueueHandler.h"

using namespace MeterMasterDAL;
AMQPConnectionObject::~AMQPConnectionObject()
{
}

AMQPConnectionObject::AMQPConnectionObject(AMQPConnectionRef connection)
: connection(connection), client(connection->client)
{
}

AMQPExchangeHandler had a bit more state information to it, when a disconnect was received it'd drop its QAmqpExchange pointer and would pick up a new one when a connect was received.

/*
 * MeterMaster Data Access Layer
 * (C) 2015 VRT Systems
 *
 * The following Software is the proprietary property of VRT Systems.
 * Unauthorised disclosure or use of this Software is forbidden to the extent
 * permitted by law.
 *
 * vim: set ts=8 sts=4 noet tw=78 sw=8 si:
 */
#ifndef _AMQP_EXCHANGE_HANDLER_H
#define _AMQP_EXCHANGE_HANDLER_H

#include <qamqp/qamqpexchange.h>

#include <QObject>
#include <QWeakPointer>
#include <QSharedPointer>
#include <QHash>
#include <QString>

#include "message/BaseMessage.h"
#include "amqp/AMQPConnectionObject.h"
#include "util/logcontext.h"

namespace MeterMasterDAL {
    /*!
     * AMQPConnectionHandler is a class for managing the graceful
     * re-establishment of AMQP connections.
     */
    class AMQPExchangeHandler
    :   public QObject,
        public AMQPConnectionObject,
        public SelfRef<AMQPExchangeHandler> {

        Q_OBJECT

        public:
            enum AMQPExchangeState {
                /*! Connection is presently disconnected */
                EXC_DISCONNECTED,
                /*! Exchange is being declared */
                EXC_DECLARING,
                /*! Exchange has been declared */
                EXC_DECLARED,
                /*! Exchange could not be declared */
                EXC_FAILED,
            };

            virtual ~AMQPExchangeHandler();

            /*!
             * Get the name of this exchange.
             */
            const QString& getName() const
            {
                return this->name;
            }

            /*!
             * Get the current state of this exchange.
             */
            AMQPExchangeState getState() const
            {
                return this->state;
            }

            /*!
             * See if the exchange is in the given state.
             */
            bool isInState(AMQPExchangeState state) const
            {
                return this->getState() == state;
            }

            /*!
             * Send a message via this exchange.
             *
             * @retval  true    Message was sent
             * @retval  false   Message was not sent
             */
            virtual bool sendMessage(MessageRef message);

        public slots:
        signals:
            /*! Exchange has been declared */
            void declared(AMQPExchangeRef ex);

            /*! Exchange failed to be declared */
            void declareFailed(
                    AMQPExchangeRef ex,
                    QAMQP::Error error);

        protected:
            AMQPExchangeHandler(
                    AMQPConnectionRef connection,
                    const QString& name,
                    QAmqpExchange::ExchangeType type,
                    QAmqpExchange::ExchangeOption options);

            /*!
             * Declare this exchange.
             */
            virtual void declare();

            /*!
             * Replace this exchange instance.
             */
            virtual void replaceInstance(
                    QAmqpExchange* instance);

            /*! Exchange state */
            AMQPExchangeState       state;

            /*! Exchange name */
            QString             name;

            /*! Exchange type */
            QAmqpExchange::ExchangeType type;

            /*! Exchange Options */
            QAmqpExchange::ExchangeOption   options;

            /*! Exchange object */
            QAmqpExchange*          exchange;

            /*!
             * Log category instance.
             */
            log4cpp::Category& log;

            /*! Change of state */
            virtual void newState(AMQPExchangeState state);

        protected slots:
            /*! Connection established to AMQP */
            virtual void connected(
                    AMQPConnectionRef connection);

            /*! Connection lost to AMQP */
            virtual void connectionLost(
                    AMQPConnectionRef connection,
                    bool reconnect);

            /*! Failed to declare exchange */
            virtual void error(QAMQP::Error error);

            /*! Exchange declared */
            virtual void declared();
        private:
        friend class AMQPConnectionHandler;
    };

    inline std::ostream& operator<<(std::ostream& out,
            AMQPExchangeHandler::AMQPExchangeState s)
    {
        switch(s) {
            case AMQPExchangeHandler::EXC_DISCONNECTED:
                out << "EXC_DISCONNECTED";
                break;
            case AMQPExchangeHandler::EXC_DECLARING:
                out << "EXC_DECLARING";
                break;
            case AMQPExchangeHandler::EXC_DECLARED:
                out << "EXC_DECLARED";
                break;
            case AMQPExchangeHandler::EXC_FAILED:
                out << "EXC_FAILED";
                break;
            default:
                out << "EXC_?";
        }
        return out;
    }
}

#endif

/*
 * MeterMaster Data Access Layer
 * (C) 2015 VRT Systems
 *
 * The following Software is the proprietary property of VRT Systems.
 * Unauthorised disclosure or use of this Software is forbidden to the extent
 * permitted by law.
 *
 * vim: set ts=8 sts=4 noet tw=78 sw=8 si:
 */

#include <QDebug>

#include "amqp/AMQPConnectionHandler.h"
#include "amqp/AMQPExchangeHandler.h"
#include "message/types.h"

using namespace MeterMasterDAL;

AMQPExchangeHandler::~AMQPExchangeHandler()
{
    /*
     * Note: do NOT delete the exchange pointer.  QAMQP takes care
     * of it.
     */
    this->log.debugStream() << "Destructor called";
}

/*!
 * Send a message via this exchange.
 *
 * @retval  true    Message was sent
 * @retval  false   Message was not sent
 */
bool AMQPExchangeHandler::sendMessage(MessageRef message)
{
    LogContext lc(this->log, QStringList("sendMessage")
                << message->correlation_id.toString());

    if (!this->isInState(EXC_DECLARED)) {
        lc.log.debugStream()    << "Exchange is in state "
                    << this->getState()
                    << "; message dropped.";
        return false;
    }

    lc.log.debugStream()    << "Attempting delivery.";
    try {
        QAmqpMessage::PropertyHash properties(
                message->getProperties());
        QByteArray body(message->dumpBody());

        lc.log.debugStream()    << "To Exchange: "
                    << this->getName()
                    << " routing key: "
                    << message->routing_key
                    << " correlation ID: "
                    << message->correlation_id.toString()
                    << " message ID: "
                    << message->message_id.toString()
                    << " body:\n---\n"
                    << body.data()
                    << "\n---";
        this->exchange->publish(body, message->routing_key,
                properties);
        return true;
    } catch (std::exception& e) {
        this->log.errorStream() << "FAILED to send message: "
            << e.what()
            << " Message ID: "
            << message->message_id
            << " Correlation ID: "
            << message->correlation_id
            << " Expiry: "
            << message->expires.toString();
        return false;
    }
}

AMQPExchangeHandler::AMQPExchangeHandler(
        AMQPConnectionRef connection,
        const QString& name,
        QAmqpExchange::ExchangeType type,
        QAmqpExchange::ExchangeOption options)
:   QObject(connection.data()),
    AMQPConnectionObject(connection),
    state(EXC_DISCONNECTED),
    name(name),
    type(type),
    options(options),
    exchange(NULL),
    log(log4cpp::Category::getInstance(
        QString("AMQPExchangeHandler.%1").arg(
            (name.isEmpty() || name.isNull())
            ? QString("(default)") : name).toStdString()))
{
    this->log.debugStream() << "New exchange handler";
    if (this->connection->isInState(AMQPConnectionHandler::CONN_CONNECTED))
        this->declare();
    else
        this->log.debugStream() << "Waiting for \"connected\" signal.";
}

/*!
 * Declare this exchange.
 */
void AMQPExchangeHandler::declare()
{
    if (this->name.isEmpty() || this->name.isNull()) {
        /* The "nameless" exchange is an AMQP standard */
        this->log.debugStream()
            << "Local instance of \"nameless\" exchange";
        this->replaceInstance(this->client->createExchange());
        this->declared();
    } else {
        /* New exchange, declare it */
        this->log.debugStream()
            << "Instance of exchage " << this->name;
        this->replaceInstance(this->client->createExchange(
                    this->name));
        this->newState(EXC_DECLARING);
        this->exchange->declare(
                this->type, this->options);
    }
}

/*! Connection established to AMQP */
void AMQPExchangeHandler::connected(
        AMQPConnectionRef connection)
{
    this->declare();
}

/*! Connection lost to AMQP */
void AMQPExchangeHandler::connectionLost(
        AMQPConnectionRef connection,
        bool reconnect)
{
    this->replaceInstance(NULL);
    this->newState(EXC_DISCONNECTED);
}

/*! Failed to declare exchange */
void AMQPExchangeHandler::error(QAMQP::Error error)
{
    this->log.debugStream()
        << "Received error "
        << this->exchange->errorString();
    this->newState(EXC_FAILED);
    emit declareFailed(this->ref(), error);
}

/*! Exchange declared */
void AMQPExchangeHandler::declared()
{
    this->log.debugStream()
        << "Received \"declared\" signal.";
    this->newState(EXC_DECLARED);
    emit declared(this->ref());
}

/*! Replace the exchange instance */
void AMQPExchangeHandler::replaceInstance(QAmqpExchange* exchange)
{
    if (this->exchange != NULL) {
        this->log.debugStream()
            << "Dropping old instance at "
            << (void*)this->exchange;
        disconnect(this->exchange);
    }
    this->exchange = exchange;
    this->log.debugStream()
        << "Using new instance at "
        << (void*)exchange;
    if (exchange != NULL) {
        this->log.debugStream()
            << "Connecting signals.";
        connect(this->exchange, SIGNAL(declared()),
                this,   SLOT(declared()));
        connect(this->exchange, SIGNAL(error(QAMQP::Error)),
                this,   SLOT(error(QAMQP::Error)));
    }
}

void AMQPExchangeHandler::newState(AMQPExchangeState state)
{
    if (!this->isInState(state)) {
        this->log.debugStream()
            << "Entering state "
            << state
            << " (old state "
            << this->state
            << ")";
        this->state = state;
    }
}

AMQPQueueHandler is where most of the action takes place. It keeps track of what exchanges it is bound to so that it can re-instate those when the connection comes back.

/*
 * MeterMaster Data Access Layer
 * (C) 2015 VRT Systems
 *
 * The following Software is the proprietary property of VRT Systems.
 * Unauthorised disclosure or use of this Software is forbidden to the extent
 * permitted by law.
 *
 * vim: set ts=8 sts=4 noet tw=78 sw=8 si:
 */
#ifndef _AMQP_QUEUE_HANDLER_H
#define _AMQP_QUEUE_HANDLER_H

#include <qamqp/qamqpqueue.h>

#include <QObject>
#include <QWeakPointer>
#include <QSharedPointer>
#include <QHash>
#include <QSet>
#include <QString>

#include "amqp/AMQPConnectionHandler.h"
#include "amqp/AMQPConnectionObject.h"
#include "message/BaseMessage.h"
#include "util/logcontext.h"

namespace MeterMasterDAL {
    /* Forward declarations */
    class AMQPConnectionHandler;
    typedef QSharedPointer<AMQPConnectionHandler>
        AMQPConnectionRef;
    typedef QWeakPointer<AMQPConnectionHandler>
        AMQPConnectionWeakRef;

    class AMQPExchangeHandler;
    typedef QWeakPointer<AMQPExchangeHandler>
        AMQPExchangeWeakRef;

    /*!
     * Exchange subscription list.
     */
    class AMQPQueueExchangeSubscription {
        public:
            /*! The exchange reference to subscribe/unsubscribe */
            AMQPExchangeWeakRef exchange;

            /*! List of routing keys presently subscribed */
            QSet<QString>       subscribed;

            /*! List of routing keys to subscribe to */
            QSet<QString>       to_subscribe;

            /*! List of routing keys to unsubscribe from */
            QSet<QString>       to_unsubscribe;

            /*! Is this subscription list up-to-date? */
            bool isUpToDate() const
            {
                return this->to_subscribe.empty()
                    && this->to_unsubscribe.empty();
            }

            /*! Is this subscription list empty? */
            bool isEmpty() const
            {
                return this->isUpToDate()
                    && this->subscribed.empty();
            }

            /*! Mark a key as to_subscribe. */
            void markToSubscribe(const QString& key)
            {
                if (key.isNull())   return;
                if (key.isEmpty())  return;
                this->subscribed.remove(key);
                this->to_unsubscribe.remove(key);
                this->to_subscribe << key;
            }

            /*! Mark a key as to_unsubscribe. */
            void markToUnsubscribe(const QString& key)
            {
                if (key.isNull())   return;
                if (key.isEmpty())  return;
                this->to_subscribe.remove(key);
                this->to_unsubscribe << key;
            }

            /*! Mark a key as subscribed. */
            void markSubscribed(const QString& key)
            {
                if (key.isNull())   return;
                if (key.isEmpty())  return;
                this->subscribed << key;
                this->to_unsubscribe.remove(key);
                this->to_subscribe.remove(key);
            }

            /*! Mark a key as unsubscribed. */
            void markUnsubscribed(const QString& key)
            {
                if (key.isNull())   return;
                if (key.isEmpty())  return;
                this->subscribed.remove(key);
                this->to_unsubscribe.remove(key);
                this->to_subscribe.remove(key);
            }

            /*! Mark currently subscribed keys as to_subscribe. */
            void markAllToSubscribe()
            {
                this->to_subscribe.unite(this->subscribed);
                this->subscribed.clear();
            }

            /*! Remove to_unsubscribe keys from subscribed. */
            void removeAllToUnsubscribe()
            {
                this->subscribed.subtract(this->to_unsubscribe);
                this->to_unsubscribe.clear();
            }
    };

    /*!
     * AMQPConnectionHandler is a class for managing the graceful
     * re-establishment of AMQP connections.
     */
    class AMQPQueueHandler
    :   public QObject,
        public AMQPConnectionObject,
        public SelfRef<AMQPQueueHandler> {

        Q_OBJECT
        public:

            enum AMQPQueueState {
                /*! Connection is presently disconnected */
                Q_DISCONNECTED,
                /*! Queue is being declared */
                Q_DECLARING,
                /*! Queue has been declared */
                Q_DECLARED,
                /*! Queue is consuming */
                Q_CONSUMING,
                /*! Queue is binding to an exchange */
                Q_BINDING,
                /*! Queue is unbinding from an exchange */
                Q_UNBINDING,
                /*! QOS settings are being defined. */
                Q_QOS,
            };

            virtual ~AMQPQueueHandler();

            /*! Return the name of this queue */
            virtual const QString& getName() const
            {
                if (this->declared_name.isEmpty()
                        || this->declared_name.isNull())
                    return this->name;
                else
                    return this->declared_name;
            }

            /*! Return the consumer tag for this queue */
            virtual const QString& getConsumerTag() const
            {
                return this->consumer_tag;
            }

            /*! Return the state of the queue */
            virtual AMQPQueueState getState() const
            {
                return this->state;
            }

            /*! See if the queue is in the given state */
            virtual bool isInState(AMQPQueueState state) const
            {
                return this->getState() == state;
            }

            /*! Bind to an exchange */
            virtual void bind(AMQPExchangeRef exchange,
                    const QString& key);

            /*! Bind to an exchange, passively declaring it */
            virtual void bind(const QString& exchange,
                    const QString& key);

            /*! Unbind from an exchange */
            virtual void unbind(AMQPExchangeRef exchange,
                    const QString& key);

            /*! Unbind from an exchange */
            virtual void unbind(const QString& exchange,
                    const QString& key);

            /*! Set the QOS for the queue */
            virtual void setQOS(qint16 prefetch_count,
                    qint32 prefetch_size=0);

            /*! Start consuming */
            virtual void consume(int consume_options=-1);

        public slots:
        signals:
            /*! The queue has been declared */
            void declared(AMQPQueueRef queue);

            /*! The queue is now consuming messages */
            void consuming(AMQPQueueRef queue);

            /*! The queue has changed state */
            void changedState(AMQPQueueRef queue);

            /*! The queue has been bound to an exchange */
            void bound( AMQPQueueRef queue,
                    AMQPExchangeRef exchange,
                    const QString key);

            /*! The queue has been unbound from an exchange */
            void unbound(   AMQPQueueRef queue,
                    AMQPExchangeRef exchange,
                    const QString key);

            /*! Message was received */
            void messageReceived(
                    AMQPQueueRef queue,
                    MessageRef message);

        protected:
            AMQPQueueHandler(
                    AMQPConnectionRef connection,
                    const QString& name,
                    int options);
            /*! Connection state */
            AMQPQueueState      state;

            /*! Target state */
            AMQPQueueState      intended_state;

            /*! Queue name */
            QString         name;

            /*! Declared queue name */
            QString         declared_name;

            /*! Consumer tag */
            QString         consumer_tag;

            /*! Queue Options */
            int         options;

            /*! Consume Options */
            int         c_options;

            /*! QOS set */
            bool            qos_set;

            /*! QOS prefetch count setting */
            qint16          qos_prefetch_count;

            /*! QOS prefetch count size */
            qint32          qos_prefetch_size;

            /*! Queue object */
            QAmqpQueue*     queue;

            /*!
             * Queue subscriptions.
             */
            QHash<QString, AMQPQueueExchangeSubscription>
                        subscriptions;

            /*!
             * Subscriptions to process (exchange names).
             */
            QSet<QString>       to_subscribe;

            /*!
             * Unsubscriptions to process (exchange names).
             */
            QSet<QString>       to_unsubscribe;

            /*!
             * What exchange is being re-bound now?
             */
            AMQPExchangeRef     rebind_exchange;

            /*!
             * What key is being re-bound now?
             */
            QString         rebind_key;

            /*! Are we processing the state */
            bool            process_state;

            /*! Mark a key as to_subscribe. */
            void markToSubscribe(const QString& exchange,
                    const QString& key)
            {
                if (exchange.isNull())  return;
                if (exchange.isEmpty()) return;
                if (key.isNull())   return;
                if (key.isEmpty())  return;
                this->subscriptions[exchange].markToSubscribe(key);
            }

            /*! Mark a key as to_unsubscribe. */
            void markToUnsubscribe(const QString& exchange,
                    const QString& key)
            {
                if (exchange.isNull())  return;
                if (exchange.isEmpty()) return;
                if (key.isNull())   return;
                if (key.isEmpty())  return;
                if (this->subscriptions.contains(exchange))
                    this->subscriptions[exchange].markToUnsubscribe(key);
            }

            /*! Mark a key as subscribed. */
            void markSubscribed(const QString& exchange,
                    const QString& key)
            {
                if (exchange.isNull())  return;
                if (exchange.isEmpty()) return;
                if (key.isNull())   return;
                if (key.isEmpty())  return;
                if (this->subscriptions.contains(exchange))
                    this->subscriptions[exchange].markSubscribed(key);
            }

            /*! Mark a key as unsubscribed. */
            void markUnsubscribed(const QString& exchange,
                    const QString& key)
            {
                if (exchange.isNull())  return;
                if (exchange.isEmpty()) return;
                if (key.isNull())   return;
                if (key.isEmpty())  return;
                if (this->subscriptions.contains(exchange)) {
                    AMQPQueueExchangeSubscription& sub
                        = this->subscriptions[exchange];
                    sub.markUnsubscribed(key);
                    if (sub.isEmpty())
                        this->subscriptions.remove(exchange);
                }
            }

            /*! Mark currently subscribed keys as to_subscribe. */
            void markAllToSubscribe()
            {
                QHash<QString, AMQPQueueExchangeSubscription>::iterator it;
                for (it = this->subscriptions.begin();
                        it != this->subscriptions.end();
                        it++)
                    it.value().markAllToSubscribe();
            }

            /*! Remove to_unsubscribe keys from subscribed. */
            void removeAllToUnsubscribe()
            {
                QHash<QString, AMQPQueueExchangeSubscription>::iterator it;
                for (it = this->subscriptions.begin();
                        it != this->subscriptions.end();
                        it++)
                    it.value().removeAllToUnsubscribe();
            }

            /*! Is the subscription list up to date? */
            bool isUpToDate() const
            {
                QHash<QString, AMQPQueueExchangeSubscription>::const_iterator it;
                for (it = this->subscriptions.begin();
                        it != this->subscriptions.end();
                        it++)
                    if (!it.value().isUpToDate())
                        return false;
                return true;
            }

            virtual void replaceInstance(QAmqpQueue* queue);
            virtual void newState(AMQPQueueState state);
            virtual void declare();
            virtual void setQOS();

            /*!
             * Process all pending binding/unbinding requests.
             * @retval  true    Bindings up to date.
             * @retval  false   More bindings pending.
             */
            virtual bool rebind();

            /*!
             * Process all pending binding/unbinding requests for
             * a given exchange.
             *
             * @retval  true    Bindings up to date.
             * @retval  false   More bindings pending.
             */
            virtual bool rebind(
                const AMQPQueueExchangeSubscription&
                        subscription);

            /*!
             * Process all pending binding/unbinding requests for
             * a given exchange.
             *
             * @retval  true    Bindings up to date.
             * @retval  false   More bindings pending.
             */
            virtual bool rebind(
                AMQPExchangeRef exchange,
                const AMQPQueueExchangeSubscription&
                        subscription);

            /*!
             * Log category instance.
             */
            log4cpp::Category& getLog()
            {
                const QString& name = this->getName();
                if (name.isEmpty() || name.isNull())
                    return log4cpp::Category::getInstance(
                            "AMQPQueueHandler");
                else
                    return log4cpp::Category::getInstance(
                        QString("AMQPQueueHandler.%1").arg(name).toStdString());
            }

        protected slots:
            /* Internal slots */
            virtual void processState();

            /* Signals from AMQPConnectionHandler */

            /*! Connection has been lost. */
            virtual void connectionLost(
                    AMQPConnectionRef connection,
                    bool reconnecting);

            /*! Connection has been established. */
            virtual void connected(AMQPConnectionRef connection);

            /* Signals from AMQPExchangeHandler */
            virtual void declared(AMQPExchangeRef exchange);

            /* Signals from the QAmqpQueue */

            virtual void declared();
            virtual void bound();
            virtual void unbound();
            virtual void messageReceived();
            virtual void consuming(const QString &consumer_tag);
            virtual void cancelled(const QString &consumer_tag);
            virtual void qosDefined();

        private:

        friend class AMQPConnectionHandler;
    };

    inline std::ostream& operator<<(std::ostream& out,
            AMQPQueueHandler::AMQPQueueState s)
    {
        switch(s) {
            case AMQPQueueHandler::Q_DISCONNECTED:
                out << "Q_DISCONNECTED";
                break;
            case AMQPQueueHandler::Q_DECLARING:
                out << "Q_DECLARING";
                break;
            case AMQPQueueHandler::Q_DECLARED:
                out << "Q_DECLARED";
                break;
            case AMQPQueueHandler::Q_CONSUMING:
                out << "Q_CONSUMING";
                break;
            case AMQPQueueHandler::Q_BINDING:
                out << "Q_BINDING";
                break;
            case AMQPQueueHandler::Q_UNBINDING:
                out << "Q_UNBINDING";
                break;
            case AMQPQueueHandler::Q_QOS:
                out << "Q_QOS";
                break;
            default:
                out << "Q_?";
        }
        return out;
    }
}

#endif
/*
 * MeterMaster Data Access Layer
 * (C) 2015 VRT Systems
 *
 * The following Software is the proprietary property of VRT Systems.
 * Unauthorised disclosure or use of this Software is forbidden to the extent
 * permitted by law.
 *
 * vim: set ts=8 sts=4 noet tw=78 sw=8 si:
 */
#include "amqp/AMQPConnectionHandler.h"
#include "amqp/AMQPExchangeHandler.h"
#include "amqp/AMQPQueueHandler.h"

using namespace MeterMasterDAL;

#include <QUuid>

AMQPQueueHandler::~AMQPQueueHandler()
{
}

AMQPQueueHandler::AMQPQueueHandler(
        AMQPConnectionRef connection,
        const QString& name,
        int options)
:   QObject(connection.data()),
    AMQPConnectionObject(connection),
    state(Q_DISCONNECTED),
    intended_state(Q_DECLARED),
    name(name),
    options(options),
    qos_set(false),
    qos_prefetch_count(0),
    qos_prefetch_size(0),
    queue(NULL)
{
    this->process_state = false;
    if (this->connection->isInState(AMQPConnectionHandler::CONN_CONNECTED))
        this->declare();
}

/*! Bind to an exchange */
void AMQPQueueHandler::bind(AMQPExchangeRef exchange,
        const QString& key)
{
    const QString& name(exchange->getName());
    this->getLog().debugStream()
        << "Stashing reference to exchange "
        << name;
    this->subscriptions[name].exchange = exchange.toWeakRef();
    this->bind(name, key);
}

/*! Bind to an exchange, passively declaring it */
void AMQPQueueHandler::bind(const QString& exchange,
        const QString& key)
{
    this->getLog().debugStream()
        << "Adding "
        << exchange
        << " key "
        << key
        << " to \"to subscribe\" list.";
    this->markToSubscribe(exchange, key);
    this->rebind();
}

/*! Unbind from an exchange */
void AMQPQueueHandler::unbind(AMQPExchangeRef exchange,
        const QString& key)
{
    this->unbind(exchange->getName(), key);
}

/*! Unbind from an exchange */
void AMQPQueueHandler::unbind(const QString& exchange,
        const QString& key)
{
    if (!this->subscriptions.contains(exchange))
        return;

    this->subscriptions[exchange].markToUnsubscribe(key);
    this->rebind();
}

/*! Start consuming */
void AMQPQueueHandler::consume(int consume_options)
{
    this->intended_state = Q_CONSUMING;
    if (consume_options >= 0)
        this->c_options = consume_options;
    if (this->queue->isDeclared())
        this->queue->consume(this->c_options);
}

/*! Set the QOS for the queue */
void AMQPQueueHandler::setQOS(qint16 prefetch_count,
        qint32 prefetch_size)
{
    this->qos_set           = false;
    this->qos_prefetch_size     = prefetch_size;
    this->qos_prefetch_count    = prefetch_count;
    QMetaObject::invokeMethod(this, "processState");
}

void AMQPQueueHandler::setQOS()
{
    if (!this->qos_set) {
        this->getLog().debugStream()
            << "Setting QOS parameters: "
               "Count="
            << this->qos_prefetch_count
            << ", Size="
            << this->qos_prefetch_size;
        this->newState(Q_QOS);
        this->queue->qos(this->qos_prefetch_count,
                this->qos_prefetch_size);
    }
}

/*!
 * Connection has been lost.
 */
void AMQPQueueHandler::connectionLost(AMQPConnectionRef connection,
        bool reconnecting)
{
    /* Drop all pointers to the old queue */
    this->replaceInstance(NULL);
}

/*!
 * Connection has been established.
 */
void AMQPQueueHandler::connected(AMQPConnectionRef connection)
{
    this->declare();
}

void AMQPQueueHandler::replaceInstance(QAmqpQueue* queue)
{
    if (this->queue != NULL)
        disconnect(this->queue);
    this->queue = queue;
    if (queue != NULL) {
        connect(this->queue,    SIGNAL(declared()),
            this,       SLOT(declared()));
        connect(this->queue,    SIGNAL(bound()),
            this,       SLOT(bound()));
        connect(this->queue,    SIGNAL(unbound()),
            this,       SLOT(unbound()));
        connect(this->queue,    SIGNAL(consuming(const QString&)),
            this,       SLOT(consuming(const QString&)));
        connect(this->queue,    SIGNAL(cancelled(const QString&)),
            this,       SLOT(cancelled(const QString&)));
        connect(this->queue,    SIGNAL(messageReceived()),
            this,       SLOT(messageReceived()));
        connect(this->queue,    SIGNAL(qosDefined()),
            this,       SLOT(qosDefined()));
    }
}

void AMQPQueueHandler::newState(AMQPQueueState state)
{
    if (!this->isInState(state)) {
        this->getLog().debugStream()
            << "Entering state "
            << state
            << " (old state "
            << this->state
            << ")";
        this->state = state;
        emit changedState(this->ref());
    }
}

void AMQPQueueHandler::declare()
{
    QAmqpQueue* queue;
    this->newState(Q_DECLARING);

    if (this->name.isEmpty() || this->name.isNull()) {
        this->removeAllToUnsubscribe();
        queue = this->client->createQueue();
    } else {
        queue = this->client->createQueue(this->name);
    }
    this->replaceInstance(queue);
    this->declared_name.clear();
    this->markAllToSubscribe();
    this->queue->declare(this->options);
}

bool AMQPQueueHandler::rebind()
{
    log4cpp::Category& log(this->getLog());
    log.debugStream() << "Re-binding all subscriptions.";
    QHash<QString, AMQPQueueExchangeSubscription>::iterator it;
    for (it = this->subscriptions.begin();
            it != this->subscriptions.end();
            it++) {
        const QString& name = it.key();
        log.debugStream()
            << "Checking exists " << name;

        AMQPQueueExchangeSubscription& sub = it.value();
        AMQPExchangeRef exchange(sub.exchange);
        if (exchange.isNull())
            exchange = this->connection->getExchange(name);

        if (exchange.isNull())
            continue;

        log.debugStream()
            << "Checking " << name
            << " is declared ("
            << exchange->getState()
            << ")";
        /* Can't do this one yet */
        if (!exchange->isInState(AMQPExchangeHandler::EXC_DECLARED))
            continue;

        if (!this->rebind(exchange, sub))
            return false;
    }
    QMetaObject::invokeMethod(this, "processState");
    return true;
}

bool AMQPQueueHandler::rebind(
    const AMQPQueueExchangeSubscription&
            subscription)
{
    AMQPExchangeRef exchange(subscription.exchange);
    if (!exchange.isNull())
        return this->rebind(exchange, subscription);
    return true;
}

bool AMQPQueueHandler::rebind(
    AMQPExchangeRef exchange,
    const AMQPQueueExchangeSubscription&
            subscription)
{
    log4cpp::Category& log(this->getLog());
    QSet<QString>::const_iterator it;
    log.debugStream()
        << "Checking " << exchange->getName()
        << " is declared";

    if (!exchange->isInState(AMQPExchangeHandler::EXC_DECLARED))
        return true;
    log.debugStream()
        << "Checking 'to subscribe' list. ("
        << subscription.to_subscribe.size()
        << " actions)";

    if (!subscription.to_subscribe.empty()) {
        /* Process subscriptions */
        it = subscription.to_subscribe.constBegin();
        if (it != subscription.to_subscribe.constEnd()) {
            this->rebind_exchange = exchange;
            this->rebind_key = *it;
            this->newState(Q_BINDING);
            log.debugStream()
                << "Binding " << exchange->getName()
                << " key " << *it;

            this->queue->bind(exchange->getName(), *it);
            return false;
        }
    }

    log.debugStream()
        << "Checking 'to unsubscribe' list. ("
        << subscription.to_unsubscribe.size()
        << " actions)";
    if (!subscription.to_unsubscribe.empty()) {
        /* Process subscriptions */
        it = subscription.to_unsubscribe.constBegin();
        if (it != subscription.to_unsubscribe.constEnd()) {
            this->rebind_exchange = exchange;
            this->rebind_key = *it;
            this->newState(Q_UNBINDING);
            log.debugStream()
                << "Unbinding " << exchange->getName()
                << " key " << *it;
            this->queue->unbind(exchange->getName(), *it);
            return false;
        }
    }
    return true;
}

void AMQPQueueHandler::declared()
{
    this->newState(Q_DECLARED);
    this->getLog().debugStream()
        << "Queue declared, officially known as "
        << this->queue->name();
    this->declared_name = this->queue->name();
    this->rebind();
    QMetaObject::invokeMethod(this, "processState");
}

void AMQPQueueHandler::bound()
{
    AMQPExchangeRef exchange(this->rebind_exchange);
    QString key(this->rebind_key);
    this->rebind_exchange.clear();
    this->rebind_key.clear();
    if (exchange.isNull() || key.isNull() || key.isEmpty())
        return;

    const QString& name = exchange->getName();
    this->subscriptions[name].markSubscribed(key);

    /* See if there's anything more to do */
    this->rebind();
}

void AMQPQueueHandler::unbound()
{
    AMQPExchangeRef exchange(this->rebind_exchange);
    QString key(this->rebind_key);
    this->rebind_exchange.clear();
    this->rebind_key.clear();
    if (exchange.isNull() || key.isNull() || key.isEmpty())
        return;

    const QString& name = exchange->getName();
    this->subscriptions[name].markUnsubscribed(key);

    /* See if there's anything more to do */
    this->rebind();
}

void AMQPQueueHandler::messageReceived()
{
    log4cpp::Category& log(this->getLog());
    while (!this->queue->isEmpty()) {
        QAmqpMessage raw_msg(this->queue->dequeue());
        log.debugStream()
            << "New message received.";
        try {
            BaseMessage* msg = BaseMessage::fromMessage(raw_msg);
            if (msg != NULL) {
                emit messageReceived(
                        this->ref(),
                        MessageRef(msg));
            } else {
                log.debugStream()
                    << "Bad message received.  Dropped.";
            }
        } catch (std::exception& ex) {
            log.errorStream()
                    << "Bad message received: "
                    << ex.what();
        }
    }
    log.debugStream()
        << "Queue is now empty.";
}

void AMQPQueueHandler::consuming(const QString &consumer_tag)
{
    if (this->consumer_tag.isNull() || this->consumer_tag.isEmpty()) {
        this->consumer_tag = consumer_tag;
        QMetaObject::invokeMethod(this, "processState");
    }
}

void AMQPQueueHandler::cancelled(const QString &consumer_tag)
{
    if (this->consumer_tag == consumer_tag) {
        this->consumer_tag.clear();
        QMetaObject::invokeMethod(this, "processState");
    }
}

void AMQPQueueHandler::qosDefined()
{
    this->getLog().debugStream()
        << "QOS settings defined.";
    this->qos_set = true;
    QMetaObject::invokeMethod(this, "processState");
}

void AMQPQueueHandler::declared(AMQPExchangeRef exchange)
{
    QString name(exchange->getName());
    this->getLog().debugStream()
        << "Received \"declared\" signal from exchange "
        << name;
    if (this->subscriptions.contains(name)) {
        this->rebind(exchange, this->subscriptions[name]);
        QMetaObject::invokeMethod(this, "processState");
    }
}

void AMQPQueueHandler::processState()
{
    LogContext lc(this->getLog(), "processState");
    if (this->process_state) return;
    this->process_state = true;

    /* Is the queue declared? */
    if (this->queue == NULL) {
        lc.log.debugStream()    << "Not yet declared, declaring.";
        this->declare();
        this->process_state = false;
        return;
    }
    if (!this->queue->isDeclared()) {
        lc.log.debugStream()    << "Not yet declared, waiting.";
        this->process_state = false;
        return;
    }

    /* Is the QOS set? */
    if (!this->qos_set) {
        this->setQOS();
        this->process_state = false;
        return;
    }

    /* Bindings not up to date */
    if (!this->isUpToDate()) {
        lc.log.debugStream()    << "Subscriptions out of date.";
        this->rebind();
        this->process_state = false;
        return;
    }

    /* Are supposed to be consuming? */
    lc.log.debugStream()    << "Intended state:"
                << this->intended_state;
    if (this->intended_state == Q_CONSUMING) {
        /* Are we consuming? */
        if (this->consumer_tag.isNull()) {
            this->consume();
            this->process_state = false;
            return;
        } else {
            this->newState(Q_CONSUMING);
        }
    } else {
        this->newState(Q_DECLARED);
    }
    this->process_state = false;
}

Essentially the Queue has many states, and in essence the moment the object is instantiated,the idea is you call whatever operations you need on it and the handler takes care of trying to make that happen. If an exchange isn't yet declared, it hooks the declared signal from that exchange and waits for notification before binding for example. The idea is to make the objects set-and-forget.

The same idea could work within QAMQP: the idea being that the Queue object stores a list of the exchanges/topics (in a structure perhaps similar to the above). Whether we use smart pointers like above or just rely on code in the constructor/destructor to maintain pointer links is a different story.

mbroadst commented 9 years ago

@sjlongland whoa, huge comment there :) I think adding helper classes to QAMQP would be a nice thing, but I still want to make sure that the fundamentals are strong. I'm going to merge in my fix for your case, and research incorporating state machines into the channels so you don't run into these issues in the future. Until that happens I fear that we're going to run into a number of different hard to track down errors unfortunately.

sjlongland commented 9 years ago

Yeah, I realise it's a long comment, much of it copy-paste, but it serves to describe what I've been thinking about. I've started looking into state-machine integration in a separate branch:

https://github.com/sjlongland/qamqp/tree/state-machine is where I've been doing this a little more cleanly. Some of the boolean flags I'm converting to enumerations so I can represent the transition states.

mbroadst commented 9 years ago

@sjlongland it might be worth looking into the QStateMachine classes (though they are a little heavy-weight and geared towards GUI state machines). I'm totally into an initial refactoring switching all bool guards to enums, that sounds like a great first incremental step.

The next step, which is considerably more difficult, is somehow encapsulating the possible behaviors in each state. I noticed in your branch that you've ended up adding more bools while cutting out a few (transitioning them to the enums), which I think indicates the complexity that can come with the transitions. Frankly I think a better approach is going to be to encapsulate each state in its own class rather than just an enum, giving each of them access to a baton (or just the dptr itself), and defining entry/exit behaviors. QSM allows for this (you don't HAVE to use it for ui state machines), and it also allows for transitions based on signals (e.g. the tcp socket is closed => ClosingState), which is why I found it particularly interesting.

sjlongland commented 9 years ago

On 08/04/15 07:44, Matt Broadstone wrote:

@sjlongland https://github.com/sjlongland it might be worth looking into the QStateMachine classes (though they are a little heavy-weight and geared towards GUI state machines).

Indeed, I did see those, and while I haven't dived into them yet, they sound interesting.

I'm totally into an initial refactoring switching all bool guards to enums, that sounds like a great first incremental step.

The next step, which is considerably more difficult, is somehow encapsulating the possible behaviors in each state. I noticed in your branch that you've ended up adding more bools while cutting out a few (transitioning them to the enums), which I think indicates the complexity that can come with the transitions. Frankly I think a better approach is going to be to encapsulate each state in its own class rather than just an enum, giving each of them access to a baton (or just the dptr itself), and defining entry/exit behaviors. QSM allows for this (you don't HAVE to use it for ui state machines), and it also allows for transitions based on signals (e.g. the tcp socket is closed => ClosingState), which is why I found it particularly interesting.

One thing I'm conscious of is just how complicated some of these state machines get. QAmqpQueue I found was cleanest when considering it as 3 related state machines; one looks after declaration of the queue, one to look after binding and a third to look after the consumer.

I tried doing it as one big state machine on the whiteboard here, and it turned into an Unholy Mess™. Even graphviz struggled with it.

The use of signals triggering transitions sounds like a plan though, as I'm doing quite a lot of that. That'll be the next step I guess: get tests passing again, then have a look at QStateMachine to see if we can

make use of that framework.

Stuart Longland (aka Redhatter, VK4MSL)

I haven't lost my mind... ...it's backed up on a tape somewhere.