andreiavrammsd / cpp-channel

Thread-safe container for sharing data between threads
https://blog.andreiavram.ro/cpp-channel-thread-safe-container-share-data-threads/
MIT License
400 stars 28 forks source link

dead lock, never end #25

Closed lasorda closed 1 year ago

lasorda commented 1 year ago

this is code, threadpool.h is from https://github.com/progschj/ThreadPool/blob/master/ThreadPool.h, ProcessV2 works fine, but Process never end

#include <cstdio>
#include <fstream>
#include <memory>
#include <string>
#include "msd/channel.hpp"
#include "threadpool.h"

using namespace std;
ThreadPool pool(10);

class BaseConverter {
 protected:
    msd::channel<std::string> lines;

 public:
    auto Process(const std::string &filename) -> shared_ptr<msd::channel<string>> {
        pool.enqueue([&filename, this]() {
            std::ifstream input(filename);
            if (!input.is_open()) {
                printf("open failed");
                return -1;
            }

            string line;
            while (getline(input, line)) {
                // fmt::println("read line:{}", line);
                line >> lines;
            }
            printf("read done");
            lines.close();
            return 0;
        });

        auto thread_num = 10;
        msd::channel<int> worker_done(thread_num);
        auto output = make_shared<msd::channel<string>>();
        for (int i = 0; i < thread_num; i++) {
            i >> worker_done;
            pool.enqueue([this, &worker_done, &output]() {
                for (const auto &e : lines) {
                    e >> *output;
                }
                int t;
                t << worker_done;
                if (worker_done.empty()) {
                    output->close();
                }
            });
        }
        return output;
    }
    auto ProcessV2(const std::string &filename) -> int {
        pool.enqueue([&filename, this]() {
            std::ifstream input(filename);
            if (!input.is_open()) {
                printf("open failed");
                return -1;
            }

            string line;
            while (getline(input, line)) {
                // fmt::println("read line:{}", line);
                line >> lines;
            }
            printf("read done");
            lines.close();
            return 0;
        });

        auto thread_num = 10;
        msd::channel<int> worker_done(thread_num);
        msd::channel<string> output;
        for (int i = 0; i < thread_num; i++) {
            i >> worker_done;
            pool.enqueue([this, &worker_done, &output]() {
                for (const auto &e : lines) {
                    e >> output;
                }
                int t;
                t << worker_done;
                if (worker_done.empty()) {
                    output.close();
                }
            });
        }
        for(const auto &e : output) {
            printf("%s", e.c_str());
        }
        return 0;
    }
};
auto main(int argc, char *argv[]) -> int {
    BaseConverter().ProcessV2("./threadpool.h");
    auto ret = BaseConverter().Process("./threadpool.h");
    for (const auto &e : *ret) {
        printf("%s", e.c_str());
    }
    return 0;
}
lasorda commented 1 year ago

it core dump sometimes

andreiavrammsd commented 1 year ago

I have not tested the library exhaustively and I'm sure there are cases where it fails. I don't know when I'll have progress on it.

And I'm not sure that I understand what you are trying to do. Please take a look at the close and multithreading examples. Maybe they'll help you. I don't have a quick fix for your situation.