Squadrick / shadesmar

Fast C++ IPC using shared memory
MIT License
555 stars 85 forks source link

a naive design for shared memory #65

Closed bethebest0622 closed 1 year ago

bethebest0622 commented 1 year ago

I read your code, it's really masterpiece.

but could you have a look at my design, it's naive and for a very simple purpose:

multi-producer multi-consumer shared memory, and avoid CPU busy-waiting (so use a conditional_variable)

I have a header with conditional_variable and pthread_mutex_t. will create when shared_memory are allocated.

  // header.hpp
    #pragma once
    #include <sys/ipc.h>
    #include <sys/shm.h>
    #include <sys/types.h>
    #include <semaphore.h>
    #include <pthread.h>
    #include <stdio.h>
    #include <string.h>
    #include <error.h>
    #include <errno.h>
    #include <fcntl.h>
    #include <unistd.h>
    #include <sys/sysinfo.h>
    #include <atomic>
    #include <memory>
    #include <mutex>
    #include <condition_variable>
    #include <iostream>
    using namespace std;

    struct Header {
      const size_t size_;
      std::atomic_int32_t tail_;
      std::mutex cv_mut_;
      std::condition_variable cv_;
      pthread_mutex_t mut_;
      Header(size_t size, int32_t tail) : size_(size), tail_(tail) {
        pthread_mutexattr_t mutexattr;
        pthread_mutexattr_init(&mutexattr);
        pthread_mutexattr_setpshared(&mutexattr, PTHREAD_PROCESS_SHARED);
        pthread_mutexattr_setrobust(&mutexattr, PTHREAD_MUTEX_ROBUST);
        pthread_mutex_init(&mut_, &mutexattr); 
      }
    };

I also have a shm_worker which used to init and allocate memory. the only thing it do is to init, create or connect to existed.

     // shm_worker.hpp
    #pragma once
    #include "./header.hpp"

    class ShmWorker {
     public:
      ShmWorker() : is_init(false) {}

      virtual ~ShmWorker() {
        shmdt(m_data);
        if (create_new) shmctl(shmid, IPC_RMID, 0); 
      }

     protected:
      key_t get_keyid(const std::string & name) {
        const std::string & s = "./" + name;
        if (access(s.c_str(), F_OK) == -1) { const std::string & s1 = "touch " + s; system(s1.c_str()); }
        key_t semkey = ftok(name.c_str(), 1); 
        if (semkey == -1) { printf("shm_file:%s not existed\n", s.c_str()); exit(1); }
        return semkey;
      }

      template <typename T>
      void init(const std::string& name, int size) {  // one word to conclude: if not existed, create, else connect
        if (is_init) { printf("this shmworker has beed inited!\n"); return; }
        m_key = get_keyid(name);
        shmid = shmget(m_key, 0, 0); 
        if (shmid == -1) {
          if (errno == ENOENT || errno == EINVAL) {
            shmid = shmget(m_key, sizeof(Header) + sizeof(T) * size, 0666 | IPC_CREAT | O_EXCL);
            if (shmid == -1) { printf("both connet and create are failed for shm\n"); exit(1); }
            printf("creating new shm %s\n", name.c_str());
            create_new = true;
            m_data = (char*)shmat(shmid, NULL, 0); 
            Header* header = new Header(size, 0); 
            memcpy(m_data, header, sizeof(Header));
          } else {
            exit(1);
          }   
        } else {
          m_data = (char*)shmat(shmid, 0, 0); 
          // m_size = reinterpret_cast<std::atomic_int*>(m_data)->load();
        }   

        if (m_data == (char*)(-1)) { perror("shmat"); exit(1); }
        is_init = true;
      }

      int m_key;
      int shmid;
      char* m_data;
      bool is_init;
      bool create_new = false;
    };

the producer code is:

 #pragma once
    #include <mutex>
    #include <fcntl.h>
    #include <fstream>
    #include "./shm_worker.hpp"

    template <typename T>
    class ShmSender: public ShmWorker {
     public:
      ShmSender(const std::string& key, int size = 4096) {
        init <T> (key, size);
        header_ = (Header*)m_data;
      }

      virtual ~ShmSender() = default;

      void Send(const T& shot) {
        // pthread_mutex_lock(&header_->mut_);
        {   
          std::lock_guard<std::mutex> lk(header_->cv_mut_);
          memcpy(m_data + sizeof(Header) + header_->tail_.load() % header_->size_ * sizeof(T), &shot, sizeof(T));
          header_->tail_.fetch_add(1);
        }   
        // pthread_mutex_unlock(&header_->mut_);
        header_->cv_.notify_all();
      }

     private:
      Header * header_;
    };

the consumer code is:

 #pragma once
    #include <unistd.h>
    #include "./shm_worker.hpp"

    template <typename T>
    class ShmRecver : public ShmWorker {
     public:
      ShmRecver(const std::string & key, int size = 4096) {
        init <T> (key, size);
        header_ = (Header*)m_data;
        read_index = header_->tail_.load();
      }

      virtual ~ShmRecver() {}

      void Recv(T& t) {
        std::unique_lock<std::mutex> lk(header_->cv_mut_);
        header_->cv_.wait(lk, [&] { return read_index != header_->tail_; });  // wait tail changed by Sender
        t = *(T*)(m_data + sizeof(*header_) + (read_index ++));
      }

     private:
      int read_index;
      Header* header_;
    };

but, when i call sender.Send(), the recver.Recv() not response.

seems conditional_variable notify_all didn't wake up Recver process's wait

could you help on this?

and it would be great if you can kindly give some advice about this design.

Squadrick commented 1 year ago

Hey @bethebest0622, this is an admirable effort, but unfortunately I don't have the time to review this nor is this the right place for such a review. I should suggest you reach out to https://softwareengineering.stackexchange.com instead.