Texera / texera

Collaborative Machine-Learning-Centric Data Analytics Using Workflows
https://texera.github.io
Apache License 2.0
163 stars 72 forks source link

Operator Materialization I #1257

Closed jiashu-z closed 2 years ago

jiashu-z commented 3 years ago

Implement operator materialization so that user can cache some operators for future use. The first step is to implement a centralized materialization.

The second step is to impelement memory management of operator materialization, so that OOM caused by materialization is avoided.

jiashu-z commented 3 years ago

Currently, I'm implementing memory managed materialization with JCS. It is on this branch. TODO in this phase:

jiashu-z commented 3 years ago

Materialization Discussion Summary

2021 Aug 11

Introduction

In texera, materialization of intermediate results and operator result service consume a lot of memory and may cause out-of-memory issues. A memory management mechanism is needed to limit memory usage and to spill some data from memory to cheap and big storage devices, typically disks, when the size of data exceeds memory capacity. To implement such a mechanism, we had a discussion about the storage layer, including data format (serialization and compression) and storage engine, based on the typical use cases of Texera.

Use Cases

The storage layer mainly handles three types of requests from the computation layer.

  1. Batch read

  2. Batch write

  3. Random read

Among them, 1 and 2 are used for materialization and result service, and 3 are used for paginated read from operator result service. The granularity of random read is decided by the page size and the row width. Based on the use cases, we proposed some designs and selected some of them for further evaluation.

Individual Storage Engine

This kind of solutions uses an individual storage engine which runs as a process. There are several kinds of storage engines we could use.

RDBMS

Currently, our focus for evaluation is not RDBMS, since it is not likely that we would eventually choose engines like MySQL or PostgreSQL.

Document Storage

KV Storage

Others

Embedded Storage Engine

We also discussed some SDKs for cache and storage. We decided that we would not use embeeded storage engines for now.

Cache SDK

Storage SDK

Cache Eviction Algorithm

Small LRU + big MRU (Most Recently Used), because most requests are linear scan.

Conclusion

Since storage is not our primary and unique feature, what we need for now is a simple but working solution, without too much complex implementation. Therefore, we decided to use an individual storage engine, and we are evaluating different storage engines. We plan to find a storage engine that can be deployed in a distributed manner. We are not limited to the mentioned storage engines, so feel free to add more storage engines to evaluate.

This is a repository for storage engine evaluation.

jiashu-z commented 3 years ago

Materialization Discussion Summary

2021 Aug 11

Introduction

In texera, materialization of intermediate results and operator result service consume a lot of memory and may cause out-of-memory issues. A memory management mechanism is needed to limit memory usage and to spill some data from memory to cheap and big storage devices, typically disks, when the size of data exceeds memory capacity. To implement such a mechanism, we had a discussion about the storage layer, including data format (serialization and compression) and storage engine, based on the typical use cases of Texera.

Use Cases

The storage layer mainly handles three types of requests from the computation layer.

  1. Batch read
  2. Batch write
  3. Random read

Among them, 1 and 2 are used for materialization and result service, and 3 are used for paginated read from operator result service. The granularity of random read is decided by the page size and the row width. Based on the use cases, we proposed some designs and selected some of them for further evaluation.

Individual Storage Engine

This kind of solutions uses an individual storage engine which runs as a process. There are several kinds of storage engines we could use.

RDBMS

  • RDBMS with connectors like JDBC
  • Serialization: Serialize the list of tuples as a table. No compression
  • One operator, one table
  • Random read: Use an auto-increment key generated by RDBMS with index as the key used for page ordering
  • Parameters are tuned to optimize the performance.
  • MySQL
  • PostgreSQL

Currently, our focus for evaluation is not RDBMS, since it is not likely that we would eventually choose engines like MySQL or PostgreSQL.

Document Storage

  • Storage engine with connectors like JDBC
  • Serialization: Serialize the list of tuples as Json objects. Compression is not mandatory.
  • One operator, multiple objects (to avoid 64MB limit for one single object). Maintain metadata elsewhere.
  • Random read: Use an auto-increment key generated by the user's code (or by the engine if it has such a function) with index as the key used for page ordering.
  • MongoDB

