GoogleCloudPlatform / workflows-demos

A collection of Workflows samples for various use cases
Apache License 2.0
130 stars 36 forks source link

Handling `application/protobuf` Payloads in Eventarc Events #128

Open yamankatby opened 5 months ago

yamankatby commented 5 months ago

Eventarc events, such as those from Cloud Firestore, utilize application/protobuf for their payloads. Could you provide guidance or examples on decoding and processing this format effectively? A sample or demonstration would be greatly appreciated.

ivanmartinvalle commented 2 months ago

Finally figured this out after hours of Google searching.

background

node, typescript, and express implementation

Below is a proof-of-concept implementation. DO NOT USE THIS IN PRODUCTION AS-IS. There's probably stuff you need to optimize with loading all protos the application needs, possibly bundling them. Probably also missing stuff with hydrating the CloudEvent object. Also not very pretty reusable.

import path from 'path'
import protobuf from 'protobufjs'
import { toDocumentEventData } from '@google/events/cloud/firestore/v1/DocumentEventData'

export const firestore = async (
  req: Request<never, never, Buffer>,
  res: Response,
): Promise<void> => {
  const cloudEvent = await decodeCloudEventFromProto(
    req,
    '../../proto',
    ['google/events/cloud/firestore/v1/data.proto', 'google/type/latlng.proto'],
    'google.events.cloud.firestore.v1.DocumentEventData',
    toDocumentEventData,
  )

  await processFirestoreEvent(cloudEvent)

  res.json({})
}

const decodeCloudEventFromProto = async <T>(
  req: Request,
  protoBaseDirectory: string,
  protos: string[],
  type: string,
  toFunction: (json: object) => T,
): Promise<CloudEvent<T>> => {
  const ceEntries = Object.entries(req.headers)
    .filter(([key]) => key.startsWith('ce-'))
    .map(([key, value]) => [key.replace('ce-', ''), value])

  const cloudEvent = new CloudEvent<T>(
    {
      ...Object.fromEntries(ceEntries)
    },
    false,
  )

  const decoded = await decodeProtoFromBuffer(
    protoBaseDirectory,
    protos,
    type,
    req.body,
    toFunction,
  )

  return cloudEvent.cloneWith({
    data: decoded,
  })
}

const decodeProtoFromBuffer = async <T>(
  protoBaseDirectory: string,
  protos: string[],
  type: string,
  buffer: Buffer,
  toFunction: (json: object) => T,
): Promise<T> => {
  const root = new protobuf.Root({})
  root.resolvePath = (origin, target): string | null => {
    if (target.startsWith('google/type')) {
      return path.resolve(__dirname, protoBaseDirectory, target)
    }

    return target
  }

  const files = protos.map((x) =>
    path.resolve(__dirname, protoBaseDirectory, x),
  )
  await root.load(files)

  const decoded = root.lookupType(type).decode(new Uint8Array(buffer))

  return toFunction(decoded)
}