Azure / azure-event-hubs-java

☁️ Java client library for Azure Event Hubs
https://azure.microsoft.com/services/event-hubs
MIT License
51 stars 60 forks source link

Receiver only receives one event at a time #125

Closed viniciusccarvalho closed 6 years ago

viniciusccarvalho commented 7 years ago

Actual Behavior

I might be misunderstanding how EH works, but I thought that a call to receive would return an Iterable of as many events as maxEventCount was set to. So I've created a simple loop to send 100 events and then receive them, but no matter what I do, I only get one event.

latch = new CountDownLatch(100);
        byte[] payload = new byte[40];
        Arrays.fill(payload,(byte)1);
        long start = System.currentTimeMillis();
        for(int i=0;i<100;i++){
            client.send(new EventData(payload));
        }
        long now = System.currentTimeMillis();
        PartitionReceiver receiver = client.createReceiverSync(EventHubClient.DEFAULT_CONSUMER_GROUP_NAME,
                "0",
                PartitionReceiver.START_OF_STREAM,
                false
        );
        receiver.setReceiveTimeout(Duration.ofSeconds(20));

        receiver.receive(100).thenAccept(this::decrease);
        latch.await();

the decrease method only countdown the latch.

I've also tried receiveSync and it always return one event only.

Is this expected?

Versions

SreeramGarlapati commented 7 years ago

this line to send event is asynchronous: client.send(new EventData(payload)); Right now as soon as an event is sent to event hub - it is delivering it to your receiverHandler.

if your intention is to send all events at once and see all of them coming to your receiverHandler as an iterable - change it to synchronous send by using client.sendSync() - then you will see many events coming in at-once...

viniciusccarvalho commented 7 years ago

Ok, but my code would block until all 100 events are received. I'm assuming that the loop will guarantee delivery of all 100 events.

So perhaps the question is that I can't just receive(100) as I could get just 1 event, but instead call receive(100) on a loop until I read 100 events, and I could be receiving anything from 1...100 events on each call, is that right?

viniciusccarvalho commented 7 years ago

I've modified the code to use sendSync and also to work on a while loop waiting for all events to arrive, although it now works, I still only receive one event at a time, I was expecting that after sending all 100 events, my first poll on receiver.receive(100) would receive more than only one event. Instead, my while loop gets executed 100 times.

SreeramGarlapati commented 7 years ago

hmm.. that doesn't seem normal.. with that very small message size - I usually see atleast 10...

however, this is common when you are running a high latency scenario - like running your receiver from a m/c in east US to EH in Japan etc.. checkout a m/c in the exact same region as of EH - to get the best latency.

(I guess you are already trying the latest 0.14.0)

viniciusccarvalho commented 7 years ago

Yes, Running 0.14.1.

What do you mean m/c? I ran the receiver/producer on my machine sending data to an EH on east coast.

rkrashr commented 7 years ago

I tested the same scenario. Setup: pre-generate 10,000 events to an event hub with 4 partitions then registerEventProcessorFactory with options set to max prefetch of 900 and batch size 300 All 10,000 messages were successfully delivered. I ran the test for 2 configurations: -- consumer is running on my local workstation -- consumer was running on Azure VM

Expected: getting batches of events of ~300 or of that order (well, definitely >100) Actual behaviour: in the plot of received batch vs partition id (0-3) screenshot from 2017-09-13 20-00-40 You can see, that most batches were size one, some were size 13-15 Performance was different in 2 scenarios (I think, because of network bandwidth) -- 400 msg/s vs 1600 msg/s (4 times increase) But still, delivering only 1 message is amazingly inefficient and reduces possible performance by several orders

sjkwak commented 6 years ago

@romanjsonar does test application have any processing logic after receiving events, or doing very little processing inside the ProcessEvents API?

Here are some details about the receive logic. When an Event Processor Host starts, it creates a connection to the service, creates a receiver and sends flow to the service to fetch messages. Upon the receive of the flow, the service will start delivering messages to the client and it will continue delivering messages until flow (prefetch) is exhausted. Messages which are delivered to the client are accumulated in a local cache (prefetch queue) and the client sends new flow (per every 100 consumed messages) to the service to fetch more messages as messages are removed from the local cache to serve receive calls.

When receive API is invoked, the client checks local cache, dequeue messages from the cache and return them to the caller. In case there are no available messages in the local cache, the receive call will wait until either timeout elapses or messages arrive.

