nengo / nengo-extras

Extra utilities and add-ons for Nengo
https://www.nengo.ai/nengo-extras
Other
5 stars 8 forks source link

Added function to optimize ensemble parameters #26

Open tbekolay opened 8 years ago

tbekolay commented 8 years ago

Migrated from https://github.com/nengo/nengo/pull/871.

I added a small test, which revealed a small bug -- the rng wasn't getting passed to the sample (now get_samples) function. Fixed that, now works and gives deterministic results when rng is set!

Note that this should wait until https://github.com/nengo/nengo/pull/1181 is merged, in case the get_samples functions undergoes any other changes.

tbekolay commented 8 years ago

Here are two other snippets from @studywolf for optimizing ensembles to compute certain functions, migrated from https://github.com/nengo/nengo/issues/440 ... before merging, it would be good to compare these approaches to the one in this PR:

import numpy as np

import nengo

def get_intercepts_for_function(function, num_samples=500, 
                                threshold=5e-3, eval_range=[-1, 1],
                                plot_results=False):

    # generate random set of initial eval_points, encoders, and intercepts
    num_neurons = 10
    eval_points = [np.random.random() for ii in range(num_neurons)] 
    encoders = [np.random.choice([-1,1],1)[0] for ii in range(num_neurons)]
    intercepts = [np.random.random()*encoders[ii] for ii in range(num_neurons)] 

    def build(intercepts, eval_points, values, build_test=False):
        m = nengo.Network()
        with m:
            if values is not None:
                # input function returns time signal
                input1 = nengo.Node(output=values) 
            else:
                input1 = nengo.Node(lambda t: np.sin(t / 4))

            m.ens1 = nengo.Ensemble(num_neurons, 1, 
                                    encoders=np.array(encoders)[:,None],
                                    intercepts=intercepts)
            ens2 = nengo.Ensemble(1, 1, neuron_type=nengo.Direct())

            output1 = nengo.Ensemble(1, 1, neuron_type=nengo.Direct())
            output2 = nengo.Ensemble(1, 1, neuron_type=nengo.Direct())

            nengo.Connection(input1, m.ens1)
            nengo.Connection(input1, ens2)
            nengo.Connection(m.ens1, output1, function=fun0, 
                             eval_points=eval_points)
            nengo.Connection(ens2, output2, function=fun0)

            if build_test == True:
                ens3 = nengo.Ensemble(num_neurons, 1)
                output3 = nengo.Ensemble(1, 1, neuron_type=nengo.Direct())
                nengo.Connection(input1, ens3)
                nengo.Connection(ens3, output3, function=fun0)
                m.probe_output3 = nengo.Probe(output3, synapse=.01)

            m.probe_input1 = nengo.Probe(input1)
            m.probe_output1 = nengo.Probe(output1, synapse=.01)
            m.probe_output2 = nengo.Probe(output2)
        return m

    for ii in range(num_samples):

        m = build(intercepts, eval_points, eval_points[-1])

        sim = nengo.Simulator(m)
        sim.run(.1)

        # randomly add some sample between 0 and 1
        error = np.abs(sim.data[m.probe_output2][-1] - 
                       sim.data[m.probe_output1][-1]) 

        if error > threshold: 
            encoders.append(-1)
            encoders.append(1)

            epsilon = .001
            intercepts.append((eval_points[-1] + epsilon)*-1)
            intercepts.append(eval_points[-1] - epsilon)
            num_neurons += 2

        eval_points.append(np.random.random() * \
                          (eval_range[1] - eval_range[0]) + eval_range[0])

        if ii % 100 == 0:
            print 'ens1.n_neurons=%i'%num_neurons
            print 'num eval_points: ', len(eval_points)

    m = build(intercepts, eval_points, values=None, build_test=True)
    sim = nengo.Simulator(m)
    run_time = 20
    sim.run(run_time)

    print 'error of trained: ', np.sum(np.abs(sim.data[m.probe_output2] - \
                                sim.data[m.probe_output1])) / (run_time / .001)
    print 'error of control: ', np.sum(np.abs(sim.data[m.probe_output2] - \
                                sim.data[m.probe_output3])) / (run_time / .001)

    if plot_results == True:
        import matplotlib.pyplot as plt
        plt.plot(sim.trange(), sim.data[m.probe_input1])
        plt.plot(sim.trange(), sim.data[m.probe_output1])
        plt.plot(sim.trange(), sim.data[m.probe_output2])
        plt.plot(sim.trange(), sim.data[m.probe_output3])
        plt.legend(['input', 'approx', 'answer', 'control'])

        plt.figure()
        from nengo.utils.ensemble import tuning_curves as tc
        eps, a = tc(m.ens1, sim)
        plt.plot(eps, a)
        plt.show()

    return intercepts, eval_points, encoders, num_neurons

