PipedreamHQ / pipedream

Connect APIs, remarkably fast. Free for developers.
https://pipedream.com
Other
8.32k stars 5.27k forks source link

[FEATURE] I'd like to schedule a workflow to run at a specific time #404

Closed dylburger closed 2 years ago

dylburger commented 3 years ago

We frequently see use cases arise for scheduling one-time jobs at a specific timestamp. For example, you may want to send your users an email exactly 2 hours after they sign up - you need a way to schedule a specific message, at a specific timestamp. You could send a message to this service using a format like:

{
  "timestamp": "2020-08-21T04:29:00.951Z",
  "message": { "name": "Luke", "email": "luke@jedi.com" }
}

Note: you can run a task scheduling service like this in your own AWS account, and have it trigger a Pipedream workflow, using this Pipedream event source.

jverce commented 3 years ago

GCP also has a similar product that we can integrate. But would it be worth supporting this natively? Sounds like a common-enough use case.

dylburger commented 3 years ago

Yeah @jverce I'm actually working on a similar GCP version of the AWS solution (using Cloud Tasks). Significantly cheaper than the Step Functions solution. The only tradeoff is that you can only schedule tasks up to 30 days in the future, which I imagine is fine for most use cases.

And we have heard this use case enough to merit it being a first-class Pipedream feature, so we're planning to add this at some point.

nachocab commented 3 years ago

@dylburger Just curious if there is an estimated ETA for this and what the current recommendation is in the meantime:

  1. posthook
  2. AWS step function
  3. The task scheduler
dylburger commented 3 years ago

@nachocab no current ETA on the in-product solution.

I haven’t actually used Posthook, so can’t compare that accurately to the other offerings. The AWS Task Scheduler event source is relatively easy to configure as long as you have an AWS account - I’m happy to help you set it up or modify it to suit your needs. So you have the benefit of working 1:1 with the author on that one 😉.

Let me know what your exact use case is - our recommendation will depend on that.

dylburger commented 3 years ago

We've released an alpha version of this functionality in the product, and we're looking for feedback if y'all would like to help us test.

You can now pass a delivery_ts to the metadata of this.$emit() within an event source:

this.$emit(event, { delivery_ts: epoch_ms })

This will emit an event from your source at the epoch ms timestamp you pass to delivery_ts.

You can also create this event source, which exposes an HTTP API for scheduling new tasks. Once you create the source, take its HTTP endpoint and send a request of the following format to schedule a new task:

curl -X POST \
  -H 'Content-Type: application/json' \
  -H 'x-pd-secret: 123' \
  -d '{ "timestamp": "2020-09-18T04:40:59Z", "message": "foo" }' \
  https://endpoint.m.pipedream.net/schedule

The event source will emit the message at the specified ISO 8601 timestamp passed in the timestamp param.

Let us know if you have any feedback or questions!

MuaazTeladia commented 3 years ago

This is an awesome feature. Some thoughts: it would be great if we could cancel a scheduled execution that hasn't yet executed and retrieve a list of pending execution for a specific endpoint. An idea would be to respond with a unique id from the source and then we can use that unique id to query a specific execution. I have tested this out seems to be working perfectly, doing some more testing with longer time periods. So far so good - keep up all the awesome work!!!

dylburger commented 3 years ago

Thank you for the feedback @Muaazt ! Noted, we're thinking about this now.

dylburger commented 3 years ago

@Muaazt if you create a new Pipedream Task Scheduler source, v0.0.2 now returns an id from the /schedule endpoint. You can make an HTTP POST request with that id to the /cancel endpoint to cancel an outstanding task. See the docs for more info.

nachocab commented 3 years ago

@dylburger This feature works great. I just have two comments:

  1. Is the idea to create a source for every type of schedule event? Ex: scheduled-deletion-zoom-recordings, scheduled-tweets, scheduled-youtube-videos. That's fine. An alternative approach would be to include the workflow id (or even the step id) that should be triggered at a specific time.

  2. It would be great to get a list of scheduled events. If I trigger twenty of them, there's no easy way to get their corresponding ids, other than by going to each workflow instance, finding the step that scheduled them and looking and copying and pasting the step-exported id

dylburger commented 3 years ago

@nachocab definitely makes sense.

On (1), we support the ability for you to emit to different "named channels" via the name property of the emit metadata:

this.$emit(event, {
  name: "scheduled-zoom-deletion",
  id,
  delivery_ts,
})

You also have the ability, via the API, to subscribe another source or workflow to events emitted only on that named channel. So if you created the task scheduler source and wanted to emit different types of scheduled tasks to three different workflows, you could emit them to three different channels, then each workflow could subscribe to the respective channel like so:

curl "https://api.pipedream.com/v1/subscriptions?emitter_id=dc_your_task_schedule_id&listener_id=p_your_workflow_id&event_name=scheduled-zoom-deletion" \
  -X POST \
  -H "Authorization: Bearer <api_key>" \
  -H "Content-Type: application/json"

That tells the workflow to listen for emitted events from your source on the scheduled-zoom-deletion channel.

