Closed dragonfriend0013 closed 3 years ago
I am trying to process a stream, and if the input processing errors out, i would like to send the data + error to a separate topic. good messages would be sent to another topic. Am i approaching this wrong?
import { KafkaStreams } from 'kafka-streams'; import getConfig from './config.mjs'; import getCache from './utils/redis.mjs'; import logger from './utils/logger.mjs'; import convertHrtime from './utils/convertHRTime.mjs'; import { tracer, getTraceId } from './utils/telemetry.mjs'; import formatData from './formatData.mjs'; import GetEvtQuery from './getEvents.mjs'; const config = getConfig(); const primaryTopic = process.env.KAFKA_INGRESS_TOPIC; const retryTopic = process.env.KAFKA_RETRY_TOPIC; const fmtTopic = `${primaryTopic}_FMT`; const cache = getCache(); cache.on('ready', () => { logger.info('Redis is ready'); }); cache.on('error', (err) => { logger.error(`Redis error: ${err.code} ${err.syscall} ${err.address} ${err.port}`); }); const kafkaStreams = new KafkaStreams(config); kafkaStreams.on('error', (error) => { logger.error(`Kafka streams error: ${error.message}`); }); const consume = async () => { const MFStream = kafkaStreams.getKStream(); MFStream .from(primaryTopic) .from(retryTopic) .mapJSONConvenience() // eslint-disable-next-line consistent-return .map((message) => tracer.startActiveSpan('message', async (span) => { try { const start = process.hrtime(); const msg = await formatData(message); msg.traceId = getTraceId(span); const evtConfig = await GetEvtQuery(msg.evtType, cache); if (!evtConfig) { return null; } // filter out configs that dont match filter // value will always be a string, so need to cast incoming data to string const events = evtConfig.filter((f) => f.keyFields .every((i) => msg.newMsg[i.field].toString() === i.value)); msg.events = events; const duration = await convertHrtime(process.hrtime(start)).milliseconds; msg.durationFmtOpt = duration; span.setAttribute('duration', duration); span.end(); return msg; } catch (e) { // check retryCnt, if >= 10, send to errortopic, otherwise send to retrytopic const thisMsg = { ...message, errMsg: e.message, }; let retryCnt = !thisMsg.retryCnt ? 0 : parseInt(thisMsg.retryCnt, 10); retryCnt += 1; thisMsg.retryCnt = retryCnt; if (thisMsg.retryCnt <= parseInt(process.env.MAX_RETRIES, 10)) { // await retryProducer(thisMsg); } else { // await errorProducer(thisMsg); } span.setStatus({ code: SpanStatusCode.ERROR, message: e.message, willRetry: thisMsg.retryCnt <= parseInt(process.env.MAX_RETRIES, 10), }); logger.error({ msg: `${e.message.replace(/"/g, "'")}`, uuid: thisMsg.uuid, traceId: getTraceId(span), retryCnt: thisMsg.retryCnt, }); } })) .awaitPromises() .wrapAsKafkaValue() .to(fmtTopic, 'auto', 'buffer'); MFStream .start() .then(() => { logger.info('MFStream started.'); }, (error) => { logger.error(`MFStream failed to start: ${error}`); }); }; consume().catch((e) => { // eslint-disable-next-line no-console console.error(e); logger.error(e); process.exit(1); });
closing, used an example using getKafkaStyledMessage to override topic specified in stream.to
I am trying to process a stream, and if the input processing errors out, i would like to send the data + error to a separate topic. good messages would be sent to another topic. Am i approaching this wrong?