Netflix / curator

ZooKeeper client wrapper and rich ZooKeeper framework
http://netflix.github.com/curator
Other
2.16k stars 435 forks source link

CF Client doesn't recover if there is a failure in the ZK cluster #163

Closed connectwithnara closed 12 years ago

connectwithnara commented 12 years ago

I have two tests running same logic using ZK API and CF API. CF API doesn't quite recover if there is a failure in the ZK cluster.

I have attached the test program. To see the problem, you could try the following:

Now try the same with testFailoverUsingCF. You will notice, unless the CF is closed and a new instance created, it doesn't recover from the situation.

import com.netflix.curator.framework.CuratorFramework; import com.netflix.curator.framework.CuratorFrameworkFactory; import com.netflix.curator.framework.api.BackgroundCallback; import com.netflix.curator.framework.api.CuratorEvent; import com.netflix.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; import org.junit.Test;

import java.util.Date;

public class FailoverTests {

private class Count {
    int value;
}

@Test
public void testFailoverWithZK() throws Exception {

    final ZooKeeper zk = new ZooKeeper(
            "host1:2181,host2:2181,host3:2181", 15000, new Watcher() {
        @Override
        public void process(WatchedEvent event) {
            info("Event: " + event.getType());
        }
    });

    while (true) {

        for (int t=0; t<5; t++) {

            Thread thread = new Thread(new Runnable() {
                @Override
                public void run() {

                    for (int batchIndex=0; batchIndex<1000; batchIndex++) {

                        try {

                            long startTime = System.currentTimeMillis();

                            final Count count = new Count();
                            int noOfOps = 100;

                            for (int i=0; i<noOfOps; i++) {

                                zk.setData(
                                        "/testNode",
                                        new Date().toString().getBytes(),
                                        -1,
                                        new AsyncCallback.StatCallback() {
                                            @Override
                                            public void processResult(int rc, String path, Object ctx, Stat stat) {
                                                count.value++;
                                                //info("Upate result:" + rc);
                                            }
                                        }, null);
                            }

                            while (true) {

                                if ((System.currentTimeMillis() - startTime) > (30 * 1000) ) {
                                    break;
                                }

                                if (count.value == noOfOps) {
                                    break;
                                }

                                Thread.sleep(500);
                            }

                            info("Done batch " + batchIndex);

                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }

                }
            });

            thread.start();

        }

        Thread.sleep(60 * 1000);

        info("Starting again.....");

    }

}

CuratorFramework cf;
boolean connectionRestarted = false;

@Test
public void testFailoverUsingCF() throws Exception {

    cf  = CuratorFrameworkFactory.newClient(
            "host1:2181,host2:2181,host3:2181",
            new ExponentialBackoffRetry(5000, 5));
    cf.start();

    while (true) {

        for (int t=0; t<5; t++) {

            Thread thread = new Thread(new Runnable() {
                @Override
                public void run() {

                    for (int batchIndex=0; batchIndex<1000; batchIndex++) {

                        try {

                            long startTime = System.currentTimeMillis();

                            final Count count = new Count();
                            int noOfOps = 100;

                            for (int i=0; i<noOfOps; i++) {

                                cf.setData()
                                        .inBackground(new BackgroundCallback() {
                                            @Override
                                            public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
                                                count.value++;

// logger.info("Update result {}", event.getResultCode()); } }) .forPath("/testNode");

                            }

                            while (true) {

                                if ((System.currentTimeMillis() - startTime) > (30 * 1000) ) {

// if (!connectionRestarted) { // connectionRestarted = true; // // cf.close(); // cf = CuratorFrameworkFactory.newClient( // "host1:2181,host2:2181,host3:2181", // new ExponentialBackoffRetry(5000, 5)); // cf.start(); // }

                                    break;
                                }

                                if (count.value == noOfOps) {
                                    break;
                                }

                                Thread.sleep(100);
                            }

                            info("Done batch " + batchIndex);

                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }

                }
            });

            thread.start();

        }

        Thread.sleep(60 * 1000);

        info("Starting again.....");

    }
}
private void info(String message) {
    System.out.println(message);
}

}

Randgalt commented 12 years ago

I can't reproduce this. Here's the modified test (to use Curator's testing cluster):

https://gist.github.com/3701773

Here's the utility class used to find the leader:

https://gist.github.com/3701779

Randgalt commented 12 years ago

Per your email to me - this was a side effect of retries. So, I'm closing this.