wsc0751 / jetlang

Automatically exported from code.google.com/p/jetlang
0 stars 0 forks source link

Unsubscribe not supported #2

Closed GoogleCodeExporter closed 9 years ago

GoogleCodeExporter commented 9 years ago
One thing missing (that is defined in pubsub specification) is the
possibility to unsubscribe from a channel.

Original issue reported on code.google.com by sthru...@gmail.com on 16 Apr 2009 at 12:02

GoogleCodeExporter commented 9 years ago
Unsubscribe is fully supported for every subscription or when you dispose a 
fiber all
subscriptions will be removed.

Single Subscription:

@Test
    public void unsubscribe() throws InterruptedException {
        Fiber fiber = new ThreadFiber();
        fiber.start();
        final CountDownLatch reset = new CountDownLatch(1);
        final CountDownLatch two = new CountDownLatch(2);
        Callback<String> runnable = new Callback<String>() {
            public void onMessage(String message) {
                reset.countDown();
                two.countDown();
            }
        };
        Channel<String> channel = new MemoryChannel<String>();
        Disposable unsub = channel.subscribe(fiber, runnable);
        channel.publish("one");
        Assert.assertTrue(reset.await(5000, TimeUnit.MILLISECONDS));
        unsub.dispose();
        channel.publish("two");
        Assert.assertFalse(two.await(10, TimeUnit.MILLISECONDS));
        fiber.dispose();
    }

Original comment by mike.ret...@gmail.com on 17 Apr 2009 at 9:57