GuiyinLi / XDMyo-Dataset

Upper limb gesture dataset based on surface electromyographic and inertial measurement unit.
GNU General Public License v3.0
2 stars 0 forks source link

你好,有关于这篇论文的代码码 #1

Open shenliaoxx opened 1 month ago

GuiyinLi commented 1 month ago

数据采集可以参考:https://github.com/sebastiankmiec/PythonMyoLinux.git 数据预处理:代码找不到了,原始CSV数据的分割方法在readme.md和论文里面说明了

GuiyinLi commented 1 month ago

这是CSV数据格式化的代码

import glob

from scipy.io import loadmat
import pandas as pd
import numpy as np
import os
from tqdm import tqdm
import shutil
import myo
from dataprocess import Dataset
from pykalman import KalmanFilter
from utils import plot

def mk_dir(path):
    if not os.path.exists(path):
        os.mkdir(path)

class CSV2DAT:
    def __init__(self, csv_root="../datasets/csv/", save_root="../datasets/dat/"):
        self.csv_root = csv_root
        self.save_root = save_root

    def get_dat(self):
        csvfile = glob.glob(self.csv_root + '*.csv')
        for f in tqdm(csvfile):
            savepath = self.save_root + os.path.basename(f)[0:-4] + '/'
            csv_path = self.csv_add_relabel(f, savepath)
            self.csv_to_dat(csv_path)
        self.move_csv(self.save_root)

    @staticmethod
    def csv_add_relabel(csvfile, savepath):
        mk_dir(savepath)
        df = pd.read_csv(csvfile)
        col_names = df.columns.tolist()
        for index, value in enumerate(col_names):
            col_names[index] = value.replace(" ", "")
        col_names[0] = 'repetition'
        df.columns = col_names

        row_delete = df[df.Label == -1].index
        df.drop(row_delete, inplace=True)
        prior = 1
        re = 1
        re_col = []
        for index, row in df.iterrows():
            if row[1] > 0:
                if prior > 0 and prior != row[1]:
                    re = 1
                if prior == 0 and prior != row[1]:
                    re = re + 1
            re_col.append(re if row[1] > 0 else 0)
            prior = row[1]
        df['repetition'] = re_col
        row_delete = df[(df.Label >= 8)].index
        df.drop(row_delete, inplace=True)
        df['repetition'] = df['repetition'].astype(int)
        df.to_csv(savepath + os.path.basename(csvfile)[0:-4] + ".csv", index=False)
        return savepath + os.path.basename(csvfile)[0:-4] + ".csv"

    @staticmethod
    def csv_to_dat(csvfile):
        # read mat
        df = pd.read_csv(csvfile)
        s_emg = df.iloc[:, 2:10].values
        mk_dir(os.path.dirname(csvfile) + '/training0')
        s_label = np.reshape(df['Label'].values, (df['Label'].values.shape[0]))
        s_repetition = np.reshape(df['repetition'].values, (df['repetition'].values.shape[0]))
        emg = []
        num_label = np.max(s_label)
        for i in range(s_repetition.shape[0]):
            if s_repetition[i] > 0:
                emg.extend(s_emg[i])
                if ((i + 1) == s_repetition.shape[0]) or (s_label[i + 1] == 0) or s_label[i + 1] != s_label[i]:
                    emg = np.array(emg).astype(np.int16)
                    emg.tofile(os.path.dirname(csvfile) + '/training0' + '/classe_' + str(
                        s_label[i] - 1 + num_label * (s_repetition[i] - 1)) + '.dat')
                    emg = []

    def move_csv(self, savedir):
        pre_train_path = savedir + 'PreTrainingDataset/'
        eval_path = savedir + 'EvaluationDataset/'
        mk_dir(pre_train_path)
        mk_dir(eval_path)
        for i in Dataset.pre_train['subjects']:
            if os.path.exists(savedir + 's' + str(i)):
                shutil.move(savedir + 's' + str(i), pre_train_path + 's' + str(i))
                print(savedir + 's' + str(i) + 'move successes')
        for i in Dataset.eval_train['subjects']:
            if os.path.exists(savedir + 's' + str(i)):
                shutil.move(savedir + 's' + str(i), eval_path + 's' + str(i))
                print(savedir + 's' + str(i) + 'move successes')
        self.spilt_dataset(eval_path, 6)

    @staticmethod
    def spilt_dataset(path, subjects, classes=7):
        eval_path = path
        for i in Dataset.eval_train['subjects']:
            path = eval_path + 's' + str(i)
            train_path = path + '/training0/'
            test0_path = path + '/test0/'
            mk_dir(test0_path)
            filelist = os.listdir(train_path)
            for file in filelist:
                classe = int(file[file.index('_') + 1:file.index('.')])
                src = os.path.join(train_path, file)
                dst = ''
                base = ''
                if 4 * classes <= classe < 6 * classes:
                    dst = os.path.join(test0_path, file)
                    base = os.path.basename(dst).replace(str(classe), str(classe - 4 * classes))
                if dst != '':
                    shutil.move(src, dst)
                    redst = os.path.dirname(dst) + '/' + base
                    print(redst)
                    os.rename(dst, redst)

