Closed apomykal closed 9 years ago
@apomykal QAbstractSocket and its subclasses do not play well with multiple threads, but AMQP promotes multiplexing single connections for multiple channels. QAMQP creates these channels (QAmqpExchange, QAmqpQueue) as children of the connection that you call the relevant "create" method for (createExchange, createQueue). With this in mind, you must make sure that you:
Without seeing a sample of your code I can't really speak further to the issue, but it's likely that you aren't following one of these rules.
@apomykal an easy to way to avoid this problem, without getting into QueuedConnections, is to ensure that each thread that you spawn has its own QAmqpClient. You only need to get into QueuedConnections if you want to limit the number of actual client connections you use to your amqp broker (generally not a huge concern).
Thanks, will double check my code and post an example if the problem persists.
Here is a quick and dirty sample:
#include "TestThread.h"
#include <QDebug>
TestThread::TestThread()
{
m_exchange = NULL;
mExchangeDeclared = false;
// Rabbit Connection
qDebug() << "TestThread::TestThread()";
connect(&m_client, SIGNAL(connected()), this, SLOT(clientConnected()));
m_client.connectToHost("localhost");
}
void TestThread::run()
{
m_continue = true;
int i = 0;
while (m_continue)
{
if (!m_exchange)
continue;
if (!mExchangeDeclared)
continue;
QString severity = "info";
QString message;
message = "Hello World: ";
message += QString::number(i);
i++;
m_exchange->publish(message, severity);
qDebug(" [x] Sent %s:%s", severity.toLatin1().constData(), message.toLatin1().constData());
sleep(5);
}
}
void TestThread::close()
{
m_client.disconnectFromHost();
m_continue = false;
}
void TestThread::clientConnected()
{
qDebug("clientConnected()");
QString queue="direct_logs";
if (m_exchange == NULL)
{
m_exchange = m_client.createExchange(queue);
qDebug() << "m_client.createExchange(" << queue << ")";
connect(m_exchange, SIGNAL(declared()), this, SLOT(exchangeDeclared()));
}
m_exchange->declare(QAmqpExchange::FanOut);
}
void TestThread::exchangeDeclared()
{
qDebug("exchangeDeclared()");
if (!m_exchange)
return;
mExchangeDeclared = true;
}
called from a QDialog:
void buttonClicked()
{
QThread *newThread = new TestThread();
if (newThread)
{
newThread->start();
if (newThread->isRunning() )
{
newThread->setPriority(QThread::NormalPriority);
}
}
}
@apomykal well first of all you're doing QThreads wrong. Give me a few minutes to put together a better example for you
On Thu, Mar 05, 2015 at 08:43:21AM -0800, Matt Broadstone wrote:
@apomykal well first of all you're doing QThreads wrong. Give me a few minutes to put together a better example for you
This has a good example,
http://doc.qt.io/qt-5/qthread.html
Or even better, use Qt Concurrent module for your workers,
http://doc.qt.io/qt-5/qtconcurrent-index.html
Along with thread pooling (even with default pool), it's much less expensive than starting a QThread. And you are less likely to start thrashing by starting too many threads.
Thanks for the pointers. I'll work on my thread example. However, I do have qamqp working in my application and it's performing brilliantly.
@apomykal good to hear, I'll close this for now then.
I set up my application to spawn a new QThread to process data (convert to a JSON format) and then send to another application using a FanOut exchange. The new thread creates the exchange and processes data in the run method.
The message is being sent out (received by the other program) but I receive the following error- QSocketNotifier: Socket notifiers cannot be enabled or disabled from another thread
I don't receive the error if the QDialog class creates the exchange directly and processes the data.