resonatehq / resonate

a dead simple programming model for the cloud
https://www.resonatehq.io
Apache License 2.0
343 stars 26 forks source link

Bug on pending promise. Resonate server just completely fail (to stop the `serve` process and to process the promise) #299

Closed nicolasmelo1 closed 4 months ago

nicolasmelo1 commented 4 months ago

Expected Behavior

1 - Start worker, start resonate and start server 2 - If there are pending promises the resonate server calls the worker. 3 - The workers calls the resonate server for claiming the task, this alone should just work.

Actual Behavior

1 - Start worker, start resonate and start server 2 - If there are pending promises the resonate server calls the worker. 3 - The workers calls the resonate server for claiming the task. 4 - The fetch request for claiming the task fails and everything gets frozen.

To Reproduce

  1. $ pnpm init
  2. $ pnpm i @resonate/sdk express
    $ pnpm install --save-dev @types/express @types/node ts-node typescript
  3. Create src folder, add both an index.ts and a worker.ts

    • Contents of index.ts:
      
      import express, { Request, Response } from "express";
      import { Resonate } from "@resonatehq/sdk/dist/async";
      import { Context } from "@resonatehq/sdk";

/**

// Initialize a Resonate application. const resonate = new Resonate({ url: "http://localhost:8001" });

// Register a function as a Resonate function resonate.register( "downloadAndSummarize", downloadAndSummarize, resonate.options({ timeout: Number.MAX_SAFE_INTEGER }) );

// Start the Resonate application resonate.start();

// Initialize an Express application. const app = express().use(express.json());

// Register a function as an Express endpoint app.post("/summarize", async (req: Request, res: Response) => { const url = req.body?.url; try { // Call the resonate function let summary = await resonate.run( "downloadAndSummarize", / id / summarize-${url}, / param / url ); res.send(summary); } catch (e) { res.status(500).send("An error occurred."); } });

// Start the Express application app.listen(3000, () => { console.log("Listening on port 3000"); });


  - Contents of *worker.ts*:
```typescript
import express, { Request, Response } from "express";

const app = express();

app.use(express.json());

app.post("/", (req: Request, res: Response) => {
  console.log("Task received", req.body);
  try {
    process(req.body).catch((error) =>
      console.error("Error while processing task", error)
    );
    res.status(200);
  } catch (e) {
    console.error("Error", e);
    res.status(500);
  }
});

async function process(request: any): Promise<void> {
  const {
    taskId,
    counter,
    links: { claim: claimUrl, complete: completeUrl },
  } = request;

  console.log("Claiming task", taskId, counter);

  // Claim the task
  let response = await fetch(claimUrl, {
    method: "POST",
    headers: { "Content-Type": "application/json" },
    body: JSON.stringify({
      taskId,
      counter,
      processId: "process-id",
      executionId: "execution-id",
      expiryInMilliseconds: 60,
    }),
  });

  if (!response.ok) {
    throw new Error(`Error claiming task: ${response}`);
  }

  let data = await response.json();

  console.log("Task claimed", response);

  // BUSINESS LOGIC
  let summary = "This is a summary of the text";
  // BUSINESS LOGIC

  // Complete the task

  console.log("Completing task", taskId, counter);

  response = await fetch(completeUrl, {
    method: "POST",
    headers: { "Content-Type": "application/json" },
    body: JSON.stringify({
      taskId,
      counter,
      executionId: "execution-id",
      state: "resolved",
      value: {
        headers: { "Content-Type": "application/json" },
        data: Buffer.from(JSON.stringify(summary)).toString("base64"),
      },
    }),
  });

  if (!response.ok) {
    throw new Error(`HTTP error! status: ${response.status}`);
  }

  data = await response.json();

  console.log("Task completed", taskId, counter);
}

const PORT = 5001;
app.listen(PORT, () => {
  console.log(`Starting worker on port ${PORT}\n`);
});
  1. Add this to your package.json:

    {
    "scripts": {
      "dev": "ts-node src/index.ts",
      "worker": "ts-node src/worker.ts"
    },
    }
  2. Run each command on a distinct terminal instance or use something like concurrenty:

    $ pnpm run dev
    $ pnpm run worker
    $ resonate serve
  3. Ctrl ^ C on the pnpm run dev terminal instance, fully stop it, let resonate run and the worker run. Change both lines on index.ts:

    export async function downloadAndSummarize(context: Context, url: string) {
    // Summarize the content on a node with a gpu
    console.log("url", url);
    - let summary = await context.run(`/gpu/summarize/summarize-${url}`, url);
    + let summary = await context.run(`/gpu/summarize/summarize1-${url}`, url);
    // Return the summary of the content
    return summary;
    }

and

app.post("/summarize", async (req: Request, res: Response) => {
  const url = req.body?.url;
  try {
    // Call the resonate function
    let summary = await resonate.run(
      "downloadAndSummarize",
-     /* id */ `summarize-${url}`,
+    /* id */ `summarize1-${url}`,
      /* param */ url
    );
    res.send(summary);
  } catch (e) {
    res.status(500).send("An error occurred.");
  }
});
  1. Rerun pnpm run dev.

  2. Curl the API again with

    $ curl \
    -X POST \
    -H 'Content-Type: application/json' \
    -d '{"url": "http://example.com"}' \
    http://localhost:3000/summarize
  3. Repeat process 6, 7 and 8. Change where it's summarize1 to summarize2, .... summarizeN. You'll see that eventually it gets fully frozen, it doesn't send to the worker. So it piles up.

  4. Now that it never sends to the worker, fully stop the resonate server, and resonate worker.

  5. Run them again. First the worker, then the server.

  6. You'll get the following on the worker console.

    Task received {
    queue: 'analytics',
    taskId: '/gpu/summarize/summarize4-http://example.com',
    counter: 763,
    links: {
    claim: 'http://localhost:8001/tasks/claim',
    complete: 'http://localhost:8001/tasks/complete'
    }
    }
    Claiming task /gpu/summarize/summarize4-http://example.com 763
    Error while processing task TypeError: fetch failed
    at Object.fetch (node:internal/deps/undici/undici:11576:11)
    at async process (/Users/nicolasmelo/workspace/resonate-world/src/worker.ts:30:18) {
    cause: SocketError: other side closed
      at Socket.onSocketEnd (node:internal/deps/undici/undici:9790:26)
      at Socket.emit (node:events:526:35)
      at Socket.emit (node:domain:489:12)
      at endReadableNT (node:internal/streams/readable:1376:12)
      at processTicksAndRejections (node:internal/process/task_queues:82:21) {
    code: 'UND_ERR_SOCKET',
    socket: {
      localAddress: '::1',
      localPort: 50692,
      remoteAddress: '::1',
      remotePort: 8001,
      remoteFamily: 'IPv6',
      timeout: undefined,
      bytesWritten: 387,
      bytesRead: 0
    }
    }
    }

