Netflix / suro

Netflix's distributed Data Pipeline
Apache License 2.0
794 stars 170 forks source link

move away from rxjava Observable to ArrayBlockingQueue for metadata wait... #207

Closed stevenzwu closed 9 years ago

stevenzwu commented 9 years ago

... messages

cloudbees-pull-request-builder commented 9 years ago

suro-pull-requests #200 FAILURE Looks like there's a problem with this pull request

cloudbees-pull-request-builder commented 9 years ago

NetflixOSS » suro » suro-pull-requests #30 ABORTED

cloudbees-pull-request-builder commented 9 years ago

NetflixOSS » suro » suro-pull-requests #33 SUCCESS This pull request looks good

cloudbees-pull-request-builder commented 9 years ago

suro-pull-requests #203 SUCCESS This pull request looks good

stevenzwu commented 9 years ago

got you. Iet me try again.

stevenzwu commented 9 years ago

updated PR with poison-msg for shutdown. verified with a simple test.

shutdown took 1 ms

package com.netflix.suro.sink.kafka;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.netflix.suro.message.MessageContainer;
import com.netflix.suro.message.StringMessage;
import org.junit.Test;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class ExecutorTest {

    private final static MessageContainer SHUTDOWN_POISON_MSG = new StringMessage("suro-KafkaSink-shutdownMsg-routingKey",
            "suro-KafkaSink-shutdownMsg-body");

    @Test
    public void shutdown() throws Exception {
        final BlockingQueue<MessageContainer> metadataWaitingQueue = new ArrayBlockingQueue<MessageContainer>(1024);
        ExecutorService executor = Executors.newSingleThreadExecutor(
                new ThreadFactoryBuilder().setDaemon(false).setNameFormat("KafkaSink-MetadataFetcher-%d").build());
        executor.submit(new Runnable() {
            @Override
            public void run() {
                while(true) {
                    final MessageContainer message;
                    try {
                        message = metadataWaitingQueue.poll(1, TimeUnit.SECONDS);
                    } catch (InterruptedException e) {
                        continue;
                    }
                    if(message == null) {
                        System.out.println("got nothing");
                        continue;
                    }
                    // check poison msg for shutdown
                    if(message == SHUTDOWN_POISON_MSG) {
                        break;
                    }
                }
            }
        });
        long start = System.currentTimeMillis();
        metadataWaitingQueue.offer(SHUTDOWN_POISON_MSG);
        executor.shutdown();
        executor.awaitTermination(10, TimeUnit.SECONDS);
        System.out.println(String.format("shutdown took %d ms", System.currentTimeMillis() - start));
    }
}
cloudbees-pull-request-builder commented 9 years ago

NetflixOSS » suro » suro-pull-requests #34 SUCCESS This pull request looks good

cloudbees-pull-request-builder commented 9 years ago

suro-pull-requests #204 FAILURE Looks like there's a problem with this pull request

stevenzwu commented 9 years ago

do you mean add the ExecutorTest.java?

stevenzwu commented 9 years ago

I guess you meant close latency. added this to TestKafkaSink.java

    /**
     * open will success because localhost is a resolvable hostname
     * note that there is no broker running at "localhost:1"
     */
    @Test
    public void testCloseLatency() throws Exception {
        String sinkstr = "{\n" +
                "    \"type\": \"kafka\",\n" +
                "    \"client.id\": \"kafkasink\",\n" +
                "    \"bootstrap.servers\": \"localhost:1\",\n" +
                "    \"kafka.etc\": {\n" +
                "          \"acks\": \"1\"\n" +
                "      }\n" +
                "}";
        KafkaSink sink = jsonMapper.readValue(sinkstr, new TypeReference<Sink>(){});
        sink.open();
        Assert.assertTrue(sink.isOpened());
        long start = System.currentTimeMillis();
        sink.close();
        long closeLatency = System.currentTimeMillis() - start;
        System.out.println("closeLatency = " + closeLatency);
        // close should take less than 500 ms
        assertTrue(closeLatency < 500);
    }