For a case where processing rate is much faster than fetching rate (for e.g., slow network or fast processing case), it is likely that receive call will be mostly in wait because of no available messages in the cache, and the receive call will be satisfied as soon as messages arrive. That is, a receive call in wait will be awaken and notified as soon as a message is enqueued in the prefetch queue. And, if a subsequent receive call is invoked before the second message is enqueued to the queue, then the pattern will repeat and receive will keep delivering a single message as there won't be enough time for the cache to accumulate more messages. Note that even set of messages are being delivered from the service, deserializing messages and enqueueing them to the local prefetch queue will happen per a message and hence, if there is a receive in wait, the receive call will be satisfied with a single message.

To minimize the number of receive calls and satisfy max batch count, the client library could let receive call wait till more messages are accumulated in the cache instead of notifying it immediately once a message is available. But, that wouldn't provide much gain in most cases since a) in many cases there will be processing logic with events and it will provide enough time to the prefetch to accumulate messages in the queue, and b) receive call wouldn't necessarily cause remote IO as prefetching logic handles it and for latency sensitive applications, it would be better if receive call delivers messages as soon as they are available instead of unnecessarily holding them. I hope this explains a bit, but if you have questions on the above, let me know.

rkrashr commented 6 years ago

@sjkwak, thanks for the detailed explanation!

As you are explaining in the last paragraph, the assumption is that processing logic is taking more time than delivery of a message. In this case, messages will accumulate in the prefetch queue.

Let me explain what we do and how we consume messages. Our application is a typical data warehouse application, which fetches messages from multiple sources and accumulates them on an internal data store for processing and analysis. It is a part of a larger aggregation pipeline. We don't process data incrementally, all operations act on larger sets. As such, the assumption of long processing time for each delivered message doesn't hold for every application. Our case is an example.

The rate of event processing in our case will always be higher than rate of event receiving. Network performance is the bottleneck in our case. Well, at least, we would like it to be the case and ideally, fill all available network bandwidth.

However, it looks like performance for this use case could be improved, if, for example, consumer on the client side would receive 10 batches of1000 messages instead of 10,000 batches of 1 message. If average message size is 1000 bytes, then one batch of 1000 messages takes ~1MB and will use network more efficiently. Even a consumer on a home connection could receive around 10K msg in 1 sec instead of 25 sec. The other question is, if 1000 messages can be quickly extracted from the event hub queue on the server side and sent to the client side. This should not be a problem. In our test (see message with plot above) all 10,000 messages are already sitting in the event hub. Of course, I don't know details of server-side implementation.

rkrashr commented 6 years ago

@sjkwak, following our discussion, I did another experiment:

Force certain fixed processing time (implemented as a delay/sleep period, no real work) for each delivered batch of messages. Receive all messages from the event hub, and record min/max batch sizes for the received batches during this run. Do several runs for a range of fixed processing times. This resulted in a plot event hub batch size as a function of delay

As you can see, the faster we process a batch of messages, the less efficient it gets. And in reverse, the slower we process each batch, the bigger each batch gets, up to the maximum size of 100 messages. As a side-note, it is up to a client side, how well it can optimize message processing. Ideally, it will take zero time to process a batch. It is almost as Event Hub compensates for efficient processing of messages on the client side by introducing inefficient networking.

SreeramGarlapati commented 6 years ago

hi @romanjsonar

Network latency to receive a batch of Events depends on these 2 factors:

1) what is the average event size? if the event is smaller - receive will be relatively faster than for large events - less data to transmit on wire.. 2) how far is your "code running - which is receiving from EventHub" from the "EventHub"? If the code is running on Azure and within the same data center (region - selected while creating your EventHubs namespace) - you should see the best per message latencies.

600ms to receive 40 events - (assuming you have small events <1k size) - I would guess - you could be running your receiver from your local machine (which will introduce a lot of delays) - If my guess is right - I would urge you to - checkout a VM on azure on the same region and measure your numbers!

rkrashr commented 6 years ago

Hi @SreeramGarlapati, Thanks for additional tips, every bit helps. I already ran this experiment with local vs Azure VM (see my post above with batch size distribution), where at the end I mentioned, that

Performance was different in 2 scenarios (I think, because of network bandwidth) -- 400 msg/s vs 1600 msg/s (4 times increase)

However, even when running on Azure VM, in the same region, performance is still sub-par as batch mostly contains single message. I will test the idea of a streaming scenario, where large volume of messages doesn't reside in a partition, but rather put by a producer in small batches and then promptly extracted by a consumer. It was raised during the discussion.

sjkwak commented 6 years ago

@romanjsonar, thanks for sharing test results. I just want to point out one thing: there is no difference in terms of networking efficiency between slow processing case (batch size 1) and fast processing case (batch size 50). Note that the service will push messages as fast as network bandwidth allows based on flow credit (prefetching) and receive API call doesn't trigger network I/O since fetching messages is done through prefetching logic.

