telefonicaid / fiware-cygnus

A connector in charge of persisting context data sources into other third-party databases and storage systems, creating a historical view of the context
https://fiware-cygnus.rtfd.io/
GNU Affero General Public License v3.0
65 stars 105 forks source link

HA issues #92

Open frbattid opened 10 years ago

frbattid commented 10 years ago

(edited after nenaming the issue title)

A. There exist several potential failure points within a transaction that must be analyzed:

Orion -(1)-> source -(2)-> channel -(3)-> sink -(4)-> HDFS (or MySQL or CKAN)
             |_________ Cygnus agent ________|

B. In addition to the failure points within a transaction, the Cygnus agent components may fail themselves.

  1. Before Flume 1.0 there was a Flume Master component in charge of monitoring the healthy of the other components through heartbeats. This is no more used since Flume 1.0 and we should study if an alternative mechanism replaces it or must be re-implemented by us if found useful.
  2. Maybe this kind of "deep" monitoring is not necessary and it is enough to monitor if the entire Cygnus agent crashes; in that case a redundant Cygnus agent starts running (active/passive configuration).

C. Regarding the channel implementation, we are currently using a memory-based channel whose information is lost if the Cygnus agent crashes.

  1. File-based channels have been proposed since they ensure the data is not lost within an agent. Using them seems to be a matter of configuration.
  2. We must study how this channels could be shared among several sinks running within different Cygnus processes.

D. Regarding scalability, a simple load balancer (active/active) configuration is enough. In that case, if a Cygnus agent crashes, its load is moved to the remaining one.


Interesting documentation:

fgalan commented 10 years ago

Related with this, @mrutid found the following (https://cwiki.apache.org/confluence/display/FLUME/Home%3bjsessionid=944F516714CC72DF17C3D89B996D7B37):

Apache Flume is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of log data. Its main goal is to deliver data from applications to Apache Hadoop's HDFS. It has a simple and flexible architecture based on streaming data flows. It is robust and fault tolerant with tunable reliability mechanisms and many failover and recovery mechanisms. It uses a simple extensible data model that allows for online analytic applications. Please click here for the user guide.

which I guess is a kind of confirmation that Flume (when properly use) could implemente HA.

frbattid commented 10 years ago

The first comment has been edited, please have a look on it.

frbattid commented 10 years ago

Good news:

Flume manages the concept of Sink Processor. This processor is in charge of managing a pool of sinks that can be used in several ways depending on the configuration:

By default, no sink processor is used (as currently doing in Cygnus).

fgalan commented 10 years ago

Based in your analyzis I think it would be intersesting to develop a Rush-based retry for notifications at Orion (issue https://github.com/telefonicaid/fiware-orion/issues/484)

frbattid commented 10 years ago

More interesting things regarding the channels:

My bet is to use the JDBC channel since:

fgalan commented 10 years ago

I agree, JDBC channel sounds much better than file or hybrid channels. As far as I remember, Derby DB is a in-memory database, right?

We should refine a little more the architecture, taking into accoung Cygnus instances and Derby instances. I understand that we would have two Cygnus instance, each one in a different VM, but using the same Derby-based channel. Would Derby instances run in a different VM? Derby supports HA? What about failover and recovery sequences in each one of the components (i.e. Derby failover/recovery, Cygnus agent failover/recovery)?

frbattid commented 10 years ago

Regarding data storage, Derby DB is based in files as said here: http://db.apache.org/derby/docs/10.0/manuals/develop/develop13.html. This is what must be shared.

Regarding the management software of Derby DB, yes, I think there will be 2 instances in memory managing the same above data files. I think this is not a problem since this is only "the logic", not "the data".

Regarding Derby HA, I'm not sure if we are able to grant HA for Derby. This Derby DB is directly managed by Flume and I do not see any configuration parameter allowing us to configure HA. As I wrote above, in past versions of Flume there was a Flume Master checking the other components where OK, but now there is nothing similar. Anyway, this could be going too far, I mean, in the end the DB are files... Let's supose we use a simple file-based channel: should we replicate the file in order to prevent a file system corruption (it can happen)? If the filesystem crashes there is nothing to do in both cases :)

fgalan commented 10 years ago

At this moment, I think we need an arquitecture picture showing Cygnus, Derby, filesystem that back the data, load balancers, VMs, etc. in order to clarify the intentional setup :)

