jina-ai / serve

☁️ Build multimodal AI applications with cloud-native stack
https://jina.ai/serve
Apache License 2.0
21.13k stars 2.22k forks source link

Hide Driver from regular Jina users #2342

Closed maximilianwerk closed 3 years ago

maximilianwerk commented 3 years ago

Describe the feature

The Driver is a Jina component that should be hidden away from an user. I suggest, it evolves into a combination of a request dispatcher and compounding multiple Executors. I envision the following yaml syntax on pod level:

executors:
  my_idx:
    - jtype: MyIndexer
      with:
        metric: euclidean
on_request:
  SearchRequest:
    - name: my_idx
      arguments:
        traversal_paths: ['c']
    - jtype: ExcludeQL   <-- inline Executor!
      with:
        constructor_parameter: test
      arguments:
        traversal_paths: ['c']
        fields:
          - buffer
  IndexRequest:
    - name: my_idx
      arguments:
        traversal_paths: ['c']

The following would be rough psydo code of how the GenericExecutorDriver handles the parsed YAML:

from collections import namedtuple

from foobar import MyIndexer, ExcludeQL, new_uuid

ExecutorWrapper = namedtuple(
    'ExecutorWrapper', ['name', 'additional_call_arguments', 'executor']
)

class GenericExecutorDriver:
    def what_exists_after_yaml_parsing(self):
        self.executors = {
            'my_idx': MyIndexer(metric='euclidean'),
        }

        self.on_request = {
            'search': [
                ExecutorWrapper('my_idx', {'traversal_paths': ['c']}, None),
                ExecutorWrapper(
                    None,
                    {'traversal_paths': ['c'], 'fields': ['buffer']},
                    ExcludeQL(constructor_parameter='test'),
                ),
            ],
            'index': [ExecutorWrapper('my_idx', {'traversal_paths': ['c']}, None)],
        }
        # Beware: Each callable should only have either name or executor defined after yaml parsing!

    def _post_init(self):

        for request_flow in self.on_request.values():
            for jina_callable in request_flow:
                if (
                    jina_callable.name is not None
                    and jina_callable.executor is not None
                ) or (jina_callable.name is None and jina_callable.executor is None):
                    raise Exception(
                        'You must define exactly one of "name" or "jtype" for an executor'
                    )

                if jina_callable.name is not None:
                    jina_callable.executor = self.executors[jina_callable.name]
                else:
                    """Not sure, whether the following is needed, but it would be nice for logging
                    and completeness of the self.executors attribute.
                    """
                    self.executors[new_uuid] = jina_callable.executor

    def __call__(self, *args, **kwargs):
        """call all executors for the current requests with the additional call arguments."""
        call_flow = self.call_flow[self.request.type]
        for executor in call_flow:
            arguments = kwargs.deepcopy()
            arguments.update(executor.additional_call_arguments)
            executor.apply(*args, **arguments)

Beware of the following features:

This solution should be extended, such that

...
on_requests:
  SearchRequest:
    ABC
  IndexRequest:
    ABC

and

...
on_requests:
  [SearchRequest, IndexRequest]:
    ABC

results in the same in memory objects and the second is just a syntax sugar for the first.

cristianmtr commented 3 years ago

So basically what we're doing is "hiding" away the driver, right? I like it! It always seemed a bit too much upfront work to customize this.

Random thoughts on this:

    def __call__(self, *args, **kwargs):
        """call all executors for the current requests with the additional call arguments."""
        call_flow = self.call_flow[self.request.type]
        for executor in call_flow:
            arguments = kwargs.deepcopy()
            arguments.update(executor.additional_call_arguments)
            executor.apply(*args, **arguments)

where drivers actually behave differently. E.g. CacheDriver expects a return and then calls another method on the executor depending on the value received

    def _apply_all(self, docs: 'DocumentSet', *args, **kwargs) -> None:
        if self._method_name == 'update':
            values = [BaseCacheDriver.hash_doc(d, self.exec.fields) for d in docs]
            self.exec_fn([d.id for d in docs], values)
        else:
            for d in docs:
                value = BaseCacheDriver.hash_doc(d, self.exec.fields)
                result = self.exec[value]
                if result:
                    self.on_hit(d, result)
                else:
                    self.on_miss(d, value)

Just saying so that we are aware of where the complexity will move.

FionnD commented 3 years ago

Hi @maximilianwerk 👋

Is this issue still relevant given all discussions with 2.0? Just felt maybe decision about it had already been made.