And on the resonate server you shall receive:

time=2024-04-25T10:43:20.807-03:00 level=INFO msg="connection http::http://localhost:5001 submitted task /gpu/summarize/summarize4-http://example.com"
time=2024-04-25T10:43:20.858-03:00 level=INFO msg="connection http::http://localhost:5001 received task /gpu/summarize/summarize4-http://example.com"
time=2024-04-25T10:43:20.860-03:00 level=ERROR msg="connection http::http://localhost:5001 failed to complete task with error: \"Post \\\"http://localhost:5001\\\": dial tcp [::1]:5001: connect: connection refused\""
time=2024-04-25T10:43:20.860-03:00 level=INFO msg="connection http::http://localhost:5001 submitted task /gpu/summarize/summarize4-http://example.com"
time=2024-04-25T10:43:20.906-03:00 level=INFO msg="connection http::http://localhost:5001 received task /gpu/summarize/summarize4-http://example.com"
time=2024-04-25T10:43:20.908-03:00 level=ERROR msg="connection http::http://localhost:5001 failed to complete task with error: \"Post \\\"http://localhost:5001\\\": dial tcp [::1]:5001: connect: connection refused\""
time=2024-04-25T10:43:20.908-03:00 level=INFO msg="connection http::http://localhost:5001 submitted task /gpu/summarize/summarize4-http://example.com"
time=2024-04-25T10:43:20.951-03:00 level=INFO msg="connection http::http://localhost:5001 received task /gpu/summarize/summarize4-http://example.com"
time=2024-04-25T10:43:21.009-03:00 level=INFO msg="http: panic serving [::1]:50692: unknown status code 4035\ngoroutine 8546 [running]:\nnet/http.(*conn).serve.func1()\n\t/Users/runner/hostedtoolcache/go/1.21.9/x64/src/net/http/server.go:1868 +0xb0\npanic({0x102c90680?, 0x14000131c90?})\n\t/Users/runner/hostedtoolcache/go/1.21.9/x64/src/runtime/panic.go:920 +0x26c\ngithub.com/resonatehq/resonate/internal/kernel/t_api.ResponseStatus.String(0x30?)\n\t/Users/runner/work/resonate/resonate/internal/kernel/t_api/status.go:70 +0x228\ngithub.com/resonatehq/resonate/internal/api.HandleRequestError(0x14000298500?)\n\t/Users/runner/work/resonate/resonate/internal/api/errors.go:97 +0x28\ngithub.com/resonatehq/resonate/internal/app/subsystems/api/service.(*Service).ClaimTask(0x140003561a0, 0x1400079fdd0, 0x1400016e040)\n\t/Users/runner/work/resonate/resonate/internal/app/subsystems/api/service/task.go:53 +0x274\ngithub.com/resonatehq/resonate/internal/app/subsystems/api/http.(*server).claimTask(0x140000b84e0, 0x140007ea300)\n\t/Users/runner/work/resonate/resonate/internal/app/subsystems/api/http/task.go:27 +0xac\ngithub.com/gin-gonic/gin.(*Context).Next(...)\n\t/Users/runner/go/pkg/mod/github.com/gin-gonic/gin@v1.9.1/context.go:174\ngithub.com/resonatehq/resonate/internal/app/subsystems/api/http.(*server).log(0x140003dac60?, 0x140007ea300)\n\t/Users/runner/work/resonate/resonate/internal/app/subsystems/api/http/http.go:95 +0x48\ngithub.com/gin-gonic/gin.(*Context).Next(...)\n\t/Users/runner/go/pkg/mod/github.com/gin-gonic/gin@v1.9.1/context.go:174\ngithub.com/gin-gonic/gin.(*Engine).handleHTTPRequest(0x140004eed00, 0x140007ea300)\n\t/Users/runner/go/pkg/mod/github.com/gin-gonic/gin@v1.9.1/gin.go:620 +0x524\ngithub.com/gin-gonic/gin.(*Engine).ServeHTTP(0x140004eed00, {0x102df6190?, 0x140000ce2a0}, 0x1400013ef00)\n\t/Users/runner/go/pkg/mod/github.com/gin-gonic/gin@v1.9.1/gin.go:576 +0x1a0\nnet/http.serverHandler.ServeHTTP({0x102df3740?}, {0x102df6190?, 0x140000ce2a0?}, 0x6?)\n\t/Users/runner/hostedtoolcache/go/1.21.9/x64/src/net/http/server.go:2938 +0xbc\nnet/http.(*conn).serve(0x14000174900, {0x102df8040, 0x140006181b0})\n\t/Users/runner/hostedtoolcache/go/1.21.9/x64/src/net/http/server.go:2009 +0x518\ncreated by net/http.(*Server).Serve in goroutine 50\n\t/Users/runner/hostedtoolcache/go/1.21.9/x64/src/net/http/server.go:3086 +0x4cc"
  1. The resonate server gets fully frozen. Once you try hitting Ctrl + C it logs:
    time=2024-04-25T10:46:30.856-03:00 level=INFO msg="http: panic serving [::1]:50901: system is shutting down\ngoroutine 98520 [running]:\nnet/http.(*conn).serve.func1()\n\t/Users/runner/hostedtoolcache/go/1.21.9/x64/src/net/http/server.go:1868 +0xb0\npanic({0x102d1ef60?, 0x140000e6b40?})\n\t/Users/runner/hostedtoolcache/go/1.21.9/x64/src/runtime/panic.go:920 +0x26c\ngithub.com/resonatehq/resonate/internal/app/subsystems/api/http.(*server).searchPromises(0x140000b84e0, 0x140007ea000)\n\t/Users/runner/work/resonate/resonate/internal/app/subsystems/api/http/promise.go:65 +0x358\ngithub.com/gin-gonic/gin.(*Context).Next(...)\n\t/Users/runner/go/pkg/mod/github.com/gin-gonic/gin@v1.9.1/context.go:174\ngithub.com/resonatehq/resonate/internal/app/subsystems/api/http.(*server).log(0x140003daa00?, 0x140007ea000)\n\t/Users/runner/work/resonate/resonate/internal/app/subsystems/api/http/http.go:95 +0x48\ngithub.com/gin-gonic/gin.(*Context).Next(...)\n\t/Users/runner/go/pkg/mod/github.com/gin-gonic/gin@v1.9.1/context.go:174\ngithub.com/gin-gonic/gin.(*Engine).handleHTTPRequest(0x140004eed00, 0x140007ea000)\n\t/Users/runner/go/pkg/mod/github.com/gin-gonic/gin@v1.9.1/gin.go:620 +0x524\ngithub.com/gin-gonic/gin.(*Engine).ServeHTTP(0x140004eed00, {0x102df6190?, 0x14000228620}, 0x14000258500)\n\t/Users/runner/go/pkg/mod/github.com/gin-gonic/gin@v1.9.1/gin.go:576 +0x1a0\nnet/http.serverHandler.ServeHTTP({0x140007bc090?}, {0x102df6190?, 0x14000228620?}, 0x6?)\n\t/Users/runner/hostedtoolcache/go/1.21.9/x64/src/net/http/server.go:2938 +0xbc\nnet/http.(*conn).serve(0x14000422000, {0x102df8040, 0x140006181b0})\n\t/Users/runner/hostedtoolcache/go/1.21.9/x64/src/net/http/server.go:2009 +0x518\ncreated by net/http.(*Server).Serve in goroutine 50\n\t/Users/runner/hostedtoolcache/go/1.21.9/x64/src/net/http/server.go:3086 +0x4cc"

