ushahidi / SwiftRiver-Core

SwiftRiver Core Applications
6 stars 3 forks source link

Filters dev #16

Closed ekala closed 11 years ago

ekala commented 12 years ago

Post-processor for the second-level filters (ushahidi/Swiftriver_v2#188).

The drop processing pipeline has been adjusted to work as follows:

69mb commented 12 years ago

Looking good but I have not been able to test this end to end yet since there is an issue with the filters modal on the web front end side side.

I would suggest that we use RabbitMQ's rpc pattern(http://bit.ly/LBa7qS) for the drop queue between semantics, media extract and the post processor:

  1. When a drop comes to the dropqueue processor through the DROPLET_QUEUE, we assign it an internal id(could be sequential number, a guid...) and use this as a correlation id. We then store that drop into an in memory dict and publish to the metadata exchange immediately with an anonymous queue in the reply_to field.
  2. We do not put the drop's delivery_tag onto the confirm_queue yet so it remains pending for now. We can put the delivery_tag with the drop in the in memory dict.
  3. The media extractor, semantics... will do their work as usual and send the drop to the reply_to queue specified by the drop queue processor and also preserve the correlation id. The main change here is that the metadata processors do not put drops back on the DROPLET_QUEUE as they do currently but a separate callback queue specified by the drop queue processor.
  4. The drop queue processor will get the drop back on the callback queue, and using the correlation_id in the drop, get the drop from the in memory dict and update it with with the drop that has just come in.
  5. When semantics_complete and media_complete, put the drop onto the POST_PROCESS_QUEUE with the same correlation id and a separate call back queue. The post processor does its job, puts the drop back onto the queue.
  6. The drop queue processor gets the drop on the callback queue and now puts in on the publish queue for publishing to the drop api and put the delivery_tag on the confirm_queue for the DROPLET_QUEUE.

I think this way we only post once to the droplet api when the drop is completely processed. We can stop using the droplet_processed field(bad for the river query) in the droplets table because only processed drops get posted.

Also because we keep the drop pending in the DROPLET_QUEUE until all processing is complete, we can use a simple in memory dict to keep track of the drop's processing's stage and only confirm when all processing is done to proceed. DROPLET_QUEUE is already durable, so no need for redis to store the drop while its being processed.

ekala commented 12 years ago

I switched the workflow to use normal persistent queues. The workflow is as follows:

  1. Drops come arrive from the various channels on the DROPLET_QUEUE. We assign each drop a correlation_id and publish it on the metadata exchange.
  2. The metadata processors do their work and post the drop to a POST_PROCESS_STAGING_QUEUE. This queue consolidates drops as they arrive from the metadata extractors. Once fully received, the drop is posted to the POST_PROCESS_QUEUE
  3. The post processor posts the drops on DROPLET_QUEUE once the filters are applied
  4. The droplet queue processor receives the drops arriving from the post processor and queues them up for posting to the web application. At this stage we confirm all drops but we discard those that do not have a destination river id (those that did not match the filters for their respective rivers)
69mb commented 12 years ago

The DropFilter needs to to use a lock whenever the in memory drop_filters dict is referenced.

Also a couple of changes in #17 for this.

ekala commented 12 years ago

@69mb I've merged the instrumentation changes from master and added support for the named filters.