nv-morpheus / Morpheus

Morpheus SDK
Apache License 2.0
366 stars 139 forks source link

[FEA]: Add c++ level 'DataManager', and high level python 'PayloadManager' objects for managing payloads on `ControlMessage` objects #1206

Open drobison00 opened 1 year ago

drobison00 commented 1 year ago

Is this a new feature, an improvement, or a change to existing functionality?

New Feature

How would you describe the priority of this feature request

High

Please provide a clear description of problem this feature solves

This feature will support three primary goals:

Describe your ideal solution

Pseudocode

Data Manager Object

Management Object

class DataManager {
public:
  // Constructors & Destructors
  DataManager();
  ~DataManager();

  // CRUD Operations
  bool Create(const std::string& key, const std::vector<uint8_t>& data, const std::string& type);
  bool Read(const std::string& key, std::vector<uint8_t>& data_out);
  bool Update(const std::string& key, const std::vector<uint8_t>& new_data);
  bool Delete(const std::string& key);

private:
  std::unordered_map<std::string, std::unique_ptr<DataRecord>> data_map_;
};

Base DataRecord class

class DataRecord {
public:
  virtual ~DataRecordFragment() = default;
  virtual bool Create(const std::vector<uint8_t>& data) = 0;
  virtual bool Read(std::vector<uint8_t>& data_out) = 0;
  virtual bool Update(const std::vector<uint8_t>& new_data) = 0;
  virtual bool Delete() = 0;
};

Derived in-memory class

class MemoryRecord : public DataRecord {
public:
  bool Create(const std::vector<uint8_t>& data) override;
  bool Read(std::vector<uint8_t>& data_out) override;
  bool Update(const std::vector<uint8_t>& new_data) override;
  bool Delete() override;

private:
  std::vector<uint8_t> data_;
};

Derived on-disk record class

class DiskRecord : public DataRecord {
public:
  bool Create(const std::vector<uint8_t>& data) override;
  bool Read(std::vector<uint8_t>& data_out) override;
  bool Update(const std::vector<uint8_t>& new_data) override;
  bool Delete() override;
private:
 std::string m_file_path;
};

Payload Manager Object

This example assumes we're working with DataFrames, this does not have to be the case long term, but serves to illustrate the process.

class DataFragment:
    def __init__(self, data_manager, ...):
        self._cached_df = None  # Cached DataFrame for this object
        self._data_manager = data_manager  # Reference to the C++ DataManager
        self._lock = Lock()  # Lock for thread safety

    def create(self):
        # Perform create operation
        # Use self._data_manager to perform byte-level CRUD
        pass

    def read(self):
        # Perform read operation
        # Use self._data_manager to perform byte-level CRUD
        pass

    def update(self):
        # Perform update operation
        # Use self._data_manager to perform byte-level CRUD
        pass

    def delete(self):
        # Perform delete operation
        # Use self._data_manager to perform byte-level CRUD
        pass

class DataFragmentSet:
    def __init__(self):
        self.fragments = []  # List to hold DataFragment objects
        self.fragment_sizes = []  # List to hold the sizes of each DataFragment
        self.total_size = 0  # Total size of all DataFragments in the set

    def add_fragment(self, fragment):
        ...

    def remove_fragment(self, fragment):
        ...

class PayloadManager:
    def __init__(self):
        self.data_record_manager = DataRecordManager()  # Reference to C++ DataRecordManager
        self.data_record_sets = {}

    def _create_record_set(data_object, storage_type):
        # ... If (storage_type) == reference, we just store the object reference.
        # ... fragment, collect metadata, and persist to backing store
        return DataRecordSet(...)

    def _gather(self, data_object_id, row_offset, row_count):
         # ... collect object fragments and return as a list
         return fragment_list

    def create(self, data_object, data_object_id=None, storage_type="in_memory"):
          # ... Validation
          # ... Create in backing store
          self._data_record_sets[data_object_id] = self._create_record_set(data_object)

          return data_object_id

    @contextmanager
    def read_mutable(self, data_object_id, row_offset, row_count):
        fragments = self._gather(data_object_id, row_offset, row_count)
        # Lock fragments, create concated DataFrame, cache and yield it.
        yield df
        # Synchronize changes back to DataFragments/DataFragmentSets and backing storage.
        self.update(data_object_id, df, row_offset, row_count, ...)

    def read(self, object_id, row_offset, row_count):
        fragment = self._gather(object_id, row_offset, row_count)
        # Lock fragments, create concated DataFrame, and cache it
        return df.copy_on_write()  # Return a copy-on-write DataFrame

    def update(self, data_object_id, row_offset, row_count):
        object_id_set = self._gather(data_object_id, row_offset, row_count)

        # ... Update stored objects ... this may result in re-balancing
        return data_object_id

    def delete(self, data_object_id):
        # ... Delete from local cache
        # ... Delete from backing store

