Open jamesthompson opened 7 years ago
Hi @jamesthompson. Sorry for my question that's unrelated to this issue, but it's hard to find any information about gogol, and I'm trying to evaluate whether to use it for my project. I'm curious if you're using gogol-pubsub
in production and if it worked for you.
@shinzui I'm not using gogol-pubsub in production and never persevered with this issue. I have however built and utilized the gogol core library to borrow the auth credential machinery (https://gist.github.com/jamesthompson/8c2cc62df78fa08e03d9828b84354cf4) and adapted that for use with http2-grpc-client. I haven't used any gogol sub-library outside of gogol-bigquery (this was a couple of years ago though: https://github.com/jamesthompson/bq-lib/blob/72c1a59a252b04cabfac6887a6f27b0290657d95/lib/bq-lib.cabal#L25-L27).
Thank you. I'm glad I asked because your gist is exactly what I'm looking for. I figured that I would need to write the client by hand for any serious production usage. How far did you get with your gist?
@shinzui FYI, I am using gogol-pubsub in prod with no issues.
Thank you for your reply, @AlistairB. I'm not sure I'm comfortable using the generated library in our event-driven architecture, where async messages are crucial for some business workflows. Official clients support backoff and other features that make subscribing to topics more reliable. I'm trying to figure out whether I can leverage part of gogol-pubsub
to write a client or go @jamesthompson's route and only use gogol for authentication. A completely different route is writing Haskell binding to the official GCP C++ client, but I don't have any experience writing bindings to a C++ library.
Ah I see. Yes I think that is fair. The generated library mirrors the rest api and doesn't have the additional functionality of other official libraries that you mention around async etc.
I guess your options are:
Re 2 + 3:
I don't really understand why you would do 2 over 3. You still ultimately need to make the required rest api calls to google pubsub. gogol-pubsub should get you closer and then you can just write the logic around async etc.
In my case I am doing option 4. I have pubsub push which is calling google cloud run services. In that case it handles the back off and other things for you. I think it is the simplest possible solution, but I believe an async pull technique has greater throughput if you need that.
I have another use case where I want to reprocess a series of pull subscriptions and then shutdown. In that case I am using conduit. Possible a bit hard to read out of context, but this code pulls from a dead letter queue, then writes the messages to the regular queue. The nice thing about conduit here is it will stream a constant number of 50 messages and not fetch more until they are processed. It doesn't handle running these tasks in parallel as I understand it.
(I am using fused-effects
with is the Has AppEventEmit sig m
and logging bit, but you can probably just ignore that)
import Conduit ((.|))
import qualified Conduit as C
import qualified Data.Conduit.Combinators as C
import qualified Network.Google as G
import qualified Network.Google.Auth.Scope as G
import qualified Network.Google.PubSub.Types as G
import qualified Network.Google.Resource.PubSub.Projects.Subscriptions.Acknowledge as G
import qualified Network.Google.Resource.PubSub.Projects.Subscriptions.Pull as G
import qualified Network.Google.Resource.PubSub.Projects.Topics.Publish as G
streamMessages ::
(G.AllowScopes s, G.HasScope' s PubSubRequiredScopes ~ 'True, MonadIO m, Has AppEventEmit sig m) =>
G.Env s ->
Text ->
Text ->
m ()
streamMessages env withSubId toTopicId =
C.runConduit $
allMessagesStream env withSubId
.| C.iterM (writeMessagesToRegularTopic env toTopicId)
.| C.iterM (ackMessages env withSubId)
.| C.sinkNull
-- can this instead use `yieldMany` and use some kind of batching mechanism? tried but failed
allMessagesStream :: (G.AllowScopes s, G.HasScope' s PubSubRequiredScopes ~ 'True, MonadIO m, Has AppEventEmit sig m) => G.Env s -> Text -> C.ConduitT () [G.ReceivedMessage] m ()
allMessagesStream env subId = do
lift $ emitAppEventInfo (AppEventMessage "Fetch messages")
messages <- liftIO $ pollForMessages env subId
lift $ emitAppEventInfo (AppEventMessage $ "Fetched message count " <> show (genericLength @Int messages))
case messages of
[] -> pure ()
xs -> C.yield xs *> allMessagesStream env subId
writeMessagesToRegularTopic ::
(G.AllowScopes s, G.HasScope' s PubSubRequiredScopes ~ 'True, MonadIO m, Has AppEventEmit sig m) =>
G.Env s ->
Text ->
[G.ReceivedMessage] ->
m ()
writeMessagesToRegularTopic env topicId messages = do
emitAppEventInfo (AppEventMessage "Write messages")
let request = createPublishRequest topicId messages
result <- liftIO $ (G.runResourceT . G.runGoogle env . G.send) request
emitAppEventInfoA (AppEventMessage "Wrote messages") (AppEventAdditional result)
ackMessages ::
(G.AllowScopes s, G.HasScope' s PubSubRequiredScopes ~ 'True, MonadIO m, Has AppEventEmit sig m) =>
G.Env s ->
Text ->
[G.ReceivedMessage] ->
m ()
ackMessages env subId messages = do
emitAppEventInfo (AppEventMessage "Ack messages")
let request = createAckRequest subId messages
result <- liftIO $ (G.runResourceT . G.runGoogle env . G.send) request
emitAppEventInfoA (AppEventMessage "Acked messages") (AppEventAdditional result)
pollForMessages :: (G.AllowScopes s, G.HasScope' s PubSubRequiredScopes ~ 'True) => G.Env s -> Text -> IO [G.ReceivedMessage]
pollForMessages env subId =
let request = G.projectsSubscriptionsPull createPullRequest subId
result = (G.runResourceT . G.runGoogle env . G.send) request
in result <&> \r -> r ^. G.prReceivedMessages
createPullRequest :: G.PullRequest
createPullRequest =
G.pullRequest
& G.prMaxMessages ?~ 50
createPublishRequest :: Text -> [G.ReceivedMessage] -> G.ProjectsTopicsPublish
createPublishRequest pubsubQueueTopicId messages = G.projectsTopicsPublish request pubsubQueueTopicId
where
messagesData =
case for messages (\m -> m ^. (G.rmMessage . _Just . G.pmData)) of
Just result -> result
Nothing -> error "unexpected message missing data"
asPubsubMessages =
messagesData <&> \md ->
G.pubsubMessage
& G.pmData ?~ md
request = G.publishRequest & G.prMessages .~ asPubsubMessages
createAckRequest :: Text -> [G.ReceivedMessage] -> G.ProjectsSubscriptionsAcknowledge
createAckRequest subId messages = G.projectsSubscriptionsAcknowledge request subId
where
ackIds =
case for messages (^. G.rmAckId) of
Just result -> result
Nothing -> error "unexpected message missing ack ids"
request = G.acknowledgeRequest & G.arAckIds .~ ackIds
type PubSubRequiredScopes =
'[ "https://www.googleapis.com/auth/cloud-platform",
"https://www.googleapis.com/auth/pubsub"
]
Anyway, bit of a random brain dump, but hopefully some of this is useful :)
Use pubsub push where the cloud infrastructure handles this for you.
I really need to benchmark that option. I didn't even consider it since throughput and performance are important in our architecture. Some user-visible features depend on the async messages to propagate through our system and a message to be pushed back to the web app through a WebSocket, so using push seems wrong.
I don't really understand why you would do 2 over 3. You still ultimately need to make the required rest api calls to google pubsub. gogol-pubsub should get you closer and then you can just write the logic around async etc.
Google, unfortunately, treats REST as second class, and I think the REST calls are going through a proxy to their gRPC servers. I figured I might as well use gRPC (GCP preferred method) if I'm going to handle the calls myself. A side benefit of going the gRPC route is updating to support newer features becomes easier since gogol seems to be updated infrequently.
Anyway, bit of a random brain dump, but hopefully some of this is useful :)
I appreciate you sharing the snippet, it might turn out useful since we're also using fused-effects
.
OIC, didn't realise there was a gRPC based api. Good to know. Anyway, good luck!
I've just made my first subscription pull request to PubSub for an existing subscription.
I noticed that my
Network.Google.PubSub.Types:PullRequest
fieldprMaxMessages
which is typed as aMaybe Int32
apparently is required to be a non optional value.https://github.com/brendanhay/gogol/blob/master/gogol-pubsub/gen/Network/Google/PubSub/Types/Product.hs#L559
Here's the ServiceError that got returned:
If there's anything I can do to help fix this I'd be happy to submit a PR. Thank you so much for this incredible work @brendanhay. I've said it before but this and
amazonka
are a tour de force in Haskell.