class MAT2DAT:
    def __int__(self, mat_root="../datasets/mat"):
        self.mat_root = mat_root

    def get_dat(self, exercises, subjects=10):
        for i in tqdm(range(1, subjects + 1)):
            s = str(i)
            e = str(exercises)
            self.mat_to_dat(self.mat_root + '/s' + s + '/S' + s + '_E' + e + '/S' + s + '_E' + e + '_A1.mat')

    @staticmethod
    def split_label(csvfile):
        s = pd.read_csv(csvfile)
        group = s.groupby('label')
        for key, value in group:
            re_group = value.groupby('repetition')
            for re_key, re_value in re_group:
                re_value.index = range(len(re_value))
                total = re_value.shape[0]
                coe = ((total - 1000) / total) / 2
                start = list(range(int(total * coe)))
                end = list(range(int(total * (1 - coe)), total))
                start.extend(end)
                re_value.drop(start, inplace=True)
                if re_key != 1:
                    header = False
                    mode = 'a'
                else:
                    header = True
                    mode = 'w'
                re_value.to_csv(csvfile[:-4] + '__' + str(key) + csvfile[-4:], index=False, header=header, mode=mode)

    @staticmethod
    def mat_to_dat(matfile):
        # read mat
        s = loadmat(matfile)
        s_emg = s['emg']
        s_label = np.reshape(s['stimulus'], (s['stimulus'].shape[0]))
        s_repetition = np.reshape(s['repetition'], (s['repetition'].shape[0]))
        emg = []
        num_label = np.max(s_label)
        for i in range(s_repetition.shape[0]):
            if s_repetition[i] > 0:
                emg.extend(s_emg[i][0:8])
                if ((i + 1) == s_repetition.shape[0]) or (s_label[i + 1] == 0):
                    emg = np.array(emg).astype(int)
                    emg.tofile(os.path.dirname(matfile) + '/training0' + '/classe_' + str(
                        s_label[i] - 1 + num_label * (s_repetition[i] - 1)) + '.dat')
                    emg = []

    @staticmethod
    def spilt_dat(path, subjects, exercise=2, classes=17):
        eval_path = path
        num_eval_subject = subjects
        exercise = exercise
        classes = classes
        for i in range(1, num_eval_subject + 1):
            path = eval_path + '/s' + str(i) + '/S' + str(i) + '_E' + str(exercise)
            train_path = path + '/training0/'
            test0_path = path + '/Test0/'
            test1_path = path + '/Test1/'
            filelist = os.listdir(train_path)
            for file in filelist:
                classe = int(file[file.index('_') + 1:file.index('.')])
                src = os.path.join(train_path, file)
                dst = ''
                base = ''
                if 4 * classes <= classe < 5 * classes:
                    dst = os.path.join(test0_path, file)
                    base = os.path.basename(dst).replace(str(classe), str(classe - 4 * classes))
                if 5 * classes <= classe < 6 * classes:
                    dst = os.path.join(test1_path, file)
                    base = os.path.basename(dst).replace(str(classe), str(classe - 5 * classes))
                if dst != '':
                    shutil.move(src, dst)
                    redst = os.path.dirname(dst) + '/' + base
                    print(redst)
                    os.rename(dst, redst)

    @staticmethod
    def mat_to_csv(matfile):
        # read mat
        s = loadmat(matfile)
        s_emg = s['emg']
        s_label = s['stimulus']
        s_repetition = s['repetition']

        # merge data
        s_data = np.column_stack((s_repetition, s_emg, s_label))

        # delete Invalid data
        s_data = np.delete(s_data, np.where(s_data[:, -1] == 0)[0], axis=0)

        # set table header
        header = ['repetition']
        channel = []
        for i in range(1, s_emg.shape[1] + 1):
            channel.append('EMG_' + str(i).rjust(2, '0'))
        header.extend(channel)
        header.append('label')

        # save data to csv
        s_data = pd.DataFrame(s_data)
        s_data[0] = s_data[0].astype(s_repetition.dtype)
        s_data[s_data.shape[1] - 1] = s_data[s_data.shape[1] - 1].astype(s_label.dtype)

        csvfile = matfile[:-3] + 'csv'
        s_data.to_csv(csvfile, index=False, header=header)

