hosseinmoein / DataFrame

C++ DataFrame for statistical, Financial, and ML analysis -- in modern C++ using native types and contiguous memory storage
https://hosseinmoein.github.io/DataFrame/
BSD 3-Clause "New" or "Revised" License
2.54k stars 313 forks source link

write a dataframe failed on Multithreading #126

Closed westfly closed 3 years ago

westfly commented 3 years ago
  1. should write implement on DataFrame Calling SpinLock Guard ?
  2. HeteroVector hold a global map without an lock, can this map defined as thread_local(since c++11) while SpinLock also thread_local too ?
    struct  HeteroVector  {
    ...
    template<typename T>
    inline static std::unordered_map<const HeteroVector *, std::vector<T>>
       vectors_ {  };
    ...
hosseinmoein commented 3 years ago

I don't think thread_local is the solution, since the scoping is not going to work and also it would waste/duplicate a big data structure.

Did you declare a global spin lock and did you give it to the DataFrame? Do you have the stack trace from your crash? Do you have any printouts from the crash? write() uses the spin lock, if given, when calling get_column()

westfly commented 3 years ago

I don't think thread_local is the solution, since the scoping is not going to work and also it would waste/duplicate a big data structure.

Did you declare a global spin lock and did you give it to the DataFrame? Do you have the stack trace from your crash? Do you have any printouts from the crash? write() uses the spin lock, if given, when calling get_column()

I carefully review the code, but don't find the spinlock in function write , the write (format json as an example) will call HeteroVector::change which finally call HeteroVector::change_implhelp

the vectors_ which is a static map in class HeteroVector call find without spinlock

 const auto  citer = vectors_<U>.find (this);

code with json format snappet

template<typename I, typename H>
template<typename S, typename ... Ts>
bool DataFrame<I, H>::write (S &o, io_format iof, bool columns_only) const  {

    if (iof != io_format::csv &&
        iof != io_format::json &&
        iof != io_format::csv2)
        throw NotImplemented("write(): This io_format is not implemented");

    if (iof == io_format::json)
        o << "{\n";

    bool            need_pre_comma = false;
    const size_type index_s = indices_.size();

    if (iof == io_format::json)  {
        if (! columns_only)  {
            _write_json_df_header_<S, IndexType>(o, DF_INDEX_COL_NAME, index_s);

            o << "\"D\":[";
            if (index_s != 0)  {
                _write_json_df_index_(o, indices_[0]);
                for (size_type i = 1; i < index_s; ++i)  {
                    o << ',';
                    _write_json_df_index_(o, indices_[i]);
                }
            }
            o << "]}";
            need_pre_comma = true;
        }

        for (const auto &iter : column_list_)  {
            print_json_functor_<Ts ...> functor (iter.first.c_str(),
                                                 need_pre_comma,
                                                 o);

            data_[iter.second].change(functor);
            need_pre_comma = true;
        }
    }
   .....
    if (iof == io_format::json)
        o << "\n}";
    o << std::endl;
    return (true);
}

code change_implhelp as below


template<typename T, typename U>
void HeteroVector::change_impl_help_ (T &functor) const  {
    const auto  citer = vectors_<U>.find (this);
    if (citer != vectors_<U>.end())
        functor(citer->second);
}
hosseinmoein commented 3 years ago

The code snippet that you posted accesses the static part as read-only, unless, you have a single instance of DataFrame being used in multiple threads. In that case you have to protect that single instance with your own mutex. it is worth reading the multithreading section of the docs here https://htmlpreview.github.io/?https://github.com/hosseinmoein/DataFrame/blob/master/docs/HTML/DataFrame.html

In any case, when I get a chance I will take a deeper look

hosseinmoein commented 3 years ago

Is there any way I can reproduce the problem?

westfly commented 3 years ago

Is there any way I can reproduce the problem?

Thx for you reply. Those days I'm busy on reproducing by striping off non-relevant code.

I modify the code dataframe_tester_3.cc which including groupby that modify global vectors_ and write that access vectors_ as below, but don't reproduce. As your suggest, I added set_lock on startup,but when commenting set_lock, test_case still works. I'm a little confused.

#include <thread>
int main(int argc, char *argv[]) {
    hmdf::SpinLock locker;
    //MyDataFrame::set_lock(&locker);
    const int   kThreadCount = 10;
    std::thread threads[kThreadCount];
    for (size_t j = 0; j < kThreadCount; ++j) {
        threads[j] = std::thread(test_groupby_edge);
        threads[j].join();
    }
    MyDataFrame::remove_lock();
    test_groupby_edge();
    test_concat_view();

    return (0);
}
westfly commented 3 years ago

Our scenario is relatively simple.

  1. Convert arrow like format to DataFrame
  2. One request get multiple dataframes that defined as using MyDataFrame = StdDataFrame
  3. Access dataframes that converted from arrows to generate more dataframes
  4. All dataframes may holding same types include std::string std::string_view long DataTime etc
  5. Class Record called at multi-threading env.
using MyDataFrame = StdDataFrame<unsigned long>;
struct Record {
   MyDataFrame click;
   MyDataFrame  impl;
   int Init(Arrow& table) {
     click.load_column<DataTime>("time", std::move(time_vect));
     return 0;
   }
   MyDataFrame  Compute() {
     auto df1 = click.groupby1<>();
     auto df2 = impl.groupby1<>();
     return df1.concat(df2);
   }
};

so should we define only one global locker to make it thread-safe ?

void InitLock() {
  static hmdf::SpinLock hmdf_lock;
  static std::once_flag flag;
  std::call_once(flag, [&hmdf_lock]() { MyDataFrame::set_lock(&hmdf_lock); });
}
hosseinmoein commented 3 years ago

That is right. You need only one global spin lock for all instances of DataFrame. The lock in DataFrame is an static member.

westfly commented 3 years ago

Thx for your reply. I think this code below may reproduce the coredump with probability about 20% when I add auto m = df; to simulate return a Dataframe instance.

using MyDataFrame = StdDataFrame<unsigned long>;

// -----------------------------------------------------------------------------

static void test_groupby_edge(int j)  {

    std::cout << "\nTesting groupby( "<< j << ") ..." << std::endl;

    MyDataFrame                df;
    std::vector<unsigned long>  idxvec =
        { 1UL, 2UL, 3UL, 10UL, 5UL, 7UL, 8UL, 12UL, 9UL, 12UL, 10UL, 13UL,
          10UL, 15UL, 14UL };
    std::vector<double>         dblvec =
        { 0.0, 15.0, -14.0, 2.0, 1.0, 12.0, 11.0, 8.0, 7.0, 6.0, 5.0, 4.0,
          3.0, 9.0, 10.0};
    std::vector<double>         dblvec2 =
        { 100.0, 101.0, 102.0, 103.0, 104.0, 105.0, 106.55, 107.34, 1.8,
          111.0, 112.0, 113.0, 114.0, 115.0, 116.0};
    std::vector<int>            intvec = { 1, 2, 3, 4, 5, 8, 6, 7, 11, 14, 9 };
    std::vector<std::string>    strvec =
        { "zz", "bb", "cc", "ww", "ee", "ff", "gg", "hh", "ii", "jj", "kk",
          "ll", "mm", "nn", "oo" };
#if 0
    dblvec2.resize(1);
    dblvec.resize(1);
    strvec.resize(1);
    intvec.resize(1);
    idxvec.resize(1); // make this greater then one to fix coredump with hack
#endif
    df.load_data(std::move(idxvec),
                 std::make_pair("dbl_col", dblvec),
                 std::make_pair("dbl_col_2", dblvec2),
                 std::make_pair("str_col", strvec));
    df.load_column("int_col", std::move(intvec),
                   nan_policy::dont_pad_with_nans);
    df.write<std::ostream, std::string, double, int, bool>
        (std::cout, io_format::json);
    FactorizeVisitor<double>    fact([] (const double &f) -> bool {
                                         return (f > 11106.0 && f < 114.0);
                                     });
    df.load_column("bool_col",
                   df.single_act_visit<double>("dbl_col_2", fact).get_result());

    auto& xvec = df.get_column<std::string>("str_col");
    auto& yvec = df.get_column<double>("dbl_col_2");
    auto m = df;
    m.write<std::ostream, std::string, double, int, bool>
        (std::cout, io_format::json);

    auto bool_df = m.groupby1<bool>(
        "bool_col",
        LastVisitor<MyDataFrame::IndexType, MyDataFrame::IndexType>(),
        std::make_tuple("dbl_col_2", "sum_dbl2", SumVisitor<double>()),
        std::make_tuple("dbl_col_2", "cnt_dbl2", CountVisitor<double>()));
    bool_df.write<std::ostream, double, std::size_t, bool>
        (std::cout, io_format::json);
}

#include <thread>
int main(int argc, char *argv[]) {
    hmdf::SpinLock locker;
    MyDataFrame::set_lock(&locker);
    const int   kThreadCount = 10;
    std::thread threads[kThreadCount];
    for (size_t j = 0; j < kThreadCount; ++j) {
        threads[j] = std::thread(test_groupby_edge, j);
    }
    for (size_t j = 0; j < kThreadCount; ++j) {
        threads[j].join();
    }
    MyDataFrame::remove_lock();
    test_groupby_edge(kThreadCount);
    test_concat_view();

    return (0);
}

coredump stack may different each time

Core was generated by `bin/dataframe_tester_3'.
Program terminated with signal 11, Segmentation fault.
#0  0x000000000040ddfb in std::vector<double, std::allocator<double> >& hmdf::HeteroVector::get_vector<double>() ()
(gdb) bt
#0  0x000000000040ddfb in std::vector<double, std::allocator<double> >& hmdf::HeteroVector::get_vector<double>() ()
#1  0x000000000040e025 in hmdf::type_declare<hmdf::HeteroVector, double>::type& hmdf::DataFrame<unsigned long, hmdf::HeteroVector>::get_column<double>(char const*) ()
#2  0x000000000040e0c8 in hmdf::FactorizeVisitor<double, unsigned long>& hmdf::DataFrame<unsigned long, hmdf::HeteroVector>::single_act_visit<double, hmdf::FactorizeVisitor<double, unsigned long> >(char const*, hmdf::FactorizeVisitor<double, unsigned long>&, bool) ()
#3  0x0000000000403f8f in test_groupby_edge(int) ()
#4  0x00007f4923b0fc9f in ?? ()
#5  0x0000000000000000 in ?? ()
hosseinmoein commented 3 years ago

I'll take a look, when I get a chance. thx

westfly commented 3 years ago

I notice that DataFrame copy constructor generate by compiler which marked as default while ~DataFrame has a lock implement.

   ...
    DataFrame &operator= (const DataFrame &) = default;
    DataFrame &operator= (DataFrame &&) = default;

    // Because of thread safety, this needs tender loving care
    //
    ~DataFrame();
     ...
hosseinmoein commented 3 years ago

will take a look

hosseinmoein commented 3 years ago

@westfly , I made the copy constructor and assignment operators thread safe. I still have to look into write(), when I get a chance

hosseinmoein commented 3 years ago

@westfly , I tightened the thread locking all around. Please take a look

westfly commented 3 years ago

@westfly , I tightened the thread locking all around. Please take a look

Thx for your reply, I will test on my project asap. could you make a release tags if it works?