LD4P / sinopia_server

[Deprecated - switching to MongoDB] Sinopia Back-end CRUD Service. LDP-inspired, HTTP Server taking JSON-LD resources & administrative metadata.
Apache License 2.0
1 stars 1 forks source link

Write app for Fargate task that generates JSON for ElasticSearch indexing #46

Closed jmartin-sul closed 5 years ago

jmartin-sul commented 5 years ago

queue to be worked by indexing pipeline deployed in AWS as a Fargate service that:

  1. listens to a queue (AmazonMQ);
  2. parses resource URIs out of messages posted by Trellis;
  3. dereferences resource URIs, pulling back information about the resource from Trellis (in JSON-LD form); and
  4. indexes the JSON-LD in ElasticSearch

codebase can be whatever we're comfortable using, as long as we can turn our indexing app into a Docker container that we can deploy using Fargate. we already have an indexing pipeline codebase and container.

Alternative approach would be a Lambda, but Fargate tasks are not subject to the same timeout restrictions, and allow for more control over how the queue is worked by default (i.e., a Lambda task won't spin up by default for every SQS entry, though that also means scaling workers under heavy load would be more manual). since we're going with the daemonized approach, a Fargate service is more appropriate than a Lambda.

blocked by #55

mjgiarlo commented 5 years ago

Here's an interesting architecture that might work for us: https://github.com/aws-samples/aws-fargate-pipeline-lambda-autoscaling#background

TL;DR: It uses CloudWatch and Lambda together to monitor SQS and spawn an appropriate number of Fargate tasks.

Update: This may not be as simple as expected. See https://github.com/LD4P/sinopia_server/issues/47#issuecomment-462444654

mjgiarlo commented 5 years ago

Given https://github.com/LD4P/sinopia_server/issues/47#issuecomment-462466333, this suggests the work on #46 could take the form of (e.g.) a Ruby- or Node-based stomp client (example usage) that subscribes to a named queue—which I believe to be /queue/trellis by default, but this needs verification/confirmation—and loops over queue entries. Note that AmazonMQ/JMS queue entries may be read by a single client once and only once. There are numerous ways to go about doing this:

Wrote up the implementation options I had in mind, @jmartin-sul @jermnelson

jermnelson commented 5 years ago

If we go the Stomp route (instead of a Java client), we should consider a node.js stomp client as well.

mjgiarlo commented 5 years ago

@jermnelson I don't have a strong preference in this case. Node and Ruby are both fine choices for this component. Given all the new technologies our recent projects have been introducing into the Infrastructure team portfolio, I was thinking Ruby would be a defensible choice (since it's our lingua franca), but we've also already got a good amount of Javascript in our portfolio.

mjgiarlo commented 5 years ago

I can now spin up Trellis and ActiveMQ and subscribe to the /queue/trellis queue (in this using the Ruby stomp client). When I create a new child container (/repository/child2) within another container, the stomp client sees the following messages:

<Stomp::Message headers={"expires"=>"0", "destination"=>"/queue/trellis", "subscription"=>"b5bdf3ad5e839987336d21fcd8b6f0815d8cc678", "priority"=>"4", "message-id"=>"ID:31b622c80706-33549-1549921260931-1:1:1:1:3", "persistent"=>"true", "timestamp"=>"1549921646265", "Content-Type"=>"application/ld+json"} body='{"@context":"https://www.w3.org/ns/activitystreams","id":"urn:uuid:0a382608-0103-44b0-b0a7-4e7355277aca","type":["http://www.w3.org/ns/prov#Activity","Create"],"actor":["http://www.trellisldp.org/ns/trellis#AnonymousAgent"],"object":{"id":"http://localhost:8080/repository/child2","type":["http://www.w3.org/ns/ldp#BasicContainer"]},"published":"2019-02-11T21:47:26.264Z"}' command='MESSAGE' >
<Stomp::Message headers={"expires"=>"0", "destination"=>"/queue/trellis", "subscription"=>"b5bdf3ad5e839987336d21fcd8b6f0815d8cc678", "priority"=>"4", "message-id"=>"ID:31b622c80706-33549-1549921260931-1:1:1:1:4", "persistent"=>"true", "timestamp"=>"1549921646276", "Content-Type"=>"application/ld+json"} body='{"@context":"https://www.w3.org/ns/activitystreams","id":"urn:uuid:bfb24beb-839f-4f97-9c39-e12352501d42","type":["http://www.w3.org/ns/prov#Activity","Update"],"actor":["http://www.trellisldp.org/ns/trellis#AnonymousAgent"],"object":{"id":"http://localhost:8080/repository","type":["http://www.w3.org/ns/ldp#BasicContainer"]},"published":"2019-02-11T21:47:26.276Z"}' command='MESSAGE' >
mjgiarlo commented 5 years ago

