ATTX-project / provenance-service

ATTX Provenance service for exposing provenance related information.
1 stars 1 forks source link

ProvenanceService: Consume messages from the provenance queue #4

Closed jkesanie closed 7 years ago

jkesanie commented 7 years ago

Description

Connect to the queue as durable consumer using STOMP(?) and receive messages and log them.

DoD

Messages are being consumed.

Testing

blankdots commented 7 years ago

Used http://nikipore.github.io/stompest/async.html and the Activemq image from https://github.com/ATTX-project/distribution-component/issues/12 In order to test we designed a producer:

import json
import logging

from twisted.internet import defer, task

from stompest.config import StompConfig

from stompest.async import Stomp
from stompest.async.listener import ReceiptListener

data_object = {
    "provenance": {
        "context": {
          "workflowID": "ingestionwf",
          "activityID": 1,
          "step": "describeExternalDS"

        },
        "agent": {
          "ID": "UV",
          "role": "ETL"      
        },  
        "activity": {
            "title": "Ingestion workflow",
            "type": "DescribeStepExecution",
            "startTime": "2017-08-02T13:52:29+02:00",
            "endTime": "2017-08-02T13:52:29+02:00"
        },
        "output": {
              "outputDataset": {
                "role": "Dataset"
              }
        }        
    },
    "payload": {
        "outputDataset": {
            "uri": "attx://ds/1",
            "title": "Harvested dataset",
            "description": "",
            "publisher": "UH",
            "license": "http://cc/0"            
        }        
    }
}

class Producer(object):
    QUEUE = '/queue/provenance.inbox'

    def __init__(self, config=None):
        if config is None:
            config = StompConfig('tcp://localhost:61613')
        self.config = config

    @defer.inlineCallbacks
    def run(self, _):
        client = Stomp(self.config)
        yield client.connect()
        client.add(ReceiptListener(1.0))
        for j in range(1000):
            yield client.send(self.QUEUE, json.dumps(data_object).encode(), receipt='message-%d' % j)
        client.disconnect(receipt='bye')
        yield client.disconnected # graceful disconnect: waits until all receipts have arrived

if __name__ == '__main__':
    logging.basicConfig(level=logging.DEBUG)
    task.react(Producer().run)