wellcometrust / reach

Wellcome tool to parse references scraped from policy documents using machine learning
MIT License
25 stars 4 forks source link

Reach Architecture Proposal #419

Closed jdu closed 4 years ago

jdu commented 4 years ago

The following is an overarching architecture proposal for Reach, this does not include application architecture and mainly addresses how applications are deployed and how data is managed within that deployment architecture.

The overarching idea is to resolve a number of problems we are approaching or have now through some architectural changes that will either address those problems directly or support the changes we need to make in order to address those problems.

Identified Problems

  1. Scaling is hard for tasks in the airflow system. We can scale the worker encapsulating all tasks (including having more workers, increased resources) but we lack the ability to scale more granularly at the task level which is more approriate in reaches case.
  2. Change is transactional, we're relyong on large bulk updates which are resource-intensive including wiping whole indexes while we re-import which is resource-intensive and write-heavy.
  3. Airflow and having a large overarching DAG is proving unstable and hard to maintain in a lot of circumstances and developing and debugging individual tasks in isolation is cumbersome and becoming hard to test locally.
  4. Deployments require overarching all-or-nothing builds and deployments of the entire reach system topology.
  5. Heavy-interdependence: A single failing service can bring down the entire system, resulting in a high level of fragility. Logical components are not sufficiently isolated.
  6. Container sizes are getting massive to the point where rolling out a deployment has high lag to change while the cluster downloads large-scale images onto nodes.
  7. Elastic doesn't support the complexity of querying that we want to achieve in order to power the frontend and the workarounds to do so are resource-intensive.
  8. The DAG is an all or nothing run, if any one task has issues the entire sub-tree for that DAG is orphaned. Recovering from this is immensely hard. The DAG can not recover from large overarching errors easily in order to continue processing data.
  9. Data models (as in entities not model files) are embedded in the DAG workflow and implementing model change requires refactoring every task in the pipeline.
  10. Integrating metrics into individual components is tricky and we don't get granular insight into how and individual task is running in terms of resources (is it I/O heavy, memory heavy, CPU intensive) we only see what the workers utilization is that encapsulates the sub-process.
  11. Log aggregation of critical processes inside the workflow in k8s is complicated and a hard problem to solve that shouldn't need solving.
  12. The current setup lacks composability, we can not reconfigure how we deploy when better options come along.
  13. Models and resource-intensive data science work are constrained by context they're being run within and require jerry-rigging to get working in some instances
  14. Using JSON as a backing store for some of the workflow tasks is proving problematic (high resource usage, manipulation of static files) now that we need more complex querying into available data.
  15. Recoverability of a process is lacking, a failure in a DAG rcan require a complete re-run of the DAG from start to finish. There is no pipeline state that can be resumed after a fix has been implemented and deployed for a given process.
  16. Loss of data stores at points in the data workflow can break or create inconsistent data throughout.
  17. Dependency management is becoming a cludge as everything required is bundled into a single container image resulting in a masssive resultant image and potentially dependency resolution problems firther down the road when aspects of the system become more complex.

Proposed Solution

The following should have minimal effect on the actual code as implemented and can be looked on more as a re-organisation of the code that we have in order to provide a more composable, maintainable and scaleable deployment architecture. It will require mainly code changes around passing of data but not changes in how that data is operated on or produced.

There are two main major changes within this proposal:

  1. Move to postgres as a backing store
  2. Strip out most of the intensive tasks from airflow and isolate them as Consumers behind a Kafka message bus

These two overarching changes gain us the following:

  1. The ability to deploy targeted updates to individual tasks (ExtractRefs, Matching, etc...) without having to refactor entire DAG pipelines
  2. The ability to granularly target scaling individual processes which require more resources than other both horizontally and vertically.
  3. The ability to A/B test changes to a given process
  4. The ability to easily add on new pipeline steps without refactoring the DAG pipeline
  5. The capability to isolate target scrapers into individual DAGs in order to isolate and recover from failure
  6. This brings us more in line with an eventually consistent system model where our data set grows and becomes more robust over time instead of being rebuilt on each run.
  7. Overall reduces resource consumption
  8. Reduces overall container images sizes
  9. Makes working on and debuggin a single sub-process or logical section of the application easy in local and remote environments
  10. Allows more granular collection of metrics about resource usage and context-specific metrics
  11. Removes the requirement for a full-scale drop and re-insert for the indexes
  12. Reduces our reliance on S3 and JSON as a backing store which are I/O and network intensive.
  13. The ability to push data from Reach into other systems (RoRi) very easily.
  14. The ability to subsume data from other systems easilyA
  15. Better Fault-tolerance. If no daemons are available to Consume a specific topics messages, Kafka will backlog the messages until a Consumer becomes available, this will limit data loss and allow us to address issues in the pipeline without triggering the overall / overarching process to restart from scratch.