Additional context

Example use case(s)

Suppose we take our existing ControlMessage object and update it to use our PayloadManager structure.

class ControlMessage
{
  public:
    ...
  private:
    std::shared_ptr<PayloadManager> m_payload_manager{nullptr};
}

Single object payload with no backing medium -- we don't need to supply a data_object_id

# Read a copy of the allocated DataFrame
df = control_message.payload().read()

# modify df, ControlMessage owned DataFrame is unaffected.
do_work_on_df(df)

# Obtain a reference to the mutable DataFrame owned by the ControlMessage
with control_message.payload().read_mutable() as df:
    # modify df, ControlMessage owned immediately reflects the modifications
    do_work_on_df(df)

    # on __exit__, nothing happens because df has no backing medium

Single object payload with a backing medium -- we don't need to supply a data_object_id

# Read a copy of the allocated DataFrame
df = control_message.payload().read()

# modify df, ControlMessage owned DataFrame is unaffected.
do_work_on_df(df)

# Obtain a reference to the mutable DataFrame owned by the ControlMessage
with control_message.payload().read_mutable() as df:
    # modify df, ControlMessage owned immediately reflects the modifications
    do_work_on_df(df)

    # on __exit__, all transactional changes on df will be propagated to the backing store.
    # This will likely need to be restricted in various ways to avoid un-necessary complexity.

Code of Conduct

cwharris commented 1 year ago

I think i understand how it would translate, but I'd really like to see an example that's framed around a stage, specifically the map operation of the stage (where by "map" I mean self.process_message in mrc.operators.map(self.process_message))).

drobison00 commented 1 year ago

@cwharris I think I might need a bit more context on your question. In the context of usage in a stage, here is a snippet from the dfp_training code, and how it would be adapted to produce the same behavior as before:

Before

        while (control_message.has_task("training")):
            control_message.remove_task("training")

            user_id = control_message.get_metadata("user_id")
            message_meta = control_message.payload()

            with message_meta.mutable_dataframe() as dfm:
                final_df = dfm.to_pandas()

            model = AutoEncoder(**model_kwargs)

            # Only train on the feature columns
            train_df = final_df[final_df.columns.intersection(feature_columns)]
            model.fit(train_df)

After

        while (control_message.has_task("training")):
            control_message.remove_task("training")

            user_id = control_message.get_metadata("user_id")
            final_df = control_message.payload().read()

            model = AutoEncoder(**model_kwargs)

            # Only train on the feature columns
            train_df = final_df[final_df.columns.intersection(feature_column s)]
            model.fit(train_df)

In the context of having a very large, backed data source, something like this would probably be more appropriate: After -- as_data_loader

        while (control_message.has_task("training")):
            control_message.remove_task("training")

            user_id = control_message.get_metadata("user_id")
            df_loader = control_message.payload().as_data_loader(feature_columns) # Note, this isn't in the pseudo code.

            model = AutoEncoder(**model_kwargs)

            # Only train on the feature columns
            model.fit(df_loader)
cwharris commented 1 year ago

Question: Since reading/writing might require IO, would it be beneficial to have async versions of the CRUD operations? On the C++ side, I imagine we'd release the gil. That allows other threads to continue processing, but without an async API in Python, we're blocking the current thread from reading more messages while any CRUD is in-flight.

drobison00 commented 1 year ago

@cwharris Yes, for most operations we'll likely want to drop the GIL or perform them asynchronously. I won't say this is 100% true, because for some record managers, we may utilize something like fsspec via pybind for the initial implementation.

In the context of a python call, we should see other python threads have a chance to grab the GIL and start executing as soon as we drop it in the c++ function. Similarly, when the c++ function issues a syscall for something like a file or network read, we should see it yield as well and let some other thread execute.

If I'm understanding the context correctly, the only thing we're likely to be blocked on is processing more messages within the current python node; I don't know of any current situations where this would be a problem.