unbit / uwsgi

uWSGI application server container
http://projects.unbit.it/uwsgi
Other
3.46k stars 691 forks source link

[RFC] Channels/Queue Subsystem (internal pubsub) #338

Open rugginoso opened 11 years ago

rugginoso commented 11 years ago

I'm new to the uwsgi internals, so be patient :D

We need an internal way to substitute external tools like redis pubsub A dedicated thread on the master should manage the routing of these messages.

The idea is to separate the data, and the notifications.

For the data: The master has a queue, cores, pushes messages in it. Each core has its own queue, the master pushes messages in them.

For the notifications: Each core has a socket connected with the master. Each core notifies to the master that has pushed something. The master notifies each core that has something to read.

Subscription: Each time a core comes up it must connect to the master and create its own queue.

Unsubscription: Each time a core comes up it must disconnect to the master and delete its own queue.

Roundtrip:

|CORE                         |MASTER                                |
|-----------------------------|--------------------------------------|
|- subscribe to master        |                                      |
|  - connect to master        |- add socket to FD_SET and select (?) |
|  - create queue             |                                      |
|-----------------------------|--------------------------------------|
|- send message               |- wait for messages                   |
|  - lock master queue        |  - select returns                    |
|  - write packet             |  - consume notify                    |
|  - unlock master queue      |  - read core queue                   |
|  - write notify on socket   |  - for each subscribed core push     |
|                             |    message                           | 
|                             |    - if has only one message, write  |
|                             |      on the socket to notify         |
|-----------------------------|--------------------------------------|
|- wait for messages          |                                      |
|  - read on socket returns   |                                      |
|  - lock core queue          |                                      |
|  - read packet              |                                      |
|  - unlock core queue        |                                      |

I'm sure I've missed many points, for the uwsgi infrastructure, performances, etc. As said early please be patient :D

unbit commented 11 years ago

Some sparse note (lot of pieces missing).

The api:

uwsgi.channel_join(channel_name) uwsgi.channel_leave(channel_name) uwsgi.channel_send(channel_name, msg) fd = uwsgi.channel_fd() msg = uwsgi.channel_recv()

uwsgi.channel_recv is pretty special, it returns message bodies, but to know from which channel it comes from you have to add the information in the message body. (otherwise you will have no way)

uwsgi.channel_recv() returns None/undefined/nil/... when no messages are available

To async manage the reception (events are edge triggered):

fd = uwsgi.channel_fd() uwsgi.wait_fd_read(fd) for(;;) { msg = uwsgi.channel_recv() if not msg: break }

Edge triggered notifications:

each core has an "ack" field (initialized as 1). If the "ack" is 1, a byte is written to the notification socket and "ack" is set to 0. When (in the core) uwsgi.channel_recv() returns None/undefined/nil..., the core writes to the notification socket and "ack" is set to "1" again (by the master, read: only the master manage the "ack" field)

Message format:

{ uint8_t opcode; uint64_t timestamp; (???, will require a syscall per-message :( ) char channel[255]; ... body ... }

opcode[0] = join opcode[1] = leave opcode[2] = msg

Locking:

we need a shared lock for the master queue (cores directly writes in it):

lock_the_master_queue append the message structure notify the master (if needed, the master is edge triggered too) unlock_the_master_queue

for(;;) { lock_the_master_queue get_the_oldest_message make_a_tmp_copy_of_the_msg unlock_the_master_queue get_the_list_of_subscribed_cores foreach(subscribed_core) { some_form_of_locking copy_the_tmp_msg_to_the_core_queue some_form_of_unlocking } free_the_tmp_msg }

Can we avoid (in some way) locking for the core queue ? (it will be pretty hard, we have the master writing in it, and the core reading and removing from it). Having a lock for each core will be overkill, having a single lock for all of the cores could hurt performance.

unbit commented 11 years ago

Channels management:

Whenever a core joins a channel, a linked list in the master thread is checked. If the channel does not exist, a new item is added to the list. Each item in the list has an associated (fixed-size) bitmap. Each bit is mapped to a core, if the bit is set the core is subscribed.

unbit commented 11 years ago

Core end:

when a core exit from a request (uwsgi_close_request) it has to be unsubscribed from all of the channels. Maybe we could add an opcode for this special message

prymitive commented 11 years ago

What if harakiri kills worker? Or worst, all workers keep dying soon after joining channel, they respawn and die again.

rugginoso commented 11 years ago

I'm sorry, I think define the api and the internals would be done in a second step.

For the queue syncronization we can avoid a lock using atomic primitives for the circular buffer indexes (on the platforms which support them).

rugginoso commented 11 years ago

@prymitive how is handled that situation by the subscription server? I think we can compare the cases.

prymitive commented 11 years ago

It is different, with subscriptions we have 2 independent uWSGI instances - master on backend sends subscription packets to uWSGI *router workers. Here we are allocating something by each worker, if that worker dies without cleaning itself we would need someone to do the cleanup - probably master. We can store the worker ID and if this worker dies we cleanup all the resources it allocated for subscriptions/whatever.

unbit commented 11 years ago

@prymitive i think when we reinitizalize a worker we need to clear all of the memory areas associated with it. It should be enough

prymitive commented 11 years ago

yep, it's just worth to remember that we do need to cleanup stuff since we cannot always rely on worker doing it by itself

rugginoso commented 11 years ago

@unbit: a newbye question about channel management: how is handled the multiprocess and multi threaded configuration? In other words: is the bitmap sufficient in this kind of configuration?

unbit commented 11 years ago

yes it is sufficient, the max number of processes and threads is always known so you can se it as a bidimensional array:

bitmap[worker][core]

rugginoso commented 10 years ago

About the comment of @unbit about the core queue locking: We can use something like a double buffer, in other words: the core has 2 queues, one "active" filled by the workers, and one "not active" beeing processed by the core. When the core finishes to process the not active queue, it switches the queues, workers fill the new one, and core processes the other.

xrmx commented 9 years ago

It this the "we implemented it but redis is faster" thing? If so can we close it?

jpic commented 8 years ago

@xrmx i was wondering the same thing, i'd love to ear what'd be uWSGI core-devs take on a django-channels backend, which is kind of a hot topic these days