Open cyita opened 3 years ago
From mlperf team: To avoid the all-to-all time for syncing embedding inputs, each worker needs to have the full data of a column (multiple columns) corresponding to its embedding(s). Basically, a collect operation to gather all the data of a column to a certain node.
Operations for RecSys
names |
---|
[user1, user2, user3] [user1, user4] [user3, user5, user6]
If possible, assign larger integer for string that is less frequently appear.
{'Quote': 0,'Retweet': 1,'TopLevel': 2} # BTW do we need to fix column names?
If the space of categories is fixed and known, it will be more efficient to directly pass a map for encoding, instead of using gen_string_idx.
Operations for Wechat Challenge
[x] groupby and aggregation | animal | age | height |
---|---|---|---|
cat | 1 | 6 | |
dog | 8 | 30 | |
cat | 2 | 10 | |
cat | 5 | 12 | |
dog | 3 | 13 |
# Example from the pyspark documentation
# '''
# function: pyspark.sql.DataFrame.columns
# return: all column names as a list.
# '''
print(df.columns)
>>> ['animal', 'age', 'height']
seems pyspark cannot support drop_duplicates_with_last (which keeps the last duplicate in the table) because there is no notion of index in pyspark.
# Example from pyspark documentation # ''' # function: pyspark.sql.DataFrame.drop_duplicates(subset=None) # return: a new DataFrame with duplicate rows removed, optionally only considering certain columns. # ''' df = sc.parallelize([ Row(name='Alice', age=5, height=80), Row(name='Alice', age=5, height=80), Row(name='Alice', age=10, height=80)]).toDF() df.drop_duplicates().show() >> +-----+---+------+ | name|age|height| +-----+---+------+ |Alice| 5| 80| |Alice| 10| 80| +-----+---+------+ df.dropDuplicates(['name', 'height']).show() >> +-----+---+------+ | name|age|height| +-----+---+------+ |Alice| 5| 80| +-----+---+------+
# Example from pyspark documentation
# '''
# function: pyspark.sql.DataFrame.sample(withReplacement=None, fraction=None, seed=None)
# param withReplacement: bool; optional; Sample with replacement or not (default False).
# param fraction: float; required; Fraction of rows to generate; range [0.0, 1.0].
# param seed: int; optional; Seed for sampling (default a random seed).
# return: a sampled subset of this DataFrame.
# '''
df.sample(fraction=0.5, withReplacement=True, seed=2021)
>>> +------+---+------+
|animal|age|height|
+------+---+------+
|cat | 1| 6|
|dog | 3| 13|
|cat | 1| 6|
+------+---+------+
e.g. Table as above
"tbl.col_to_list("age")" should return [1, 8, 2, 5, 3]
Dummy pipeline unsupported operations:
Operations for Booking Challenge
[x] read_csv: read csv file and convert into FeatureTable
[x] union: append more date row-wisely
# Example for Feature table union method
# Find the union of two tables according to their columns' name
# :param tbl: feature table
# :return: Feature table
df1.show()
>>> +---+---+
| x | y |
+---+---+
| 1 | a |
| 2 | b |
+---+---+
df2.show()
>>> +---+---+
| y | x |
+---+---+
| c | 3 |
| d | 4 |
+---+---+
df3 = df1.union(df2)
df3.show()
>>> +---+---+
| x | y |
+---+---+
| 1 | a |
| 2 | b |
| 3 | c |
| 4 | d |
+---+---+
[x] append_columns: append a new columns with constant value
# Append the columns with value to table
# :param col: the name of the col
# :param value: value to be append
df.show()
>>> +---+---+
| x | y |
+---+---+
| 1 | a |
| 2 | b |
+---+---+
df.append_columns("z",0)
>>> +---+---+---+
| x | y | z |
+---+---+---+
| 1 | a | 0 |
| 2 | b | 0 |
+---+---+---+
"""
Operations for Booking Challenge
df.show()
>>> +---+---+
| x | y |
+---+---+
| 1 | a |
| 2 | b |
+---+---+
df = df.iloc(x,0,d)
df.show()
>>> +---+---+
| x | y |
+---+---+
| d | a |
| 2 | b |
+---+---+
df = df.iloc(x,[0,1],d)
df.show()
>>> +---+---+
| x | y |
+---+---+
| d | a |
| d | b |
+---+---+
Operations for Booking Challenge
#Factorise the given column and convert to output column
#:param in_col: input columns
#:param out_col: output columns
df1.show()
+---+---+ | x | y | +---+---+ | 1 | a |
| 2 | b | | 3 | a | | 4 | a | | 5 | c | +---+---+ df2 = df1.factorise("y","z") +---+---+---+ | x | y | z | +---+---+---+ | 1 | a | 0 | | 2 | b | 1 | | 3 | a | 0 | | 4 | a | 0 | | 5 | c | 2 | +---+---+---+
Operations for Booking Challenge
[x] sort the value of a table with given column
df.show()
>>> +---+---+
| x | y |
+---+---+
| 1 | a |
| 2 | b |
| 1 | c |
+---+---+
df = df.sort("x")
df.show()
>>> +---+---+
| x | y |
+---+---+
| 1 | a |
| 1 | c |
| 2 | b |
+---+---+
df = df.iloc("x",False)
df.show()
>>> +---+---+
| x | y |
+---+---+
| 2 | b |
| 1 | a |
| 1 | c |
+---+---+
df = df.iloc(["x","y"],[True,False])
df.show()
>>> +---+---+
| x | y |
+---+---+
| 1 | c |
| 1 | a |
| 2 | b |
+---+---+
[x] append the column with given list of index and value pair
df.show()
>>> +---+---+
| x | y |
+---+---+
| 1 | a |
| 2 | b |
| 1 | c |
+---+---+
df = df.append_list("z", [(0, 1), (1, 2), (2, 3)])
df.show()
>>> +---+---+---+
| x | y | z |
+---+---+---+
| 1 | a | 1 |
| 1 | c | 2 |
| 2 | b | 3 |
+---+---+---+
df = df.append_list("h", [(0, 1), (2,3)])
df.show()
>>> +---+---+---+---+
| x | y | z | h |
+---+---+---+---+
| 1 | a | 1 | 1 |
| 1 | c | 2 | null |
| 2 | b | 3 | 3 |
+---+---+---+---+
Operations for Booking Challenge
df.show()
>>> +---+---+
| x | y |
+---+---+
| 1 | a |
| 2 | b |
| 1 | c |
+---+---+
df = df.shift("x","z")
df.show()
>>> +---+---+---+
| x | y | z |
+---+---+---+
| 1 | a | null |
| 1 | c | 1 |
| 2 | b | 2 |
+---+---+---+
df = df.shift("x","z",2)
df.show()
>>> +---+---+---+
| x | y | z |
+---+---+---+
| 1 | a | null |
| 1 | c | null |
| 2 | b | 1 |
+---+---+---+
df = df.shift("x","z",2,-1)
df.show()
>>> +---+---+---+
| x | y | z |
+---+---+---+
| 1 | a | -1 |
| 1 | c | -1 |
| 2 | b | 1 |
+---+---+---+
Also operations to handle timestamp, including:
Also operations to handle timestamp, including:
- [ ] f.from_unixtime
- [ ] f.hour
- [ ] f.minute
- [ ] f.second
will do tmr
Also operations to connect Pandas Dataframe, sort
Need persist table to avoid iterative computation.
Operations supported by NVTabular:
def transform_python_udf(self, in_col, out_col, udf_func)
StatOperator
TODO