awarebayes / RecNN

Reinforced Recommendation toolkit built around pytorch 1.7
Apache License 2.0
574 stars 113 forks source link

Pandas Performance suggestion #1

Closed truongthanh96 closed 5 years ago

truongthanh96 commented 5 years ago

Hello, i am working with your awesome paper and awesome project. And I think I can improve your performance when working ratings data in pandas. According to https://realpython.com/fast-flexible-pandas/

The code with long loops and dict management can be speed up by replacing:

ratings = pd.read_csv('../data/ml-20m/ratings.csv')
users = np.array(list(set(ratings['userId'])))
ratings_by_user = dict([(i, pd.DataFrame()) for i in users])
for u in tqdm(list(users)):
        ratings_by_user[u] = ratings.loc[ratings['userId'] == u].sort_values(by='timestamp').reset_index(drop=True)

import pickle

ratings_by_user = pickle.load(open('../data/rbu.pkl', 'rb'))

for user in tqdm(ratings_by_user.keys()):
    ratings_by_user[user] = ratings_by_user[user].drop(['timestamp', 'userId'], axis=1)
    ratings_by_user[user]['rating'] = ratings_by_user[user]['rating'].apply(lambda i: 2*(i-2.5))

to_delete = []
for user in tqdm(list(ratings_by_user.keys())):
    ratings_by_user[user]['rating'] = ratings_by_user[user]['rating'].apply(lambda i: 2*(i-2.5))
    ratings_by_user[user] = ratings_by_user[user][ratings_by_user[user][:,1] >= 0]
    ratings_by_user[user] = ratings_by_user[user].values
    # specify your frame size here! 1 end choice + # of movies to be fed into the model
    if len(ratings[i]) < 11:
        to_delete.append(ratings_by_user[user])

for i in to_delete:
    del ratings_by_user[user]

class ML20mDataset(Dataset):
    def __init__(self):
        self.set_dataset(1)

    def set_dataset(self, u):
        self.user = u
        self.dataset = ratings[u]

    def __len__(self):
        return max(len(self.dataset) - frame_size, 0)

    def __getitem__(self, idx):
        ratings = self.dataset[idx:frame_size+idx+1]
        movie_chosen = ratings[:, 0][-1]
        films_watched = ratings[:, 0][:-1]

        films_lookup = torch.stack([movies[id_to_index[i]] for i in ratings[:, 0]])

        state = films_lookup[:-1].to(cuda).float()
        next_state = films_lookup[1:].to(cuda).float()

        rewards = torch.tensor(ratings[:, 1][:frame_size]).to(cuda).float()
        next_rewards = torch.tensor(ratings[:, 1][1:frame_size+1]).to(cuda).float()

        action = films_lookup[-1].to(cuda)

        reward = torch.tensor(ratings[:, 1][-1].tolist()).to(cuda).float()
        done = torch.tensor(idx == self.__len__() - 1).to(cuda).float()

        state = (state, rewards)
        next_state = (next_state, next_rewards)

        return state, action, reward, next_state, done

with arrow processing when working with pandas df but still yield corresponding result


        print("Start update ratings %s" % (datetime.datetime.now(),))
        train_df = train_df.copy()
        train_df["rating"] = train_df['rating'].apply(lambda i: 2 * (i - 2.5))
        train_df = train_df[train_df["rating"] >= 0]
        users = train_df[["user","item"]].groupby(["user"]).size()
        users = users[users >= self.frame_size + 1]
        train_df = train_df[train_df["user"].isin(users.index)]
        user_rated = train_df.sort_values(by=["user", 'timestamp']).drop(columns=["timestamp"]).set_index("user")

        self.user_rated = user_rated

        print("Done update ratings %s" % (datetime.datetime.now(),))

        batch_bar = tqdm(total=self.get_total_user())

        for user, df in self.user_rated.groupby(level=0):
            batch_bar.update(1)
            size = max(len(df) - self.frame_size, 0)
            for idx in range(0, size):
                if np.random.rand() > 0.2:  # intake percents
                    continue
                ratings = df[idx:self.frame_size + idx + 1]
                ratings = ratings[["item", "rating"]].values

                movie_chosen = ratings[:, 0][-1]
                films_watched = ratings[:, 0][:-1]
                ...

Above process took Start update ratings 2019-04-09 16:44:01.957343 Done update ratings 2019-04-09 16:44:24.071254

Instead of manually looping over each element to get it's index

id_to_index = dict([(u, i) for i, u in enumerate(pd.read_csv('../data/ml-20m/movies.csv')['movieId'].values)])

You can use

movies_series = pd.read_csv('../data/ml-20m/movies.csv')["movieId"].reset_index(drop=True)
movies_series = pd.Series(movies_series.index.values, index=movies_series)
id_to_index = movies_series.to_dict()

This is my first issues on github, and i want to contribute to your awesome project.

awarebayes commented 5 years ago

Thanks, haven't start doing the data loader yet, the one I did is deprecated. But I will make sure I have used your suggestions! The update will probably come out today. Thanks again, I'd been running into major performance issues

awarebayes commented 5 years ago

Tho line train_df = train_df[train_df["rating"] >= 0] is wrong because it neglects all the negative rating reviews which are really helpful for the critic. DDPG is supposed to work with negative rewards and learn 'what not to do' from them

truongthanh96 commented 5 years ago

You are right, I have double checked the original paper and found the difference. I will come back with new code as I am implementing that paper myself soon. On paper they also use epsilon greedy to generate random action, and they re-rank all remaining items in each step and select the highest one. Though because of fixed N, I have to make sure user begin state include his K positive item, not sure his first N positive item based on timestamp or just random N positive item from his session.

Observe current state s t = f(Ht), where Ht = {i1 , ..., in }

awarebayes commented 5 years ago

Done! I have also started working on an asynchronous version but it doesn't seem to be functioning properly just yet. If you need code it is located in the notes under Async Dataloader. I used aiostream with async generators and streams for synchronizing. Everything is just copy paste. My vision is that the ratings need to be split by users and fed into async workers that will yield the mini-batches into the stream