Right now you can't list those subscriptions or see them in the UI as a workflow trigger, but if you think something like that would work for you, I could add this functionality into the task scheduler, such that you could specify the channel where you'd like to schedule the message:

{
  "timestamp": "2020-08-21T04:29:00.951Z", // timestamp: an ISO 8601 timestamp
  "message": { "name": "Luke" } // message: any object or string
  "channel": "scheduled-zoom-deletion" // <- this is new
}

Then we could experiment, get your feedback, and figure out how to iterate to make this functionality more native over time.

Re (2), messages that have been scheduled, but haven't yet reached their delivery_ts, should be in a special $in queue. I'll also work on abstracting this via some endpoint (maybe GET /tasks/<channel>), but for now you can get the component ID of your task scheduler event source and run:

curl -H "Authorization: Bearer <api key>" https://api.pipedream.com/v1/sources/<source ID>/event_summaries/\?event_name\=$in
nachocab commented 3 years ago

Thanks, Dylan.

I think being able to select the "Scheduled Tasks" source as a trigger and have a dropdown to also select the channel would make a lot of sense in the UI. That, together with the new parameter to select the channel on $emit would cover most of the use cases I can think of.

nachocab commented 3 years ago

@dylburger Also, there seems to be an issue when creating new scheduled sources. I'm getting "Error in workflow" when sending the test event:

curl -d '{"message":"May the force be with you."}' \
  -H "Content-Type: application/json" \
  "https://a4c48ce164c715cf29273fe389133a3d.m.pipedream.net"

Error in workflow
dylburger commented 3 years ago

@nachocab currently test events for event sources are hardcoded globally, and aren't source-specific. Since that payload isn't of the format the task scheduler expects, and since the HTTP request is made to the / path and not the /schedule path, that test event isn't expected to work.

Just to confirm, if you send a request of the right format to the /schedule path, does that work?

We want to support test events that are bespoke to a given source, which should make this experience better in the future.

nachocab commented 3 years ago

@dylburger Yes, it works. I had just forgotten about the /schedule part because it wasn't in the URL. 😅

Also, I noticed that HTTP sources save the payload in event.body but schedule sources save it in event.message. Wouldn't it be better if they both used event.body?

dylburger commented 3 years ago

@nachocab I’ll think about that more. I’m partial to something generic like “message” because that’s used in other pub sub / task schedulers (e.g. AWS SNS and Google Cloud Tasks).

The HTTP sources use “body” because that holds special meaning in HTTP. So in general we prefer the terms that make sense for the protocol / context instead of trying to fit all sources into a consistent format. But this isn’t dogma.

nachocab commented 3 years ago

That makes sense. No need to change it if it follows existing conventions. 👍

nachocab commented 3 years ago

@dylburger I tried seeing my scheduled events using curl -H "Authorization: Bearer <api key>" https://api.pipedream.com/v1/sources/<source ID>/event_summaries/\?event_name\=$in for my source dc_AjuzN0 but I only got 5 old events like this one (September 25):

 {
            "id": "1601094784024-0",
            "indexed_at_ms": 1601094784024,
            "metadata": {
                "emit_id": "1i29O7rQuN4VTh2nw1eU8QUzRFt",
                "emitter_id": "dc_AjuzN0",
                "id": "dc92d910-eac0-4da2-a233-ef276ae92dc1",
                "name": "",
                "summary": "{\"timestamp\":\"2020-09-25T17:24:00+02:00\",\"message\":{\"email\":\"testing@gmail.com\"}}",
                "ts": 1601094784003
            }
        }
nachocab commented 3 years ago

@dylburger Ah. The $in was being interpreted as a variable \$in works

Although, this format isn't the most intuitive because I have to convert the epoch timestamps into ISO timestamps and I also can't search by message (all I have is the task id: 589ee897-cda5-44e0-90d0-a68b1300cbba that I get back when I schedule).

I can imagine that canceling a task that I scheduled several days ago might prove challenging if the queue has several entries.

        {
            "id": "1601634093084-0",
            "indexed_at_ms": 1601634093084,
            "metadata": {
                "delivery_ts": "1604071928000",
                "dispatch_id": "1601634093078-1",
                "emit_id": "1iJmVeS5cjJ7SzJO374bRFtk3Ff",
                "emitter_id": "dc_AjuzN0",
                "id": "589ee897-cda5-44e0-90d0-a68b1300cbba",
                "name": "self",
                "v": "1"
            }
        },
nachocab commented 3 years ago

Also, I just canceled task 589ee897-cda5-44e0-90d0-a68b1300cbba and got a 200 response, but when I check back in the $in queue, it's still there.

If I try to cancel it again, I get a 404. So I'm assuming this worked (but I won't really know until October 31st), but it's a bit confusing 😅

nachocab commented 3 years ago

