hudon / spike

Brain Simulator Parallelization
http://nengo.ca/
1 stars 1 forks source link

Consolidate: Step 1 #10

Closed hudon closed 11 years ago

hudon commented 11 years ago

Merge these things:

gretac commented 11 years ago

This step of Consolidation will include the part of New Theano code that deals with Spiking ensembles (the original model we were working with). The other types of ensemble connections (ex. directly connected) will not be touched in this story.

gretac commented 11 years ago

Consolidation step 1 is done (implemented + correct output) on the consolidation-step1 branch.

hudon commented 11 years ago

merged in master. let's address the review I sent out first. pasted here for reference:

 src/matrix_multiplication.py      |   3 +-
 src/nef_theano/ensemble.py        | 146 +++++++++++++++++++++++++++++++-------
 src/nef_theano/ensemble_origin.py |   4 +-
 src/nef_theano/filter.py          |  10 ++-
 src/nef_theano/input.py           |  31 ++++++--
 src/nef_theano/network.py         |  96 +++++++++++++++++++------
 src/nef_theano/origin.py          |  21 ++++++
 src/nef_theano/zmq_utils.py       |  33 +++++++++
 8 files changed, 289 insertions(+), 55 deletions(-)

diff --git a/src/matrix_multiplication.py b/src/matrix_multiplication.py
index 27c60e1..b2ef261 100644
--- a/src/matrix_multiplication.py
+++ b/src/matrix_multiplication.py
@@ -1,6 +1,6 @@
 # Perform matrix multiplication on arbitrary matrices

-import nengo.nef_theano as nef
+import nef_theano as nef

 net=nef.Network('Matrix Multiplication') #Create the network object

@@ -64,3 +64,4 @@ def product(x):
 net.connect('C','D',index_post=[i/D2 for i in range(D1*D2*D3)],func=product)

 net.run(1) # run for 1 second
+net.clean_up()
diff --git a/src/nef_theano/ensemble.py b/src/nef_theano/ensemble.py
index ef7eb48..aeb4162 100644
--- a/src/nef_theano/ensemble.py
+++ b/src/nef_theano/ensemble.py
@@ -12,12 +12,15 @@ from . import filter
 from .hPES_termination import hPESTermination
 from .helpers import map_gemv

+import zmq
+import zmq_utils
+
 class Ensemble:
     """An ensemble is a collection of neurons representing a vector space.

     """

