DaanDeMeyer / reproc

A cross-platform (C99/C++11) process library
MIT License
552 stars 65 forks source link

Read output in an infinite loop #23

Closed Milerius closed 4 years ago

Milerius commented 4 years ago

Hello @DaanDeMeyer I'm writting a new program using your library:

#pragma once

#include <filesystem>
#include <string>
#include <future>
#include <entt/entity/registry.hpp>
#include <reproc++/reproc.hpp>
#include <antara/gaming/core/real.path.hpp>
#include <antara/gaming/ecs/system.hpp>

namespace fs = std::filesystem;

namespace antara::gaming::blockchain {

    /*struct nspv_output
    {
        nspv_output(reproc::process& background) noexcept;
        std::string output;
        std::mutex output_mutex;
        std::future<std::error_code> async_drain;
    };*/

    class nspv final : public ecs::logic_update_system<nspv> {
    public:
        nspv(entt::registry &registry, fs::path tools_path = core::assets_real_path() / "tools") noexcept;
        void update() noexcept final;
        bool spawn_nspv_instance(const std::string& coin) noexcept;
        ~nspv() noexcept final;
    private:
        std::filesystem::path tools_path_;
        using nspv_registry = std::unordered_map<std::string, reproc::process>;
        nspv_registry registry_;
    };
}

REFL_AUTO(type(antara::gaming::blockchain::nspv))

#include <loguru.hpp>
#include <future>
#include <mutex>
#include <antara/gaming/blockchain/nspv.system.hpp>
#include <reproc++/sink.hpp>

namespace antara::gaming::blockchain {
    nspv::nspv(entt::registry &registry, fs::path tools_path) noexcept :
            system(registry), tools_path_(std::move(tools_path)) {
        LOG_SCOPE_FUNCTION(INFO);
        DVLOG_F(loguru::Verbosity_INFO, "assets tool path: {}", tools_path_.string());
    }

    void nspv::update() noexcept {
        for (auto &&[coin, background] : registry_) {
            std::stringstream ss;
            background.drain(reproc::stream::out, reproc::sink::ostream(ss));
            DVLOG_F(loguru::Verbosity_INFO, "nspv output: {}", ss.str());
        }
    }

    nspv::~nspv() noexcept {
        LOG_SCOPE_FUNCTION(INFO);
        for (auto &&[coin, background] : registry_) {
            auto ec = background.stop(reproc::cleanup::terminate, reproc::milliseconds(2000), reproc::cleanup::kill,
                                      reproc::infinite);
            if (ec) {
                VLOG_SCOPE_F(loguru::Verbosity_ERROR, "error: %s", ec.message().c_str());
            }
        }
    }

    bool nspv::spawn_nspv_instance(const std::string &coin) noexcept {
        LOG_SCOPE_FUNCTION(INFO);
        registry_[coin] = reproc::process(reproc::cleanup::terminate, reproc::milliseconds(2000),
                                          reproc::cleanup::kill, reproc::infinite);
        std::array<std::string, 1> args = {tools_path_ / "nspv"};
        auto ec = registry_[coin].start(args, tools_path_.string().c_str());
        if (ec) {
            DVLOG_F(loguru::Verbosity_ERROR, "error: {}", ec.message());
            return false;
        }
        return true;
    }
}

I try to read in an infinite loop in the function update the output of each process that I'm running.

But unfortunately it's seem's blocking at the first try.

What I'm looking for retrieving output at each frame ?

I read at the example but it's only use stream when the execution is finished for example, no ?

Milerius commented 4 years ago

I remember that flushing the output is not supported out of the scope if I remember, but it's possible to implement it if I remember

Milerius commented 4 years ago

What would be your advice and your approach to retrieve the output whenever I want to print it in the logger ?

Milerius commented 4 years ago

Ideally I would like to do :

 std::stringstream ss;
            background.drain(reproc::stream::out, reproc::sink::ostream(ss));
            DVLOG_F(loguru::Verbosity_INFO, "nspv output: {}", ss.str());

Without thread, just in my update function

I tried also with the sink string from upstream but unfortunately it's not working (still stuck in an infinite loop)

Milerius commented 4 years ago

I find a solution that work:

#include <loguru.hpp>
#include <future>
#include <mutex>
#include <antara/gaming/blockchain/nspv.system.hpp>
#include <reproc++/sink.hpp>

namespace antara::gaming::blockchain {
    nspv::nspv(entt::registry &registry, fs::path tools_path) noexcept :
            system(registry), tools_path_(std::move(tools_path)) {
        LOG_SCOPE_FUNCTION(INFO);
        DVLOG_F(loguru::Verbosity_INFO, "assets tool path: {}", tools_path_.string());
    }

