Azure / azure-event-hubs-node

Node client library for Azure Event Hubs https://azure.microsoft.com/services/event-hubs
MIT License
50 stars 46 forks source link

Receiving all messages from storage after returning context.checkpoint() #80

Closed HaykHayrapetyan closed 6 years ago

HaykHayrapetyan commented 6 years ago

//Receiver.js

async function main() {
  // Create the Event Processo Host
  const eph = EventProcessorHost.createFromConnectionString(
    EventProcessorHost.createHostName("my-host"),
    storageCS,
    ehCS,
    {
      eventHubPath: path
    }
  );

  // Message event handler
  const onMessage = (context/*PartitionContext*/, data /*EventData*/) => {
    console.log(">>>>> Rx message from ", context.partitionId, ': ', data.body);
    //console.log('context', context)
    return context.checkpoint();
  };
  // Error event handler
  const onError = (error) => {
    console.log(">>>>> Received Error: %O", error);
  };
  // Register the event handlers
  eph.on(EventProcessorHost.message, onMessage);
  eph.on(EventProcessorHost.error, onError);
  // start the EPH
  await eph.start();
}

main().catch((err) => {
  console.log(err);
});

After receiving all events once, I checkpoint them. But after every restart of node instance it receives them once more. How can I checkpoint the offset??

Grigsuv commented 6 years ago

I have the same problem

amarzavery commented 6 years ago

@HaykHayrapetyan @Grigsuv - Can you provide the checkpoint info that was stored in the storage blob container? You can find that when you view the blob in the portal.

We try to get the checkpoint info from Storage blob based on the info provided in storage connection string. More details over here. If the offset is provided then we use that else we default to receiving events from start.

HaykHayrapetyan commented 6 years ago

@amarzavery here is one of the info's stored in the storage:

{"partitionId":"1","owner":"my-host-b77442e5-7870-4d42-a834-9595b3a74b05","token":"53d17dad-6fad-458b-ba8a-11cc689c3fd3","epoch":1528209623048,"sequenceNumber":1,"offset":"88"}

Is this the way it was supposed to be stored?

amarzavery commented 6 years ago

Okay So I figured what is going on... The checkpoint info is stored in the blob container. The name of the blob container is the name of the EPH. In the above sample, a new blob container will be created every time the sample is run (i.e. once all the messages are received, you close the script and then run it again at a later time). If you pass the name of the previously created blob container, then the EPH will not read all the messages from start. It will read the checkpoint info if it exists in the blob and read messages after that point..

This parameter EventProcessorHost.createHostName("my-host"), should be replaced by the name of the previously created container name (which can be found from the portal).

This is not very obvious and should definitely be handled in a better way then what we are doing currently..

If you have any suggestion to handle this in a better way then please feel free to share them.

amarzavery commented 6 years ago

Actually, I have built-in provision to provide a leaseContainerName. You can find all the optional parameters over here. If you do not provide the lease container name then we use the host name as the lease container name.

amarzavery commented 6 years ago

I have updated the sample in README that provides a leaseContainerName. This should make it clear.

HaykHayrapetyan commented 6 years ago

@amarzavery It seemed to work fine as the receiver got the old messages only once(when I first ran it, all messages were checkpointed and I didn't get the new ones). However after process restart, I don't get new messages sent to eventhub.

amarzavery commented 6 years ago

@HaykHayrapetyan - Thanks for reporting this. There is a small bug. Upon fixing it, I was able to get EPH sample working correctly with restart and send/receive as expected.

I am attaching the screen shot of the fix.

screen shot 2018-06-13 at 11 19 26 am

You can make the above mentioned changes in the _attachReceiver() function in node_modules/azure-event-processor-host/dist/lib/eventProcessorHost.js file to unblock yourself.

I will be sending a PR with a fix at the earliest.

amarzavery commented 6 years ago

fixed by https://github.com/Azure/azure-event-hubs-node/pull/82