nodefluent / kafka-streams

equivalent to kafka-streams :octopus: for nodejs :sparkles::turtle::rocket::sparkles:
https://nodefluent.github.io/kafka-streams/
MIT License
830 stars 111 forks source link

trying to generage 2 msgs to output stream from 1 incoming msg #205

Closed dragonfriend0013 closed 3 years ago

dragonfriend0013 commented 3 years ago

I am trying to generate more than 1 message from 1 incoming message. I need to call an API/Redis to determine how may configs the original msg has, and attach those configs to the original message before writing to the next topic. So far, everything i have tried has failed to write to the output topic (if i send a single value, everything works fine)


import { KafkaStreams } from 'kafka-streams';
import { SpanStatusCode } from '@opentelemetry/api';

import getConfig from './config.mjs';
import getCache from './utils/redis.mjs';
import logger from './utils/logger.mjs';
import convertHrtime from './utils/convertHRTime.mjs';
import { tracer, getTraceId, traceHeaders } from './utils/telemetry.mjs';
import formatData from './formatData.mjs';
import GetEvtQuery from './getEvents.mjs';
import preCache from './preCache.mjs';
import GenerateData from './generateData.mjs';

const config = getConfig();
const primaryTopic = process.env.KAFKA_INGRESS_TOPIC;
const mfRetryTopic = `${primaryTopic}_RETRY`;
const fmtTopic = `${primaryTopic}_FMT`;
const gqlRetryTopic = `${fmtTopic}_RETRY`;
const dataTopic = `${primaryTopic}_DATA`;
const MAX_RETRIES = parseInt(process.env.MAX_RETRIES, 10);
const errTopic = process.env.KAFKA_ERROR_TOPIC;
const dlqTopic = process.env.KAFKA_DLQ_TOPIC;

const cache = getCache();
cache.on('ready', () => {
  logger.info('Redis is ready');
});
cache.on('error', (err) => {
  logger.error(`Redis error: ${err.code} ${err.syscall} ${err.address} ${err.port}`);
});

const getKafkaStyledMessage = (key = null, payload, topic = undefined, partition = undefined) => ({
  key, // (required) just to not be undefined, keys will otherwise receive random uuids
  value: JSON.stringify(payload), // required, ensure this is a string or a buffer!
  topic,
  partition,
});

const kafkaStreams = new KafkaStreams(config);
kafkaStreams.on('error', (error) => {
  logger.error(`Kafka streams error: ${error.message}`);
});

const consume = async () => {
  const MFStream = kafkaStreams.getKStream();
  MFStream
    .from(primaryTopic)
    .from(mfRetryTopic)
    .mapJSONConvenience()
    .map((message) => tracer.startActiveSpan('message', async (span) => {
      try {
        const start = process.hrtime();
        const msg = await formatData(message);
        msg.traceId = getTraceId(span);
        msg.traceHeaders = traceHeaders(span);
        const evtConfig = await GetEvtQuery(msg.evtType, cache);
        if (!evtConfig) {
          return getKafkaStyledMessage(
            null,
            {
              ...message.value,
              errMsg: 'No matching configs',
            },
            dlqTopic,
          );
        }
        // filter out configs that dont match filter
        // i.value will always be a string, so need to cast incoming data to string
        const configs = evtConfig.filter((f) => f.keyFields
          .every((i) => msg.newMsg[i.field].toString() === i.value));

        const results = configs.map((cfg) => {
          msg.config = cfg;
          msg.durationFmtOpt = convertHrtime(process.hrtime(start)).milliseconds;
          return getKafkaStyledMessage(msg.key, msg);
        });

        const duration = convertHrtime(process.hrtime(start)).milliseconds;
        span.setAttribute('duration', duration);
        span.end();
        return results; // ??? not being converted to a stream ???
      } catch (e) {
        const thisMsg = {
          ...message.value,
          errMsg: e.message,
        };
        let retryCnt = !thisMsg.retryCnt ? 0 : parseInt(thisMsg.retryCnt, 10);
        retryCnt += 1;
        thisMsg.retryCnt = retryCnt;
        span.setStatus({
          code: SpanStatusCode.ERROR,
          message: e.message,
          willRetry: thisMsg.retryCnt <= MAX_RETRIES,
        });
        logger.error({
          msg: `${e.message.replace(/"/g, "'")}`,
          uuid: thisMsg.key,
          traceId: getTraceId(span),
          retryCnt: thisMsg.retryCnt,
        });
        if (thisMsg.retryCnt <= MAX_RETRIES) {
          return getKafkaStyledMessage(null, thisMsg, mfRetryTopic);
        }
        return getKafkaStyledMessage(null, thisMsg, errTopic);
      }
    }))
    .awaitPromises()
    .tap((message) => {
      console.log(message);
    })
    .to(fmtTopic, 'auto', 'send');

  const GQLStream = kafkaStreams.getKStream();
  GQLStream
    .from(fmtTopic)
    .from(gqlRetryTopic)
    .mapJSONConvenience()
    .map((message) => tracer.startActiveSpan('gql', async (span) => {
      const msg = {
        ...message.value,
      };
      await preCache(msg.newMsg, cache);
      try {
        const start = process.hrtime();
        const { data, gqlTime } = await GenerateData(msg.config, msg);
        msg.gqlData = data;
        const duration = convertHrtime(process.hrtime(start)).milliseconds;
        msg.durationGQL = duration;
        msg.gqlTime = gqlTime;
        span.setAttribute('duration', duration);
        span.end();
        // throw new Error('test error');
        return getKafkaStyledMessage(msg.key, msg);
      } catch (e) {
        const thisMsg = {
          ...msg,
          errMsg: e.message,
        };
        let retryCnt = !thisMsg.retryCnt ? 0 : parseInt(thisMsg.retryCnt, 10);
        retryCnt += 1;
        thisMsg.retryCnt = retryCnt;
        span.setStatus({
          code: SpanStatusCode.ERROR,
          message: e.message,
          willRetry: thisMsg.retryCnt <= MAX_RETRIES,
        });
        logger.error({
          msg: `${e.message.replace(/"/g, "'")}`,
          uuid: thisMsg.key,
          traceId: getTraceId(span),
          retryCnt: thisMsg.retryCnt,
        });
        if (thisMsg.retryCnt <= MAX_RETRIES) {
          return getKafkaStyledMessage(null, thisMsg, gqlRetryTopic);
        }
        return getKafkaStyledMessage(null, thisMsg, errTopic);
      }
    }))
    .awaitPromises()
    .tap((f) => {
      console.log(f);
    })
    .to(dataTopic, 'auto', 'send');

  MFStream
    .start()
    .then(() => {
      logger.info('MFStream started.');
    }, (error) => {
      logger.error(`MFStream failed to start: ${error}`);
    });

  GQLStream
    .start()
    .then(() => {
      logger.info('GQLStream started.');
    }, (error) => {
      logger.error(`GQLStream failed to start: ${error}`);
    });
};

consume().catch((e) => {
  // eslint-disable-next-line no-console
  console.error(e);
  logger.error(e);
  process.exit(1);
});
dragonfriend0013 commented 3 years ago

Corrected by adding

.awaitPromises()
.concatMap((message) => MFStream.getNewMostFrom(message.map((m) => m)))
.to(fmtTopic, 'auto', 'send');