hosseinmoein / DataFrame

C++ DataFrame for statistical, Financial, and ML analysis -- in modern C++ using native types and contiguous memory storage
https://hosseinmoein.github.io/DataFrame/
BSD 3-Clause "New" or "Revised" License
2.46k stars 310 forks source link

Multiple threads process dataframe #181

Closed zhouqi1727 closed 2 years ago

zhouqi1727 commented 2 years ago

One thread produces data and one thread processes data. When only the production thread is running, the program can run for a long time, such as more than one day, but when the processing thread is running at the same time, an error will be reported in a few hours. The error is reported in append_ Column, the following is the code of the two functions: producer: while(true){ lock_guard<std::mutex> lk(mutex1); if(dataframe.get_index().size()>300){ try{ auto index = dataframe.get_index()[0]; auto endIndex = dataframe.get_index()[99]; dataframe.remove_data_by_idx<double>({index,endIndex}); }catch(const std::exception& e){ cout<<"dataframere move data error:"<<e.what()<<endl; } continue; } dataframe.append_index((time+1)*interval); dataframe.append_column("agg_price",agg_price); dataframe.append_column("agg_vol",agg_vol); dataframe.append_column("buy_price",buy_price); dataframe.append_column("sell_price",sell_price); sales: while(1){ if(dataframere .get_index().size()>100){ std::lock_guard<std::mutex> lck2(mutex2); new_dataframere = dataframere ; vec = get_feats(new_dataframere ,"b"); } usleep(10000); } The two functions are executed in different threads.

hosseinmoein commented 2 years ago

You should read the multithreading section of the documentation and see the code samples it points you to.

DataFrame is not multithreaded safe. You need to provide a SpinLock and your own mutex, if you are using the same instance of the DataFrame in multiple threads.

Also, I noticed in the sales thread

if(dataframere.get_index().size()>100)

is not inside the mutex protection

hosseinmoein commented 2 years ago

BTW, what is the error?

zhouqi1727 commented 2 years ago

the error is Segmentation fault in dataframe.append_column,i have provide a SpenLock in thread ,thanks.

zhouqi1727 commented 2 years ago

Thread 6 "main" received signal SIGSEGV, Segmentation fault. [Switching to Thread 0x7ffff585d700 (LWP 446988)] 0x00005555555a58d7 in gnu_cxx::__enable_if<std::is_scalar::value, double*>::type std::__fill_n_a<double, unsigned long, double>(double, unsigned long, double const&) () (gdb) bt

0 0x00005555555a58d7 in gnu_cxx::__enable_if<std::is_scalar::value, double*>::type std::__fill_n_a<double, unsigned long, double>(double, unsigned long, double const&) ()

1 0x0000555555596ab8 in double std::fill_n<double, unsigned long, double>(double*, unsigned long, double const&) ()

2 0x000055555558c22b in double std::uninitialized_fill_n::uninit_fill_n<double, unsigned long, double>(double*, unsigned long, double const&) ()

3 0x000055555557e9c2 in double std::uninitialized_fill_n<double, unsigned long, double>(double*, unsigned long, double const&) ()

4 0x00005555555724f1 in double std::__uninitialized_fill_n_a<double, unsigned long, double, double>(double*, unsigned long, double const&, std::allocator&) ()

5 0x00005555555b050d in std::vector<double, std::allocator >::_M_fill_insert(__gnu_cxx::__normal_iterator<double*, std::vector<double, std::allocator > >, unsigned long, double const&)

()

6 0x00005555555a55bf in std::vector<double, std::allocator >::resize(unsigned long, double const&) ()

7 0x0000555555596628 in void hmdf::DataFrame<unsigned long, hmdf::HeteroVector>::consistentfunctor::operator()<std::vector<double, std::allocator > >(std::vector<double, std::allocator >&) const ()

8 0x000055555558bd76 in void hmdf::HeteroVector::change_implhelp<hmdf::DataFrame<unsigned long, hmdf::HeteroVector>::consistentfunctor&, double>(hmdf::DataFrame<unsigned long, hmdf::HeteroVector>::consistentfunctor&) ()

9 0x000055555557ddf1 in void hmdf::HeteroVector::changeimpl<hmdf::DataFrame<unsigned long, hmdf::HeteroVector>::consistentfunctor&, hmdf::HeteroVector::type_list, double>(hmdf::DataFrame<unsigned long, hmdf::HeteroVector>::consistentfunctor&, hmdf::HeteroVector::type_list) ()

10 0x000055555557166d in void hmdf::HeteroVector::change<hmdf::DataFrame<unsigned long, hmdf::HeteroVector>::consistentfunctor&>(hmdf::DataFrame<unsigned long, hmdf::HeteroVector>::consistentfunctor&) ()

11 0x000055555556a067 in void hmdf::DataFrame<unsigned long, hmdf::HeteroVector>::make_consistent() ()

12 0x0000555555566fa9 in void hmdf::DataFrame<unsigned long, hmdf::HeteroVector>::remove_data_by_idx(hmdf::Index2D) ()

zhouqi1727 commented 2 years ago

i update my code like this: ` mutex mutex1,mutex2; SpinLock lock; producer: while(true){ lock_guard lk(mutex1); dataframe.set_lock(&lock); if(dataframe.get_index().size()>300){ auto index = dataframe.get_index()[0]; auto endIndex = dataframe.get_index()[99]; dataframe.remove_data_by_idx({index,endIndex}); dataframe.remove_lock(); continue; } dataframe.append_index((time+1)*interval); dataframe.append_column("agg_price",agg_price); dataframe.append_column("agg_vol",agg_vol); dataframe.append_column("buy_price",buy_price); dataframe.append_column("sell_price",sell_price); dataframe.remove_lock(); }

sales: while(1){ if(dataframere .get_index().size()>100){ std::lock_guard lck2(mutex2); dataframe.set_lock(&lock); new_dataframere = dataframere ; vec = get_feats(new_dataframere ,"b"); dataframe.remove_lock(); } usleep(10000); } `

but i have got this error:

Thread 7 "main" received signal SIGSEGV, Segmentation fault. [Switching to Thread 0x7ffff505c700 (LWP 454182)] 0x00005555555af84b in std::detail::_Hash_code_base<hmdf::HeteroVector const, std::pair<hmdf::HeteroVector const const, std::vector<double, std::allocator > >, std::__detail::_Select1st, std::hash<hmdf::HeteroVector const*>, std::detail::_Mod_range_hashing, std::detail::_Default_ranged_hash, false>::_M_bucket_index(std::detail::_Hash_node<std::pair<hmdf::HeteroVector const const, std::vector<double, std::allocator > >, false> const, unsigned long) const () (gdb) bt

0 0x00005555555af84b in std::detail::_Hash_code_base<hmdf::HeteroVector const, std::pair<hmdf::HeteroVector const const, std::vector<double, std::allocator > >, std::__detail::_Select1st, std::hash<hmdf::HeteroVector const*>, std::detail::_Mod_range_hashing, std::detail::_Default_ranged_hash, false>::_M_bucket_index(std::detail::_Hash_node<std::pair<hmdf::HeteroVector const const, std::vector<double, std::allocator > >, false> const, unsigned long) const ()

1 0x00005555555a44f1 in std::_Hashtable<hmdf::HeteroVector const, std::pair<hmdf::HeteroVector const const, std::vector<double, std::allocator > >, std::allocator<std::pair<hmdf::HeteroVector const const, std::vector<double, std::allocator > > >, std::__detail::_Select1st, std::equal_to<hmdf::HeteroVector const>, std::hash<hmdf::HeteroVector const>, std::detail::_Mod_range_hashing, std::__detail::_Default_ranged_hash, std::detail::_Prime_rehash_policy, std::__detail::_Hashtable_traits<false, false, true> >::_M_bucket_index(std::__detail::_Hash_node<std::pair<hmdf::HeteroVector const const, std::vector<double, std::allocator > >, false>*) const ()

2 0x0000555555595793 in std::_Hashtable<hmdf::HeteroVector const, std::pair<hmdf::HeteroVector const const, std::vector<double, std::allocator > >, std::allocator<std::pair<hmdf::HeteroVector const const, std::vector<double, std::allocator > > >, std::__detail::_Select1st, std::equal_to<hmdf::HeteroVector const>, std::hash<hmdf::HeteroVector const>, std::detail::_Mod_range_hashing, std::__detail::_Default_ranged_hash, std::detail::_Prime_rehash_policy, std::__detail::_Hashtable_traits<false, false, true> >::_M_find_before_node(unsigned long, hmdf::HeteroVector const const&, unsigned long) const ()

3 0x000055555558a7c0 in std::_Hashtable<hmdf::HeteroVector const, std::pair<hmdf::HeteroVector const const, std::vector<double, std::allocator > >, std::allocator<std::pair<hmdf::HeteroVector const const, std::vector<double, std::allocator > > >, std::__detail::_Select1st, std::equal_to<hmdf::HeteroVector const>, std::hash<hmdf::HeteroVector const>, std::detail::_Mod_range_hashing, std::__detail::_Default_ranged_hash, std::detail::_Prime_rehash_policy, std::__detail::_Hashtable_traits<false, false, true> >::_M_find_node(unsigned long, hmdf::HeteroVector const const&, unsigned long) const ()

4 0x000055555557c96d in std::_Hashtable<hmdf::HeteroVector const, std::pair<hmdf::HeteroVector const const, std::vector<double, std::allocator > >, std::allocator<std::pair<hmdf::HeteroVector const const, std::vector<double, std::allocator > > >, std::__detail::_Select1st, std::equal_to<hmdf::HeteroVector const>, std::hash<hmdf::HeteroVector const>, std::detail::_Mod_range_hashing, std::__detail::_Default_ranged_hash, std::detail::_Prime_rehash_policy, std::__detail::_Hashtable_traits<false, false, true> >::find(hmdf::HeteroVector const const&) ()

5 0x0000555555570a65 in std::unordered_map<hmdf::HeteroVector const, std::vector<double, std::allocator >, std::hash<hmdf::HeteroVector const>, std::equal_to<hmdf::HeteroVector const>, std::allocator<std::pair<hmdf::HeteroVector const const, std::vector<double, std::allocator > > > >::find(hmdf::HeteroVector const* const&) ()

6 0x0000555555569703 in std::vector<double, std::allocator >& hmdf::HeteroVector::get_vector() ()

7 0x00005555555666c8 in hmdf::type_declare<hmdf::HeteroVector, double>::type& hmdf::DataFrame<unsigned long, hmdf::HeteroVector>::get_column(char const*) ()

8 0x0000555555566ef1 in unsigned long hmdf::DataFrame<unsigned long, hmdf::HeteroVector>::append_column(char const*, double const&, hmdf::nan_policy) ()

hosseinmoein commented 2 years ago

I really don't know the logic of your program, so I am guessing here. If you only have one instance of DataFrame in your entire process and you are protecting it with your own mutex, then you do not need a SpinLock.

Again, I don't know your logic but from what I can tell, you want something like this:

std::mutex   mutex1;

// producer:
while (true)  {
    std::lock_guard<std::mutex> lk (mutex1);

    if (dataframe.get_index().size() > 300)  {
        auto index = dataframe.get_index()[0];
        auto endIndex = dataframe.get_index()[99];

        dataframe.remove_data_by_idx({ index, endIndex });
        continue;
    }
    dataframe.append_index ((time + 1) * interval);
    dataframe.append_column ("agg_price",agg_price);
    dataframe.append_column ("agg_vol",agg_vol);
    dataframe.append_column ("buy_price",buy_price);
    dataframe.append_column ("sell_price",sell_price);
}

// sales:
while (true)  {
    {
        std::lock_guard<std::mutex> lk (mutex1);

        if (dataframere.get_index().size() > 100)  {
            new_dataframere = dataframere ;
            vec = get_feats(new_dataframere ,"b");
        } 
    }

    usleep(10000);
}
zhouqi1727 commented 2 years ago

I've run the scheme you gave me before, but this scheme is not available to get_feats will not run, that is, it will not enter the sales process and will always be locked in the producer.

hosseinmoein commented 2 years ago

As I said I don’t know the logic of your process. So you have to adjust the code I gave you. Probably you should also sleep in the producer