KV Storage

  • Storage engine with connectors
  • Seralization: Serialize the list of tuples as bytes or string. Compression is not mandatory.
  • One operator, multiple KV pair. Each row is a KV pair.

    • Random Read: Handled by user's code. Maintain metadata elsewhere.
  • One operator, one KV pair. Redis supports list as the value.

    • Random Read: Not true random read, because list in Redis is implemented using a linked list. See here.
  • Redis

Others

  • Greenplum I'm not familiar with this kind of databases, so I'll take a look at it.

Embedded Storage Engine

We also discussed some SDKs for cache and storage. We decided that we would not use embeeded storage engines for now.

Cache SDK

  • Apache JCS: Memory cache + disk storage library. No memory size managment. Not very popular.
  • Caffeine: Memory cache.
  • Guava: A very big library which includes modules for cache.

Storage SDK

  • HaloDB: KV storage engine. Use memory to store index and use disk to store data. Not actively maintained. Download issue with maven.
  • MapDB: KV Storage engine with memory cache.

Cache Eviction Algorithm

Small LRU + big MRU (Most Recently Used), because most requests are linear scan.

Conclusion

Since storage is not our primary and unique feature, what we need for now is a simple but working solution, without too much complex implementation. Therefore, we decided to use an individual storage engine, and we are evaluating different storage engines. We plan to find a storage engine that can be deployed in a distributed manner. We are not limited to the mentioned storage engines, so feel free to add more storage engines to evaluate.

This is a repository for storage engine evaluation.

Currently, the focus of our evaluation is MongoDB. We are deploying it on google cloud to test its usability and performance.

jiashu-z commented 3 years ago

Currently, for memory management and distributed storage, we plan to use mongodb. We will finish the implementation and start some tests soon.

jiashu-z commented 3 years ago

Currently, we have implemented most of the main functions of materialization, and we are merging and testing the implementation. After we merged our branch with the master branch, we found several problems.

  1. Log issue on the frontend. Currently you can only see very little log, which makes debugging very hard.
  2. When you click a cached operator on the frontend, the backend does not receive requests. So although the materialization is working on the backend, you are unable to view the materialized results on the frontend. This is a screenshot of the log issue. 截屏2021-08-28 下午8 15 02

There are also some known TODOs we are focusing on.

jiashu-z commented 3 years ago

cache status update demo operator_materialization_demo_1

jiashu-z commented 3 years ago

Operator Materialization Update

Operator materialization is also called operator cache. We might mix these two terminologies but they have the same meaning.

1326 #1327 #1328 #1329

These pull requests are our current implementation of operator materialization. Here is a summary of the design and implementation.

Usage

Operator materialization should be used for some expensive operators, like sentiment analysis, because for these operators, retrieving the materialized results is much faster and generating the results again. Notice that materalization changes the workflow being executed on the backend, so the operators (materialized or not) should not have side effects. The load balancing of the cluster might be influenced as well, because currently, materialization and retrieval operators are executed on the master.

Storage

Currently, we support these ways of storage.

These storages are implemented in the sub-classes of OpResultStorage. You are allowed to inherit this class to create other storages. Currently, we support these APIs.

Backend

To implement operator materialization, we first define a pair of operators, CacheSourceOp and CacheSinkOp. On the backend, we maintain the results and metadata of materialization across different executions in a same session. a WorkflowRewriter is used to determine the validity of materialization, add CacheSourceOp and CacheSinkOp to the original workflow, remove the operators from the original workflow that are replaced by CacheSourceOp, and update the results and metadata. Here, we do not modify the original worklfow directly. Instead, we create a new, rewritten workflow. On the backend, there are several materialization states for an operator.

When the session is closed, all the materializations and metadata of this session are removed. When an operator transits from materialized and valid to materialized but not valid during rewriting, the outdated materialization of this operator is also cleared, and the metadata is updated accordingly. Currently, we do batch read and batch write from and to OpResultStorage, and the CacheSinkOp and CacheSourceOp only run on the master. We've also supported instant cache status update. When the user changes the workflow on the frontend without executing it, the frontend sends CacheStatusUpdateRequest to the backend, and the backend sends CacheStatusUpdateEvent to the frontend to update the materialization status. The validity of materialization is still determined by the WorkflowRewriter, but in this case, the rewriter should be regarded as a closure because it does not modify materialization and metadata of this session.

Frontend

On the frontend, the user is able to cache or un-cache an operator, and is able to see the paginated materialized results.

zuozhiw commented 2 years ago

Group discussion 1/13: this task is completed