redpanda-data / connect

Fancy stream processing made operationally mundane
https://docs.redpanda.com/redpanda-connect/about/
8.12k stars 830 forks source link

Inconsistent behavior with Javascript processing #2622

Open lunacrafts opened 4 months ago

lunacrafts commented 4 months ago

I am encountering an issue with running a JavaScript processor in Benthos. The processor's intended function is to delete a specific property (xxx) from incoming JSON objects. However, the property deletion behaves inconsistently, resulting in unreliable processing. The logs show that the property xxx is sometimes deleted and sometimes not, with no apparent pattern. Below is a snippet of the log output, demonstrating the inconsistent behavior:


logs

│ level=info msg="Running main config from specified file" @service=benthos benthos_version=v4.24.0 path=/benthos.yaml                                                                                                                                                         │
│ level=info msg="Listening for HTTP requests at: http://0.0.0.0:4195" @service=benthos                                                                                                                                                                                        │
│ level=info msg="Launching a benthos instance, use CTRL+C to close" @service=benthos                                                                                                                                                                                          │
│ {"id":"128bb85b-e2ff-4cc7-9da4-ec0d25b7ad94","message":"Heyxx!!!","num_keys":4,"timestamp":"2024-05-31T20:28:36.157267343Z"}                                                                                                                                                 │
│ {"id":"497b2eca-04d1-4887-ac8a-84eac84471c2","message":"Heyxx!!!","num_keys":4,"timestamp":"2024-05-31T20:28:37.157583968Z"}                                                                                                                                                 │
│ {"id":"87e12f20-af9f-4b23-bfcd-ad584455c45a","message":"Heyxx!!!","num_keys":4,"timestamp":"2024-05-31T20:28:38.157381469Z"}                                                                                                                                                 │
│ {"id":"ea7d745c-3cc8-4a3c-a379-76441d3c4bf6","message":"Heyxx!!!","timestamp":"2024-05-31T20:28:39.156897886Z","xxx":"XXX"}                                                                                                                                                  │
│ {"id":"dd6ad1ec-3ec9-4e3b-9c7d-b05f3fc6813b","message":"Heyxx!!!","timestamp":"2024-05-31T20:28:40.158598803Z","xxx":"XXX"}                                                                                                                                                  │
│ {"id":"fbb11f2b-5d6b-4423-a2f8-fd1ccbbc8416","message":"Heyxx!!!","num_keys":4,"timestamp":"2024-05-31T20:28:41.15743972Z"}                                                                                                                                                  │
│ {"id":"148d1ec5-947f-49d5-b5bb-9a7e4c68b0f0","message":"Heyxx!!!","timestamp":"2024-05-31T20:28:42.157098262Z","xxx":"XXX"}                                                                                                                                                  │
│ {"id":"a8052553-d485-4db3-80dc-dc9065de1d13","message":"Heyxx!!!","timestamp":"2024-05-31T20:28:43.156823721Z","xxx":"XXX"}                                                                                                                                                  │
│ {"id":"478a8957-e3f3-4360-9b42-8635ea171bcb","message":"Heyxx!!!","timestamp":"2024-05-31T20:28:44.157223305Z","xxx":"XXX"}                                                                                                                                                  │
│ {"id":"6d0e72be-99e1-4768-892f-71c56265b46a","message":"Heyxx!!!","timestamp":"2024-05-31T20:28:45.15662668Z","xxx":"XXX"}

benthos.yaml

http:
  address: 0.0.0.0:4195
  cors:
    enabled: false
  debug_endpoints: false
  enabled: true
  root_path: /benthos
input:
  generate:
    count: 0
    interval: 1s
    mapping: "\n                  root.id = uuid_v4()\n                  root.message
      = \"Hey!!!\"\n                  root.timestamp = now()\n                "
output:
  stdout:
    codec: lines
pipeline:
  processors:
  - bloblang: "\n                    root = this\n                    root.xxx = \"XXX\"\n
      \                 "
  - javascript:
      code: let thing = benthos.v0_msg_as_structured(); thing.num_keys = Object.keys(thing).length;
        delete thing["xxx"]; benthos.v0_msg_set_structured(thing);
    label: asyncjs

Code is deployed to minikube via helm chart & Pulumi. Memory or any other resources are not exceeded

helm & pulumi

import * as pulumi from '@pulumi/pulumi';
import * as k8s from '@pulumi/kubernetes';
import { Namespace } from '@mirrorboards/namespace';

type BenthosPipelineArgs = {
  namespace: string;
  metadata: {
    name: string
  }
};

export class BenthosPipeline extends pulumi.ComponentResource {
  public readonly pipeline: k8s.helm.v3.Release;

  constructor(name: string, args: BenthosPipelineArgs, opts?: pulumi.ComponentResourceOptions) {
    super('mirrorboards:platform:benthos:BenthosPipeline', name, args, opts);

    const namespace = new Namespace('benthos');

    this.pipeline = new k8s.helm.v3.Release(
      namespace.get('pipeline'),
      {
        namespace: args.namespace,
        name: args.metadata.name,
        createNamespace: true,
        repositoryOpts: {
          repo: 'https://benthosdev.github.io/charts/',
        },
        chart: 'benthos',
        version: '2.2.0',
        values: {
          config: {
            input: {
              generate: {
                interval: '1s',
                count: 0,
                mapping: `
                  root.id = uuid_v4()
                  root.message = "Hey!!!"
                  root.timestamp = now()
                `
              }
            },
            pipeline: {
              processors: [
                {
                  bloblang: `
                    root = this
                    root.xxx = "XXX"
                  `
                },
                {
                  label: 'asyncjs',
                  javascript: {
                    code: `
                      let thing = benthos.v0_msg_as_structured();
                      thing.num_keys = Object.keys(thing).length;
                      delete thing["xxx"];
                      benthos.v0_msg_set_structured(thing);
                    `
                  }
                }
              ]
            },
            output: {
              stdout: {
                codec: 'lines'
              }
            }
          }
        }
      },
      {
        parent: this,
      },
    );

    this.registerOutputs({
      pipeline: this.pipeline,
    });
  }
}
lunacrafts commented 4 months ago

These two tickets are not related, but I decided to post it here as I assume they are both solvable within the same area of the code.

https://github.com/redpanda-data/connect/issues/2623

mihaitodor commented 1 month ago

Turns out this issue is caused by Goja runtime reuse: https://github.com/dop251/goja/issues/205. Will try to find a way to address this or at least document it.

Simple way to reproduce it:

input:
  generate:
    count: 0
    interval: 250ms
    mapping: |
      root = {}

  processors:
    - javascript:
        code: |
          let foobar = benthos.v0_msg_as_structured();
          benthos.v0_msg_set_structured(foobar);
    - log:
        message: ${! error() }

output:
  stdout: {}

This will start printing SyntaxError: Identifier 'foobar' has already been declared at main.js:1:1(0) as soon as runtimes from the pool get reused.

One workaround is to scope the script:

(function() {
  let foobar = benthos.v0_msg_as_structured();
  benthos.v0_msg_set_structured(foobar);
})();

According to this blog post, it should be fine to have all these anonymous functions created since the goja runtime garbage collector can clean them up. We might want to consider doing this by default internally.