Started playing with connecting these pieces together.

First, I create via curl a root container in Trellis with curl -i -X POST -H 'Content-Type: text/turtle; charset=UTF-8' -H 'Link: <http://www.w3.org/ns/ldp#BasicContainer>; rel="type"' -H 'Slug: repository' -d "@prefix dcterms: <http://purl.org/dc/terms/> .\n@prefix ldp: <http://www.w3.org/ns/ldp#> .\n<> a ldp:Container, ldp:BasicContainer;\n dcterms:title 'Repository Root' ." http://localhost:8080/

Then I create a bunch of child containers using curl commands similar to above.

And then I can process those messages and grab updated JSON from each object id:

require 'elasticsearch'
require 'faraday'
require 'stomp'

# A place to temporarily hold messages once popped off the queue
messages = []

# Create a client and subscribe to the default Trellis JMS queue, 
#   popping messages into the `messages` array
client = Stomp::Client.new('stomp+ssl://localhost:61616')
client.subscribe("/queue/trellis") { |msg| messages << msg }

# Loop over `Stomp::Message` instances, parse the body as JSON, 
#   and grab the object ids (these are Trellis URIs) and uniquify results.
# Note: these are ordered because both queues and arrays are ordered.
uris = messages.map { |message| JSON.parse(message.body)['object']['id'] }.uniq
# => ["http://localhost:8080/repository/child5",
#     "http://localhost:8080/repository",
#     "http://localhost:8080/repository/child6",
#     "http://localhost:8080/repository/child7",
#     "http://localhost:8080/repository/child8"]

# Loop over Trellis URIs and resolve them, requesting JSON format
uris.each do |uri|
  json = Faraday.get(uri, nil, { 'Accept' => 'application/ld+json' }).body
  # => {"@id":"http://localhost:8080/repository","contains":["http://localhost:8080/repository/child1","http://localhost:8080/repository/child2","http://localhost:8080/repository/child3","http://localhost:8080/repository/child4","http://localhost:8080/repository/child5","http://localhost:8080/repository/child6","http://localhost:8080/repository/child7","http://localhost:8080/repository/child8"],"@context":{"contains":{"@id":"http://www.w3.org/ns/ldp#contains","@type":"@id"},"schema":"http://schema.org/","owl":"http://www.w3.org/2002/07/owl#","xsd":"http://www.w3.org/2001/XMLSchema#","skos":"http://www.w3.org/2004/02/skos/core#","memento":"http://mementoweb.org/ns#","rdfs":"http://www.w3.org/2000/01/rdf-schema#","acl":"http://www.w3.org/ns/auth/acl#","geo":"http://www.w3.org/2003/01/geo/wgs84_pos#","dc11":"http://purl.org/dc/elements/1.1/","as":"https://www.w3.org/ns/activitystreams#","rdf":"http://www.w3.org/1999/02/22-rdf-syntax-ns#","ldp":"http://www.w3.org/ns/ldp#","time":"http://www.w3.org/2006/time#","prov":"http://www.w3.org/ns/prov#","dc":"http://purl.org/dc/terms/"}}

  # We have an updated Trellis JSON document; post it to ElasticSearch
  Elasticsearch::Client.new(log: true).index(index: 'sinopia', type: 'record', body: json)
end
mjgiarlo commented 5 years ago

I believe this issue is now closeable. We have the app and now just need to deploy it and the various components to AWS (for which there are other issues). Closing.