@dylburger I was thinking that a useful workaround would be if I can add my own fields to the output I get from /event_summaries. I tried adding them to the metadata in $emit, but they're not showing up:

      // Scheduled tasks are emitted to the self channel, which is delivered
      // to this same deployed component at the specified delivery_ts
      const $id = uuid();
      this.$emit(
        { ...body, $channel: selfChannel, $id },
        {
          name: selfChannel,
          id: $id,
          delivery_ts: epoch,
          timestamp, // ISO date
          ...body.message, // message contents
        }
      );
dylburger commented 3 years ago

@nachocab btw once we launch paid plans, I'll be looking into the cancellation issues you were observing more.

Re: the emit metadata, we have considered the ability to add arbitrary metadata but as you mentioned, that won't work right now. /event_summaries should return the full message payload if you pass the expand=event query string parameter - see the docs. Does including the metadata in the message itself, and retrieving it from /event_summaries, work OK?

nachocab commented 3 years ago

@dylburger Thanks, Dylan. I wasn't aware I could use expand=event. Now I can clean it up with jq and get all I need:

http https://api.pipedream.com/v1/sources/$SOURCE_ID/event_summaries/\?event_name\=\$in\&expand\=event \
  Authorization:"Bearer $PIPEDREAM_KEY" | jq '[.data[].event] | map({timestamp, email:.message.email, id: ."$id"}) | sort_by(.timestamp)'
{
  "timestamp": "2020-10-11T20:26:43+00:00",
  "email": "EMAIL1",
  "id": "aa8a5b87-b4ce-4f65-ab6e-9fd4051d39ec"
}
{
  "timestamp": "2020-10-18T22:31:55+00:00",
  "email": "EMAIL2",
  "id": "f9ac76bd-a2d0-4152-a770-4c53cb1171cf"
}
...

Also, I don't know if I'm doing something wrong, but I just tested again and cancelling events doesn't seem to be working. For example:

I schedule a task two minutes from now and get this confirmation:

{
    "id": "abe84406-a638-440b-8c4e-e2d1d419dd52",
    "msg": "Successfully scheduled task"
}

I then do a POST request to the /cancel endpoint and get this confirmation:

{
    "msg": "Cancelled scheduled task for event abe84406-a638-440b-8c4e-e2d1d419dd52"
}

I still see the event in the queue and when 2 minutes pass, it gets emitted anyway. 🤷‍♂️

dylburger commented 3 years ago

@nachocab I can confirm and we’re looking into it.

nachocab commented 3 years ago

@dylburger Perfect. Let me know. I have a few things on hold until cancelling works 😁

nachocab commented 3 years ago

I don't know if it's relevant, but I've noticed that when I schedule an event and I query /event_summaries I see 4 copies of the same event. The only differences between the copies are the id, the dispatch_id and the indexed_at_ms

This is one of the 4 copies:

{
            "event": {
                "$channel": "self",
                "$id": "af1532d7-e4e1-4710-b342-4d3b379d6ad4",
                "message": {
                    "topic_id": 4490
                },
                "timestamp": "2020-11-01T14:00:00"
            },
            "id": "1604225253485-0", // THIS CHANGES
            "indexed_at_ms": 1604225253485, // THIS CHANGES
            "metadata": {
                "delivery_ts": "1604239200000", 
                "dispatch_id": "1604225253466-1", // THIS CHANGES
                "emit_id": "1jgUVlh7XBQTws5DpJJAS16Z5ZK",
                "emitter_id": "MY_SOURCE_ID",
                "id": "af1532d7-e4e1-4710-b342-4d3b379d6ad4",
                "name": "self",
                "v": "1"
            }
        },
dylburger commented 3 years ago

@nachocab a couple of questions:

I couldn't replicate on a new source so this'll help me dig in.

nachocab commented 3 years ago

Thanks for looking into this @dylburger

I created a 0.0.3 source 7 days ago (dc_bPuEVB) by clicking on the first link in the new readme. When I send an event there, I see 3 copies in the report.

This is the command I'm using:

http https://api.pipedream.com/v1/sources/dc_bPuEVB/event_summaries/\?event_name\=\$in\&expand\=event \
Authorization:"Bearer $PIPEDREAM_KEY"

Feel free to send any test events there. This is what I use http https://a5525b62449c39e9cb4285f59fb98406.m.pipedream.net/schedule timestamp="2020-12-01T23:00:00Z" message:='{"topic_id":"whatever"}'.

I just tried creating a new test source (dc_jku4kD6) and it appears like this issue is not happening there. 🤷‍♂️

Also, in terms of priorities, I'd care much more to have the /cancel option work 😅

dylburger commented 3 years ago

Need to also return a 400 error from the task scheduler HTTP API when I've reached my limit on outstanding scheduled tasks (currently 100)

wittawasw commented 2 years ago

This link to create new source from readme is now unusable. It's required "Connect a Pipedream account" field and the input itself is not respond to clicking. Therefore, can't create the source.

dylburger commented 2 years ago

@wittawasw That issue has been fixed. Try visiting https://pipedream.com/sources/new?key=pipedream-new-scheduled-tasks one more time.

@nachocab I believe we've resolved all of the outstanding issues above. Can you try creating the newest version of the source and check out the docs, and let me know if that works? If it doesn't, please open up a new issue with the specific errors.