lvh / thimble

A Twisted thread-pool based wrapper for blocking APIs.
Other
5 stars 2 forks source link

Thread affinity support #2

Open lvh opened 10 years ago

lvh commented 10 years ago

I think this actually means adding thread affinity to Twisted thread pools.

glyph commented 8 years ago

The right way to do this in Twisted would be to make twisted._threads a public API. You can already see its documentation here: https://twistedmatrix.com/documents/15.5.0/api/twisted._threads.html

glyph commented 8 years ago

"thread affinity" would not be a special thunk you passed to your threadpool, but rather, the thing you asked to do your threadpool work. With twisted._threads, you can just create an individual ThreadWorker now, like:

from threading import Thread
def affinitized(reactor):
    worker = ThreadWorker(lambda target: Thread(target=target).start())
    reactor.addSystemEventTrigger('before', 'shutdown', worker.quit)
    return worker

"..."

resourceProxy = affinitized(reactor)
resourceProxy.do(step1)
resourceProxy.do(step2)
resourceProxy.do(step3)

This ensures that all access to resourceProxy is asynchronous, but serialized. Obviously a more sophisticated implementation might want to do some kind of management around automatically quitting the affinitized worker when its job is done and not double-quitting at shutdown, but the general idea here is that you can just create an individual worker for as long as you need it.

For a more efficient implementation (to actually pool the dedicated threads so that you aren't paying to spin them up), you could do something like:

pool = pool(someLimit) # shutdown left as an exercise for the reader
def affinitized():
    return ThreadWorker(lambda target: pool.do(target))

Okay wait a second on second thought that implementation is awesome.

glyph commented 8 years ago

Then, any work you want to perform, goes like this:

from twisted.internet import Deferred
def deferToWorker(inReactor, inWorker, work):
    result = Deferred()
    @inWorker.do
    def workWrapper():
        output = work()
        @inReactor.do
        def relayResult():
            result.callback(output)
    return result

inWorker can then be "pool" if you don't care about affinity or "affinitized" if you do.

glyph commented 8 years ago

(Sorry for the comment storm, just really high on having finally actually conclusively understood the bug in https://github.com/rackerlabs/otter - #4 - that I've been hearing about for almost 2 years now)

lvh commented 8 years ago

Cool! I should probably note that I do not have the resources to work on this project at all. I wonder if perhaps it belongs in Twisted instead.

glyph commented 8 years ago

@lvh - any chance I could get commit and PyPI on this then, in case future fixes are required?

glyph commented 8 years ago

Probably it does belong in Twisted though.