openzipkin / zipkin

Zipkin is a distributed tracing system
https://zipkin.io/
Apache License 2.0
16.95k stars 3.09k forks source link

MIGRATING_FROM variable to ease upgrades #1679

Open codefromthecrypt opened 7 years ago

codefromthecrypt commented 7 years ago

Sometimes we need to switch storage implementations from one type to another or due to a schema incompatibility. For example, you may switch from mysql to cassandra, or from cassandra to cassandra3, or from elasticsearch 2.x to 6.x. Data migration or maintaining multiple copies of storage code is one way out, but due to the normal aging of trace data, another one is a fan-out proxy. This discusses the fan-out proxy approach.

The proposal is to use a MIGRATING_FROM variable whose value is the query endpoint of an existing install (ex http://zipkin-v129:9411). When in a transition, you leave an old service up until your migration is complete. The master runs the new code and queries across itself and the MIGRATING_FROM until the latter is unset.

For example, given an environment which you want to upgrade, like so

                                .─────────.      
                             ,─'           '─.   
                            ╱                 ╲  
                           ╱                   ╲ 
                          ;                     :
                          :    Applications     ;
   .─────────.             :                   ; 
 ,'           `.           :                   ; 
(   Zipkin UI   )           `.               ,'  
 '─.         ,─'              '─.         ,─'    
    `───┬───'                    │───────'       
        │                        │               
        │             ┌──────────┘               
        │             │                          
┌───────▼─────────────▼──────┐                   
│  ┌───────────┐ ┌─────────┐ │                   
│  │ Query Api │ │Collector│ │                   
│  └──────┬────┘ └──┬──────┘ │                   
│         │         │        │                   
│    ┌────▼─────────▼────┐   │                   
│    │ Elasticsearch 5.x │   │                   
│    │ Storage Component │   │                   
│    │                   │   │                   
│    └───────────────────┘   │                   
│ Zipkin 1.29                │                   
└────────────────────────────┘                   

You'd install a new versions and point users to it, setting MIGRATING_FROM to the old version. At this point, you can immediately cut write traffic to it, or at some point in the future. The new version will query across itself and the other.

                                   .───────.                       
                                ,─'         '─.                    
                               ;   Migrated    :                   
          ┌────────────────────: Applications  ;     .───────.     
          │                     ╲             ╱   ,─'         '─.  
          │       .─────────.    '─.       ,─'   ╱               ╲ 
          │     ,'           `.     `─────'     ;  Applications   :
          │    (   Zipkin UI   )                :                 ;
          │     '─.         ,─'                  ╲               ╱ 
          │        `──┬────'                      ╲             ╱  
          │           │                            '─.       ,─'   
          │           │                               `──┬──'      
┌─────────▼───────────▼──────┐                           │         
│  ┌───────────┐ ┌─────────┐ │ ┌────────────────┐        │         
│  │ Collector │ │Query Api├─┼─┤ Migrating From │        │         
│  └──────┬────┘ └──┬──────┘ │ └────────────┬───┘        │         
│         │         │        │              │            │         
│    ┌────▼─────────▼────┐   │              │            │         
│    │ Elasticsearch 6.x │   │              │            │         
│    │ Storage Component │   │      ┌───────▼────────────▼───────┐ 
│    │                   │   │      │  ┌───────────┐ ┌─────────┐ │ 
│    └───────────────────┘   │      │  │ Query Api │ │Collector│ │ 
│ Zipkin 1.30                │      │  └──────┬────┘ └──┬──────┘ │ 
└────────────────────────────┘      │         │         │        │ 
                                    │    ┌────▼─────────▼────┐   │ 
                                    │    │ Elasticsearch 5.x │   │ 
                                    │    │ Storage Component │   │ 
                                    │    │                   │   │ 
                                    │    └───────────────────┘   │ 
                                    │ Zipkin 1.29                │ 
                                    └────────────────────────────┘ 

This eliminates the need to keep multiple copies of storage code in the same process. Since retention is usually days, you can simply cutoff the MIGRATING_FROM when you are ready.

codefromthecrypt commented 7 years ago

cc @openzipkin/elasticsearch @openzipkin/cassandra @openzipkin/core for input on this

codefromthecrypt commented 7 years ago

Note this technique can also be used to address a transition from one storage type to another. for example, mysql to cassandra or elasticsearch

codefromthecrypt commented 7 years ago

PS another way out would be to write a read api merging proxy. This would work, but it would add another process to manage and upgrade. Yet another way out would be to do the fan-out in javascript in the UI code. This would be a bit complicated especially due to cross origin requests.

semyonslepov commented 7 years ago

How will it work if there is an index with the same date in old and new storage? Scan through both for traceId until the first match is found?

P.S. In general sounds good for me.

codefromthecrypt commented 7 years ago

How will it work if there is an index with the same date in old and new storage? Scan for traceId until the first match is found?

It would speculatively query the old one with the same parameters, and merge any results (which would delay quests a bit)

P.S. In general sounds good for me.

cool!

ImFlog commented 7 years ago

Could it also be a way to migrate from a Span V1 => V2 in the same storage ? Or any breaking change in general (even if it only happens once a in a very while) ?

codefromthecrypt commented 7 years ago

Could it also be a way to migrate from a Span V1 => V2 in the same storage ?

Yes, for example I have this in mind for the transition to Span V2 (which is the same as Elasticsearch 6.x)

Or any breaking change in general (even if it only happens once a in a very while) ?

Indeed

codefromthecrypt commented 7 years ago

As a part of #1674 I'm using an internal type like this which is working so far. This could likely be used for a generic one (where right is an http api span store):

/**
 * This makes redundant read commands, concatenating results if two answers come back, or accepting
 * one if there's an error on the other.
 */
public final class LenientDoubleAsyncSpanStore implements AsyncSpanStore {
  final AsyncSpanStore left;
  final AsyncSpanStore right;

--snip--
  @Override public void getTrace(long traceIdHigh, long traceIdLow, Callback<List<Span>> callback) {
    GetTraceDoubleCallback doubleCallback = new GetTraceDoubleCallback(callback);
    left.getTrace(traceIdHigh, traceIdLow, doubleCallback);
    right.getTrace(traceIdHigh, traceIdLow, doubleCallback);
  }

  static final class GetTraceDoubleCallback extends LenientDoubleCallback<List<Span>> {
    static final Logger LOG = Logger.getLogger(GetTraceDoubleCallback.class.getName());

    GetTraceDoubleCallback(Callback<List<Span>> delegate) {
      super(LOG, delegate);
    }

    @Override List<Span> merge(List<Span> v1, List<Span> v2) {
      List<Span> result = new ArrayList<>(v1);
      result.addAll(v2);
      return MergeById.apply(result);
    }
  }

--snip--

/** Callback that succeeds if at least one value does. The first error is logged. */
abstract class LenientDoubleCallback<V> implements Callback<V> {
  final Logger log;
  final Callback<V> delegate;

  V v;
  Throwable t;

  LenientDoubleCallback(Logger log, Callback<V> delegate) {
    this.log = log;
    this.delegate = delegate;
  }

  abstract V merge(V v1, V v2);

  @Override synchronized final public void onSuccess(V value) {
    if (t != null) {
      delegate.onSuccess(value);
    } else if (v == null) {
      v = value;
    } else {
      delegate.onSuccess(merge(v, value));
    }
  }

  @Override synchronized final public void onError(Throwable throwable) {
    if (v != null) {
      delegate.onSuccess(v);
    } else if (t == null) {
      log.log(Level.INFO, "first error", throwable);
      t = throwable;
    } else {
      delegate.onError(throwable);
    }
  }
}