-    def __init__(self, neurons, dimensions, dt, tau_ref=0.002, tau_rc=0.02,
+    def __init__(self, name, neurons, dimensions, dt, tau_ref=0.002, tau_rc=0.02,
                  max_rate=(200, 300), intercept=(-1.0, 1.0), radius=1.0,
                  encoders=None, seed=None, neuron_type='lif',
                  array_size=1, eval_points=None, decoder_noise=0.1,
@@ -63,6 +66,9 @@ class Ensemble:
             If noise_type = gaussian, this is the variance.

         """
+        self.name = name
+        self.dt = dt
+
         if seed is None:
             seed = np.random.randint(1000)
         self.seed = seed
@@ -73,6 +79,7 @@ class Ensemble:
         self.noise = noise
         self.noise_type = noise_type
         self.decoder_noise = decoder_noise
+
         self.mode = mode

         # make sure that eval_points is the right shape
@@ -135,14 +142,29 @@ class Ensemble:
             # make default origin
             self.add_origin('X', func=None, dt=dt, eval_points=self.eval_points) 

-        elif self.mode == 'direct':
+            self.ticker_conn = None
+            self.input_socket_definitions = []
+            self.input_sockets = []
|| Hudon - self.input_sockets can be an array of InputSocket objects, where Socket
  is class that wraps around a pyzmq socket and contains both the definition
  and the socket once it's bound
+            # ONLY decoded_input/ filter names, same order as input_sockets
+            # TODO: improve this
|| Hudon - the InputSocket can also contain the name of the corresponding
input
+            self.inputs = []
+            self.poller = zmq.Poller()

+        elif self.mode == 'direct':
+            raise Exception("ERROR", "The 'direct' ensemble mode should not be used.")
             # make default origin
             self.add_origin('X', func=None, dimensions=self.dimensions*self.array_size) 
             # reset neurons_num to 0
             self.neurons_num = 0

-    def add_termination(self, name, pstc, decoded_input=None, encoded_input=None):
+    def __del__(self):
+        for socket in self.input_sockets:
+            socket.close()
+
+        self.ticker_conn.close()
+
+    def add_termination(self, name, pstc, decoded_input=None, 
+        encoded_input=None, input_socket=None, transform=None):
         """Accounts for a new termination that takes the given input
         (a theano object) and filters it with the given pstc.

@@ -172,19 +194,36 @@ class Ensemble:
         if decoded_input is not None: assert (encoded_input is None)
         elif encoded_input is not None: assert (decoded_input is None) 
         else: assert False
+        

-        if decoded_input: 
-            if self.mode is not 'direct': 
-                # rescale decoded_input by this neuron's radius
-                source = TT.true_div(decoded_input, self.radius)
-            # ignore radius in direct mode
-            else: source = decoded_input
+        if decoded_input is not None and self.mode is 'direct':
+            raise Exception("ERROR", "Not using the 'direct' mode of ensembles")
+            source = TT.true_div(decoded_input, self.radius)
             name = self.get_unique_name(name, self.decoded_input)
             self.decoded_input[name] = filter.Filter(
-                name=name, pstc=pstc, source=source, 
+                name=name, pstc=pstc, source=source,
                 shape=(self.array_size, self.dimensions))
-        elif encoded_input: 
+
+        # decoded_input in this case will be the output of pre node
+        elif decoded_input is not None and self.mode is 'spiking':
+            # decoded_input is NOT the shared variable of the origin
+            pre_output = theano.shared(decoded_input)
+            source = TT.dot(transform, pre_output)
+            self.input_socket_definitions.append(input_socket)
+            name = self.get_unique_name(name, self.decoded_input)
+
+            self.decoded_input[name] = filter.Filter(
+                name=name, pstc=pstc, source=source,
+                shape=(self.array_size, self.dimensions),
+                pre_output=pre_output)
+
+            # Assumption: the ith socket represents the ith value
+            self.inputs.append(name)
+
+        elif encoded_input:
+            raise Exception("ERROR", "Just deal with decoded_input for ensembles") 
             name = self.get_unique_name(name, self.encoded_input)
+
             self.encoded_input[name] = filter.Filter(
                 name=name, pstc=pstc, source=encoded_input, 
                 shape=(self.array_size, self.neurons_num))
@@ -204,6 +243,8 @@ class Ensemble:
         :param float pstc:
         :param learned_termination_class:
         """
+        raise Exception("ERRPR", "Learned connections are not usable yet.")
+
         #TODO: is there ever a case we wouldn't want this?
         assert error.dimensions == self.dimensions * self.array_size

@@ -253,8 +294,7 @@ class Ensemble:
             specific set of points to optimize decoders over for this origin
         """

-        # if we're in spiking mode create an ensemble_origin with decoders 
-        # and the whole shebang for interpreting the neural activity
+        # Create an ensemble_origin with decoders
         if self.mode == 'spiking':
             if 'eval_points' not in kwargs.keys():
                 kwargs['eval_points'] = self.eval_points
@@ -264,6 +304,7 @@ class Ensemble:
         # if we're in direct mode then this population is just directly 
         # performing the specified function, use a basic origin
         elif self.mode == 'direct':
+            raise Exception("ERROR", "The 'direct' ensemble mode is not being used.")
             if func is not None:
                 if 'initial_value' not in kwargs.keys():
                     # [func(np.zeros(self.dimensions)) for i in range(self.array_size)]
@@ -320,8 +361,18 @@ class Ensemble:

         return theano.function([], encoders)()

-    def theano_tick(self):
+    def make_tick(self):
+        updates = {}
+        updates.update(self.update())
+        self.theano_tick = theano.function([], [], updates = updates)
|| Hudon - no spaces for named arguments in PEP8

+        # introduce 1-time-tick delay
+        self.theano_tick()
+        for o in self.origin.values():
+            o.tick()
+
+    def direct_tick(self):
|| Hudon - rename to direct_mode_tick
+        raise Exception("ERROR", "Not using 'direct' mode of ensembles")
         if self.mode == 'direct':
             # set up matrix to store accumulated decoded input
             X = np.zeros((self.array_size, self.dimensions))
@@ -337,7 +388,54 @@ class Ensemble:
                     val = np.float32([o.func(X[i]) for i in range(len(X))])
                     o.decoded_output.set_value(val.flatten())

-    def update(self, dt):
+    def run(self):
+        self.bind_sockets()
+        self.make_tick()
+
+        while True:
+            self.ticker_conn.recv()
+            self.tick()
+            self.ticker_conn.send("")
+
+    # Receive the outputs of pre - decoded output - and pass it to filters
+    def tick(self):
+        responses = dict(self.poller.poll(1))
+
+        # poll for all inputs, do not receive unless all inputs are available
+        for i, socket in enumerate(self.input_sockets):
+            if socket not in responses or responses[socket] != zmq.POLLIN:
+                return
+
+        for i, socket in enumerate(self.input_sockets):
+            val = socket.recv_pyobj()
+            name = self.inputs[i]
|| Hudon - InputSocket class would make this more robust
+            self.decoded_input[name].set_pre_output(val)
+
+        # should be the compiled theano function for this ensemble
+        # includes the accumulators, ensemble, and origins updates
+        self.theano_tick()
+
+        # continue the tick in the origins
+        for o in self.origin.values():
+            o.tick()
+
+    def bind_sockets(self):
+        # create socket connections for inputs
+        for defn in self.input_socket_definitions:
|| Hudon - should we have 1 context per ensemble or multiple contexts?
create_socket() also creates a new context for the socket
from guide:
"ØMQ applications always start by creating a context, and then using that for
creating sockets. In C, it's the zmq_ctx_new() call. You should create and use
exactly one context in your process. Technically, the context is the container
for all sockets in a single process, and acts as the transport for inproc
sockets, which are the fastest way to connect threads in one process. If at
runtime a process has two contexts, these are like separate ØMQ instances. If
that's explicitly what you want, OK, but otherwise remember:

Do one zmq_ctx_new() at the start of your main line code, and one
zmq_ctx_destroy() at the end.

If you're using the fork() system call, each process needs its own context. If
you do zmq_ctx_new() in the main process before calling fork(), the child
processes get their own contexts. In general, you want to do the interesting
stuff in the child processes and just manage these from the parent process."
+            socket = defn.create_socket()
+            self.input_sockets.append(socket)
+            self.poller.register(socket, zmq.POLLIN)
+
+        for o in self.origin.values():
+            o.bind_sockets()
+
+        # zmq.REP strictly enforces alternating recv/send ordering
+        zmq_context = zmq.Context()
+        self.ticker_conn = zmq_context.socket(zmq.REP)
+        self.ticker_conn.connect(zmq_utils.TICKER_SOCKET_LOCAL_NAME)
+
+    # Using the dt that was passed to the ensemble at construction time
+    def update(self):
         """Compute the set of theano updates needed for this ensemble.

         Returns a dictionary with new neuron state,
@@ -360,7 +458,7 @@ class Ensemble:
             else:
                 X += di.value

-            updates.update(di.update(dt))
+            updates.update(di.update(self.dt))

         # if we're in spiking mode, then look at the input current and 
         # calculate new neuron activities for output
@@ -372,7 +470,7 @@ class Ensemble:
             for ei in self.encoded_input.values():
                 # add its values directly to the input current
                 J += (ei.value.T * self.alpha.T).T
-                updates.update(ei.update(dt))
+                updates.update(ei.update(self.dt))

             # only do this if there is decoded_input
             if X is not None:
@@ -388,27 +486,27 @@ class Ensemble:
                 # sqrt(dt) instead of dt. Hence, we divide the std by sqrt(dt).
                 if self.noise_type.lower() == 'gaussian':
                     J += self.srng.normal(
-                        size=self.bias.shape, std=np.sqrt(self.noise/dt))
+                        size=self.bias.shape, std=np.sqrt(self.noise/self.dt))
                 elif self.noise_type.lower() == 'uniform':
                     J += self.srng.uniform(
                         size=self.bias.shape, 
-                        low=-self.noise/np.sqrt(dt), 
-                        high=self.noise/np.sqrt(dt))
+                        low=-self.noise/np.sqrt(self.dt), 
+                        high=self.noise/np.sqrt(self.dt))

             # pass that total into the neuron model to produce
             # the main theano computation
-            updates.update(self.neurons.update(J, dt))
+            updates.update(self.neurons.update(J, self.dt))

             for l in self.learned_terminations:
                 # also update the weight matrices on learned terminations
-                updates.update(l.update(dt))
+                updates.update(l.update(self.dt))

             # and compute the decoded origin decoded_input from the neuron output
             for o in self.origin.values():
-                updates.update(o.update(dt, updates[self.neurons.output]))
+                updates.update(o.update(self.dt, updates[self.neurons.output]))

         if self.mode == 'direct': 
-
+            raise Exception("ERROR", "The 'direct' ensemble connections not used")
             # if we're in direct mode then just directly pass the decoded_input 
             # to the origins for decoded_output
             for o in self.origin.values(): 
diff --git a/src/nef_theano/ensemble_origin.py b/src/nef_theano/ensemble_origin.py
index 78a9028..fc74f26 100644
--- a/src/nef_theano/ensemble_origin.py
+++ b/src/nef_theano/ensemble_origin.py
@@ -31,7 +31,7 @@ class EnsembleOrigin(Origin):
         initial_value = np.zeros(self.ensemble.array_size * func_size) 
         Origin.__init__(self, func=func, initial_value=initial_value)
         self.func_size = func_size
-    
+
     def compute_decoders(self, func, dt, eval_points=None):     
         """Compute decoding weights.

@@ -197,7 +197,7 @@ class EnsembleOrigin(Origin):
         # scale sample points
         samples = samples.T * scale 

-        return theano.function([], samples)()
+        return theano.function([], samples)()            

     def update(self, dt, spikes):
         """the theano computation for converting neuron output
diff --git a/src/nef_theano/filter.py b/src/nef_theano/filter.py
index af2eae5..d4fa4af 100644
--- a/src/nef_theano/filter.py
+++ b/src/nef_theano/filter.py
@@ -7,7 +7,8 @@ import theano.tensor as TT
 class Filter:
     """Filter an arbitrary theano.shared"""

-    def __init__(self, pstc, name=None, source=None, shape=None):
+    def __init__(self, pstc, name=None, source=None, shape=None, 
+        pre_output=None):
         """
         :param float pstc:
         :param string name:
@@ -18,6 +19,9 @@ class Filter:
         self.pstc = pstc
         self.source = source

+        # has to be a Theano shared variable
+        self.pre_output = pre_output
|| Hudon - Filter has a data member that it doesn't directly use. It's here so
that it can be useful to functions that use a Filter, but not not useful for
Filter itself. We might want to rethink who's responsible for updating
"pre_output"
+
         ### try to find the shape from the given parameters (source and shape)
         if source is not None and hasattr(source, 'get_value'):
             value = source.get_value()
@@ -34,6 +38,10 @@ class Filter:
         self.value = theano.shared(value, name=name)
         self.name = name

+    # TODO: will set_value always work in this case?
+    def set_pre_output(self, output):
+        self.pre_output.set_value(output)
+
     def set_source(self, source):
         """Set the source of data for this filter

diff --git a/src/nef_theano/input.py b/src/nef_theano/input.py
index 8c056df..54ed633 100644
--- a/src/nef_theano/input.py
+++ b/src/nef_theano/input.py
@@ -6,6 +6,8 @@ import numpy as np

 from . import origin

+import zmq
+import zmq_utils

 class Input(object):
     """Inputs are objects that provide real-valued input to ensembles.
@@ -22,6 +24,8 @@ class Input(object):
             time after which to set function output = 0 (s)

         """
+        self.ticker_conn = None
+
         self.name = name
         self.t = 0
         self.function = None
@@ -52,10 +56,8 @@ class Input(object):
         """
         self.zeroed = False

-    def theano_tick(self):
-        """Move function input forward in time.
-        
-        """
+    def tick(self):
+        """Move function input forward in time."""
         if self.zeroed:
             return

@@ -85,3 +87,24 @@ class Input(object):
             # cast as float32 for consistency / speed,
             # but _after_ it's been made a list
             self.origin['X'].decoded_output.set_value(np.float32(value)) 
+
+        for o in self.origin.values():
+            o.tick()
+
+    def run(self):
+        self.bind_sockets()
+
+        while True:
+            self.t = float(self.ticker_conn.recv())
+            self.tick()
+            self.ticker_conn.send("")
+
+    def bind_sockets(self):
+        for o in self.origin.values():
+            o.bind_sockets()
+
+        # zmq.REP strictly enforces alternating recv/send ordering
+        zmq_context = zmq.Context()
+        self.ticker_conn = zmq_context.socket(zmq.REP)
+        self.ticker_conn.connect(zmq_utils.TICKER_SOCKET_LOCAL_NAME)
+
diff --git a/src/nef_theano/network.py b/src/nef_theano/network.py
index f283324..2ea5470 100644
--- a/src/nef_theano/network.py
+++ b/src/nef_theano/network.py
@@ -14,6 +14,10 @@ from . import input
 from . import subnetwork
 from . import connection

+from multiprocessing import Process
+import zmq
+from . import zmq_utils
+
 class Network(object):
     def __init__(self, name, seed=None, fixed_seed=None, dt=.001):
         """Wraps an NEF network with a set of helper functions
@@ -35,14 +39,23 @@ class Network(object):
         self.fixed_seed = fixed_seed
         # all the nodes in the network, indexed by name
         self.nodes = {}
+        self.processes = []
+
+        self.setup = False
+        self.ticker_conn = zmq.Context().socket(zmq.DEALER)
+        self.ticker_conn.bind(zmq_utils.TICKER_SOCKET_LOCAL_NAME)
+
         # the function call to run the theano portions of the model
-        self.theano_tick = None
+        # self.theano_tick = None
         # the list of nodes that have non-theano code
-        self.tick_nodes = [] 
+        # self.tick_nodes = [] 
|| Hudon - add a TODO to remove these commented out vars
         self.random = random.Random()
         if seed is not None:
             self.random.seed(seed)

+    def __del__(self):
+        self.ticker_conn.close()
+
     def add(self, node):
         """Add an arbitrary non-theano node to the network.

@@ -56,9 +69,11 @@ class Network(object):
         """
         # remake theano_tick function, in case the node has Theano updates 
         self.theano_tick = None 
-        self.tick_nodes.append(node)
+        #self.tick_nodes.append(node)
         self.nodes[node.name] = node

+        p = Process(target=node.run, name=node.name)
+        self.processes.append(p)

     def connect(self, pre, post, transform=None, weight=1,
                 index_pre=None, index_post=None, pstc=0.01, 
@@ -136,6 +151,7 @@ class Network(object):
             instead of creating a new one.

         """
+
         # reset timer in case the model has been run,
         # as adding a new node requires rebuilding the theano function 
         self.theano_tick = None  
@@ -143,16 +159,17 @@ class Network(object):
         # get post Node object from node dictionary
         post = self.get_object(post)

-        # get the origin from the pre Node
-        pre_origin = self.get_origin(pre, func)
         # get pre Node object from node dictionary
         pre_name = pre
         pre = self.get_object(pre)
-
-        # get decoded_output from specified origin
+        # get the origin from the pre Node, CREATE one if does not exist
+        pre_origin = self.get_origin(pre_name, func)
         pre_output = pre_origin.decoded_output
         dim_pre = pre_origin.dimensions 

+        origin_socket, destination_socket = \
+            zmq_utils.create_local_socket_definition_pair(pre, post)
+
         if transform is not None: 

             # there are 3 cases
@@ -180,6 +197,8 @@ class Network(object):
             if transform.shape[0] != post.dimensions * post.array_size \
                                                 or len(transform.shape) > 2:

+                raise Exception("ERROR", "Case 1 and 2 should NOT be reached.")
+
                 if transform.shape[0] == post.array_size * post.neurons_num:
                     transform = transform.reshape(
                                       [post.array_size, post.neurons_num] +\
@@ -231,8 +250,11 @@ class Network(object):

                     # pass in the pre population encoded output function
                     # to the post population, connecting them for theano
+                    # I.E. adding an Accumulator
                     post.add_termination(name=pre_name, pstc=pstc, 
-                        encoded_input=encoded_output)
+                        encoded_input=encoded_output, input_socket=dest_socket)
+
+                    pre_origin.add_output(origin_socket)

                     return

@@ -249,12 +271,19 @@ class Network(object):

         # apply transform matrix, directing pre dimensions
         # to specific post dimensions
-        decoded_output = TT.dot(transform, pre_output)

-        # pass in the pre population decoded output function
-        # to the post population, connecting them for theano
|| Hudon - remove or add TODO these commented out statements
+        # decoded_output = TT.dot(transform, pre_output)
+
+        # decoded input = decoded_out * transform
|| Hudon - remove|clarify
+        # decoded_out (pre output) needs to be replaced using IPC
+        # so pass both + calculate dot product in accumulator
+
+        # passing in the VALUE of pre output
         post.add_termination(name=pre_name, pstc=pstc, 
-            decoded_input=decoded_output) 
+            decoded_input=pre_output.get_value(), 
+            input_socket=destination_socket, transform=transform) 
+
+        pre_origin.add_output(origin_socket)

     def get_object(self, name):
         """This is a method for parsing input to return the proper object.
@@ -363,12 +392,17 @@ class Network(object):
         self.theano_tick = None

         kwargs['dt'] = self.dt
-        e = ensemble.Ensemble(*args, **kwargs) 
+        e = ensemble.Ensemble(name, *args, **kwargs) 
+        self.nodes[name] = e
+
+        p = Process(target=e.run, name=name)
+        self.processes.append(p)

         # store created ensemble in node dictionary
         if kwargs.get('mode', None) == 'direct':
+            raise Exception("ERROR", "Do not support 'direct' communication mode")
             self.tick_nodes.append(e)
-        self.nodes[name] = e
+        
         return e

     def make_array(self, name, neurons, array_size, dimensions=1, **kwargs):
@@ -397,7 +431,6 @@ class Network(object):
         """
         return subnetwork.SubNetwork(name, self)

-
     def make_probe(self, target, name=None, dt_sample=0.01, 
                    data_type='decoded', **kwargs):
         """Add a probe to measure the given target.
@@ -462,25 +495,42 @@ class Network(object):
         :param float dt: the timestep of the update
         """
         # if theano graph hasn't been calculated yet, retrieve it
-        if self.theano_tick is None:
-            self.theano_tick = self.make_theano_tick() 
+        # if self.theano_tick is None:
+        #     self.theano_tick = self.make_theano_tick() 
+
+        if not self.setup:
+            for proc in self.processes:
+                if not proc.is_alive():
+                    proc.start()
+            self.setup = True
+
         for i in range(int(time / self.dt)):
             # get current time step
             t = self.run_time + i * self.dt

-            # run the non-theano nodes
-            for node in self.tick_nodes:
-                node.t = t
-                node.theano_tick()
+            num_processes = len(self.processes)
+
+            for i in xrange(num_processes):
+                self.ticker_conn.send("", zmq.SNDMORE) #This is the Delimiter
+                self.ticker_conn.send(str(t))

-            # run the theano nodes
-            self.theano_tick()
+            for i in xrange(num_processes):
+                self.ticker_conn.recv() # This is the delimiter (discard it)
+                self.ticker_conn.recv()

         # update run_time variable
         self.run_time += time

         ## TODO spike: run cleanup here

+    # called when the user is all done (otherwise, procs hang :) )
|| Hudon - remove smiley. This is srz bsns
+    def clean_up(self):
+        self.ticker_conn.close()
+
+        # force kill
+        for proc in self.processes:
+            proc.terminate()
+
     def write_data_to_hdf5(self, filename='data'):
         """This is a function to call after simulation that writes the 
         data of all probes to filename using the Neo HDF5 IO module.
diff --git a/src/nef_theano/origin.py b/src/nef_theano/origin.py
index aed3bc0..8709deb 100644
--- a/src/nef_theano/origin.py
+++ b/src/nef_theano/origin.py
@@ -4,6 +4,9 @@ import collections
 import numpy as np
 import theano

+import zmq
+import zmq_utils
+
 class Origin(object):
     """An origin is an object that provides a signal. Origins project
     to terminations.
@@ -45,3 +48,21 @@ class Origin(object):
         # find number of parameters of the projected value
         if dimensions is None: dimensions = len(initial_value)
         self.dimensions = dimensions
+

|| Hudon - would these benefit from our socket class wrapper?
+        self.output_socket_definitions = []
+        self.output_sockets = []
+
+    def __del__(self):
+        for socket in self.output_sockets:
+            socket.close()
+
+    def add_output(self, output_socket_def):
+        self.output_socket_definitions.append(output_socket_def)
+
+    def bind_sockets(self):
+        for defn in self.output_socket_definitions:
+            self.output_sockets.append(defn.create_socket())
+
+    def tick(self):
+        for socket in self.output_sockets:
+            socket.send_pyobj(self.decoded_output.get_value())
\ No newline at end of file
diff --git a/src/nef_theano/zmq_utils.py b/src/nef_theano/zmq_utils.py
new file mode 100644
index 0000000..136f4f0
--- /dev/null
+++ b/src/nef_theano/zmq_utils.py
@@ -0,0 +1,33 @@
+import zmq
+
+# Straight-up IPC connection between processes (Does NOT work on Windows)
+TICKER_SOCKET_LOCAL_NAME = "ipc:///tmp/spike.tick_timer_connection"
+
+# TCP Connection (For network communication)
+TICKER_SOCKET_GLOBAL_NAME = "tcp://*:10000"
+
+class SocketDefinition(object):
+    def __init__(self, endpoint, socket_type, is_server = False):
+        self.endpoint = endpoint
+        self.socket_type = socket_type
+        self.is_server = is_server
+
+    def create_socket(self):
|| Hudon - change to pass in a context so we can have 1 ctx per proc
+        ctx = zmq.Context()
+        socket = ctx.socket(self.socket_type)
+        if self.is_server:
+            socket.bind(self.endpoint)
+        else:
+            socket.connect(self.endpoint)
+
+        return socket
+
+def create_local_socket_definition_pair(origin_node, destination_node):
+    socket_name = "ipc:///tmp/spike.node_connection.{origin_name}-to-{destination_name}".format(
+        origin_name=origin_node.name,
+        destination_name=destination_node.name)
+
+    origin_socket_type = zmq.PUSH
+    destination_socket_type = zmq.PULL
+
+    return SocketDefinition(socket_name, origin_socket_type, is_server=True), SocketDefinition(socket_name, destination_socket_type, is_server=False)
gretac commented 11 years ago

Other comments addressed in 1657ee18ed4c8844b9eb49b3e3eea39f57129c0a and 9b2ec4f5520b568859677a8fc4ad6b8c5ee8079d.