################################################################################

if __name__ == '__main__':

    # generate a random target signal to follow
    n1 = np.zeros((10000,), dtype=complex)
    n2 = np.zeros((10000,), dtype=complex)
    n1[:5] = np.exp(1j*np.random.uniform(0, 50*np.pi, (5,)))
    n2[20:25] = np.exp(1j*np.random.uniform(0, 50*np.pi, (5,)))
    s1 = np.fft.ifft(n1)*1000
    s2 = np.fft.ifft(n2)*1000
    s2[:5000] = 0
    s2[6000:] = 0
    s = s1 + s2

    def fun0(x):
        return s.real[int(np.floor((x+1)*5000))]

    get_intercepts_for_function(fun0, num_samples=500, plot_results=True)

and

import numpy as np
from scipy.interpolate import interp1d

import nengo

def get_intercepts_for_function_derivative(function, num_samples=500, 
                                           threshold=5e-3, eval_range=[-1, 1],
                                           plot_results=False):

    # evaluate the function, find areas with large derivatives
    num_evals = 1000
    x = np.linspace(eval_range[0], eval_range[1], num_evals)
    vals = np.zeros(num_evals)
    for ii in range(num_evals-1):
        vals[ii] = function(x[ii])
    # get derivatives
    dvals = np.hstack([0, np.diff(vals)])
    ddvals = np.hstack([0, np.diff(dvals)])

    # find the parts that are unusually large, to focus on
    high = np.zeros(num_evals)
    # high[dvals > np.mean(dvals) + np.var(dvals)] = 1
    high[ddvals > np.mean(ddvals) + np.var(ddvals)] = 1
    # convolve with Gaussian to get our probability dist for sampling
    gauss = np.exp(-np.linspace(-1, 1, 10)**2 / 2)
    dist = np.convolve(high, gauss, mode='same')
    # normalize distribution 
    dist /= np.sum(dist)
    # also add in a term to make sure everything has some probability
    dist += np.ones(dist.shape[0]) / dist.shape[0]
    # normalize distribution 
    dist /= np.sum(dist)
    # get cumulative sum, making sure that 0 is the first value
    # which is important for the interpolation below
    cdf = np.hstack([0, np.cumsum(dist)])

    # generate interpolation function
    pdist = interp1d(cdf, np.linspace(-1, 1, cdf.shape[0]))

    import matplotlib.pyplot as plt
    # test pdist
    a = [pdist(np.random.random()) for ii in range(1000)]
    plt.subplot(211)
    plt.plot(a, np.ones(len(a)), 'rx')
    plt.plot(np.linspace(-1, 1, vals.shape[0]), vals)
    # plt.plot(np.linspace(-1, 1, vals.shape[0]), dvals)
    plt.plot(np.linspace(-1, 1, vals.shape[0]), ddvals)
    plt.legend(['samples', 'function', 'function derivative'])
    plt.subplot(212)
    plt.plot(np.linspace(-1, 1, vals.shape[0]), high*max(dist))
    plt.plot(np.linspace(-1, 1, vals.shape[0]), dist)
    plt.legend(['areas with high derivative', 'probability dist'])
    plt.show()

    # generate random set of initial eval_points, encoders, and intercepts
    num_neurons = 10
    eval_points = [np.random.random() for ii in range(num_neurons)] 
    encoders = [np.random.choice([-1,1],1)[0] for ii in range(num_neurons)]
    intercepts = [np.random.random()*encoders[ii] for ii in range(num_neurons)] 

    def build(intercepts, eval_points, values, build_test=False):
        m = nengo.Network()
        with m:
            if values is not None:
                # input function returns time signal
                input1 = nengo.Node(output=values) 
            else:
                input1 = nengo.Node(lambda t: np.sin(t / 4))

            m.ens1 = nengo.Ensemble(num_neurons, 1, 
                                    encoders=np.array(encoders)[:,None],
                                    intercepts=intercepts)
            ens2 = nengo.Ensemble(1, 1, neuron_type=nengo.Direct())

            output1 = nengo.Ensemble(1, 1, neuron_type=nengo.Direct())
            output2 = nengo.Ensemble(1, 1, neuron_type=nengo.Direct())

            nengo.Connection(input1, m.ens1)
            nengo.Connection(input1, ens2)
            nengo.Connection(m.ens1, output1, function=fun0, 
                             eval_points=eval_points)
            nengo.Connection(ens2, output2, function=fun0)

            if build_test == True:
                ens3 = nengo.Ensemble(num_neurons, 1)
                output3 = nengo.Ensemble(1, 1, neuron_type=nengo.Direct())
                nengo.Connection(input1, ens3)
                nengo.Connection(ens3, output3, function=fun0)
                m.probe_output3 = nengo.Probe(output3, synapse=.01)

            m.probe_input1 = nengo.Probe(input1)
            m.probe_output1 = nengo.Probe(output1, synapse=.01)
            m.probe_output2 = nengo.Probe(output2)
        return m

    for ii in range(num_samples):

        m = build(intercepts, eval_points, eval_points[-1])

        sim = nengo.Simulator(m)
        sim.run(.1)

        # randomly add some sample between 0 and 1
        error = np.abs(sim.data[m.probe_output2][-1] - 
                       sim.data[m.probe_output1][-1]) 

        if error > threshold: 
            encoders.append(-1)
            encoders.append(1)

            epsilon = .001
            intercepts.append((eval_points[-1] + epsilon)*-1)
            intercepts.append(eval_points[-1] - epsilon)
            num_neurons += 2

        eval_points.append(pdist(np.random.random()))

        if ii % 100 == 0:
            print 'ens1.n_neurons=%i'%num_neurons
            print 'num eval_points: ', len(eval_points)

    m = build(intercepts, eval_points, values=None, build_test=True)
    sim = nengo.Simulator(m)
    run_time = 20
    sim.run(run_time)

    print 'error of trained: ', np.sum(np.abs(sim.data[m.probe_output2] - \
                                sim.data[m.probe_output1])) / (run_time / .001)
    print 'error of control: ', np.sum(np.abs(sim.data[m.probe_output2] - \
                                sim.data[m.probe_output3])) / (run_time / .001)

    if plot_results == True:
        import matplotlib.pyplot as plt
        plt.plot(sim.trange(), sim.data[m.probe_input1])
        plt.plot(sim.trange(), sim.data[m.probe_output1])
        plt.plot(sim.trange(), sim.data[m.probe_output2])
        plt.plot(sim.trange(), sim.data[m.probe_output3])
        plt.legend(['input', 'approx', 'answer', 'control'])

        plt.figure()
        from nengo.utils.ensemble import tuning_curves as tc
        eps, a = tc(m.ens1, sim)
        plt.plot(eps, a)
        plt.show()

    return intercepts, eval_points, encoders, num_neurons

################################################################################

if __name__ == '__main__':

    # generate a random target signal to follow
    n1 = np.zeros((10000,), dtype=complex)
    n2 = np.zeros((10000,), dtype=complex)
    n1[:5] = np.exp(1j*np.random.uniform(0, 50*np.pi, (5,)))
    n2[20:25] = np.exp(1j*np.random.uniform(0, 50*np.pi, (5,)))
    s1 = np.fft.ifft(n1)*1000
    s2 = np.fft.ifft(n2)*1000
    s2[:5000] = 0
    s2[6000:] = 0
    s = s1 + s2

    def fun0(x):
        return s.real[int(np.floor((x+1)*5000))]

    get_intercepts_for_function_derivative(fun0, num_samples=200, plot_results=True)