    void nspv::update() noexcept {
        for (auto &&[coin, process] : registry_) {
            std::lock_guard<std::mutex> lock(process.process_mutex);
            if (not process.out.empty()) {
                DVLOG_F(loguru::Verbosity_INFO, "nspv output: {}", process.out);
                process.out.clear();
            }
            if (not process.err.empty()) {
                DVLOG_F(loguru::Verbosity_ERROR, "nspv err output: {}", process.err);
                process.err.clear();
            }
        }
    }

    nspv::~nspv() noexcept {
        LOG_SCOPE_FUNCTION(INFO);
        for (auto &&[coin, process] : registry_) {
            auto ec = process.background.stop(reproc::cleanup::terminate, reproc::milliseconds(2000),
                                              reproc::cleanup::kill,
                                              reproc::infinite);
            if (ec) {
                VLOG_SCOPE_F(loguru::Verbosity_ERROR, "error: %s", ec.message().c_str());
            }
        }
    }

    bool nspv::spawn_nspv_instance(const std::string &coin) noexcept {
        LOG_SCOPE_FUNCTION(INFO);
        auto bg = reproc::process(reproc::cleanup::terminate, reproc::milliseconds(2000),
                                  reproc::cleanup::kill, reproc::infinite);

        auto res = registry_.try_emplace(coin, reproc::process(reproc::cleanup::terminate, reproc::milliseconds(2000),
                                                               reproc::cleanup::kill, reproc::infinite)).second;
        if (not res) {
            return false;
        }
        std::array<std::string, 1> args = {tools_path_ / "nspv"};
        auto ec = registry_.at(coin).background.start(args, tools_path_.string().c_str());
        if (ec) {
            DVLOG_F(loguru::Verbosity_ERROR, "error: {}", ec.message());
            return false;
        }
        using namespace std::chrono_literals;
        auto error = registry_.at(coin).background.wait(2s);
        if (error != reproc::error::wait_timeout) {
            DVLOG_F(loguru::Verbosity_ERROR, "error: {}", error.message());
            return false;
        }
        return true;
    }
}
#pragma once

#include <filesystem>
#include <string>
#include <future>
#include <entt/entity/registry.hpp>
#include <reproc++/reproc.hpp>
#include <antara/gaming/core/real.path.hpp>
#include <antara/gaming/ecs/system.hpp>

namespace fs = std::filesystem;

namespace antara::gaming::blockchain {
    class thread_safe_string_sink {
        std::string &out_;
        std::string &err_;
        std::mutex &mutex_;
    public:
        thread_safe_string_sink(std::string &out, std::string& err, std::mutex &mutex)
                : out_(out), err_(err), mutex_(mutex)
        {}

        bool
        operator()(reproc::stream stream, const uint8_t *buffer, unsigned int size)
        {
            std::lock_guard<std::mutex> lock(mutex_);
            switch (stream) {
                case reproc::stream::out:
                    out_.append(reinterpret_cast<const char *>(buffer), size);
                    break;
                case reproc::stream::err:
                    err_.append(reinterpret_cast<const char *>(buffer), size);
                    break;
                case reproc::stream::in:
                    break;
            }
            return true;
        }
    };

    struct nspv_process {
        nspv_process(reproc::process background_) noexcept:
                background(std::move(background_)) {
            sink_thread = std::thread([this]() { this->background.drain(thread_safe_string_sink(out, err, process_mutex)); });
        }

        ~nspv_process() {
            if (sink_thread.joinable()) {
                sink_thread.join();
            }
        }

        reproc::process background;
        std::string out;
        std::string err;
        std::mutex process_mutex;
        std::thread sink_thread;
    };

    class nspv final : public ecs::logic_update_system<nspv> {
    public:
        nspv(entt::registry &registry, fs::path tools_path = core::assets_real_path() / "tools") noexcept;

        void update() noexcept final;

        bool spawn_nspv_instance(const std::string &coin) noexcept;

        ~nspv() noexcept final;

    private:
        std::filesystem::path tools_path_;
        using nspv_registry = std::unordered_map<std::string, nspv_process>;
        nspv_registry registry_;
    };
}

REFL_AUTO(type(antara::gaming::blockchain::nspv))
Milerius commented 4 years ago

I hope this is correct !

DaanDeMeyer commented 4 years ago

Yes, that's currently the best way to implement this. However, with my latest changes I should be able to extend reproc_read to read from any of a list of processes instead of only a single one. That would allow this to work with a single thread which would be a huge improvement. I'll see what I can come up with.

Milerius commented 4 years ago

Ah yeah that's an excellent idea !

Milerius commented 4 years ago

Do you mind exposing the thread safe sink from reproc sink header ?

DaanDeMeyer commented 4 years ago

Done. It's available as reproc::sink::thread_safe::string.

Milerius commented 4 years ago

Thank's !