hyunsik-yoon / study

Personal repository for self-study
Apache License 2.0
0 stars 0 forks source link

Learn Multithreading with Modern C++ #4 #148

Open hyunsik-yoon opened 1 year ago

hyunsik-yoon commented 1 year ago

let's study. From https://www.udemy.com/course/learn-modern-cplusplus-concurrency/learn/lecture/22341398

hyunsik-yoon commented 1 year ago

C++ recap

std::chrono

#include <chrono>

using namespace std::chrono;

auto _10_sec = std::chrono::second(10);
auto _20_ms = std::chrono::milliseconds(20);
auto _30_us = std::chrono::microseconds(30);

auto _2s = 2s; // 2 sec auto _20ms = 20ms; auto _50us = 50us;


### random
- **random number engine**: gnerates a sequence of random numbers
- **distribution object:** 

```cpp
#include <random>

// integer
mt19937 mt; // random number engine instance
uniform_int_distribution<int> uid(0, 100);  // distribution object (functor) for close interval [0, 100]

for (auto i = 0; i < 10; ++i) cout << uid(mt) << endl; 

// real number
uniform_real_distribution<double> did(0, 1);  // distribution object (functor) for close interval [0, 100]
for (auto i = 0; i < 10; ++i) cout << did(mt) << endl; 
    mt19937 mt1;
    mt19937 mt2;
    uniform_int_distribution<int> uid(0, 100);

    for (auto i = 0; i < 3; ++i) cout << uid(mt1) << "==" << uid(mt2) << endl; 
    /* output is always
        92==92
        44==44
        95==95     */
hyunsik-yoon commented 1 year ago

std::thread

int main() { std::thread tom(hello); // thread를 만들고 terminate 안하면...? // but ~thread() 에서 terminate // 그런데 이 시점에서 thread가 아직 안 끝났을 수도 있다.

tom.join(); // thread 끝날때까지 기다려 }

/* join() 을 안하면 실행 후 아래의 에러가 난다. 그냥 shell이 thread를 죽여버리는 듯.

terminate called without an active exception Aborted */


- thread creation using **functor**
```cpp
class Hello {
public:
  void operator()(void) { std::cout << "Howdy"; }
};

int main() {
    Hello hello;
    std::thread tom(hello);
    tom.join();
}
hyunsik-yoon commented 1 year ago

passing param during creation of thread

