Steps to Reproduce
An error will be reported every few minutes after the program starts, after which the program will continue to run
can you tell me why ? how to solve ?
[2019-12-26 14:49:40.889] [WARN] info - userlog kafka error: { [Error: Local: Timed out]
origin: 'local',
message: 'timed out',
code: -1,
errno: -1,
stack: 'Error: Local: Timed out' }
[2019-12-26 14:50:41.644] [WARN] info - userlog kafka error: { [Error: Local: Timed out]
origin: 'local',
message: 'timed out',
code: -1,
errno: -1,
stack: 'Error: Local: Timed out' }
[2019-12-26 14:51:43.046] [WARN] info - userlog kafka error: { [Error: Local: Timed out]
origin: 'local',
message: 'timed out',
code: -1,
errno: -1,
stack: 'Error: Local: Timed out' }
[2019-12-26 14:52:45.061] [WARN] info - userlog kafka error: { [Error: Local: Timed out]
origin: 'local',
message: 'timed out',
code: -1,
errno: -1,
stack: 'Error: Local: Timed out' }
node-rdkafka Configuration Settings
let streamUserlog = Kafka.KafkaConsumer.createReadStream(
{
'metadata.broker.list': 'kafaka1:19092,kafka2:19092',
'group.id': 'productions1223',
'socket.keepalive.enable': true,
'enable.auto.commit': false
},
{
'auto.offset.reset': 'latest'
},
{
'topics': ['xxxx']
}
)
streamUserlog.on('data', async function (m) {
streamUserlog.pause()
counter++
let userLog = JSON.parse(m.value)
let actType = userLog.ul_actType
let articleId = userLog.ul_targetId
let addTime = userLog.ul_addTime
let userId = userLog.ul_userId
let appName = userLog.ul_appName
let ip = userLog.ul_ip
let strategy = userLog.ul_strategyId
if (actType === 1000) { // view
await stock.increaseStock(articleId, actType, addTime, userId)
}
if (counter % numCommit === 0) {
streamUserlog.consumer.commit()
}
Environment Information
Steps to Reproduce An error will be reported every few minutes after the program starts, after which the program will continue to run can you tell me why ? how to solve ? [2019-12-26 14:49:40.889] [WARN] info - userlog kafka error: { [Error: Local: Timed out] origin: 'local', message: 'timed out', code: -1, errno: -1, stack: 'Error: Local: Timed out' } [2019-12-26 14:50:41.644] [WARN] info - userlog kafka error: { [Error: Local: Timed out] origin: 'local', message: 'timed out', code: -1, errno: -1, stack: 'Error: Local: Timed out' } [2019-12-26 14:51:43.046] [WARN] info - userlog kafka error: { [Error: Local: Timed out] origin: 'local', message: 'timed out', code: -1, errno: -1, stack: 'Error: Local: Timed out' } [2019-12-26 14:52:45.061] [WARN] info - userlog kafka error: { [Error: Local: Timed out] origin: 'local', message: 'timed out', code: -1, errno: -1, stack: 'Error: Local: Timed out' } node-rdkafka Configuration Settings let streamUserlog = Kafka.KafkaConsumer.createReadStream( { 'metadata.broker.list': 'kafaka1:19092,kafka2:19092', 'group.id': 'productions1223', 'socket.keepalive.enable': true, 'enable.auto.commit': false }, { 'auto.offset.reset': 'latest' }, { 'topics': ['xxxx'] } ) streamUserlog.on('data', async function (m) { streamUserlog.pause() counter++
let userLog = JSON.parse(m.value) let actType = userLog.ul_actType let articleId = userLog.ul_targetId let addTime = userLog.ul_addTime let userId = userLog.ul_userId let appName = userLog.ul_appName let ip = userLog.ul_ip let strategy = userLog.ul_strategyId if (actType === 1000) { // view await stock.increaseStock(articleId, actType, addTime, userId) }
if (counter % numCommit === 0) { streamUserlog.consumer.commit() }
if (counter % numLog === 0) { __logger.info('userlog kafka:', 'consume message ok', 'articleId:', articleId, 'actType:', actType, 'addTime:', addTime) counter = 0 }
streamUserlog.resume() })
streamUserlog.on('error', function (err) { __logger.error(err) process.exit(1) })
streamUserlog.consumer.on('event.error', function (err) { __logger.warn('userlog kafka error:', err) })
Additional context