def csv_to_npy(csvfile, savepath, gestures, person):
    data = pd.read_csv(csvfile)
    data_value = data.values[:, :]
    for gesture in gestures:
        times = 1
        flag = 1
        data_gesture = []
        # kf_online = myo.KalmanFilterOnline()
        for d in data_value:
            if d[1] == gesture:
                emg = d[2:10]
                acc_conv, gyro_conv = myo.IMU.coordinate_convert(d[10:14], d[14:17], d[17:20])

                # imu_conv = kf_online.filter(gyro_conv, acc_conv)
                imu_conv = np.concatenate((acc_conv, gyro_conv))
                all_data = np.concatenate((emg, imu_conv))
                data_gesture.append(all_data)
                flag = 0
            else:
                if flag == 0:
                    data_gesture = np.array(data_gesture).astype(np.float32)
                    np.save(savepath + str(person) + '_' + str(gesture) + '_' + str(times) + '.npy', data_gesture)
                    # kf_online.state_init()
                    # emg = np.array(data_gesture).astype(np.int16)
                    # emg.tofile(savepath + '/classe_' + str(gesture - 1 + 7 * (times - 1)) + '.dat')
                    data_gesture = []
                    times = times + 1
                    flag = 1

def calculate_bias():
    csvfile = '../datasets/Bias/myo_wear_rest.csv'
    df = pd.read_csv(csvfile)
    signal = df[(df.Label == 1)].values
    acc = []
    gyro = []
    for s in signal:
        q = s[10:14]
        a = s[14:17]
        g = s[17:20]
        a_conv, g_conv = myo.IMU.coordinate_convert(q, a, g)
        acc.append(a_conv)
        gyro.append(g_conv)
    acc = np.array(acc)
    gyro = np.array(gyro)
    print('ACC_X:', acc[:, 0].std())  # 0.001875685910043594  0.003409516280756376
    print('GYRO_X:', gyro[:, 0].std())  # 0.06200327295444661 0.5893605072465599

    print('ACC_Y:', acc[:, 1].std())  # 0.0018913260039597713  0.004465990601111007
    print('GYRO_Y:', gyro[:, 1].std())  # 0.06963855596098587  0.34299795842099245

    print('ACC_Z:', acc[:, 2].std())  # 0.0028977735727613507  0.0039070980272106004
    print('GYRO_Z:', gyro[:, 2].std())  # 0.05068126393504067  0.303374151213909

    print('end')

if __name__ == "__main__":
    # csv2dat = csv_to_npy("../data/csv/", "../data/npy/",range(1,6),range(1,7))
    # csv2dat.get_dat()
    # mat2dat = MAT2DAT('../../TLMyo/DB5/PreTrainingDataset')
    # num_mem = 21
    gestures = range(1, 14)
    csv_to_npy('../datasets/csv/s0.csv', '../datasets/npy/', gestures, 22)
    # for i in tqdm(range(1, 4 + 1)):
    #     csv_to_npy('../datasets/csv/s' + str(i) + '.csv', '../data/npy/', gestures, i)
shenliaoxx commented 1 month ago

谢谢