danielduarte / flowed

A fast and reliable flow engine for orchestration and more uses in Node.js, Deno and the browser
https://danielduarte.github.io/flowed
MIT License
151 stars 21 forks source link

When using an async resolver which uses the fetch method to access an external endpoint the running flow is immediately interrupted #42

Open andreadipalma opened 1 year ago

andreadipalma commented 1 year ago

When using an async resolver which uses the fetch method to access an external endpoint the running flow is immediately interrupted and returns the control to the caller function without completing the tasks.

export function query_similar(query: string, filters: string) {
    return new Promise(async (resolve, reject) => {
        try {
            /* Call Clean API on the Front End API to reload the definition */
            const similar = "https://query-data.azurewebsites.net/api/similar?" + new URLSearchParams(
            {
                'op': "query",
                'query': query,
                "max_results": "10"
            });

           let response = await fetch(similar, {
                method: "GET",
                headers: {
                    "Content-Type": "application/json",
                }
            });

           const body = await response.json();

            if (body === undefined) {
                throw new Error(`Failed to call function: ${response.statusText}`);
            }
            resolve(body);
        } catch (error) {
            reject(error); // Reject the promise if an error occurs
        }
    });                           
}

async nlp_rank(item_id_field: string, records: any[], user_message:string) {
    if (isNotNull(records) && records.length > 0 && isNotNull(user_message)) {
      try {
        let id_list:string = records.map(record => record[item_id_field]).join(",");
        // call api to retrieve similarity ranking 
        const ranked_records = await query_similar(user_message, id_list);
        // merge arrays by document id
        let out_records = records.map((record, i) => Object.assign({}, record, ranked_records));     
        return out_records;
      } catch (error) {
        return;
      }
    }
  }

// This is the resolver class
export class ContentRank {
  async exec(params: ValueMap, context:ValueMap) {
    // simplified code

    records = await nlp_rank(item_id_field, input_records.records, userProfile.message);
    return { results: { records: records } };
  }
}

let results = await FlowManager.run(
        // The flow spec
        ret.strategy,          
        // The parameters
        ret.params,
        // The expected results
        [ out ],          
        // The resolvers mapping
        {
          load: Load as unknown as TaskResolverExecutor,
          filter: Filter as unknown as TaskResolverExecutor,
          contentrank: ContentRank as unknown as TaskResolverExecutor,
          similarrank: SimilarRank as unknown as TaskResolverExecutor,
          top: Top as unknown as TaskResolverExecutor,
          sort: Sort as unknown as TaskResolverExecutor,
          merge: Merge as unknown as TaskResolverExecutor
        },
        { webAPI: this.context, registry: this.registry, userManager},
      );

Without the async resolver the workflow completes correctly, I suspect I'm doing some mistake with await async into the chain of function calls but really struggling to find where it could be.

danielduarte commented 1 year ago

Hi @andreadipalma, and thanks for reporting.

The first "weird" thing that I've noticed in your code is that the resolver ContentRank is returning an object with the key results, which means you have one return named "results" that you need to be returned or provided to another task. May be you mean to return just { records: records } (and not { results: { records: records } }). If you are requiring the value "results" in another task (or as an expected flow result value), that's fine. But if your result meant to be named "records", that could be the mistake. I cannot confirm since I don't have your flow spec.

Also, it is a great idea for debugging to run the flow with the environment variable DEBUG=flowed:*, which will give you detailed step by step debugging information in the standard output.

If it doesn't compromise your project's privacy, you can share your flow spec and debugging information (obtained with the DEBUG env var) and I'll be able to help in more detail.

Also, I leave you a couple general things to double check when the flow runs without executing one or more tasks:

andreadipalma commented 1 year ago

Hi @danielduarte thank you for your great suggestions. I'm going through them and posting later the info. Thanks

andreadipalma commented 1 year ago

@danielduarte the following is the flow descriptor. I see a potential flaw with the return "results", I remember it was done to return multiple variables in one single json. On the other side, I have some doubt on how to set the environment with process.env since the code runs on the browser and I can't use the types/node library. Can I set the debug through a parameter when running the flowmanager instead?

I did further test. I eliminated the "async" from the resolver class exec method and run another rank type not requiring asynchronous call. It completes the workflow correctly. If in the code below I just add async to exec and still use an sync_rank (so it is not an async call) the workflow exits without completing the last task. I'm further convinced it depends on the flow definition.

// This is the resolver class
export class ContentRank {
  exec(params: ValueMap, context:ValueMap) {
    // simplified code

    records = sync_rank(item_id_field, input_records.records, userProfile.message);
    return { results: { records: records } };
  }
}

