securefederatedai / openfl

An Open Framework for Federated Learning.
https://openfl.readthedocs.io/en/latest/index.html
Apache License 2.0
727 stars 204 forks source link

ValueError: buffer size must be a multiple of element size #1142

Open HubgitCCL opened 1 day ago

HubgitCCL commented 1 day ago

Describe the bug I trained my model both on the PC and the testbed of the lab. When I finished the training, I wanted to use 'fx model save' command to save the model, however, the error message is like this:

INFO     Loading OpenFL model protobuf:  🠆 C:\Users\users\openfl\openfl-server-main\save\best.pbuf                                 model.py:154
EXCEPTION : buffer size must be a multiple of element size
Traceback (most recent call last):
  File "<frozen runpy>", line 198, in _run_module_as_main
  File "<frozen runpy>", line 88, in _run_code
  File "c:\Users\users\openfl\openfl-server-main\.venv\Scripts\fx.exe\__main__.py", line 7, in <module>
  File "c:\Users\users\openfl\openfl-server-main\.venv\Lib\site-packages\openfl\interface\cli.py", line 328, in entry
    error_handler(e)
  File "c:\Users\users\openfl\openfl-server-main\.venv\Lib\site-packages\openfl\interface\cli.py", line 246, in error_handler
    raise error
  File "c:\Users\users\openfl\openfl-server-main\.venv\Lib\site-packages\openfl\interface\cli.py", line 326, in entry
    cli(max_content_width=120)
  File "c:\Users\users\openfl\openfl-server-main\.venv\Lib\site-packages\click\core.py", line 1157, in __call__
    return self.main(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "c:\Users\users\openfl\openfl-server-main\.venv\Lib\site-packages\click\core.py", line 1078, in main
    rv = self.invoke(ctx)
         ^^^^^^^^^^^^^^^^
  File "c:\Users\users\openfl\openfl-server-main\.venv\Lib\site-packages\click\core.py", line 1688, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
                           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "c:\Users\users\openfl\openfl-server-main\.venv\Lib\site-packages\click\core.py", line 1688, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
                           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "c:\Users\users\openfl\openfl-server-main\.venv\Lib\site-packages\click\core.py", line 1434, in invoke
    return ctx.invoke(self.callback, **ctx.params)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "c:\Users\users\openfl\openfl-server-main\.venv\Lib\site-packages\click\core.py", line 783, in invoke
    return __callback(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "c:\Users\users\openfl\openfl-server-main\.venv\Lib\site-packages\click\decorators.py", line 33, in new_func
    return f(get_current_context(), *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\users\openfl\openfl-server-main\.venv\Lib\site-packages\openfl\interface\model.py", line 106, in save_
    task_runner = get_model(plan_config, cols_config, data_config, model_protobuf_path)
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\users\openfl\openfl-server-main\.venv\Lib\site-packages\openfl\interface\model.py", line 158, in get_model
    tensor_dict, _ = utils.deconstruct_model_proto(model_protobuf, NoCompressionPipeline())
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "c:\Users\users\openfl\openfl-server-main\.venv\Lib\site-packages\openfl\protocols\utils.py", line 236, in deconstruct_model_proto
    tensor_dict[key] = compression_pipeline.backward(
                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "c:\Users\users\openfl\openfl-server-main\.venv\Lib\site-packages\openfl\pipelines\pipeline.py", line 162, in backward
    data = transformer.backward(data=data, metadata=transformer_metadata.pop(), **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "c:\Users\users\openfl\openfl-server-main\.venv\Lib\site-packages\openfl\pipelines\pipeline.py", line 90, in backward
    flat_array = np.frombuffer(data, dtype=np.float32)
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
ValueError: buffer size must be a multiple of element size

To Reproduce the required python files are attached to this description.

Steps to reproduce the behavior:

  1. Trained the model
  2. Run 'fx model save -i "save/best.pbuf" or other pbuf like last.pbuf '
  3. See error

Expected behavior I expected I get a model which I could to predict and test without the framework.

Desktop (please complete the following information):

kta-intel commented 23 hours ago

Hey @HubgitCCL, it seems like a mismatch between the buffer size and the expected data type size. Are you able to double-check that the size in your .pbuf file is a multiple of 4 bytes? Wonder if the model is being saved as float64, which could be causing this issue.

HubgitCCL commented 22 hours ago

Hi @kta-intel , I think it is saved as float64, with the help of the code below.

I attached my last.pbuf here for reference. last.zip

import struct

file_path = 'save/best.pbuf'

float64_size = 8
float32_size = 4

def read_and_check_data(file_path):
    with open(file_path, 'rb') as f:
        file_size = f.seek(0, 2)
        f.seek(0)

        print(f"File size: {file_size} bytes")

        byte_offset = 0
        while byte_offset < file_size:
            data_64 = f.read(float64_size)
            if len(data_64) == float64_size:
                try:
                    value_64 = struct.unpack('d', data_64)
                    print(f"Offset {byte_offset}: As float64: {value_64}")
                    byte_offset += float64_size
                    continue
                except struct.error:
                    pass

            data_32 = f.read(float32_size)
            if len(data_32) == float32_size:
                try:
                    value_32 = struct.unpack('f', data_32)
                    print(f"Offset {byte_offset}: As float32: {value_32}")
                    byte_offset += float32_size
                    continue
                except struct.error:
                    pass

            byte_offset += 1

read_and_check_data(file_path)
HubgitCCL commented 19 hours ago

What' s more, I make a little modification to the pipeline.py as follow, it turns out like this:

EXCEPTION : Buffer size (179 bytes) must be a multiple of element size (4 bytes).
Traceback (most recent call last):
  File "C:\Users\CCL\AppData\Local\Programs\Python\Python38\lib\runpy.py", line 192, in _run_module_as_main
    return _run_code(code, main_globals, None,
  File "C:\Users\CCL\AppData\Local\Programs\Python\Python38\lib\runpy.py", line 85, in _run_code
    exec(code, run_globals)
  File "D:\openfl\openfl-server\.venv\Scripts\fx.exe\__main__.py", line 7, in <module>
  File "D:\openfl\openfl-server\.venv\lib\site-packages\openfl\interface\cli.py", line 328, in entry
    error_handler(e)
  File "D:\openfl\openfl-server\.venv\lib\site-packages\openfl\interface\cli.py", line 246, in error_handler
    raise error
  File "D:\openfl\openfl-server\.venv\lib\site-packages\openfl\interface\cli.py", line 326, in entry
    cli(max_content_width=120)
  File "D:\openfl\openfl-server\.venv\lib\site-packages\click\core.py", line 1157, in __call__
    return self.main(*args, **kwargs)
  File "D:\openfl\openfl-server\.venv\lib\site-packages\click\core.py", line 1078, in main
    rv = self.invoke(ctx)
  File "D:\openfl\openfl-server\.venv\lib\site-packages\click\core.py", line 1688, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "D:\openfl\openfl-server\.venv\lib\site-packages\click\core.py", line 1688, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "D:\openfl\openfl-server\.venv\lib\site-packages\click\core.py", line 1434, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "D:\openfl\openfl-server\.venv\lib\site-packages\click\core.py", line 783, in invoke
    return __callback(*args, **kwargs)
  File "D:\openfl\openfl-server\.venv\lib\site-packages\click\decorators.py", line 33, in new_func
    return f(get_current_context(), *args, **kwargs)
  File "D:\openfl\openfl-server\.venv\Lib\site-packages\openfl\interface\model.py", line 106, in save_
    task_runner = get_model(plan_config, cols_config, data_config, model_protobuf_path)
  File "D:\openfl\openfl-server\.venv\Lib\site-packages\openfl\interface\model.py", line 158, in get_model
    tensor_dict, _ = utils.deconstruct_model_proto(model_protobuf, NoCompressionPipeline())
  File "D:\openfl\openfl-server\.venv\lib\site-packages\openfl\protocols\utils.py", line 236, in deconstruct_model_proto
    tensor_dict[key] = compression_pipeline.backward(
  File "D:\openfl\openfl-server\.venv\lib\site-packages\openfl\pipelines\pipeline.py", line 171, in backward
    data = transformer.backward(data=data, metadata=transformer_metadata.pop(), **kwargs)
  File "D:\openfl\openfl-server\.venv\lib\site-packages\openfl\pipelines\pipeline.py", line 95, in backward
    raise ValueError(f"Buffer size ({buffer_size} bytes) must be a multiple of element size ({element_size} bytes).")
ValueError: Buffer size (179 bytes) must be a multiple of element size (4 bytes).

The buffer size is 179 bytes, how this happens and what can I do?

Below is my modification.

83993b03-e698-4c98-86cf-b5afef23fd60

HubgitCCL commented 17 hours ago

I am training a BiLSTM model with one aggregator and two collaborators, and each of the collaborator have different dataset.

But I noticed, when I run the 'fx model save' on the aggregator, there is a output line like

INFO     Received Data Path: data/Anomalous_data/V_1S_1.csv                                                                  kerasLSTM_inmemory.py:14

However, collaborator 1 was trained with this V_1S_1.csv, collaborator 2 was trained with another file, V_2S_2.csv. Could this causes the error?

HubgitCCL commented 15 hours ago

Anyway, I managed to rewrite the Aggregator class in the OpenFL. I saved the weights as an npz file, and then I used the npz file and my model structure to rebuild the model in the SavedModel format, and it turned out to succeed. Now I am able to get the saved model. What I do with the Aggregator class is as follows:

     def _save_model(self, round_number, file_path):
        #Save the best or latest model.

        Args:
            round_number (int): Model round to be saved.
            file_path (str): Either the best model or latest model file path.

        Returns:
            None
       #

        import tensorflow as tf
        import numpy as np
        import os

        # Extract the model from TensorDB and set it to the new model
        og_tensor_dict, _ = utils.deconstruct_model_proto(
            self.model, compression_pipeline=self.compression_pipeline
        )
        tensor_keys = [
            TensorKey(k, self.uuid, round_number, False, ("model",))
            for k, v in og_tensor_dict.items()
        ]
        tensor_dict = {}
        for tk in tensor_keys:
            tk_name, _, _, _, _ = tk
            tensor_dict[tk_name] = self.tensor_db.get_tensor_from_cache(tk)
            if tensor_dict[tk_name] is None:
                self.logger.info(
                    "Cannot save model for round %s. Continuing...",
                    round_number,
                )
                return

        if file_path == self.best_state_path:
            self.best_tensor_dict = tensor_dict
        if file_path == self.last_state_path:
            self.last_tensor_dict = tensor_dict

        # 在生成pbuf之前保存Keras模型权重
        try:
            # 创建保存目录
            save_dir = os.path.dirname(file_path)
            keras_weights_path = os.path.join(save_dir, f'keras_weights_round_{round_number}')

            # 确保目录存在
            os.makedirs(save_dir, exist_ok=True)

            # 将tensor_dict转换为Keras可以加载的格式
            weights_dict = {}
            for name, value in tensor_dict.items():
                # 移除可能的'kernel'或'bias'前缀
                clean_name = name.replace('/kernel', '').replace('/bias', '')
                if name.endswith('/kernel'):
                    weights_dict.setdefault(clean_name, []).insert(0, value)
                elif name.endswith('/bias'):
                    weights_dict.setdefault(clean_name, []).append(value)
                else:
                    weights_dict[clean_name] = [value]

            # 保存为Keras H5格式
            np.savez(keras_weights_path, **tensor_dict)
            self.logger.info(f"Keras weights saved to {keras_weights_path}.npz")

            # 另外也保存为TensorFlow SavedModel格式(如果模型结构可用)
            try:
                if hasattr(self, 'keras_model'):
                    # 设置权重
                    self.keras_model.set_weights([tensor_dict[name] for name in tensor_dict])
                    # 保存完整模型
                    tf_save_path = keras_weights_path + '_saved_model'
                    self.keras_model.save(tf_save_path)
                    self.logger.info(f"Full Keras model saved to {tf_save_path}")
            except Exception as e:
                self.logger.warning(f"Could not save full model: {str(e)}")

        except Exception as e:
            self.logger.error(f"Error saving Keras weights: {str(e)}")

        # 然后再生成和保存pbuf文件
        self.model = utils.construct_model_proto(
            tensor_dict, round_number, self.compression_pipeline
        )
        utils.dump_proto(self.model, file_path)`

Thanks for your consistent help. @kta-intel