Closed arulselvam closed 6 years ago
This is interesting. We are closing the receiver link at the end of receiveBatch(), so ideally this should not happen. I will investigate further and get back to you.
Is it possible to provide debug logs?
You can set the DEBUG
environment variable like this:
set DEBUG=azure*,rhea*,-rhea:raw,-rhea:message,-azure:amqp-common:datatransformer
. Logs can be stored in a file as well as mentioned here.
The initial part of debug logs will contain the connectionstring. Make sure to remove that before posting the logs here.
I see the issue now. So the recieveBatch() method is closing the link correctly after receiving the message. Since there are no active links on the connection, the connection is idle. An empty connection (without any links) can remain idle only for 5 minutes (300K milliseconds). Hence the timeout error.
After, you are done receiving messages you should call await client.close()
to close the connection.
Otherwise, you can use the client.receive(...);
to receive messages. That will keep the link open.
Thanks @amarzavery, I'll try to get the debug logs in couple of days. I don't think the issue occurs due to idle connection, we were getting the error in mid of fetching messages from Events Hub. Strangely, when I reduced the number of events from 1000 to 200 in the client.receiveBatch call, we stopped receiving the error for the last 2 days. We don't want to close the connection after calling await client.receiveBatch(...), as we'll be periodically polling every few secs and we would like to re-use the existing connection. Also even if the underlying connection is closed, is there a reason for client.receiveBatch not to check and re-establish the closed connection? On your suggestion to use client.receive(..), is there a way to apply backpressure?. I would like to buffer the messages, and if the buffer gets full, I would like to stop receiving the messages temporarily for some time and start receiving once done processing the messages in buffer.
In the client.receive() method you can provide async version of onMessage and onError handlers and await internally on messages being processed. Then you wouldn't need to worry about back pressure
async function main() {
const client = EventHubClient.createFromConnectionString(str, path);
const onMessage = async (eventData) => {
console.log(">>> EventDataObject: ", eventData);
// This should help you solve the back pressure problem. My understanding is that
// node.js's event loop would queue the event handler calls for you.
await processEvent(eventData.body);
}
const onError = (err) => {
console.log(">>>>> Error occurred: ", err);
};
const options = {
eventPosition: EventPosition.fromEnqueuedTime(Date.now()),
enableReceiverRuntimeMetric: true
}
const rcvHandler = client.receive("0", onMessage, onError, options);
console.log("rcvHandler: ", rcvHandler.name);
// If for some reason you want to stop receiving messages then you can
await rcvHandler.stop();
}
We don't want to close the connection after calling await client.receiveBatch(...), as we'll be periodically polling every few secs and we would like to re-use the existing connection.
As long as you are polling within 5 minutes after receiveBatch() has returned, things would be fine and the same connection would be reused.
Also even if the underlying connection is closed, is there a reason for client.receiveBatch not to check and re-establish the closed connection?
receiveBatch() does check if the connection is open or not and would open the connection if it is not opened yet. That is what the method does when it is called for the very first time. It opens the connection and then creates a receiver link and tries to receive the configured number of messages within the specified amount of time. Once it is done receiving messages, it closes the receiver link and returns the result. Note: the link is closed, but the underlying amqp connection is still open. If receiveBatch will be called again within 5 minutes of the previous call then the next call to receiveBatch will reuse the existing open connection. If that does not happen and if there is no other sender/receiver link on the connection (i.e. it is an empty connection, then the service forcefully closes the connection due to 5 minute idle timeout).
In the client.receive() method you can provide async version of onMessage and onError handlers and await internally on messages being processed. Then you wouldn't need to worry about back pressure
We have to do batch processing on messages, so processing one message at a time with async version of onMessage handler won't work for us. I think I can implement backpressure using the combination of client.receive() and receiveHandler.stop(). would you recommend any other way to implement backpressure?
I see. Given your requirements, I feel client.receiveBatch()
would be ideal. If used correctly within an async function then you wouldn't have to worry about the connection getting disconnected. receiveBatch()
would open the connection if needed on every invocation. However, while receiving messages if the connection goes down then it will reject the promise with an error. If that happens then you can retry again.
I modified the batch receive example and it worked as expected.
import { EventHubClient, EventData, delay } from "../lib";
import * as dotenv from "dotenv";
dotenv.config();
const connectionString = "EVENTHUB_CONNECTION_STRING";
const entityPath = "EVENTHUB_NAME";
const str = process.env[connectionString] || "";
const path = process.env[entityPath] || "";
const client = EventHubClient.createFromConnectionString(str, path);
async function main(): Promise<void> {
const result: EventData[] = await client.receiveBatch("0", 10, 20);
console.log(">>> EventDataObjects: ", result);
let i = 0;
for (let data of result) {
console.log("### Actual message (%d):", ++i, data.body ? data.body.toString() : null);
}
// not closing the client. This should trigger the connection forced timeout error after 5 minutes.
// await client.close();
// sleeping for 5 minutes and 9 seconds...
await delay(309000);
}
main().then(() => {
console.log("still doing something after 5 minutes. We should get here even if the connection goes down.");
// This should call client.receiveBatch() again and it should open a new connection since the old one was dead...
return main();
}).catch((err) => {
console.log("error: ", err);
});
output:
### Actual message (7): Hello EH 2
### Actual message (8): Hello EH 3
### Actual message (9): Hello EH 4
### Actual message (10): [object Object]
rhea:frames [connection-1]:0 -> empty +2m
rhea:raw [connection-1] SENT: 8 0000000802000000 +2m
rhea:io [connection-1] read 35 bytes +3m
rhea:io [connection-1] got frame of size 35 +1ms
rhea:raw [connection-1] RECV: 35 0000002302000000005313c0160b52017000001388520270000007fe40404040404140 +56s
rhea:frames [connection-1]:0 <- flow#13 {"next_incoming_id":1,"incoming_window":5000,"next_outgoing_id":2,"outgoing_window":2046,"echo":true} +56s
rhea:frames [connection-1]:0 -> empty +1m
rhea:raw [connection-1] SENT: 8 0000000802000000 +1m
rhea:io [connection-1] read 267 bytes +2m
rhea:io [connection-1] got frame of size 267 +0ms
rhea:raw [connection-1] RECV: 267 0000010b02000000005316c0fe03434100531dc0f603a316616d71703a636f6e6e656374696f6e3a666f72636564a1da54686520636f6e6e656374696f6e2077617320696e61637469766520666f72206d6f7265207468616e2074686520616c6c6f77656420333030303030206d696c6c697365636f6e647320616e6420697320636c6f73656420627920636f6e7461696e657220274c696e6b547261636b6572272e20547261636b696e6749643a36306138643631626465336434313936386538373762363732366130623439665f47372c2053797374656d547261636b65723a67617465776179352c2054696d657374616d703a382f31302f3230313820383a31353a343520504d40 +60s
rhea:frames [connection-1]:0 <- detach#16 {"closed":true,"error":{"condition":"amqp:connection:forced","description":"The connection was inactive for more than the allowed 300000 milliseconds and is closed by container 'LinkTracker'. TrackingId:60a8d61bde3d41968e877b6726a0b49f_G7, SystemTracker:gateway5, Timestamp:8/10/2018 8:15:45 PM"}} +60s
rhea:events [connection-1] Link got event: sender_error +5m
azure:amqp-common:error An error occurred on the cbs sender link.. { ConnectionForcedError: The connection was inactive for more than the allowed 300000 milliseconds and is closed by container 'LinkTracker'. TrackingId:60a8d61bde3d41968e877b6726a0b49f_G7, SystemTracker:gateway5, Timestamp:8/10/2018 8:15:45 PM
azure:amqp-common:error at Object.translate (/Users/amarz/sdk/rh/final/azure-event-hubs-node/client/lib/amqp-common/errors.ts:454:19)
azure:amqp-common:error at Sender._cbsSenderReceiverLink.sender.registerHandler (/Users/amarz/sdk/rh/final/azure-event-hubs-node/client/lib/amqp-common/cbs.ts:84:27)
azure:amqp-common:error at emitOne (events.js:116:13)
azure:amqp-common:error at Sender.emit (events.js:211:7)
azure:amqp-common:error at Sender.link.dispatch (/Users/amarz/sdk/rh/final/azure-event-hubs-node/client/node_modules/rhea/lib/link.js:59:37)
azure:amqp-common:error at Sender.link.on_detach (/Users/amarz/sdk/rh/final/azure-event-hubs-node/client/node_modules/rhea/lib/link.js:159:32)
azure:amqp-common:error at Session.on_detach (/Users/amarz/sdk/rh/final/azure-event-hubs-node/client/node_modules/rhea/lib/session.js:709:27)
azure:amqp-common:error at Connection.(anonymous function) [as on_detach] (/Users/amarz/sdk/rh/final/azure-event-hubs-node/client/node_modules/rhea/lib/connection.js:684:30)
azure:amqp-common:error at c.dispatch (/Users/amarz/sdk/rh/final/azure-event-hubs-node/client/node_modules/rhea/lib/types.js:909:33)
azure:amqp-common:error at Transport.read (/Users/amarz/sdk/rh/final/azure-event-hubs-node/client/node_modules/rhea/lib/transport.js:108:36)
azure:amqp-common:error name: 'ConnectionForcedError',
azure:amqp-common:error translated: true,
azure:amqp-common:error retryable: true,
azure:amqp-common:error info: null,
azure:amqp-common:error condition: 'amqp:connection:forced' } +5m
rhea:events [connection-1] Link got event: sender_close +13ms
rhea:events [connection-1] Session got event: sender_close +0ms
rhea:events [connection-1] Connection got event: sender_close +0ms
rhea:events [3f74cb13-1678-d240-8577-5bbf9a6921d5] Container got event: sender_close +0ms
rhea:frames [connection-1]:0 -> detach#16 {"closed":true} +14ms
rhea:raw [connection-1] SENT: 22 0000001602000000005316d000000006000000024341 +15ms
rhea:io [connection-1] read 548 bytes +15ms
rhea:io [connection-1] got frame of size 268 +0ms
rhea:raw [connection-1] RECV: 268 0000010c02000000005316c0ff0352014100531dc0f603a316616d71703a636f6e6e656374696f6e3a666f72636564a1da54686520636f6e6e656374696f6e2077617320696e61637469766520666f72206d6f7265207468616e2074686520616c6c6f77656420333030303030206d696c6c697365636f6e647320616e6420697320636c6f73656420627920636f6e7461696e657220274c696e6b547261636b6572272e20547261636b696e6749643a36306138643631626465336434313936386538373762363732366130623439665f47372c2053797374656d547261636b65723a67617465776179352c2054696d657374616d703a382f31302f3230313820383a31353a343520504d40 +0ms
rhea:frames [connection-1]:0 <- detach#16 {"handle":1,"closed":true,"error":{"condition":"amqp:connection:forced","description":"The connection was inactive for more than the allowed 300000 milliseconds and is closed by container 'LinkTracker'. TrackingId:60a8d61bde3d41968e877b6726a0b49f_G7, SystemTracker:gateway5, Timestamp:8/10/2018 8:15:45 PM"}} +1ms
rhea:events [connection-1] Link got event: receiver_error +1ms
azure:amqp-common:error An error occurred on the cbs receiver link.. { ConnectionForcedError: The connection was inactive for more than the allowed 300000 milliseconds and is closed by container 'LinkTracker'. TrackingId:60a8d61bde3d41968e877b6726a0b49f_G7, SystemTracker:gateway5, Timestamp:8/10/2018 8:15:45 PM
azure:amqp-common:error at Object.translate (/Users/amarz/sdk/rh/final/azure-event-hubs-node/client/lib/amqp-common/errors.ts:454:19)
azure:amqp-common:error at Receiver._cbsSenderReceiverLink.receiver.registerHandler (/Users/amarz/sdk/rh/final/azure-event-hubs-node/client/lib/amqp-common/cbs.ts:88:27)
azure:amqp-common:error at emitOne (events.js:116:13)
azure:amqp-common:error at Receiver.emit (events.js:211:7)
azure:amqp-common:error at Receiver.link.dispatch (/Users/amarz/sdk/rh/final/azure-event-hubs-node/client/node_modules/rhea/lib/link.js:59:37)
azure:amqp-common:error at Receiver.link.on_detach (/Users/amarz/sdk/rh/final/azure-event-hubs-node/client/node_modules/rhea/lib/link.js:159:32)
azure:amqp-common:error at Session.on_detach (/Users/amarz/sdk/rh/final/azure-event-hubs-node/client/node_modules/rhea/lib/session.js:709:27)
azure:amqp-common:error at Connection.(anonymous function) [as on_detach] (/Users/amarz/sdk/rh/final/azure-event-hubs-node/client/node_modules/rhea/lib/connection.js:684:30)
azure:amqp-common:error at c.dispatch (/Users/amarz/sdk/rh/final/azure-event-hubs-node/client/node_modules/rhea/lib/types.js:909:33)
azure:amqp-common:error at Transport.read (/Users/amarz/sdk/rh/final/azure-event-hubs-node/client/node_modules/rhea/lib/transport.js:108:36)
azure:amqp-common:error name: 'ConnectionForcedError',
azure:amqp-common:error translated: true,
azure:amqp-common:error retryable: true,
azure:amqp-common:error info: null,
azure:amqp-common:error condition: 'amqp:connection:forced' } +14ms
rhea:events [connection-1] Link got event: receiver_close +1ms
rhea:events [connection-1] Session got event: receiver_close +0ms
rhea:events [connection-1] Connection got event: receiver_close +0ms
rhea:events [3f74cb13-1678-d240-8577-5bbf9a6921d5] Container got event: receiver_close +0ms
rhea:io [connection-1] got frame of size 15 +1ms
rhea:raw [connection-1] RECV: 15 0000000f02000000005317c0020140 +2ms
rhea:frames [connection-1]:0 <- end#17 {} +2ms
rhea:events [connection-1] Session got event: session_close +1ms
rhea:events [connection-1] Connection got event: session_close +0ms
rhea:events [3f74cb13-1678-d240-8577-5bbf9a6921d5] Container got event: session_close +0ms
rhea:io [connection-1] got frame of size 265 +1ms
rhea:raw [connection-1] RECV: 265 0000010902000000005318c0fc0100531dc0f603a316616d71703a636f6e6e656374696f6e3a666f72636564a1da54686520636f6e6e656374696f6e2077617320696e61637469766520666f72206d6f7265207468616e2074686520616c6c6f77656420333030303030206d696c6c697365636f6e647320616e6420697320636c6f73656420627920636f6e7461696e657220274c696e6b547261636b6572272e20547261636b696e6749643a36306138643631626465336434313936386538373762363732366130623439665f47372c2053797374656d547261636b65723a67617465776179352c2054696d657374616d703a382f31302f3230313820383a31353a343520504d40 +0ms
rhea:frames [connection-1]:0 <- close#18 {"error":{"condition":"amqp:connection:forced","description":"The connection was inactive for more than the allowed 300000 milliseconds and is closed by container 'LinkTracker'. TrackingId:60a8d61bde3d41968e877b6726a0b49f_G7, SystemTracker:gateway5, Timestamp:8/10/2018 8:15:45 PM"}} +0ms
rhea:events [connection-1] Connection got event: connection_error +0ms
rhea:events [3f74cb13-1678-d240-8577-5bbf9a6921d5] Container got event: connection_error +0ms
rhea:events [connection-1] Connection got event: connection_close +0ms
azure:event-hubs:connectionContext [connection-1] connection close happened. +5m
rhea:frames [connection-1]:0 -> detach#16 {"handle":1,"closed":true} +1ms
rhea:raw [connection-1] SENT: 23 0000001702000000005316d00000000700000002520141 +1ms
rhea:frames [connection-1]:0 -> end#17 {} +0ms
rhea:raw [connection-1] SENT: 12 0000000c0200000000531745 +0ms
rhea:frames [connection-1]:0 -> close#18 {} +0ms
rhea:raw [connection-1] SENT: 12 0000000c0200000000531845 +0ms
rhea:events [connection-1] Connection got event: disconnected +317ms
azure:event-hubs:error [connection-1] Error (context.connection.error) occurred on the amqp connection: c {
azure:event-hubs:error value:
azure:event-hubs:error [ Typed { type: [Object], value: 'amqp:connection:forced' },
azure:event-hubs:error Typed {
azure:event-hubs:error type: [Object],
azure:event-hubs:error value: 'The connection was inactive for more than the allowed 300000 milliseconds and is closed by container \'LinkTracker\'. TrackingId:60a8d61bde3d41968e877b6726a0b49f_G7, SystemTracker:gateway5, Timestamp:8/10/2018 8:15:45 PM' },
azure:event-hubs:error Typed { type: [Object], value: null } ] } +5m
azure:event-hubs:error [connection-1] Error (context.error) occurred on the amqp connection: { Error: read ECONNRESET
azure:event-hubs:error at _errnoException (util.js:992:11)
azure:event-hubs:error at TLSWrap.onread (net.js:618:25) code: 'ECONNRESET', errno: 'ECONNRESET', syscall: 'read' } +1ms
still doing something after 5 minutes. We should get here even if the connection goes down.
. . . .
. . . .
### Actual message (2): [object Object]
### Actual message (3): [object Object],10,20,some string
### Actual message (4): true
### Actual message (5): Hello EH 0
### Actual message (6): Hello EH 1
### Actual message (7): Hello EH 2
### Actual message (8): Hello EH 3
### Actual message (9): Hello EH 4
### Actual message (10): [object Object]
Amars-MacBook-Pro:client amarz$
As you can see the log statement from the .then()
did get executed. So the web app shouldn't go down because of this. Next call to client.receiveBatch() did open the connection and it received messages again.
Thanks @amarzavery for the example. I tested it with 'azure-event-hubs' 0.2.5 version in the npm repository, but it doesn't work as expected. Retrying client.receiveBatch(…)
is erroring out with the same 'ConnectionForced' error.
But the example works with the latest code in the master branch. Will there be a release to npm repo anytime soon?
@arulselvam - "0.2.6" version of "azure-event-hubs"
has been published to npm.
Thanks a lot @amarzavery.
You are welcome. Thank you for filling issues. It helped me fix a bug in the lower level library.
The issue has been fixed. Please install the 0.2.8 version of the sdk. Feel free to reopen the issue, if the problem persists.
We are using
await client.receiveBatch(...)
in an Azure webjob to periodically fetch and process messages from Event Hubs. But we are frequently getting the below error. We have to restart the webjob to resolve the issue temporarily.To Reproduce Call
await client.receiveBatch('0', 1000, 1)
periodically. I am not able to reproduce this issue in my local machine, but getting the error from the Azure Webjob everyday.Expected behavior
await client.receiveBatch(...)
should successfully fetch the messages from Event Hubs.Package-name: azure-event-hubs Package-version: 0.2.5 node.js version: 10.0.0 OS name and version: Azure App Service - Windows