frbattid commented 10 years ago

I've generated this:

ha_architecture

fgalan commented 10 years ago

As next step, I'd suggest to have a new picture, mapping each logical process to a given VM and try to figure out what would happen on selective fails of those VMs.

mrutid commented 10 years ago

"Failure point (2): putting a Flume event into the channel Nevertheless, the channel capability may be reached. It seems Flume simply reports an error in this case. What to do?"

In this case an error must be returned (synchronously) to source system, which should be on charge of retry. Use RUSH may be an option (but please check with JJ), if doesn't fit a retry mechanism should be implemented by Orion.

frbattid commented 10 years ago

The file-based chennel has been tested with success. The following steps were done:

  1. To run Cygnus with a bad HDFS endpoint configuration (the sink port was 14001 instead of 14000) in order a data persistence attempt fails.
  2. To send a notification in order to start a data persistence which is known to fail in advance. The key point is the Flume event containing the data to be persisted is successfully stored within a channel-related file.
  3. To re-run Cygnus with a good HDFS endpoint configuration in order to replay previous not persisted events. It has been checked the events are now persisted.

In addition, a second correct re-run of Cygnus has demonstrated the already replayed events are not replayed again.

The only additional Cygnus configuration needed is:

cygnusagent.channels.hdfs-channel.type = file
cygnusagent.channels.hdfs-channel.checkpointDir = /home/opendata/fiware/fiware-channels/file/checkpoint
cygnusagent.channels.hdfs-channel.dataDirs = /home/opendata/fiware/fiware-channels/file/data
frbattid commented 10 years ago

Regarding JDBC, the same test than above was done, with success as well.

The only additional Cygnus configuration needed is:

cygnusagent.channels.hdfs-channel.type = jdbc
cygnusagent.channels.hdfs-channel.db.type = DERBY
cygnusagent.channels.hdfs-channel.sysprop.user.home = /home/opendata/fiware/flume-channels
frbattid commented 10 years ago

I have opened a question in SOF regarding the remote channel sharing since my tests were involving agents running in the same machine: http://stackoverflow.com/questions/25618024/events-queue-in-a-different-machine-than-the-agent-one

frbattid commented 9 years ago

It seems the current JDBC channel is not suitable for our purposes, thus a custom channel has to be created.

The first obvious design seems to be a JDBC channel that can be accessed in a remote way both by the active and the passive instances of Cygnus, because this feature was the one we were asking for in SOF. Typically, the supporting database for this JDBC channel will be deployed in a third machine.

Nevertheless, due we have to start from the scratch, why not create a smarter custom channel? I mean, JDBC channels are slower than memory ones, and even more slow if the access is not local but remote in a third machine. In addition, what happens if the third machine supporting the dabatabase crashes too?

My proposal is to implement a double event put. The first put will be done in the local memory-based channel (the channel for the current active Cygnus). The second put will be done in the remote JDBC-based channel (the channel for the current pasive Cygnus). When a sink processes a local memory-based event, then this sink has to delete the copy of the event in the remote JDBC-based memory. A specific thread can be created for this purposes, avoiding the sinks wasting time with this additional stuff.

If the active Cygnus crashes, then the passive Cygnus starts working, processing both the new fresh incoming events stored in a memory-based channel, and the events not processed by the old-active-and-now-crashed Cygnus. Of course, the new active Cygnus has to implement the same estrategy, i.e. it has to try to put a copy of the new incoming events in the remote JDBC-based channel (it has to try because the remote Cygnus was the active Cygnus and it can be still in a crashed state).