towhee-io / towhee

Towhee is a framework that is dedicated to making neural data processing pipelines simple and fast.
https://towhee.io
Apache License 2.0
3.21k stars 247 forks source link

[DesignProposal]: DataCollection API #642

Closed reiase closed 2 years ago

reiase commented 2 years ago

Background and Motivation

  1. For data scientists, towhee should accelerate their work, but stay close with their favorite toolkits;
  2. For machine learning engineers, towhee should be scalable and as simple as a building block of the production system;

Features

  1. Pythonic functional API;
  2. Dispatch function to towhee operators; #712
  3. Better error handling for large scale datasets; #649
  4. Better performance by async execution; #706
  5. Mixin extensions; #672
  6. DAG execution; # TOTO
  7. Execution backends; # TODO

Design

Design Goal:

  1. Pythonic API: being natural to those python speakers;
  2. Connective: integrating well with python data science eco-system;

Core API

DataCollection provides a set of functional programming APIs that allows the user to assemble their data processing pipeline by chaining functions and operators. The method-chaining style API is proved to be very powerful in the area of data science and is widely accepted by the community.

Core API of DataCollection is designed to stay close with scala's and spark's collection API, with additional improvements for accelerating some work in python.

  1. Creating

    DataCollection is always created from Iterables

    dc = DataCollection([ list | tuple | Iterator | Iterable ])

Combining small data collections will produce a larger one, this is useful if the data is collected from different sources:

small_dc_1 = DataCollection(range(10)) # [0...9]
small_dc_2 = DataCollection(range(10, 20)) #[10...19]
large_dc = small_dc_1 + small_dc_2 # [0...19]

Data collections can be ziped to a wider one:

dc_1 = DataCollection(range(10)) # [0...9]
dc_2 = DataCollection(range(10, 20)) #[10...19]

dc_1.zip(dc_2) # [(0, 10), (1,11)...(9,19)]

Zipping two data collection is useful when benchmarking a model on datasets, which needs predictions and labels to calculate model metrics.

  1. Collection API

    Collection API is a set of high-order functional APIs that apply function/operator to the collection and produce a new data collection.

    map: apply function/operator to each elements and generate a new collection of the results
    dc.map(lambda x: x+1) # apply a lambda function

    multi-line function is also supported by decorator syntax:

    @dc.map
    def result(x):
    return x+1

Notice that function name is used as variable name for result data collection. In another word, result is a data collection, not a function.

filter: apply a filter to each element and generate a new collection that only contains filtered elements.
dc.filter(lambda x: x<3)
  1. Advanced Collection API

    dc = DataCollection(range(10))
    dc.batch(5) # [[0,1,2,3,4], [5,6,7,8,9]]
    dc.rolling(3) #[[0,1,2], [1,2,3], ..., [7,8,9]]
    dc.batch(2).flaten() # [1, 2, 3, ..., 9], unbatch the data collection
  2. Chaining functions

    There are two ways for chaining functions/operators

  3. chained collection API calls

    dc
    .map(lambda x: x+1)
    .map(lambda x: x*2)
  4. pipeline operator >>

    (
    dc 
    >> lambda x: x+1
    >> lambda x: x*2
    )

Use DataCollection with towhee

DataCollection support call dispatcher, which will redirect function on data collection to extended APIs. For examples:

@register_dispatcher
def add_1(x):
    return x+1

dc.add_1() # [add_1(x) for x in dc]

We can dispatch function calls to towhee operators, makes it much easier to assemble a towhee pipeline

dc = DataCollection(load_a_set_of_image())
dc.towhee.decode_image() # op_name = 'towhee/decode_image' 
  .towhee.image_embedding(model_name='resnet101') # op_name = 'towhee/image_embedding'
  .towhee.vector_norm(norm='l2') # op_name = 'towhee/vector_norm'

Examples for building an image search engine with towhee

  1. build image index
    dc = DataCollection.glob('path/*.jpg')
    dc.load_image()
    .towhee.image_embedding_pipeline(model='resnet50')
    .batch(128) # batched insert is usually more effective
    .to_milvus('https+milvus://host:port/db/db_name/table_name')
  2. query
    dc = DataCollection.from_files('query.jpg')
    result = dc.load_image()
    .towhee.image_embedding_pipeline(model='resnet50')
    .query_milvus('https+milvus://host:port/db/db_name/table_name', top_k=20)
    .sortBy(lambda x: x.distance)
    .to_list()

    Pros and Cons

No response

Anything else? (Additional Context)

No response

reiase commented 2 years ago

This API requires further review and discussion @GuoRentong @fzliu

filip-halt commented 2 years ago

@reiase Is there any way we can split the high level pythonic pipeline creation and the datacollection? I feel like a user should be able to create a pipeline without having to use the extra complexity that comes from datacollection.