alanxz / SimpleAmqpClient

Simple C++ Interface to rabbitmq-c
MIT License
404 stars 213 forks source link

Message properties not being sent to RabbitMQ hub #287

Open humayunkhan opened 3 years ago

humayunkhan commented 3 years ago

I have used vcpkg to build rabbitmq-c as described in the https://github.com/alanxz/rabbitmq-c build instructions:

git clone https://github.com/Microsoft/vcpkg.git
cd vcpkg
./bootstrap-vcpkg.sh
./vcpkg integrate install
./vcpkg install librabbitmq
./vcpkg install boost

Then I built https://github.com/alanxz/SimpleAmqpClient (latest release version 2.5.1)

The problem that I am facing is that the message properties like CorrelationId, Type, etc. are not being sent to RabbitMq hub. The same code was working previously before I upgraded to latest release.

Here is the code that I am using: I publish the message using the following code:

void RabbitMqNode::SendResponse(google::protobuf::Message *response, NodeEnvelope *envelope, std::map<string, string> headers)
{
    auto message = GetBasicResponseMessage(response, envelope, &headers);
    auto returnAddress = envelope->returnAddress;
    channel->BasicPublish(std::string(), returnAddress, message);
}

AmqpClient::BasicMessage::ptr_t RabbitMqNode::GetBasicResponseMessage(google::protobuf::Message *response, NodeEnvelope * envelope, std::map<string, string>* headers)
{
    auto message = GetBasicMessage(response, headers);
    message->CorrelationId(envelope->corelationId);
    return message;
}

AmqpClient::BasicMessage::ptr_t RabbitMqNode::GetBasicMessage(google::protobuf::Message *inputMessage, std::map<string, string> *headers)
{
    auto body = serializer.SerializeMessageToJson(inputMessage);
    return GetBasicMessage(inputMessage, body, headers);
}

AmqpClient::BasicMessage::ptr_t RabbitMqNode::GetBasicMessage(google::protobuf::Message *inputMessage, std::string messageBody, std::map<string, string> *headers)
{
    auto name = GetMessageName(inputMessage);
    auto message = AmqpClient::BasicMessage::Create(messageBody);
    message->DeliveryMode(AmqpClient::BasicMessage::delivery_mode_t::dm_nonpersistent);
    message->ContentType(MIME_JSON);
    message->Type(name);
    message->AppId(nodeId);
    auto headerTable = message->HeaderTable();
    InitializeHeaders(&headerTable, headers);
    message->HeaderTable(headerTable);
    message->Timestamp(utility.GetCurrentUnixTimestampMilliseconds());

    return message;
}

void RabbitMqNode::InitializeHeaders(AmqpClient::Table *headersTable, std::map<string, string> *headers)
{
    if (headers->empty())
    {
        return;
    }
    std::map<string, string>::iterator it = headers->begin();
    while (it != headers->end())
    {
        auto key = (std::string)it->first;
        auto value = it->second;

        headersTable->insert(std::pair<string, string>(key, value));

        // Increment the Iterator to point to next entry
        it++;
    }
}

Note: The body of the message and the headers are being sent without any issues.

Observation: Only the AppId property is sent correctly if I don't initialize the headers; but if I initialize headers, then except the headers no other property is sent to the hub. The body is always sent correctly; no problem with that.

Could some one please identify the mistake?

humayunkhan commented 3 years ago

@alanxz I think I know where the problem is. In the source code -> Channel.cpp the function "CreateAmqpProperties" seems to be handling the flags incorrectly. Example:

if (mes.TimestampIsSet()) {
    ret.timestamp = mes.Timestamp();
    ret._flags = AMQP_BASIC_TIMESTAMP_FLAG;
  }

will overwrite the previous flags which were set. So I replaced this with:

if (mes.TimestampIsSet()) {
    ret.timestamp = mes.Timestamp();
    ret._flags |= AMQP_BASIC_TIMESTAMP_FLAG;
  }

and replaced all other flags which were being overwritten. After this change I can see the properties and things seem to be working fine.

Could you please look into this and let me know if this is the correct fix or not.

Thank you.

alanxz commented 3 years ago

This appears to be fixed in https://github.com/alanxz/SimpleAmqpClient/commit/4818655450f385d7a9b68c3ebc60dc71fe42e2a3.

humayunkhan commented 3 years ago

That is correct. I only took the latest release. May be it is better to create a new release with this fix or at least update the release notes with this known issue.

It would save a lot of time for people who are unaware of this.