Wellcome - Reach Deployment

New / Updated Components

Kafka

Kafka is a distributed message queue / bus that uses topics and a pub/sub model to distribute messages to "Consumers" from "Producers". A given Producer will send data to a specific Topic and that data will be distributed to one of any number of Consumers for that topic. A Consumer can be a background service, database, third-party system, etc... and can be a stop-point, or send its results on to another topic to continue a larger process.

Kafka is well-known for it's ability to scale for large data pipeline systems and is in use across a variety of different organizations for their data pipelines. I (JDU) have personally used it and a similar service called RabbitMQ in a number of projects with very positive effect against the goals outlined above.

Postgres

Postgres is a relational database system which uses a non-conforming SQL implementation. It has a number of features which make it a great platform for data analysis and data warehousing as well as querying and search.

Postgres has TSVector support for Full-Text search operations and powers some extremely large search systems across the globe. It's replication is easily managed and the system can be scaled easily horizontally and vertically by implementing read replicas and advanced partitioning.

Pg can scale to a large number of highly concurrent write operations against a master and replicate in real-time to any read replicas attached to the main master. As well as this it has highly performant export operations through COPY TO / FROM and supports embedded pl/python trigger syntax allowing you to write SQL triggers in python including importing external packages such as pandas and numpy.

ONNX and ONNX Runtime

Note: ONNX Runtime is not currently production-ready.

These are in consideration for further down the line. The ONNX team is working on coming up with a runtime to ease the deployment of models in production environments. This would allow the DS team to update models by deploying them to ONNX.

The idea in this case would for heavy model-based tasks/processes we replace the daemon with a facade task which calls into ONNX to the specific model instead of processing the data against the model inside of the daemon. These facade services could be generalized and be based on a single set of code and which topics to consume / push to as well as which model to query and formatting constraints can be declared as configuration.

Additional Note ONNX seems to have a lot of industry behind it pushing it forward including AWS and Azure, but there may be alternatives out there which make more sense.

Dedicated API service

Splitting the API endpoint out from the web application will allow us the following:

  1. More granular resource management between the web application and the API
  2. Ability for the API syntax to diverge from the web applications querying syntax
  3. Ability to scale the service by deploying additional pods behind a load balancer

Changes to Airflow

The main change in airflow would be the splitting up of the current DAG, removal of tasks after the NameNormalization Task for each process. We would then split the individual scraper target DAG tasks into individual DAGs. One for each target. This allows us the ability to selectively disable targets which are being problematic, and isolate debugging for a given target without having to run and overarching, all-encompassing DAG.

The individual scraper DAGS would be tasked with scraping the targets, downloading PDFs into S3 and pushing out a stream of policy documents to be processed to the PolicyTopic in Kafka. All stages after the TitleNormalization would occur as orchestrated tasks where Kafka routes individual results to daemon services subscribed to those topics.

In addition to this, the importing and managing of EPMC metadata would be moved to the default namespace Airflow in order to reduce duplication of effort. The EPMC metadata DAG in the default Airflow would be tasked with getting the latest EPMC metadata and UPSERTing it to postgres as well as storing it in S3 in common formats needed for different processes and made available in S3 to the entire cluster.

jdu commented 4 years ago

On the A/B testing side of this, what I mean is that for instance let's say we have an updated Matcher that we want to test out. Instead of copying the entire DAG or retrofitting the task into the current DAG as a secondary task, we can encapsulate that matcher in a daemon, deploy it and have it subscribe to the PolicyTopic in Kafka and it will start to receive policy documents as they come into the system alongside the current production matcher daemon. So we can test the new matcher on live data coming into the system.

jdu commented 4 years ago

Yet another bonus to this, is if we can get metrics feeding properly into prometheus as well as getting grafana connected to postgres, kafka, airflow, etc..., we can do a fair level of validation from Grafana, for instance querying and visualizing data from from the postgres tables, from kafkas message bus metrics, and any metrics we collect from the individual daemons.

nsorros commented 4 years ago

I am not very sure about ONNX, note that not all our models our neural networks or big. We can more easily deploy our models behind an API than support deploying on ONNX I think so why not just do that? AWS has a way to do that via sagemaker as far as i know but similarly, we can package models behind Flask and ship them either into a custom pod or elastic beanstalk etc.

jdu commented 4 years ago

@nsorros The ONNX is more a "something we can evaluate to see if it's worthwhile once it's more stable" but you've described the general gist of what's happening above, instead of it being an HTTP API service, it's a daemon which has data passed to it through Kafka.

This is what I mean by composability, with that logic isolated, it can be deployed in a pod to the cluster, to SageMaker, Lambda, wherever and Kafka can route data to it.

nsorros commented 4 years ago

