retejs / rete

JavaScript framework for visual programming
https://retejs.org
MIT License
10.17k stars 653 forks source link

Modifying data in loops (Tasks) #514

Closed skroink closed 1 year ago

skroink commented 3 years ago

Hi,

I'm trying to write an ETL flow editor using Rete.js. So far I've got a flow where I can upload a file, iterate each line in the file and output each line as a vue event.

image

But I've got a problem, if I want to modify or fetch a single column from each line. In the example I've a component to feed the data to (Loop -> Modify -> Log), which purpose should be to extract a single column, but it only gets the last index of the loop everytime it's run.

CsvLoader

class CsvLoader extends Rete.Component {
  constructor() {
    super("CsvLoader");
    this.task = {
      outputs: {
        file: "output",
        action: "option"
      },
      init(task) {
        EventBus.$on("fileload", data => {
          task.run(data);
        });
      }
    };
  }

  builder(node) {
    let out = new Rete.Output("file", "File", Socket.File);
    let event = new Rete.Output("action", "action", Socket.Action);

    node.addOutput(out);
    node.addOutput(event);
  }

  async worker(node, inputs, data) {
    const file = data.file;

    const filecontent = await new Promise((resolve, reject) => {
      if (file instanceof File) {
        Papa.parse(file, {
          header: true,
          complete: results => {
            resolve(results.data);
          }
        });
      } else reject("No File");
    });

    return {
      file: filecontent
    };
  }
}

Loop

class Loop extends Rete.Component {
  constructor() {
    super("Loop");
    this.task = {
      outputs: {
        data: "output",
        event: "option"
      },
      init(task) {
      }
    };
  }

  builder(node) {
    const input = new Rete.Input("input", "File", socket.File, true);
    const actionInput = new Rete.Input("action", "action", socket.Action, true);

    const output = new Rete.Output("data", "output", socket.Text);
    const actionOutput = new Rete.Output("event", "action", socket.Action);

    return node
      .addInput(input)
      .addInput(actionInput)

      .addOutput(actionOutput)
      .addOutput(output);
  }

  async worker(node, inputs, { line }) {
    if (line === undefined) {
      EventBus.$emit("clearlog");
      await Promise.all(
        inputs.input[0].map(el => this.clone().run({ line: el }))
      );
      this.closed = ["event"];
    } else {
      return {
        data: line
      };
    }
  }
}

Log

class Log extends Rete.Component {
  constructor() {
    super("Log");
    this.task = {
      outputs: {},
      init(task) {
      }
    };
  }
  builder(node) {
    var inp0 = new Rete.Input("act", "Action", socket.Action, true);

    var inp1 = new Rete.Input("id", "Id", socket.Text);
    var inp2 = new Rete.Input("description", "Description", socket.Text);

    return node
      .addInput(inp0)
      .addInput(inp1)
      .addInput(inp2);
  }

  async worker(node, inputs) {
    const id = inputs.id !== undefined ? inputs.id[0] : null;
    const description =
      inputs.description !== undefined ? inputs.description[0] : null;
    const data = {
      id: id,
      description: description
    };
    EventBus.$emit("log", data);
  }
}

Modify

class Modify extends Rete.Component {
  constructor() {
    super("Modify");
    this.group = "Line";
    this.task = {
      outputs: {
        data: "output"
      },
      init(task) {
        task.reset();
      }
    };
  }

  builder(node) {
    const input = new Rete.Input("line", "Line", socket.Text, true);
    const output = new Rete.Output("data", "output", socket.Text);

    return node.addInput(input).addOutput(output);
  }

  async worker(node, input) {
    return {
      data: input.line
    };
  }
}
rete-js[bot] commented 1 year ago

This issue is stale because it has been open 30 days with no activity. Remove stale label or comment or this will be closed in 10 days.