Azure / azure-event-hubs-node

Node client library for Azure Event Hubs https://azure.microsoft.com/services/event-hubs
MIT License
50 stars 44 forks source link

how to make startAfterTime: [last closed time] #28

Closed willin closed 6 years ago

willin commented 7 years ago

as #26 reported connection closed often.

and i have no idea about the err handling.

const createReceiver = async ({ connStr = '', partitionIds = [], errorHandler = defaultErrorHandler, messageHandler = defaultMessageHandler } = {}) => {
  const client = EventHubClient.fromConnectionString(connStr);
  await client.open();
  partitionIds.forEach(async (partitionId) => {
    // how to make startAfterTime to the time connection closed?
    const receiver = await client.createReceiver('$Default', partitionId, { startAfterTime: Date.now() });
    receiver.on('errorReceived', async (err) => {
      // right?
      if (err.transport && err.transport.name === 'AmqpProtocolError') {
        console.log(`worker #${cluster.worker.id} PID:${process.pid} Restart #${partitionId}`);
        // right?
        await createReceiver({ connStr, partitionIds: [partitionId], errorHandler, messageHandler });
      } else {
        await errorHandler(err);
      }
    });
    receiver.on('message', messageHandler);
  });
};

i guess perhaps it may be related to checkpoint and storage,

like sample in dot net: https://github.com/Azure/azure-event-hubs/blob/master/samples/DotNet/Microsoft.Azure.EventHubs/SampleEphReceiver/SimpleEventProcessor.cs

https://blogs.msdn.microsoft.com/kaevans/2015/02/24/scaling-azure-event-hubs-processing-with-worker-roles/

and java: https://github.com/Azure/azure-event-hubs/blob/002a2056dda20fa79237fbc8bbdab3ef80065307/samples/Java/src/main/java/com/microsoft/azure/eventhubs/samples/Basic/EventProcessorSample.java

amarzavery commented 6 years ago

We are working on an EPH implementation and there will be an implementation out shortly.