gearpump / gearpump

Lightweight real-time big data streaming engine over Akka
https://gearpump.github.io/gearpump/
Apache License 2.0
763 stars 153 forks source link

Gearpump Storage Framework #1197

Closed whjiang closed 8 years ago

whjiang commented 9 years ago

In general, a Gearpump application requires following storage support:

  1. Jar-file storage to store the application jar file(s).
  2. application log. Currently we store logs in each node which makes application log analysis difficult.
  3. application metrics.
  4. application configuration.
  5. data source offset store (for at-least once semantics of streaming application)
  6. application state checkpoint store (for transaction semantics)

The general idea is:

  1. Provide a storage system satisfied the above requirements.
  2. Assume this storage is highly available. That means, it is user's duty to provide such kind of a storage. For test purpose, user can use some non-HA storage system. But, in product use, it shall be HAed.
  3. Isolate usage from implementation. That is, Gearpump doesn't rely on Hadoop-common or HDFS or one specific implementation to provide such storage. User is free to implement its own storage.
  4. This is a daemon provided functionality and can be used by every Gearpump application.
  5. This storage shall provide data retentation functionality and access control.
  6. This storage provides a set of API to meet the above requirements instead of one low-level API.
  7. User can override the system setting to provide dedicated implementation for certain sub-storage system, e.g. chekcpoint store.
  8. Akka replication shall store minimal info for an application and leave the majority to this storage system. I.e. akka replication is more like a seed to this storage system.
  9. In release, each storage implementation (e.g. storage-hdfs) is a standalone module/artifact.

The draft of this storage looks like (quite initial, tentative to change):

trait Storage {
    def createAppStorage(AppName, AppId) : AppStorage
    def getAppStorage(AppId) : Option[AppStorage]
}

trait AppStorage {
    def open
    def close
    def getJarStore: JarStore
    def getMetricsStore: AppMetricsStore
    def getKVStore: KVStore
    def getLogAppender: LogAppender
    def getConfiguration(ProcessorId): UserConfig
    def setConfiguration(ProcessorId, UserConfig)
}

trait JarStore {
    def copyFromLocal(localPath, remotePath)
    def copyToLocal(remotePath, localPath)
}

///assume K is sortable
trait KVStore[K,V] {
    def append(key, value)
    def read(key): Try[Option[V]]
}
kkasravi commented 9 years ago

Perhaps the different types of storage could share a common API or aspects of collections so AppStorage doesn't need to change every time a new type of storage is added. Also at least with JarStore see #1204 where lack of streaming causes Master to exit.

Should AppStorage contain write, read methods?

whjiang commented 9 years ago

Hi @kkasravi, I agree that sharing common API is one goal. This is why the checkpoint store and offset store in current codebase are unified.

The read/write methods are for KVStore instead of AppStorage. AppStorage is a quite general concept. It includes everything an application needs for storage (except connector).

clockfly commented 9 years ago

+1 for the ideas. The difficulty will be how to make the storage implementation plugable. So that user can choose the storage system they want, like casandra, hdfs, or hbase...

netcomm commented 8 years ago

maybe i lose something, why not just base on akk persitence, it now support hbase/casandra/... plug-ins.

clockfly commented 8 years ago

@netcomm

I am still trying to grasp the benefits and limitation of akka persistence.

Here are some consideration to select a persistence layer:

  1. As a streaming platform, we want the classpath to be as short as possible. We don't want the user app to fail if it depends on a different version of HBase or Cansandra. Using akka persistence will create a class path dependency on external storage system.
  2. Akka persistence requires sequential read/write. While for our case, we need a KV store for to store application data like clock value.
  3. For the Master layer, we use akka cluster to implement master HA, and will sync the data between masters. I don't know how the data replication works in akka persistence.
  4. The akka persistence serialization framework is not very performant.

I need further investigation on akka persistence.

netcomm commented 8 years ago

hi @clockfly

. As a streaming platform, we want the classpath to be as short as possible. We don't want the user app to fail if it depends on a different version of HBase or Cansandra. Using akka persistence will create a class path dependency on external storage system. ----- classpath? can more detail

. Akka persistence requires sequential read/write. While for our case, we need a KV store for to store application data like clock value. ----- yes, because Akka persistence implemention host on actor, so inside actor it is sequential, but you can choose leveldb or whatover more power persistence plug-ins, maybe just see persistence layer, it is not sequential.

. For the Master layer, we use akka cluster to implement master HA, and will sync the data between masters. I don't know how the data replication works in akka persistence. ----- i find gearpump have used akkas CRDTS. because CRDTS in memory,then you can just save them use akka persistence.

. The akka persistence serialization framework is not very performant. ----- sorry,No actual data not say maybe i loss something,please correct it.

clockfly commented 8 years ago

@netcomm

----- classpath? can more detail

For example, if you want to run a job over gearpump, the job itself has its own java classpath, gearpump engine platform has its own classpath, these two may conflict as the app executor JVM contains both classpath. So in gearpump, we shaded most dependencies. By if we introduce heavy library like casandra, then it maybe too difficult to do the classpath shading.

----- yes, because Akka persistence implemention host on actor, so inside actor it is sequential, but you can choose leveldb or whatover more power persistence plug-ins, maybe just see persistence layer, it is not sequential.

I am not familiar with akka persistence. Can you give us some doc link about "non-sequential" api? Can you find some examples?

 i find gearpump have used akkas CRDTS. because CRDTS in memory,then you can just save them use akka persistence.

It will be great if akka-persistence can do this.

. The akka persistence serialization framework is not very performant.
----- sorry,No actual data not say

There are two use cases for us. The first one is store some cluster data that not requiring high performance, for this part, I think may be we can use akka-persistence, if akka-persistence can be usaged as a generic KV storage as you said.

The second use case for use require high performance writing. We need to store application level checkpoint files. In the early old time of Gearpump, we tested the akka serialization performance, not very good. That is why we coin our own serialization implementation. I think if akka-persistence layer has strong binding to akka-serialization framework, then it may becomes a problem. Probably the binding is not that strong, then we can bypass it.

We are interested with the "non-sequential" usage of akka persistence, please give us some pointers or examples, thanks.

netcomm commented 8 years ago

@clockfly

in the http://doc.akka.io/docs/akka/2.4.1/scala/persistence.html, maybe you can find some useful info,.e.g. "The persistAsync method provides a tool for implementing high-throughput persistent actors. It will not stash incoming Commands while the Journal is still working on persisting and/or user code is executing event callbacks." if this is not what you want, please let me know,maybe i can help something.

clockfly commented 8 years ago

@netcomm, I opened #1822, are you interested in writing a demo project for akka-persistence?

netcomm commented 8 years ago

ok,i try it.

manuzhang commented 8 years ago

moved to https://issues.apache.org/jira/browse/GEARPUMP-63