cylc / cylc-uiserver

A Jupyter Server extension that serves the cylc-ui web application for monitoring and controlling Cylc workflows.
https://cylc.org
GNU General Public License v3.0
15 stars 18 forks source link

protobuf: dynamic subscriptions #568

Open oliver-sanders opened 7 months ago

oliver-sanders commented 7 months ago

Background:

We use GraphQL as an API, but we do not use it as a "store". The data does into a Protobuf serialisation, we then "resolve" GraphQL requests off of this serialisation, re-serialising it into JSON format for transfer to the UI.

So GraphQL is the external API for UI-UIS comms, Protobuf is both the store and the internal API for UIS-Flow comms.

Presently, the UIS subscribes to all topics for all active workflows (running, paused, stopping). This is vastly surplus to requirements as (at present) users can only look at one workflow at a time in the UI. This may cause excess load on the UIS and, potentially, high memory usage.

In order to field requests we only need to subscribe to the subset of topics that are required to satisfy active GraphQL subscriptions. E.G, to satisfy the requirements of the UI when looking at the tree view for workflow "a" we would need:

  1. A subscription to the WORKFLOW topic for all active workflows. This is required for workflow listing (for the GScan view).
  2. A subscription to the TASK_PROXIES, FAMILY_PROXIES, TASKS and JOBS topics for workflow "a" only (for the tree view).

To get started, it might be easier to consider two subscription levels:

Getting the data store to handle two different subscription levels is straight forward:

Subscription Level Patch Cylc UIS: ```diff diff --git a/cylc/uiserver/data_store_mgr.py b/cylc/uiserver/data_store_mgr.py index 101fc36..578dcac 100644 --- a/cylc/uiserver/data_store_mgr.py +++ b/cylc/uiserver/data_store_mgr.py @@ -36,7 +36,7 @@ from concurrent.futures import ThreadPoolExecutor from copy import deepcopy from pathlib import Path import time -from typing import Dict, Optional, Set +from typing import Dict, Optional, Set, List from cylc.flow.exceptions import WorkflowStopped from cylc.flow.id import Tokens @@ -80,16 +80,31 @@ class DataStoreMgr: RECONCILE_TIMEOUT = 5. # seconds PENDING_DELTA_CHECK_INTERVAL = 0.5 + SUBSCRIPTION_LEVELS = { + 'all': ( + 'pb_entire_workflow', + {ALL_DELTAS.encode('utf-8'), b'shutdown'}, + ), + 'workflow': ( + 'pb_workflow_only', + {WORKFLOW}, + ), + } + def __init__(self, workflows_mgr, log): self.workflows_mgr = workflows_mgr self.log = log self.data = {} self.w_subs: Dict[str, WorkflowSubscriber] = {} - self.topics = {ALL_DELTAS.encode('utf-8'), b'shutdown'} + # self.all_topics = {ALL_DELTAS.encode('utf-8'), b'shutdown'} + # self.workflow_topics = {WORKFLOW} self.loop = None - self.executor = ThreadPoolExecutor() + self.executor = ThreadPoolExecutor(max_workers=32) self.delta_queues = {} + # last known contact data + self._contact_data = {} + @log_call async def register_workflow(self, w_id: str, is_active: bool) -> None: """Register a new workflow with the data store. @@ -126,7 +141,7 @@ class DataStoreMgr: self._purge_workflow(w_id) @log_call - async def connect_workflow(self, w_id, contact_data): + async def connect_workflow(self, w_id, contact_data, sub_level='workflow'): """Initiate workflow subscriptions. Call this when a workflow has started. @@ -134,8 +149,10 @@ class DataStoreMgr: Subscriptions and sync management is instantiated and run in a separate thread for each workflow. This is to avoid the sync loop blocking the main loop. - """ + # update last known contact data + self._contact_data[w_id] = contact_data + if self.loop is None: self.loop = asyncio.get_running_loop() @@ -152,9 +169,13 @@ class DataStoreMgr: w_id, contact_data['name'], contact_data[CFF.HOST], - contact_data[CFF.PUBLISH_PORT] + contact_data[CFF.PUBLISH_PORT], + self.SUBSCRIPTION_LEVELS[sub_level][1], + ) + successful_updates = await self._workflow_update( + [w_id], + self.SUBSCRIPTION_LEVELS[sub_level][0], ) - successful_updates = await self._entire_workflow_update(ids=[w_id]) if w_id not in successful_updates: # something went wrong, undo any changes to allow for subsequent @@ -192,6 +213,33 @@ class DataStoreMgr: self.w_subs[w_id].stop() del self.w_subs[w_id] + async def _update_subscription_level(self, w_id, sub_level): + sub = self.w_subs.get(w_id) + topics = self.SUBSCRIPTION_LEVELS[sub_level][1] + if sub: + if sub.topics == topics: + # workflow already subscribed to all topics + return True + else: + self.disconnect_workflow(w_id, update_contact=False) + await self.connect_workflow( + w_id, + self._contact_data[w_id], + sub_level, + ) + return True + else: + # we have to wait for this workflow to be detected before we can + # connect to it + # TODO: consider awaiting a scan here? + return False + + async def premote_subscription_full(self, w_id): + await self._update_subscription_level(w_id, 'all') + + async def demote_subscription_workflow(self, w_id): + await self._update_subscription_level(w_id, 'workflow') + def get_workflows(self): """Return all workflows the data store is currently tracking. @@ -221,7 +269,7 @@ class DataStoreMgr: if w_id in self.delta_queues: del self.delta_queues[w_id] - def _start_subscription(self, w_id, reg, host, port): + def _start_subscription(self, w_id, reg, host, port, topics): """Instantiate and run subscriber data-store sync. Args: @@ -236,7 +284,7 @@ class DataStoreMgr: host=host, port=port, context=self.workflows_mgr.context, - topics=self.topics + topics=topics ) self.w_subs[w_id].loop.run_until_complete( self.w_subs[w_id].subscribe( @@ -333,7 +381,8 @@ class DataStoreMgr: workflow_request( self.workflows_mgr.workflows[w_id]['req_client'], 'pb_data_elements', - args={'element_type': topic} + args={'element_type': topic}, + timeout=self.RECONCILE_TIMEOUT, ), self.loop ) @@ -352,8 +401,9 @@ class DataStoreMgr: except Exception as exc: self.log.exception(exc) - async def _entire_workflow_update( - self, ids: Optional[list] = None + @log_call + async def _workflow_update( + self, ids: List[str], req_method: str, ) -> Set[str]: """Update entire local data-store of workflow(s). @@ -364,9 +414,6 @@ class DataStoreMgr: if ids is None: ids = [] - # Request new data - req_method = 'pb_entire_workflow' - requests = { w_id: workflow_request( client=info['req_client'], command=req_method, log=self.log ``` Cylc Flow: ```diff diff --git a/cylc/flow/data_store_mgr.py b/cylc/flow/data_store_mgr.py index 744daeb4d..3c519468d 100644 --- a/cylc/flow/data_store_mgr.py +++ b/cylc/flow/data_store_mgr.py @@ -2699,6 +2699,29 @@ class DataStoreMgr: return workflow_msg + def get_workflow_only(self): + """Gather workflow-level data elements into single Protobuf message. + + No tasks / cycles, etc, just workflow stuff. + + Returns: + cylc.flow.data_messages_pb2.PbEntireWorkflow + + """ + + data = self.data[self.workflow_id] + + workflow_msg = PbEntireWorkflow() + workflow_msg.workflow.CopyFrom(data[WORKFLOW]) + # workflow_msg.tasks.extend(data[TASKS].values()) + # workflow_msg.task_proxies.extend(data[TASK_PROXIES].values()) + # workflow_msg.jobs.extend(data[JOBS].values()) + # workflow_msg.families.extend(data[FAMILIES].values()) + # workflow_msg.family_proxies.extend(data[FAMILY_PROXIES].values()) + # workflow_msg.edges.extend(data[EDGES].values()) + + return workflow_msg + def get_publish_deltas(self): """Return deltas for publishing.""" all_deltas = DELTAS_MAP[ALL_DELTAS]() diff --git a/cylc/flow/network/server.py b/cylc/flow/network/server.py index 5c0704720..10903fe5d 100644 --- a/cylc/flow/network/server.py +++ b/cylc/flow/network/server.py @@ -46,6 +46,7 @@ if TYPE_CHECKING: # maps server methods to the protobuf message (for client/UIS import) PB_METHOD_MAP = { 'pb_entire_workflow': PbEntireWorkflow, + 'pb_workflow_only': PbEntireWorkflow, 'pb_data_elements': DELTAS_MAP } @@ -411,11 +412,20 @@ class WorkflowRuntimeServer: """Send the entire data-store in a single Protobuf message. Returns serialised Protobuf message - """ pb_msg = self.schd.data_store_mgr.get_entire_workflow() return pb_msg.SerializeToString() + @authorise() + @expose + def pb_workflow_only(self, **_kwargs) -> bytes: + """Send only the workflow data, not tasks etc. + + Returns serialised Protobuf message + """ + pb_msg = self.schd.data_store_mgr.get_workflow_only() + return pb_msg.SerializeToString() + @authorise() @expose def pb_data_elements(self, element_type: str, **_kwargs) -> bytes: ```

