celery / jumpstarter

MIT License
7 stars 3 forks source link

Actor #1

Open thedrow opened 3 years ago

thedrow commented 3 years ago

Jumpstarter is based on the Actor model. It aims to help define self-contained pieces of business logic and facilitate communication between them while maintaining a separation of concerns.

The Actor class, along with its domain specific language, is the main public API of this package.

This issue serves as a milestone and as a basic draft for the documentation and the CEP.

The State Machine

At the heart of each Actor lies a state machine and not just any ordinary state machine! It is an Hierarchical State Machine :smile:.

In summary, the difference is that a state machine is consistent from states and transitions between them, but in an hierarchical state machine, states can also have their own sub-state machines.

I picked this design to model the lifecycle of the Actor, but we can, and we will use this design to facilitate further features, as I will later demonstrate.

Since implementing a state machine library with hierarchical states support and async callbacks is way too hard for this project to succeed in a timely manner, I picked the rather mature transitions library. However, since transitions only supported asyncio as the event loop for its async variant, together with @aleneum, we created transitions-anyio. He did most of the technical work as he is more familiar with the codebase and the theory. I fixed a few bugs, provided feedback, and took care of productizing it.

So thank you @aleneum, you will be given credit for this project.

For a description of the Actor's lifecycle states and transitions, please refer to #3.

Event Loop

Jumpstarter supports both the asyncio and the trio event loops using the anyio package. There's no apparent design limitation that prevents us from supporting both at the moment, so I think we should meet as many use-cases as possible to reach a broader audience.

