phaistos-networks / TANK

A very high performance distributed log service
Apache License 2.0
938 stars 70 forks source link

Simultaneous Consumes Confuse Response Messages #64

Closed gregschrock closed 6 years ago

gregschrock commented 6 years ago

Hello,

My company has been using TANK to great success in a recent project. Thank you!

I recently uncovered a bug where consuming from different topics at the same time results in the same message being returned for both request IDs. I've created a (fairly) minimal test to demonstrate the issue.

I'm using the google testing framework, but the test can be run otherwise with very minimal changes. It does assume that TANK is running and that "topic1" and "topic2" exist with a single partition for each. I can update this snippet to create the topics through the client if that would be helpful.

#include <gtest/gtest.h>

#define LEAN_SWITCH
#include <tank_client.h>
#undef LEAN_SWITCH

#include <chrono>
#include <iostream>
#include <string>

TEST(TankClient, readsFromMultipleTopicsSimultaneously)
{
    auto stringFrom8 = [](strwlen8_t string) {
        return std::string(string.data(), string.size());
    };

    auto stringFrom32 = [](strwlen32_t string) {
        return std::string(string.data(), string.size());
    };

    ::TankClient client;
    client.set_default_leader("127.0.0.1:11011");

    auto msSinceEpoch = std::chrono::duration_cast<std::chrono::milliseconds>(
        std::chrono::system_clock::now().time_since_epoch());

    uint64_t timestamp = static_cast<uint64_t>(msSinceEpoch.count());

    const std::string topic1{"topic1"};
    const std::string topic2{"topic2"};

    const std::string topic1Message{"I am from topic 1"};
    const std::string topic2Message{"I am from topic 2"};

    const auto produceTopic1Id = client.produce({       
        {
            {topic1.c_str(), 0},
            {{topic1Message.c_str(), timestamp, ""}}       
        }       
    });

    const auto produceTopic2Id = client.produce({       
        {
            {topic2.c_str(), 0},
            {{topic2Message.c_str(), timestamp, ""}}       
        }       
    });

    const auto consumeTopic1Id = client.consume({
            {
                {topic1.c_str(), 0},
                {0, 4 * 1024}
            }
        },
        2000,
        0
    );

    const auto consumeTopic2Id = client.consume({
            {
                {topic2.c_str(), 0},
                {0, 4 * 1024}
            }
        },
        2000,
        0
    );

    std::cout << topic1 << " produce: " << produceTopic1Id << std::endl;
    std::cout << topic2 << " produce: " << produceTopic2Id << std::endl;
    std::cout << topic1 << " consume: " << consumeTopic1Id << std::endl;
    std::cout << topic2 << " consume: " << consumeTopic2Id << std::endl;

    size_t expectedResponses = 4;
    size_t receivedResponses = 0;

    while (receivedResponses < expectedResponses)
    {
        client.poll(1000);

        for (const auto& produceAck : client.produce_acks())
        {
            std::cout << "Produced " << produceAck.clientReqId << " to "
                      << stringFrom8(produceAck.topic) << ":" << produceAck.partition
                      << std::endl;

            ++receivedResponses;
        }

        for (const auto& partitionContent : client.consumed())
        {
            std::cout << "\nRequest  : " << partitionContent.clientReqId << std::endl;
            std::cout << "Topic    : " << stringFrom8(partitionContent.topic) << std::endl;
            std::cout << "Partition: " << partitionContent.partition << std::endl;

            for (auto pMessage : partitionContent.msgs)
            {
                auto receivedMessage = stringFrom32(pMessage->content);

                if (partitionContent.clientReqId == consumeTopic1Id)
                {
                    EXPECT_STREQ(topic1Message.c_str(), receivedMessage.c_str());
                }
                else if (partitionContent.clientReqId == consumeTopic2Id)
                {
                    EXPECT_STREQ(topic2Message.c_str(), receivedMessage.c_str());
                }
                else
                {
                    std::cout << "    *** This consume response is unexpected ***" << std::endl;
                }

                std::cout << "  Timestamp: " << pMessage->ts << std::endl;
                std::cout << "  Content  : " << stringFrom32(pMessage->content)  << std::endl;
            }

            ++receivedResponses;
        }
    }
}

The test fails at the snippet:

                if (partitionContent.clientReqId == consumeTopic1Id)
                {
                    EXPECT_STREQ(topic1Message.c_str(), receivedMessage.c_str());
                }

with the response matching consumeTopic1Id having a message I am from topic 2.

Any insight you can give me into this behavior would be much appreciated.

markpapadakis commented 6 years ago

@gregschrock thank you for the great bug report.

I am going to look into it as soon as I can, although, I am currently working on some major improvements to TANK, so it may take a while until this new branch is pushed upstream. Specifically, a few new optimizations for improving consumer requests responsiveness for "tailing" semantics, and, also, TANK is going to be cluster-aware ( similar to Kafka in terms of semantics, but hooking to Consul instead of ZooKeeper ).

gregschrock commented 6 years ago

Hey @markpapadakis,

I spent some time investigating this issue today and found its source. This consumption optimization is problematic when process_consume is called multiple times within a single poll. The range created by {consumptionList.data(), cnt} will still point to the data of consumptionList when it (consumptionList) is reused in subsequent calls. That is, it's a pointer to the consumption list which is now holding messages for a different partition. Ultimately, all partition content created through the optimization will share some number of messages. The exact number and messages will vary based on how many messages each partition contains and in what order the responses are processed.

My test passed when I disabled that optimization, but I'm not sure what the long term fix should be. The optimization is still valid for the last processed partition, but it seems impossible to know that the last partition (of all consume responses rather than just the current one) is being processed.

Let me know what you think the solution should be and I can put up a PR if you like.

markpapadakis commented 6 years ago

Good morning @gregschrock :)

I should be fixed now, please update from the repo and try again. It was actually bit more subtle. consumptionList shouldn't have been clear()ed in that method context, and each (topic, partition) message set consumed was not appropriately tracked.

Thank you very much for your report. If everything's fine, please close this issue. The new TANK major release should a while because I am tied on other things, but it shouldn't take too long.

gregschrock commented 6 years ago

Thanks so much @markpapadakis!

The quick turnaround is appreciated! I applied your patch, and our related tests are passing.

However, I'm finding that the next.seqNum has been impacted. In my test above, updating the expectations with:

                if (partitionContent.clientReqId == consumeTopic1Id)
                {
                    EXPECT_STREQ(topic1Message.c_str(), receivedMessage.c_str());
                    EXPECT_EQ(2, partitionContent.next.seqNum);  // <- added expectation
                }

produces a failure where partitionContent.next.seqNum is 0 instead of 2.

markpapadakis commented 6 years ago

Thank you @gregschrock. I 'll look into it within an hour and will post an update

markpapadakis commented 6 years ago

I tried and couldn't reproduce that(thousands of runs). Any chance you didn't erase the topic/partition data before you ran the tests?

gregschrock commented 6 years ago

Okay, it must be something in my build. We don't have C++17 support, so I've needed to make minimal changes to for loops etc. in order to be C++14 compatible. But the patch applied and built cleanly, so I expected everything to be good. I must have introduced this issue somehow. Sorry to take your time.

gregschrock commented 6 years ago

Okay, I've got the conclusion. I had pulled in Fixes #64, but not Fixed tiny client issue/type. The latter addressed the next index computation issue from the first. All's good now.

markpapadakis commented 6 years ago

Thank you very much :)