However, determining the set of workflows that we require ALL subscriptions for (by looking at GraphQL subs) is somewhat more difficult.

Middleware allows you to work out what fields queries are looking at, however, I don't think you can monitor whether a subscription is still active from middleware?

The Tornado integration layer has access to the subscription lifecycle, but does not appear to have access to the parsed request, making it hard to determine what the subscription is looking at. The best I could come up with after a few mins is:

Tornado Patch ```diff diff --git a/cylc/uiserver/websockets/tornado.py b/cylc/uiserver/websockets/tornado.py index aaf2e28..5b8579f 100644 --- a/cylc/uiserver/websockets/tornado.py +++ b/cylc/uiserver/websockets/tornado.py @@ -63,12 +63,14 @@ class TornadoSubscriptionServer(BaseAsyncSubscriptionServer): loop=None, backend=None, middleware=None, - auth=None + auth=None, + data_store_mgr=None, ): self.loop = loop self.backend = backend or None self.middleware = middleware self.auth = auth + self.data_store_mgr=data_store_mgr super().__init__(schema, keep_alive) @staticmethod @@ -131,6 +133,25 @@ class TornadoSubscriptionServer(BaseAsyncSubscriptionServer): # with this id. await connection_context.unsubscribe(op_id) + # Work out what workflows this query is subscribing to by looking at + # the request parameters. This is a faulted method which assumes that + # the GraphQL variable that controls which workflows we are subscribing + # to is called "WorkflowId" or "WorkflowIds", but of course, this + # variable could be named anything, its value could be a pattern and it + # doesn't have to be an argument at all, it could be hardcoded in the + # request. + _workflow_ids = [] + for key, value in params['variable_values'].items(): + if key.lower() == 'workflowid': + _workflow_ids.append(value) + elif key.lower() == 'workflowids': + _workflow_ids.extend(value) + for workflow_id in _workflow_ids: + import asyncio + while not await self.data_store_mgr.premote_subscription_full(workflow_id): + # workflow not in the store yet, wait + await asyncio.sleep(1) + params['root_value'] = op_id execution_result = self.execute(params) try: @@ -158,7 +179,6 @@ class TornadoSubscriptionServer(BaseAsyncSubscriptionServer): with suppress(KeyError): connection_context.request_context['sub_statuses'].pop(op_id) - async def send_execution_result(self, connection_context, op_id, execution_result): # Resolve any pending promises if execution_result.data and 'logs' not in execution_result.data: ```
dwsutherland commented 5 months ago

Presently, the UIS subscribes to all topics for all active workflows

Well, no, just two topics .. all (which is all data), and shutdown... (also, shutdown should always be a topic) But the all data topic can be reduced based on the data level..

The way to intercept all the required info would be to modify the async generator (subscriber)

I hadn't actually read this before my first attempt, but I might adopt some things (levels) and topic adjustments.

One thing we need to be able to handle is general queries hitting the UIS, as we'll want CLI via UIS in at some point and to be able to handle random queries.. For this I think we can use that timer concept, keeping a full sync of workflows queried for some amount of time..