int main() {

std::string name = "Anne";

std::thread anne1(hello_func1, name);
anne1.join();

std::thread anne2(hello_func2, std::move(name));
// name is now ""
anne2.join();

// compile error. std::thread cannot pass reference
// std::thread anne3(hello_func3, &name);

name = "Anne";
std::thread anne3(hello_func3, std::ref(name));  // use std::ref() reference_wrapper
anne3.join();
// name = "Gilbert" 로 바뀜

- lambda 는 capture 로 reference 받을 수 있음
```cpp
    std::string name = "Anne";

    std::thread anne([&name]    // capture by reference works
                     (string middle, string last){  // two param
                            name = name + " " + middle + " " + last;
                     },
                     "Sherly",      // param 1
                     "Kusbert");    // param 2
    anne.join();

    std::cout << name;  // "Anne Sherly Kusbert"
hyunsik-yoon commented 1 year ago

sleep_for(), get_id(), native_handle()

void hello() {
    // "this_thread" is a namespace
    std::cout << "thread id    : " << std::this_thread::get_id() << std::endl;

    std::this_thread::sleep_for(std::chrono::seconds(1));  // make this thread sleep
}

int main() {
    std::thread tom(hello);

    // native_handler() is instance method
    std::cout << "native handle: " << tom.native_handle() << std::endl;

    std::this_thread::sleep_for(std::chrono::seconds(2)); // make main thread sleep
    tom.join();

    std::cout << "after finishing ---" << std::endl;
    std::cout << "native handle: " << tom.native_handle() << std::endl;

    std::thread::id thread_id = tom.get_id();
    std::cout << "thread id    : " << thread_id << std::endl;
}

/* output

ative handle: 140603136325376
thread id    : 140603136325376
after finishing ---
native handle: 0
thread id    : thread::id of a non-executing thread
*/
hyunsik-yoon commented 1 year ago

passing thread as param

void hello() {
    std::cout << "hello) thread id:\t" << std::this_thread::get_id() << std::endl;
}

std::thread create() {
    std::thread th(hello);
    std::cout << "create) thread id:\t" << std::this_thread::get_id() << std::endl;

    return th; // by RVO
}

void wait_join(std::thread &&th) {
    std::cout << "wait_join) thread id:\t" << std::this_thread::get_id() << std::endl;
    th.join();
}

void try_call_by_value(std::thread th) { }

void try_call_by_ref(std::thread &th) { }

int main() {
    std::thread hello = create();
    std::cout << "main) thread id:\t" << hello.get_id() << std::endl;

    // try_call_by_value(hello); <- compile err: error: use of deleted function ‘std::thread::thread(const std::thread&)’
    try_call_by_ref(hello);
    wait_join(std::move(hello));
}

/* output

create) thread id:      139930813339456
main) thread id:        139930813335296
wait_join) thread id:   139930813339456
hello) thread id:       139930813335296
*/
hyunsik-yoon commented 1 year ago

exception inside thread

detatching

exception 2.

private: std::thread _th; };

int main() { std::thread hello([](){ cout << "hello"; std::this_thread::sleep_for(std::chrono::seconds(2)); } ); ThreadGuard th_guard(std::move(hello));

// 아래서 exception이 나든 안나든 RAII 에 의해 만든 ThreadGuard에 의해 thread.join()은 호출되게 됨.
try {
    throw runtime_error("err");
} catch(exception &e) { }

}



## stopping thread
- c++ 에는 thread.stop(), thread.pause()같은게 **없음**
  - resource 차원에서 위험할 수 있어서 그런듯
hyunsik-yoon commented 1 year ago

torn writes and reads

mamaging data races

hyunsik-yoon commented 1 year ago

mutex

std::mutex mtx;

void hello_1(std::vector &v) { mtx.lock(); v.emplace_back(10); std::this_thread::sleep_for(std::chrono::seconds(1)); mtx.unlock(); }

void hello_2(std::vector &v) { while(! mtx.try_lock()) // lock 획득 못하면 false 리턴. lock 거는데 성공하면 true std::this_thread::sleep_for(std::chrono::milliseconds(10)); v.emplace_back(20); mtx.unlock(); }

int main() { std::vector v; std::thread th1(hello_1, std::ref(v)); std::thread th2(hello_2, std::ref(v)); th1.join(); th2.join();

std::for_each(v.begin(), v.end(), [](int k){ std::cout << k << " "; }); // "10 20"

}



- lock()을 한 후, 처리하다 exception이 나면 catch 등에서 unlock() 처리를 확실히 해줘야 한다.
  - 이게 복잡해서 mutex 를 직접 쓰는 일은 적음.
  - RAII 패턴으로 이를 피해가기 위해 아래의 `std::lock_guard`  사용
hyunsik-yoon commented 1 year ago

std::lock_guard

void hello_1(std::vector<int> &v) {
    try
    {
        {
            std::lock_guard<std::mutex> lock_g(mtx);  // v 를 lock 하기 위
            v.emplace_back(10);
        }
        std::this_thread::sleep_for(std::chrono::seconds(1));
    } catch (std::exception&) { /* empty */}
}
hyunsik-yoon commented 1 year ago

std::unique_lock

void foo() { throw std::runtime_error("OMG"); }

void hello_1(std::vector<int> &v) {
    try
    {
        std::unique_lock<std::mutex> u_lock(mtx);
        // 여기서 exception 이 나도 unique_lock() 의 destructor 에서 unlock을 함.
        v.emplace_back(10);
        foo();
        // 수동으로 unlock() 해도 됨. destructor는 unlock을 이중으로 부르지 않음.
        u_lock.unlock();

        std::this_thread::sleep_for(std::chrono::seconds(1));
    } catch (std::exception&) { /* empty */}
}

- std::adopt_lock: constructor에서 mutex가 이미 lock되어 있다고 가정함
```cpp
    std::mutex mutex;
    mutex.lock();
    std::unique_lock<std::mutex> lock(mutex, std::adopt_lock); // 이미 lock 한 mutex 사
    std::cout << "Mutex locked!" << std::endl;
    lock.unlock();
hyunsik-yoon commented 1 year ago

std::recursive_mutex

hyunsik-yoon commented 1 year ago

try_lock_for, try_lock_until

system clock, steady clock

// system clock
auto start_system = std::chrono::system_clock::now();
std::this_thread::sleep_for(std::chrono::seconds(1));
auto end_system = std::chrono::system_clock::now();
std::cout << "System clock duration: "
            << std::chrono::duration_cast<std::chrono::milliseconds>(end_system - start_system).count()
            << " ms\n";

// steady clock
auto start_steady = std::chrono::steady_clock::now();
std::this_thread::sleep_for(std::chrono::seconds(1));
auto end_steady = std::chrono::steady_clock::now();
std::cout << "Steady clock duration: "
            << std::chrono::duration_cast<std::chrono::milliseconds>(end_steady - start_steady).count()
            << " ms\n";
hyunsik-yoon commented 1 year ago

static local variable

singleton

hyunsik-yoon commented 1 year ago

translation unit

hyunsik-yoon commented 1 year ago

thread_local

class Msg {
public:
    Msg() { _msg = "Hello, "; std::cout << "constructor" << std::endl; }
    void name(std::string s) { _msg = _msg + s; }
    ~Msg() { std::cout << "destructor" << std::endl; }

    std::string _msg;
};

// thread_local keyword
thread_local Msg msg;

void hello(std::string name) {
    msg.name(name);
    std::cout << msg._msg << std::endl;
}

int main() {
    std::thread th1(hello, "Anne");
    std::thread th2(hello, "Gilbert");

    th1.join(); th2.join();
    /* output:

    constructor
    Hello, Anne
    destructor
    constructor
    Hello, Gilbert
    destructor
    */
}
hyunsik-yoon commented 1 year ago

대입문이 어떻게 처리되나.

ptr = new Foo();

lazy initialization

void thread_func() { if (sock == nullptr) {
unique_lock lock(mtx); if (sock == nullptr)
sock = new SocketImpl(); // note: '=' 는 atomic 에 의해 overload되어 automic 하게 처리됨. 객체 대입 방법 2'의 문제가 생기지 않음. } SocketImpl *s = sock; // automic 으로는 member 함수 못부름. s->listen(); }

hyunsik-yoon commented 1 year ago

deadlock

nested lock

// OR

std::unique_lock lock1(mtx1, std::defer_lock); std::unique_lock lock2(mtx2, std::defer_lock); std::lock(mtx1, mtx2);

hyunsik-yoon commented 1 year ago

Locking guideline

shared data는 class로 만들기

hyunsik-yoon commented 1 year ago

condition variable

std::mutex mtx;
std::condition_variable cv;

int data = 0;

void reader(int id)
{
    for (int i = 0; i < 5; ++i)
    {
        std::unique_lock<std::mutex> lock(mtx);
        // REMEMBER
        // cv.wait() 은 mutex를 건 상태로 부른다.
        cv.wait(lock);

        // Read the data.
        std::cout << id << ": " << data << std::endl;

        // lock 이 destroy되면서 unlock 상태가 됨
    }
}

void writer()
{
    for (int i = 0; i < 10; ++i)
    {
        std::unique_lock<std::mutex> lock(mtx);
        {
            ++data;
        }
        // REMEMBER
        // cv.notify_all()은 mutex를 unlock()한 상태로 부른다. 안 그러면 프로그램이 먹통되어 버림.
        // data modification이 끝났으므로 당연히 unlock 해야지 된다고 기억할 것...

        lock.unlock();

        cv.notify_all();

        // REMEMBER
        // sleep을 안하면 먹통이 된다. notify되는 속도보다 이 루프 상단으로 가서 lock을 시도하는 속도가 더빠른듯..?
        // 그러면 이 writer도 lock에 걸려 잠자게 되니까....
        sleep(10);
    }
}
hyunsik-yoon commented 1 year ago

conditional variable with predicate

int data = 0; bool readable = false;

void reader(int id) { for (int i = 0; i < 5; ++i) { std::unique_lock lock(mtx);

    // lock 이 일단 걸린다.
    // predicate (두번째 param) 을 실행해보고
    // true 이면 lock 을 풀고 wait 가 끝난다.
    // false 이면 wait 상태로 notify 가 올떄까지 기다린다.
    cv.wait(lock, []() { return readable; });

    // Read the data.
    std::cout << id << ": " << data << std::endl;
    readable = false;

    // lock 이 destroy되면서 unlock 됨.
}

}

void writer() { for (int i = 0; i < 10; ++i) { { std::unique_lock lock(mtx);

        ++data;
        readable = true;
    }
    cv.notify_all();
    sleep(10); // wait for readers to read data
    // 퀴즈: sleep을 안하면 1, 10 정도만 두개 찍히고 hang 된다. 왜인지 생각해볼 것

}

} // 1부터 10까지 잘 출력

hyunsik-yoon commented 1 year ago

std::promise, std::future

void producer(std::promise<int> &&promise)
{
    sleep(1000);
    int value = 42;
    promise.set_value(value); // this will wake up future.get()

    sleep(1000);
    std::cout << "producer is about to terminate.." << std::endl;
}

void consumer(std::future<int> &&future)
{
    int value = future.get();
    std::cout << "Consumer received value: " << value << std::endl;
}

int main()
{
    // Create a promise and a future.
    std::promise<int> promise;
    std::future<int> future = promise.get_future();

    // Start the producer and consumer threads.
    std::thread t1(producer, std::move(promise));
    std::thread t2(consumer, std::move(future));

    // Wait for the threads to finish.
    t1.join();
    t2.join();

    return 0;
}
/* output 

Consumer received value: 42
producer is about to terminate..

*/
hyunsik-yoon commented 1 year ago

exception in std::promise, std::future

void producer(std::promise<int> &&promise)
{
    try {
        throw std::runtime_error("exception_from_producer");
    } catch (...) {
        promise.set_exception(std::current_exception());  // share state 에 exception 저장
    }
}

void consumer(std::future<int> &&future)
{
    try {
        future.get();  // 저장된 exception이 throw 된
    } catch (const std::exception &exc) {
        std::cout << "catching " << exc.what() << std::endl;
    }
}
hyunsik-yoon commented 1 year ago

std::shared_future

int main() { std::promise p;

std::shared_future<int> f = p.get_future();

// the following also works (어차피 다 move construcor를 부르는 듯):
//
// std::shared_future<int> f{p.get_future()};
// std::shared_future<int> f{std::move(p.get_future())}; <-- 개인적으로 이게 명확하다
// std::shared_future<int> f{p.get_future().share()};

std::thread t1([&f]() {
    std::cout << f.get() << std::endl;
});

std::thread t2([&f]() {
    std::cout << f.get() << std::endl;
});

std::thread t(setter, std::ref(p));
output:

42 42

hyunsik-yoon commented 1 year ago

atomic

void increment(int id) { for (int i = 0; i < 1000000; ++i) // counter = counter + 1; // --> (1) NOT OK ++counter; // --> (2) OK // counter.fetch_add(1); --> (3) OK // counter += 1 --> (4) OK }

int main() { std::thread t1(increment, 1); std::thread t2(increment, 2);

/ (2), (3), (4) 의 결과는 2000000 이지만 (1) 의 결과는 2000000 보다 작은 수가 나온다. (예: 1291737, 1304097 돌릴때마다 다름) /



- 위의 (2), (3), (4) 가 되는 이유는
  - `atomic` type의 operation overloading으로 정의되어 있어서 그럼
  - `fetch_add()`, `fetch_sub()`
  - `fetch_and()`, `fetch_or()`, `fetch_xor()`
  - `++`, `--`
  - `+=`, `-=`
  - `&=`, `|=`, `^=`
  - `=`
hyunsik-yoon commented 1 year ago

spin lock

std::atomic_flag

// 초기화는 항상 이걸로 함. false 값을 대입하게 된다고.
// true 이면 누가 critical section에 있다는 뜻
std::atomic_flag flag = ATOMIC_FLAG_INIT;

void process(std::vector<int> &v, char id)
{
    for (int x = 1; x < 4; ++x)
    {
        while (flag.test_and_set()) { sleep(5);  } // spin lock
        // entering critical section

        std::for_each(v.begin(), v.end(), [&x](int &val) { val = x; sleep(5); });
        std::for_each(v.begin(), v.end(), [&id](int val) { std::cout << id << val << ","; sleep(5); });
        std::cout << std::endl;

        // let's exit critical section
        flag.clear();
        sleep(5);
    }
}

int main() {
    std::vector<int> v(10, 0);

    std::thread t1(process, std::ref(v), 'a');
    std::thread t2(process, std::ref(v), 'b');

    t1.join(); t2.join();
    return 0;
}
hyunsik-yoon commented 1 year ago

lock free programming

hyunsik-yoon commented 1 year ago

std::packaged_task

int divide(int x, int y) {
    if (y == 0) {
        throw std::runtime_error("Divide by zero error");
    }
    return x / y;
}

int main() {
    std::packaged_task<int(int, int)> task(divide);
    std::future<int> result = task.get_future();

    // Run the task in a separate thread
    std::thread thread(std::move(task), 10, 0);

    try {
        int quotient = result.get();
        std::cout << "Result: " << quotient << std::endl;
    } catch (std::exception& e) {
        std::cerr << "Exception caught: " << e.what() << std::endl;
    }

    thread.join();
    return 0;
}
hyunsik-yoon commented 1 year ago

std::async

#include <iostream>
#include <future>

void foo()
{
    throw std::runtime_error("An error occurred in foo()");
}

int main()
{
    // std::launch::async 은 다른 thread 로 생성하여 foo를 수행하라는 뜻.
    std::future<void> future = std::async(std::launch::async, foo);

    try {
        future.get();
    }
    catch(const std::exception& e) {
        std::cout << "Exception caught: " << e.what() << std::endl;
    }

    return 0;
}
hyunsik-yoon commented 1 year ago

어떤 thread object를 써야 하나?

hyunsik-yoon commented 1 year ago

parallelism

hyunsik-yoon commented 1 year ago

thread safe queue

private: std::mutex mtx; std::condition_variable cv;

std::queue<int> q;

};

hyunsik-yoon commented 1 year ago

thread pool

class thread_pool { concurrent_queue_cv work_queue; std::vector threads; void worker(); public: thread_pool(); ~thread_pool(); void submit(Func f); };

void thread_pool::worker() { while (true) { Func task; work_queue.pop(task); task(); } }

thread_pool::thread_pool() { const unsigned thread_count = thread::hardware_concurrency(); for (unsigned i = 0; i < thread_count; ++i) threads.push_back(thread{&thread_pool::worker, this}); }

thread_pool::~thread_pool() { for (auto& t: threads) { t.join(); } }

void thread_pool::submit(Func f) { work_queue.push(f); }

hyunsik-yoon commented 1 year ago

concurrent read/write file

fcntl

read lock using fcntl (shared lock)

#include <stdio.h>
#include <fcntl.h>
#include <unistd.h>

void lock_reading(int fd) {
    // Set read lock
    struct flock fl;
    memset(&fl, 0, sizeof(fl)); // just in case

    fl.l_type = F_RDLCK;
    fl.l_whence = SEEK_SET;
    fl.l_start = 0;
    fl.l_len = 0;   // 0 means entire file
    if (fcntl(fd, F_SETLK, &fl) == -1) { perror("fcntl"); exit(1); }
}

void unlock_file(int fd) {
    // Unlock
    struct flock fl;
    memset(&fl, 0, sizeof(fl)); // just in case

    fl.l_type = F_UNLCK;
    if (fcntl(fd, F_SETLK, &fl) == -1) { perror("fcntl"); exit(1); }
}

int main(int argc, char** argv) {
    // Open file
    int fd = open(argv[1], O_RDONLY);
    if (fd == -1) { perror("open"); return 1; }

    lock_reading(fd);

    // Do some work with the locked file...
    printf("file LOCKED. press [Enter] to unlock.\n");
    getchar();

    unlock_file(fd);

    // Close file
    close(fd);
    return 0;
}

write lock using fcntl (exclusive lock)

#include <fcntl.h>
#include <unistd.h>
#include <cstring>

void lock_writing(int fd) {
    struct flock fl;
    std::memset(&fl, 0, sizeof(fl));
    fl.l_type = F_WRLCK;  // Set write lock
    fl.l_whence = SEEK_SET;
    fl.l_start = 0;
    fl.l_len = 0;  // Lock the entire file

    if (fcntl(fd, F_SETLKW, &fl) == -1) {
        // Handle error
    }
}

int main() {
    // Open file
    int fd = open("example.txt", O_RDWR);

    // Lock for writing
    lock_writing(fd);

    // Write to file
    write(fd, "Hello, world!", 13);

   // unlock
  unlock_file(fd);

    // Close file
    close(fd);
    return 0;
}
hyunsik-yoon commented 1 year ago

more to know

thread starvation

livelock

daemon thread

hyunsik-yoon commented 1 year ago

live lock

    pthread_t thread1 = t1.native_handle(); // pthread_t type, which is the POSIX thread ID

    // set the scheduling parameters for thread1 to a higher priority

    struct sched_param params;

    params.sched_priority = sched_get_priority_max(SCHED_FIFO); 
    // SCHED_FIFO is a real-time scheduling policy

    int ret = pthread_setschedparam(thread1, SCHED_FIFO, &params);

priority problem

prority inversion