but never really stops the process.

resonate.yml contents

# api:
#   baseUrl: http://example.com

aio:
  subsystems:
    queuing:
      config:
        connections:
          - kind: http
            name: summarize
            metadata:
              properties:
                url: http://localhost:5001

        routes:
          - kind: pattern
            name: default
            target:
              connection: summarize
              queue: analytics
            metadata:
              properties:
                pattern: /gpu/summarize/*

Specifications

Additional context

dfarr commented 4 months ago

Thank you so much for the detailed report Nicolas! We dug into this and what we discovered is there are actually two bugs that worked together to make a bad situation worse :D

The first bug (and the quickfix) was a bug in our quickstart code, specifically in the worker.

app.post('/', (req: Request, res: Response) => {
  console.log("Task received", req.body);
  try {
    process(req.body).catch(error => console.error("Error while processing task", error));
    res.status(200);
  } catch (e) {
    console.error("Error", e);
    res.status(500);
  }
});

The problem is res.status does not actually complete the http request, we're missing send. If you change the code to the following, your example should work 🤞

app.post('/', (req: Request, res: Response) => {
  console.log("Task received", req.body);
  try {
    process(req.body).catch(error => console.error("Error while processing task", error));
    res.status(200).send();
  } catch (e) {
    console.error("Error", e);
    res.status(500).send();
  }
});

The second bug was in our server, we forgot to set a timeout for these http requests. If no timeout is specified golang defaults to waiting for forever. Furthermore, by default our server has only a single worker to handle these http requests (this is configurable) and if that worker is waiting forever, then no subsequent tasks will be sent to workers (please note that other parts of our server were unaffected). This waiting is the reason the server would not shutdown. We will remedy this by adding a timeout, thanks for helping us find this!