MarkEdmondson1234 / googleCloudRunner

Easy R scripts on Google Cloud Platform via Cloud Run, Cloud Build and Cloud Scheduler
https://code.markedmondson.me/googleCloudRunner/
Other
81 stars 26 forks source link

Question about Pub/Sub subscriptions, cr_plumber_pubsub() #119

Closed evanbiederstedt closed 2 years ago

evanbiederstedt commented 3 years ago

Hi @MarkEdmondson1234

Thanks for the excellent package!

I was hoping to clarify the documentation somewhat: I'm essentially trying to find an R equivalent to the SDKs/client libraries provided by GCP for Pub/Sub.

For instance, within Python, there's the functionality to subscribe to Pub/Sub messages with a streaming pull: https://cloud.google.com/pubsub/docs/pull https://googleapis.dev/python/pubsub/latest/index.html

from concurrent.futures import TimeoutError
from google.cloud import pubsub_v1

# TODO(developer)
# project_id = "your-project-id"
# subscription_id = "your-subscription-id"
# Number of seconds the subscriber should listen for messages
# timeout = 5.0

subscriber = pubsub_v1.SubscriberClient()
# The `subscription_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/subscriptions/{subscription_id}`
subscription_path = subscriber.subscription_path(project_id, subscription_id)

def callback(message):
    print(f"Received {message}.")
    message.ack()

streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path}..\n")

# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
    try:
        # When `timeout` is not set, result() will block indefinitely,
        # unless an exception is encountered first.
        streaming_pull_future.result(timeout=timeout)
    except TimeoutError:
        streaming_pull_future.cancel()

I have set up Pub/Sub to listen for the event OBJECT_FINALIZE when a new object in uploaded into a GCS bucket. I would like to pull this message, and run a callback function within R.

Based on the examples, there is the function cr_plumber_pubsub() which can accept Pub/Sub messages: https://code.markedmondson.me/googleCloudRunner/reference/cr_plumber_pubsub.html

# example function echos back pubsub message
pub <- function(x){
  paste("Echo:", x)
}

#' Receive pub/sub message
#' @post /pubsub
#' @param message a pub/sub message
function(message=NULL){
  googleCloudRunner::cr_plumber_pubsub(message, pub)
  }

Would this function work equivalently as the Python code above? I'm not using this within a Cloud Run set up.

Thank you for any insight, and I appreciate the help. Best, Evan

MarkEdmondson1234 commented 3 years ago

Good question, that function cr_plumber_pubsub() parses the format that Pub/Sub creates when its triggered via a subscription. See the code here: https://github.com/MarkEdmondson1234/googleCloudRunner/blob/7f5a980ffc5de1372bd9871bfa24a41b5a1fd279/R/plumber.R#L40-L53

That could include the event name OBJECT_FINALIZE and then your callback function would trigger that would be the parse_f in the example above I think.

How to subscribe to streaming HTTP is beyond my R skills, (I think it uses RPC) but the JSON API is what all the R libraries wrap - see https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.subscriptions/pull

I think that is actually sufficient for the majority of pub/sub cases, I don't know but perhaps the python client simply starts a HTTP server that receives the Pub/Sub and parses it much the like Cloud Run example

Making a full R library for pub/sub is something that would be useful perhaps, and could be done using googleAuthR/gargles function generators i.e. this auto-generated one https://github.com/MarkEdmondson1234/autoGoogleAPI/tree/master/googlepubsubv1.auto

evanbiederstedt commented 3 years ago

Thanks for the help, @MarkEdmondson1234

How to subscribe to streaming HTTP is beyond my R skills, (I think it uses RPC) but the JSON API is what all the R libraries wrap - see https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.subscriptions/pull

Thanks, that makes sense. My question about creating R libraries that interact with the REST API would be if there are performance issues that are avoided with the streaming feature.....I haven't tried it out, so I don't have any priors here.

I don't know but perhaps the python client simply starts a HTTP server that receives the Pub/Sub and parses it much the like Cloud Run example

It could be worth trying out....I'll update you if check this. In my case, perhaps it's just easier to create an HTTP server and parse the messages using a somewhat similar approach above...

Making a full R library for pub/sub is something that would be useful perhaps, and could be done using googleAuthR/gargles function generators i.e. this auto-generated one https://github.com/MarkEdmondson1234/autoGoogleAPI/tree/master/googlepubsubv1.auto

This does look promising....might be possible to structure this into a more formal package.

MarkEdmondson1234 commented 2 years ago

There is now an R Pub/Sub library by Andrea! https://CRAN.R-project.org/package=googlePubsubR

GitHub: https://github.com/andodet/googlePubsubR/issues