@nsorros The ONNX is more a "something we can evaluate to see if it's worthwhile once it's more stable" but you've described the general gist of what's happening above, instead of it being an HTTP API service, it's a daemon which has data passed to it through Kafka.

This is what I mean by composability, with that logic isolated, it can be deployed in a pod to the cluster, to SageMaker, Lambda, wherever and Kafka can route data to it.

That sounds good from a data science perspective and it will definitely ease things. I am assuming we ll still get support for deploying our models in this way but as always we are happy to do most of the work.

nsorros commented 4 years ago

In regards to Postgres, what is the main problem we are addressing? Is it to start creating consistent identifiers in the documents etc? In regards to full text search we are using elastic search which is better and s3 is really mainly used to write while elastic serves most of the data to the app right?

SamDepardieu commented 4 years ago

I think moving from our current architectrure to an independantly deployed one is just a logic step we need to take.

Deploying overly large containers, even if supported to some extend by our current architecture, is not something we should keep doing, as it bring a lot of issues with it. By moving to this we bring more stability to both self contained projects and to the entire pipeline.

Not to mention that reducing building/deployment time is something we should always look forward to.

SamDepardieu commented 4 years ago

In regards to Postgres, what is the main problem we are addressing? Is it to start creating consistent identifiers in the documents etc? In regards to full text search we are using elastic search which is better and s3 is really mainly used to write while elastic serves most of the data to the app right?

One of those is how the web application for reach is actually searching for things: ths actual query is getting overly complicated for things a simple relational database would do in a very simple way. Not sure about completely geting rid of it, though. I think some datascience projects depends on it?

jdu commented 4 years ago

That sounds good from a data science perspective and it will definitely ease things. I am assuming we ll still get support for deploying our models in this way but as always we are happy to do most of the work.

@nsorros Ofcourse! Basically what would happen is your code for a specific process, for instance Extract Refs would have it's own container image, there would be a light wrapper around the extractor code which runs a background daemon in the pod and opens a persistent TCP socket to the Kafka topic(s) it needs to subscribe to. Incoming data would pass into your code and the results routed to another topic for onward chained processes or to wherever it needs to reside. We can deploy the model files to S3 and have the service check periodically if a new model file is available and download it before persisting with the next item off the topic (I believe this is being worked on anyway).

In regards to Postgres, what is the main problem we are addressing? Is it to start creating consistent identifiers in the documents etc? In regards to full text search we are using elastic search which is better and s3 is really mainly used to write while elastic serves most of the data to the app right?

The problems we're having are in regards to the querying and joining of data from the actual reach application frontend, the types of data JOINS and pagination that we want to execute are extremely resource-intensive using elastic and require a few workarounds. Things which are easily achievable in Postgres are turning out harder in Elastic from a frontend querying perspective. Search parameters in terms of the frontend search can easily be addressed with some postgres trigger functions which update a tsvector column on the dataset when data changes in the system and using postgres search functionality to facilitate the search (we're doing this in RoRi now). We'll also gain the ability to have views and a more robust set of data types and indexing, paritioning, etc..., as well as depending on where we go with this we may need additionl application-level persistence for things further down the road if we increase the functionality of the application in terms of user-facing features ( for instance saving searches, alerts when a search changes, etc..).

As well for things like the citations index, the size of that index is very large in comparison to what it would be in postgres or another relational database using JOINs or views, because of the lack of proper joins in elastic we need to embed citing policies as sub-properties of the rows in the citations index, which means that each and every citation entry has 0-n policy docs embedded in it as well.

As well as this, moving to postgres as the backing store for the frontend means we step out of kubernetes and can utilize AWS RDS to implement read replication and load-balancing more easily.

jdu commented 4 years ago

@nsorros I know we use elastic for searching in MatchRefs, if we can't get the same level of search functionality through postgres that's required for those processes we can still leave elastic in-tact as we'll more than likely need to anyway for log aggregation. We can have the DAGs assocaited with things like EPMC metadata and have postgres mirror it's policy data into elastic to facilitate that or implement some logical / WAL replication code to push changes in Postgres to the elastic on a trigger.

ivyleavedtoadflax commented 4 years ago

I think this makes sense to me. Concerns from my side are:

Ahh looks like the second point is addressed above in @nsorros' question.

jdu commented 4 years ago

@ivyleavedtoadflax Elastic will stay in I just haven't updated the model

ONNX was put in there as a consideration for evaluating it further down the line once the runtime is more stable. It's not the ONNX format, ONNX has a sub-project called the ONNX runtime which is a model hosting and running service daemon which you can deploy your models to and push data to it through an API. So instead of having a dedicated daemon, there would just be facade task which accepts some data from Kafka and calls the ONNX endpoint to process it and get a result. It was only included in the diagram because it might fit our needs once the runtime is stable which will be much further in the future I think.

kristinenielsen commented 4 years ago

@jdu to document on notion and close