The flow descriptor:

{
    "tasks": {
        "Load Suggestions": {
            "requires": [
                "node[af636e50-5d2d-4692-9a26-e6c3f2887cc1]_data_provider",
                "node[af636e50-5d2d-4692-9a26-e6c3f2887cc1]_filter_criteria"
            ],
            "provides": [
                "var_node[af636e50-5d2d-4692-9a26-e6c3f2887cc1]out_0"
            ],
            "resolver": {
                "name": "load",
                "params": {
                    "node[af636e50-5d2d-4692-9a26-e6c3f2887cc1]_data_provider": "node[af636e50-5d2d-4692-9a26-e6c3f2887cc1]_data_provider",
                    "node[af636e50-5d2d-4692-9a26-e6c3f2887cc1]_filter_criteria": "node[af636e50-5d2d-4692-9a26-e6c3f2887cc1]_filter_criteria"
                },
                "results": {
                    "results": "var_node[af636e50-5d2d-4692-9a26-e6c3f2887cc1]out_0"
                }
            }
        },
        "TagRank": {
            "requires": [
                "var_node[af636e50-5d2d-4692-9a26-e6c3f2887cc1]out_0",
                "node[81339727-0140-49f8-a2b8-7be2f611245c]_data_provider",
                "node[81339727-0140-49f8-a2b8-7be2f611245c]_rank_strategy",
                "node[81339727-0140-49f8-a2b8-7be2f611245c]_user_data_provider",
                "node[81339727-0140-49f8-a2b8-7be2f611245c]_rank_field",
                "node[81339727-0140-49f8-a2b8-7be2f611245c]_user_rank_field"
            ],
            "provides": [
                "var_node[81339727-0140-49f8-a2b8-7be2f611245c]out_0"
            ],
            "resolver": {
                "name": "contentrank",
                "params": {
                    "var_node[af636e50-5d2d-4692-9a26-e6c3f2887cc1]out_0": "var_node[af636e50-5d2d-4692-9a26-e6c3f2887cc1]out_0",
                    "node[81339727-0140-49f8-a2b8-7be2f611245c]_data_provider": "node[81339727-0140-49f8-a2b8-7be2f611245c]_data_provider",
                    "node[81339727-0140-49f8-a2b8-7be2f611245c]_rank_strategy": "node[81339727-0140-49f8-a2b8-7be2f611245c]_rank_strategy",
                    "node[81339727-0140-49f8-a2b8-7be2f611245c]_user_data_provider": "node[81339727-0140-49f8-a2b8-7be2f611245c]_user_data_provider",
                    "node[81339727-0140-49f8-a2b8-7be2f611245c]_rank_field": "node[81339727-0140-49f8-a2b8-7be2f611245c]_rank_field",
                    "node[81339727-0140-49f8-a2b8-7be2f611245c]_user_rank_field": "node[81339727-0140-49f8-a2b8-7be2f611245c]_user_rank_field"
                },
                "results": {
                    "results": "var_node[81339727-0140-49f8-a2b8-7be2f611245c]out_0"
                }
            }
        },
        "Top3": {
            "requires": [
                "var_node[81339727-0140-49f8-a2b8-7be2f611245c]out_0",
                "node[02487f60-a52a-4931-bd21-9124b895a2bc]_data_provider",
                "node[02487f60-a52a-4931-bd21-9124b895a2bc]_top_records"
            ],
            "provides": [
                "var_node[02487f60-a52a-4931-bd21-9124b895a2bc]out_0"
            ],
            "resolver": {
                "name": "top",
                "params": {
                    "var_node[81339727-0140-49f8-a2b8-7be2f611245c]out_0": "var_node[81339727-0140-49f8-a2b8-7be2f611245c]out_0",
                    "node[02487f60-a52a-4931-bd21-9124b895a2bc]_data_provider": "node[02487f60-a52a-4931-bd21-9124b895a2bc]_data_provider",
                    "node[02487f60-a52a-4931-bd21-9124b895a2bc]_top_records": "node[02487f60-a52a-4931-bd21-9124b895a2bc]_top_records"
                },
                "results": {
                    "results": "var_node[02487f60-a52a-4931-bd21-9124b895a2bc]out_0"
                }
            }
        }
    }
}
danielduarte commented 1 year ago

Hi @andreadipalma, you can debug and see how the flow runs in the browser by doing this:

Once that is done, run your flows and you should see something like this: image

There you have the details about every executed task with its parameters and results.

danielduarte commented 1 year ago