Here is how prefetching works in a simplified version. Let's assume that prefetch value is set to 500. 1) a receiver starts and send a flow (prefetch value, 500) to the service. 2) the service pushes 500 messages to the receiver. 3) the receiver starts getting messages and keeps them in cache as they are delivered. 4) receive API is invoked with batch size of 10, it checks the cache, grabs 10 messages from the cache. If the cache has less than 10 messages, it will get all messages in the cache. 5) as messages are consumed, the receiver will send a new flow (100) every 100 consumed messages. 6) the service receives new flow and it pushes 100 messages to the client.

So, network cost will be the same for both fast processing and slow processing case.

On a separate note, current max prefetch value is low in some cases. We're going to increase the limit from 999 to 5,000.

rkrashr commented 6 years ago

@sjkwak, thanks for the detailed explanation. I agree, this is how ideally it should work. I checked in debugger, in my test (described above)

public CompletableFuture<Iterable<EventData>> PartitionReceiver:: receive(final int maxEventCount)

is receiving only a single message, regardless of the space available in the pre-fetch queue, even if it is empty.

In item 5, you are saying, that if there is enough space in pre-fetch queue, then server side should send 100 messages for your example. Then, of course, network cost will be the same, and bandwidth will be used efficiently. However, I see only 1 message received most of the time. And this is the whole point of these experiments. As far as I understand, the source of this is not in the azure-eventhubs library, that implements consumer, but rather on the server side.

SreeramGarlapati commented 6 years ago

we tested our API extensively several times to repro this scenario but in vain.

Here's what we did:

1) we pre-populated the EventHub with >1M messages of each 100 bytes size- to make sure - messages are always available to receiver and will not contribute to the slowness of receives 2) I checked out an Azure vm in the exact same region as of the EventHub - to make sure latency will be optimal while I receive 3) I ran this baseline test and tried to chart out the events received per receive call over a timeline..

package trails;

import com.microsoft.azure.eventhubs.*;

import java.util.Collection;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

public class ReceiverPerf {

    static Executor executorService = Executors.newWorkStealingPool();

    public static void main( String[] args ) throws Exception    {

        final EventHubClient ehClient = EventHubClient.createFromConnectionStringSync(
                "------ConnectionStringFromAzurePortal------");

        final PartitionReceiver receiver = ehClient.createReceiverSync(
                EventHubClient.DEFAULT_CONSUMER_GROUP_NAME,
                "0", 
                PartitionReceiver.START_OF_STREAM);
        receiver.receiveSync(1); // invoke 1 receive call to exclude initialization overhead

        System.out.println(String.format("%s, %s", 0, 0));

        final long initialNanoCheckpoint = System.nanoTime();
        long eventCount = 0;
        while (true) {
            final Iterable<EventData> events = receiver.receiveSync(999);
            eventCount += getSize(events);
            System.out.println(String.format("%s, %s", (System.nanoTime() - initialNanoCheckpoint)/1000000, eventCount));
            Thread.sleep(10); // introduce 10ms sleep to simulate - down-stream IO Operation 
        }
    }

    static long getSize(Iterable<EventData> events) {
        if (events ==null)
            return 0;

        if (!events.iterator().hasNext())
            return 0;

        if (events instanceof Collection) {
            return ((Collection<?>) events).size();
        }

        throw new RuntimeException();
    }
}

Here's the cart we got (x-axis is the timeline in seconds & y-axis is the messages per batch):

image

Average batch size we got over these ~6k samples is 182. However, the contract of the client API is to return as soon as 1 message is available - so you might observe very few receives with <10 messages as well.

We are closing this issue as of now, as we cannot reproduce - receiving only 1 message at a time.

rkrashr commented 6 years ago

Great, looks like it was fixed!

birdayz commented 5 years ago

i am having huge issues with too small batches. what is taken into consideration for the batch size? i set prefetch to 2k and batch size to 1k. usually i get batches between 1 and 20, but mostly < 5. I made sure that there are enough events in the partition for my test (pre-filled it); so it's not the producer. What can i do? Eventhub is horribly slow this way; once a device from iot hub sends a lot of messages i am lagging VERY MUCH behind..

i used the code sample from above by @SreeramGarlapati .

JamesBirdsall commented 5 years ago

@birdayz If you increase the sleep length, does the batch size get any bigger? Each receive call should return whatever is in the prefetch buffer. If waiting longer increases the batch size, then that part is working.

Where are you running your test code at? How is the network between there and our service?

What version of the client are you using?