labrad / pylabrad

python interface for labrad
50 stars 31 forks source link

Threading changes behavior #279

Open roboguy222 opened 8 years ago

roboguy222 commented 8 years ago

We are working on a basic monitoring system that has several different threads querying several different devices through a single server every second or so. Without threading, we get the respective values we want as expected, but with threading, the correct device does not seem to be constantly selected, and the values bounce around between threads. Any idea what might be happening here? The issue can be duplicated with the following script:

import labrad
import threading

# New labrad connection
cxn = labrad.connect()
# Get two new contexts
ctx1 = cxn.context()
ctx2 = cxn.context()

# Create instance of device server
m = cxn.mks_pdr2000_server
# Select the device in the contexts we defined
m.select_device(0, context = ctx1)
m.select_device(1, context = ctx2)

# Store the outputs in text files
m1txt = open("m1.txt", "wb")
m2txt = open("m2.txt", "wb")

def askm1():
    for i in range(10):
        m1txt.write(str(i)+' '+str(m.get_pressure(context = ctx1))+'\n')
    threading.Timer(0.1, askm1).start()

def askm2():
    for i in range(10):
        m2txt.write(str(i)+' '+str(m.get_pressure(context = ctx2))+'\n')
    threading.Timer(0.1, askm2).start()

m1thread = threading.Thread(target = askm1, args=[])
# If the main thread stops, stop the child thread
m1thread.daemon = True

m2thread = threading.Thread(target = askm2, args=[])
# If the main thread stops, stop the child thread
m2thread.daemon = True

m1thread.start()
m2thread.start()
roboguy222 commented 8 years ago

@nmGit

DanielSank commented 8 years ago

Could you write a very simple mock-up server which reproduces this problem so that we can diagnose without needing specific hardware?

maffoo commented 8 years ago

I would suggest using separate connections in each thread. We haven't done any work to make the connection objects threadsafe.

roboguy222 commented 8 years ago

Hmm, it appears to be working now with the following test server. I'll look into what @nmGit saw more and get back to you.

from labrad.server import LabradServer, setting
from labrad import units, util

class ThreadingTestServer(LabradServer):
    name = 'Threading Test Server'

    @setting(1, 'Select Device', device='v')
    def select_device(self, c, device):
        c['device'] = device

    @setting(2, 'Get Device', returns='v')
    def get_device(self, c):
        return c['device']

__server__ = ThreadingTestServer()

if __name__ == '__main__':
    from labrad import util
    util.runServer(__server__)
ejeffrey commented 8 years ago

Agree with matthew that the connection objects are not necessarily threadsafe. Use one connection per thread, use locks, or use asynchronous client connections and run everything in one thread.

My guess is that mks_prd2000_server is not "concurrent safe". Any time you "yield" in a server, any other context can execute pending requests. If there is any shared state in the server, (including shared hardware), that can be potentially problematic.

In particular, the following code is unsafe:

@setting(ID=5, channel="w") def read_pressure(self, c, channel): yield serial_server.write("READ? %d" % channel) result = yield serial_server.read_line() return float(result)

If two contexts read different channels from the same physical port their reads and writes may get intermixed.

This can be fixed with locking in the usual way, but labrad provides a couple of guarantees to make this easier:

1) Multiple requests for the same context will be executed in the order received, even if they come in separate packets. 2) Multiple requests in the same packet will be executed without interruption as long as the requests don't yield. Since most hardware access servers (serial, GPIB) do not use asynchronous IO, a write() followed by a read() in the same packet will be executed without interruption. This is not necessarily the best behavior to rely on as if the server is modified so that read() or write() can yield, your code will break.

On Fri, Aug 5, 2016 at 12:11 PM, Chris Wilen notifications@github.com wrote:

Hmm, it appears to be working now with the following test server. I'll look into what @nmGit https://github.com/nmGit saw more and get back to you.

from labrad.server import LabradServer, settingfrom labrad import units, util class ThreadingTestServer(LabradServer): name = 'Threading Test Server'

@setting(1, 'Select Device', device='v')
def select_device(self, c, device):
    c['device'] = device

@setting(2, 'Get Device', returns='v')
def get_device(self, c):
    return c['device']

server = ThreadingTestServer() if name == 'main': from labrad import util util.runServer(server)

— You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub https://github.com/labrad/pylabrad/issues/279#issuecomment-237937683, or mute the thread https://github.com/notifications/unsubscribe-auth/ABnKHNF7Nz_0BMJyj6f740BNIwGsrNMvks5qc4rSgaJpZM4Jd50g .

nmGit commented 8 years ago

Hello everyone, I managed to fix the problem, thanks for your comments. @ejeffrey, thank you in particular for mentioning locking, this is exactly what I was looking for. I just needed to make a few changes to the deviceWrapper in my labrad server.

The original serial read/write code:

  @inlineCallbacks
    def write_line(self, code):
        """Write data value to the rate monitor."""
        yield self.server.write_line(code, context=self.ctx)

    @inlineCallbacks
    def read_line(self):
        """Read data value from the rate monitor."""
        ans = yield self.server.read(context=self.ctx)
        returnValue(ans)

After I got rid of read_line and write_line, and added a function that did both, the code was:

def rw_line(self, code):
        # Don't allow two concurrent read/write calls.
        self._lock = DeferredLock()
        return self._lock.run(partial(self._rw_line, code))

@inlineCallbacks
def _rw_line(self, code):
    '''Write data to the device.'''
    yield self.server.write_line(code, context=self.ctx)
    time.sleep(0.2)
    ans = yield self.server.read(context=self.ctx)
    returnValue(ans)

Calling the rw_line function sets up locking and allows my server to work.

Thanks for your time