For the example in my previous comment, I've written this little piece of code where I tried to reproduce your issue, but I wasn't able to do so. I've created a simple resolver with async/await including a timer that resolves the promise after a couple seconds, and I had no issues.

Maybe you could try this example to check if it is an issue with the browser version, Flowed package version (which I'd recommend to use the latest), your bundler or any other component.

Please let me know if you were able to see the debugging logs, and what you found. I'll be glad to help.

<html>
<head>
  <script src="https://cdn.jsdelivr.net/npm/flowed@latest/dist/lib/flowed.js" charset="utf-8"></script>
</head>
<body>
  <script>
    // First of all:
    // In order to see debug logs, open the browser developer console and run
    //    localStorage.debug = 'flowed:*'
    // Also, if you use Chrome (or other Chromium-based browser such as Brave, Edge, etc.),
    // please enable the Verbose level in the developer tools.

    // The flow specification
    const flow = {
      tasks: {
        T1: {
          provides: ['a'],
          resolver: {
            name: 'r1',
            results: {
              a: 'a'
            }
          }
        }
      }
    };
    const params = {};
    const expectedResults = ['a'];

    class R1 {
      async exec() {
        const timerPromise = new Promise((resolve, reject) => {
          // Wait for 2 secs and resolve the promise
          setTimeout(() => {
            resolve({ a: '123456' })
          }, 2000);
        });
        const results = await timerPromise;
        return results;
      }
    }

    const resolvers = {
      r1: R1,
    };

    // Run the flow and log results
    Flowed.FlowManager.run(flow, params, expectedResults, resolvers)
      .then(results => console.log('results', results));

  </script>
</body>
</html>
andreadipalma commented 1 year ago

Hi @danielduarte , following your instructions I did the test in the html page and worked (anyway I had no doubt it works!). Running the test with the debug information this is what I get. The first log is from the case "without the async resolver". So it completes regularly.

flowed:flow [1] ▶ Flow started with params: Array(1)0: {adpnode[af636e50-5d2d-4692-9a26-e6c3f2887cc1]_data_provider: 'PromotionTable', adpnode[af636e50-5d2d-4692-9a26-e6c3f2887cc1]_filter_criteria: {…}, adpnode[81339727-0140-49f8-a2b8-7be2f611245c]_data_provider: 'PromotionTable', adpnode[81339727-0140-49f8-a2b8-7be2f611245c]_rank_strategy: 1, adpnode[81339727-0140-49f8-a2b8-7be2f611245c]_user_data_provider: 'UnknownUserProfile', …}length: 1[[Prototype]]: Array(0) +0ms
flowed.js:1277 flowed:flow [1]   ⓘ Changed flow state from 'Ready' to 'Running' +1ms [undefined]0: undefinedlength: 1[[Prototype]]: Array(0)
flowed.js:1277 flowed:flow [1]   ‣ Task 'Load Suggestions(load)' started, params: Array(1)0: adpnode[af636e50-5d2d-4692-9a26-e6c3f2887cc1]_data_provider: "PromotionTable"adpnode[af636e50-5d2d-4692-9a26-e6c3f2887cc1]_filter_criteria: {odata: "?$select=cr57f_adpcardofferingid,cr57f_name,cra2c_…url&$filter=((contains(cr57f_name, 'Sicurezza')))", jsonlogic: {…}}[[Prototype]]: Objectlength: 1[[Prototype]]: Array(0) +2s
flowed.js:1277 flowed:flow [1]   ✓ Finished task 'Load Suggestions', results: Array(1)0: var_adpnode[af636e50-5d2d-4692-9a26-e6c3f2887cc1]out_0: [][[Prototype]]: Objectlength: 1[[Prototype]]: Array(0) +1ms
flowed.js:1277 flowed:flow [1]   ‣ Task 'TagRank(contentrank)' started, params: Array(1)0: adpnode[81339727-0140-49f8-a2b8-7be2f611245c]_data_provider: "PromotionTable"adpnode[81339727-0140-49f8-a2b8-7be2f611245c]_rank_field: "cra2c_tags"adpnode[81339727-0140-49f8-a2b8-7be2f611245c]_rank_strategy: 1adpnode[81339727-0140-49f8-a2b8-7be2f611245c]_user_data_provider: "UnknownUserProfile"adpnode[81339727-0140-49f8-a2b8-7be2f611245c]_user_rank_field: "cra2c_topics"var_adpnode[af636e50-5d2d-4692-9a26-e6c3f2887cc1]out_0: [][[Prototype]]: Objectlength: 1[[Prototype]]: Array(0) +1s
flowed.js:1277 flowed:flow [1]   ✓ Finished task 'TagRank', results: Array(1)0: var_adpnode[81339727-0140-49f8-a2b8-7be2f611245c]out_0: [][[Prototype]]: Objectlength: 1[[Prototype]]: Array(0) +1ms
flowed.js:1277 flowed:flow [1]   ‣ Task 'Top3(top)' started, params: Array(1)0: adpnode[02487f60-a52a-4931-bd21-9124b895a2bc]_data_provider: "PromotionTable"adpnode[02487f60-a52a-4931-bd21-9124b895a2bc]_top_records: "3"var_adpnode[81339727-0140-49f8-a2b8-7be2f611245c]out_0: [][[Prototype]]: Objectlength: 1[[Prototype]]: Array(0) +1s
flowed.js:1277 flowed:flow [1]   ✓ Finished task 'Top3', results: Array(1)0: var_adpnode[02487f60-a52a-4931-bd21-9124b895a2bc]out_0: [{…}][[Prototype]]: Objectlength: 1[[Prototype]]: Array(0) +1ms
flowed.js:1277 flowed:flow [1]   ⓘ Changed flow state from 'Running' to 'Finished' +1ms [undefined]
flowed.js:1277 flowed:flow [1] ✔ Flow finished with results: Array(1)0: var_adpnode[02487f60-a52a-4931-bd21-9124b895a2bc]out_0: Array(1)0: {cr57f_adpcardofferingid: 'eef1ecf5-11ed-ed11-8849-002248999338', cr57f_discountlevel: '12', cr57f_name: 'XXXXXXXXXX', cr57f_priority: '2', cr57f_promotionenddate: '11/1/2022', …}length: 1[[Prototype]]: Array(0)[[Prototype]]: Objectlength: 1[[Prototype]]: Array(0) +0ms

The second test is from the case "with the async resolver". So it interrupts the execution and not arrives to the third component (Top). I see a warning message from the second component (the async component) that doesn't produce the expected output. Debugging it I see it produces the output but at that point the flow of execution is lost somewhere. I've also made the modification/simplifications to the output value in the resolver as suggested.

flowed:flow [3] ▶ Flow started with params: Array(1)0: {adpnode[af636e50-5d2d-4692-9a26-e6c3f2887cc1]_data_provider: 'PromotionTable', adpnode[af636e50-5d2d-4692-9a26-e6c3f2887cc1]_filter_criteria: {…}, adpnode[02487f60-a52a-4931-bd21-9124b895a2bc]_data_provider: 'PromotionTable', adpnode[02487f60-a52a-4931-bd21-9124b895a2bc]_top_records: '3', adpnode[ce09efe9-89ec-4d89-a9ee-eb70c38c08ba]_data_provider: 'PromotionTable', …}length: 1[[Prototype]]: Array(0) +18s

flowed.js:1277 flowed:flow [3]   ⓘ Changed flow state from 'Ready' to 'Running' +0ms [undefined]0: undefinedlength: 1[[Prototype]]: Array(0)

flowed.js:1277 flowed:flow [3]   ‣ Task 'Load Suggestions(load)' started, params: Array(1)0: adpnode[af636e50-5d2d-4692-9a26-e6c3f2887cc1]_data_provider: "PromotionTable"adpnode[af636e50-5d2d-4692-9a26-e6c3f2887cc1]_filter_criteria: {odata: "?$select=cr57f_adpcardofferingid,cr57f_name,cra2c_…url&$filter=((contains(cr57f_name, 'XXXXXX')))", jsonlogic: {…}}[[Prototype]]: Objectlength: 1[[Prototype]]: Array(0) +2s

flowed.js:1277 flowed:flow [3]   ✓ Finished task 'Load Suggestions', results: Array(1)0: var_adpnode[af636e50-5d2d-4692-9a26-e6c3f2887cc1]out_0: [{…}][[Prototype]]: 
Objectlength: 1[[Prototype]]: Array(0) +1ms

flowed.js:1277 flowed:flow [3]   ‣ Task 'ContentRank(contentrank)' started, params: Array(1)0: adpnode[ce09efe9-89ec-4d89-a9ee-eb70c38c08ba]_data_provider: "PromotionTable"adpnode[ce09efe9-89ec-4d89-a9ee-eb70c38c08ba]_rank_field: "cra2c_document"adpnode[ce09efe9-89ec-4d89-a9ee-eb70c38c08ba]_rank_strategy: 2adpnode[ce09efe9-89ec-4d89-a9ee-eb70c38c08ba]_user_data_provider: "UnknownUserProfile"adpnode[ce09efe9-89ec-4d89-a9ee-eb70c38c08ba]_user_rank_field: "cra2c_message"var_adpnode[af636e50-5d2d-4692-9a26-e6c3f2887cc1]out_0: [{…}][[Prototype]]: Objectlength: 1[[Prototype]]: Array(0) +3s

flowed.js:1277 flowed:flow [3]   ✓ Finished task 'ContentRank', results: Array(1)0: {}[[Prototype]]: Objectlength: 1[[Prototype]]: Array(0) +32s

flowed.js:1277 flowed:flow [3] ⚠️ Expected value 'var_adpnode[ce09efe9-89ec-4d89-a9ee-eb70c38c08ba]out_0' was not provided by task 'ContentRank' with resolver 'contentrank'. Consider using the task field 'defaultResult' to provide values by default. +0ms [undefined]0: undefinedlength: 1[[Prototype]]: Array(0)

flowed.js:1277 flowed:flow [3]   ⓘ Changed flow state from 'Running' to 'Finished' +0ms [undefined]0: undefinedlength: 1[[Prototype]]: Array(0)

flowed.js:1277 flowed:flow [3] ✔ Flow finished with results: Array(1)0: {}length: 1[[Prototype]]: Array(0) +1ms
danielduarte commented 1 year ago

Hi again @andreadipalma, Glad to see that you could successfully setup the flow logs.

There I see a couple details to check:

First, it seems like the flow specs in these cases are different: in the first case the resolver contentrank is associated to a task called TagRank, and in the second one same resolver but the task is named ContentRank. That doesn't necessarily mean that the second flow spec is incorrect, but it may be a good idea to double check the differences to see if something is missing.

And second detail to check, in the second test the logs show that the resolver doesn't return any value, which is the reason why the next task in the flow (Top) is not executed.

From 1st case (one result returned, required by "Top" task):

flowed.js:1277 flowed:flow [1]   ✓ Finished task 'TagRank', results: Array(1)0: var_adpnode[81339727-0140-49f8-a2b8-7be2f611245c]out_0: [][[Prototype]]: Objectlength: 1[[Prototype]]: Array(0) +1ms

From 2nd case (no results returned, so no dependent task executed):

flowed.js:1277 flowed:flow [3]   ✓ Finished task 'ContentRank', results: Array(1)0: {}[[Prototype]]: Objectlength: 1[[Prototype]]: Array(0) +32s

So, you'd need to make sure the required value (or a promise that resolves to it) is returned. Other than that, I suspect the builder/packer you could be using in your project is transpiling the async/await with some issues. Or that same thing could be happening in the builder/packer that I'm using with Flowed to make it available for the browser. Even if I have tests that checks async resolvers, I'll double check some more cases like this, specifically in the browser to see if I found any issue.

I hope my comment helps. And please let me know if I can help with further debugging.

andreadipalma commented 1 year ago

@danielduarte thank you so much for your time and the helpful insights, really appreciated!

I've double checked the point 1 about the different names, and tested again with the same name ... apparently I just captured the flows after some modifications including the name of the task (don't ask me why :-) but don't seem to sort any effect.