Of course, Celery can still only support trio if we finalize that decision (as currently agreed in https://github.com/celery/ceps/blob/bad1c9efeb866ddb225b9ee2674dd15db8051674/draft/high-level-architecture.rst). Still, at the time being, there is no reason to do so as we haven't written any of the clients, so we can't know for certain if they require trio directly.

In any case, this seems to me like a good decision. If you disagree, please open a separate issue about it.

Dependencies

Actors may depend on other actors to run before starting themselves. In some cases, they must depend on another actor if an actor consumes messages from another actor's stream.

The suggested public API is as follows:

from jumpstarter import Actor, depends_on

class AccountBalanceActor(Actor):
  def __init__(self, user_id: int):
    self.user_id = user_id

class AccountBookkeepingActor(Actor):
  def __init__(self, user_id: int, account_balance_actor: AccountBalanceActor):
    self._account_balance_actor = account_balance_actor

  @depends_on
  def account_balance_actor(self):
    return account_balance_actor

In this example, the AccountBalanceActor maintains the balance in a single user ID's account. The AccountBookkeepingActor is responsible for logging and auditing withdrawals and income, possibly passing these audit logs to another actor responsible for detecting fraud.

You can also use a factory method or initialize a brand new AccountBalanceActor inside the body of the account_balance_actor method. You may also return a subclass of Actor, and it will be initialized for you, provided all the arguments available for that actor. Please refer to #4 for more details on inversion of control.

For more information on actor dependencies, please refer to #5.

Resources

Actors have resources they manage during their lifetime, such as:

There may be more use cases, but these are enough to warrant implementing this feature. In fact, Jumpstarter relies on this feature to manage the Actor itself, as we will soon demonstrate.

A resource is a context manager or an asynchronous context manager. It is entered whenever the Actor is starting, specifically just before the state machine transitions to the starting -> resources_acquired state. It is exited whenever the Actor is stopping, specifically just before the state machine transitions to the starting -> resources_released state (See #3 for details regarding the lifecycle states and transitions).

The cancel scope, which is crucial for cold-shutting the worker, is itself an asynchronous context manager as is anyio.TaskGroup used to run the Actor's background tasks. These are some of the internal resources the actor uses. Therefore, we need to support this feature.

The proposed (and already implemented) public API is:

from pathlib import Path

from jumpstarter import Actor, resource

class FileHeadActor(Actor):
  def __init__(self, file_path: Path):
    self.file_path = file_path

  @resource
  def log_file(self):
    return open(file_path)

For more information on resources, please refer to #6.

Tasks

An actor repeatedly runs tasks to fulfill their purpose. Using tasks, the user implements the business logic of the Actor. A task can be synchronous or asynchronous. If the task is synchronous, the task is run in a thread pool. If it is asynchronous, the task runs using the event loop.

The proposed (and prototyped in the actor-tasks branch) public API is:

from jumpstarter import Actor, task
from jumpstarter.tasks import Success

class CountingActor(Actor):
  def __init__(self):
    self.i: int = 0

  @task
  def count_to_ten(self):
    self.i += 1
    print(self.i)

    if self.i == 10:
      return Success()

When you start the actor, specifically before the transition to starting -> tasks_running, the count_to_ten method is repeatedly called until you stop the actor (which in turn triggers the cancel scope). This actor counts to 10 and prints the current count. When it reaches 10, the task stops running as it was successful.

There are two types of tasks: continuous and periodic. There may be more types in the future.

For more information on tasks, please refer to #7.

Producers

Tasks can produce results for other Actors to consume. They do so by returning a value other than Success(), Failure(), Retry() or Skip().

For example:

from pathlib import Path

from jumpstarter import Actor, resource, Success

class FileHeadActor(Actor):
  def __init__(self, file_path: Path, number_of_lines: int = 250):
    self.file_path = file_path
    self.i = 0
    self.number_of_lines = number_of_lines

  @resource
  def log_file(self):
    return open(file_path)

  @task
  def lines(self):
    line = self.log_file.readline()
    if line:
      self.i += 1
      return line
   if self.i == self.i number_of_lines:
     return Success()

This actor opens a file and reads the first 250 lines from it.

For more information on producers, please refer to #9.

Consumers

Tasks can consume results from other actors' task.

from jumpstarter import Actor, task
from special_crptography_library import encrypt

from myproject import FileHeadActor

class LogEncryptionActor(Actor):
  @task(consume_from=FileHeadActor.lines)
  def encrypt(self, line):
    return encrypt(line)

When you consume from an actor, you must depend on it. The code above must produce an error.

from jumpstarter import Actor, depends_on, task
from special_crptography_library import encrypt

from myproject import FileHeadActor

class LogEncryptionActor(Actor):
   @depends_on
   def file_head_actor(self):
     return FileHeadActor

  @task(consume_from=FileHeadActor.lines)
  def encrypt(self, line):
    return encrypt(line)

Now the actor should work correctly and produce its own stream. Note that the encrypt task is both a consumer and a producer.

Potentially, both actors could pass messages back & forth, each doing its part.

For more information on consumers, please refer to #10.

auvipy commented 3 years ago

actor for celery?

thedrow commented 3 years ago

No, this is something else. I'll provide more details on the issue's body soon.

auvipy commented 3 years ago

OK. just curious is there anything we can get from the cell? or its just a standalone actor framework

thedrow commented 3 years ago

I think we should abandon & archive it. It wasn't very widely used or developed.

auvipy commented 3 years ago

I don't see any problem with anyio as it supports both asyncio and trio. also choosing a mature library is a wise choice.

MicahLyle commented 3 years ago

An actor repeatedly runs tasks to fulfill their purpose. Using tasks, the user implements the business logic of the Actor. A task can be synchronous or asynchronous. If the task is synchronous, the task is run in a thread pool. If it is asynchronous, the task runs using the event loop.

Listened to a podcast from I think a year or two back where Ask was talking about https://github.com/robinhood/faust. It was interesting to hear that faust can be embedded, for example, directly into an asyncio backed fastapi or flask app (I believe both of those could be relevant examples), because it's just running in the event loop anyway, etc. Do you imagine next-gen Celery, with jumpstarter, could have similar capabilities? I.E. currently there needs to be a background worker (assuming task_always_eager = False), but with async/await integration, could a Celery worker also be running "in the foreground" so to speak? Not that most folks would want to do that, but rather asking it as a conceptual/theoretical question? What sort of other enhancements like that does jumpstarter and its task model enable?

thedrow commented 3 years ago

Listened to a podcast from I think a year or two back where Ask was talking about https://github.com/robinhood/faust. It was interesting to hear that faust can be embedded, for example, directly into an asyncio backed fastapi or flask app (I believe both of those could be relevant examples), because it's just running in the event loop anyway, etc. Do you imagine next-gen Celery, with jumpstarter, could have similar capabilities? I.E. currently there needs to be a background worker (assuming task_always_eager = False), but with async/await integration, could a Celery worker also be running "in the foreground" so to speak? Not that most folks would want to do that, but rather asking it as a conceptual/theoretical question?

Yes, Celery NextGen can be embedded as well.

What sort of other enhancements like that does jumpstarter and its task model enable?

Many. celery/ceps#31 is the place to discuss them. This issue describes the implementation details as well as a rough guideline for designing this library.