I'm starting to think that your point on the transpiler could be involved. The following are my configurations if you see something strange could help me. I'm doing also a further step to run the same logic on a node server based component (so no browser involved). Let you know. Thank you

{
  "compilerOptions": {
    "jsx": "react",
    "target": "es5",
    "module": "commonjs",
    "lib": ["ES6", "DOM"],
    "strict": true,
    "strictPropertyInitialization": false
  },
  "exclude": [
    "./node_modules"
  ]
}
{
    "extends": "./node_modules/pcf-scripts/tsconfig_base.json",
    "compilerOptions": {
        "typeRoots": ["node_modules/@types"],
        "esModuleInterop": true,
        "target": "ES6",
        "downlevelIteration": true
    }
}
andreadipalma commented 1 year ago

Hi @danielduarte just tested it on a server side nodejs server in javascript. The same identical flow definition now uses the async call now works correctly and doesn't mess up the flow execution. I think you where absolutely right in pointing to the typescript transpiling process to cause this symptom when running async calls in the flow. Please let me know if I can do some more verifications on the typescript side (see info provided in my previous), but for the moment I can move on with this. Thank you

danielduarte commented 1 year ago

Hi @andreadipalma, Great that you could move on after these issues, and also to know that it is indeed a transpiling issues. As soon as I have the time, I'll be reviewing in detail your last logs in the previous comments. Thanks.