keras-team / keras

Deep Learning for humans
http://keras.io/
Apache License 2.0
61.63k stars 19.42k forks source link

Exception when using threads #9424

Closed Jackaljkdan closed 6 years ago

Jackaljkdan commented 6 years ago

I have a problem with using Keras models in threads. I have been able to reproduce the issue in a MWE that I report below. Here and in the real code, I create the networks outside the threads, then inside of them i use a network to make predictions and fitting.

To make it work with as few as 2 threads, I hade to use the workarounds described in: https://github.com/keras-team/keras/issues/5896 and https://github.com/keras-team/keras/issues/6124 (i.e. they amount to using with graph.as_default(): and net._make_train_function())

The program executes without any problem with 32 threads, however when that number is increased to 64 exceptions are raised (see below).


import threading
import numpy as np
from keras.layers import Input, LSTM
from keras.models import Model

import tensorflow as tf

n_threads = 64

# get reference to graph as per issue #5896
graph = tf.get_default_graph()

def make_rnet():
    inp = Input(batch_shape=(1,1,5))
    out = LSTM(1, stateful=True)(inp)
    return Model(inputs=inp, outputs=out)

def thread_fn(index, global_net):
    """
    :param index:
    :type global_net: Model
    """

    print("thread-%s" % index)

    net = nets[index]

    # use with statement as per issue #5896
    with graph.as_default():

        net.reset_states()

        # sync weigths with global network
        gw = global_net.get_weights()
        net.set_weights(gw)

        in_shape = [int(d) for d in net.input.shape]
        out_shape = [int(d) for d in net.output.shape]

        # test prediction
        net.predict(np.ones(shape=in_shape))

        # test fit on random data
        x = np.random.random(size=in_shape)
        y = np.ones(shape=out_shape)
        w = net.get_weights()
        net.fit(x,y, verbose=0, batch_size=1)
        new_w = net.get_weights()

        # update global network weigths
        upd = [n - o for n,o in zip(new_w, w)]
        new_gw = [w + u for w,u in zip(global_net.get_weights(), upd)]
        global_net.set_weights(new_gw)

global_net = make_rnet()
nets = [make_rnet() for i in range(n_threads)]
threads = [threading.Thread(target=thread_fn, args=(i, global_net)) for i in range(n_threads)]

for i, net in enumerate([global_net] + nets):
    print("init net", i)
    net.compile(optimizer='rmsprop', loss='mse')
    # init as per issue #6124
    net._make_train_function()

print("saving global_net weights")

old_gw = global_net.get_weights()

print("starting %s threads..." % n_threads)

for t in threads:
    t.start()

for t in threads:
    t.join()

print("threads terminated.")

# the following code is not very important
new_gw = global_net.get_weights()

changed = False
for o,n in zip(old_gw, new_gw):
    if not np.array_equal(o,n):
        changed = True
        break

if changed:
    print('global w changed!')
else:
    print('global w unaltered')

Some of the exceptions raised with 64 threads are:

Exception in thread Thread-8:
Traceback (most recent call last):
  File "C:\WinPython-64bit-3.5.4.0Qt5-r2017-02\python-3.5.4.amd64\lib\threading.py", line 914, in _bootstrap_inner
    self.run()
  File "C:\WinPython-64bit-3.5.4.0Qt5-r2017-02\python-3.5.4.amd64\lib\threading.py", line 862, in run
    self._target(*self._args, **self._kwargs)
  File "C:/Users/Daniele/Desktop/univ/tesi/codice/prove/mwe_thread_problem.py", line 43, in thread_fn
    net.predict(np.ones(shape=in_shape))
  File "C:\WinPython-64bit-3.5.4.0Qt5-r2017-02\python-3.5.4.amd64\lib\site-packages\keras\engine\training.py", line 1787, in predict
    self._make_predict_function()
  File "C:\WinPython-64bit-3.5.4.0Qt5-r2017-02\python-3.5.4.amd64\lib\site-packages\keras\engine\training.py", line 1029, in _make_predict_function
    **kwargs)
  File "C:\WinPython-64bit-3.5.4.0Qt5-r2017-02\python-3.5.4.amd64\lib\site-packages\keras\backend\tensorflow_backend.py", line 2381, in function
    return Function(inputs, outputs, updates=updates, **kwargs)
  File "C:\WinPython-64bit-3.5.4.0Qt5-r2017-02\python-3.5.4.amd64\lib\site-packages\keras\backend\tensorflow_backend.py", line 2333, in __init__
    self.updates_op = tf.group(*updates_ops)
  File "C:\WinPython-64bit-3.5.4.0Qt5-r2017-02\python-3.5.4.amd64\lib\site-packages\tensorflow\python\ops\control_flow_ops.py", line 2898, in group
    return _GroupControlDeps(dev, deps, name=name)
  File "C:\WinPython-64bit-3.5.4.0Qt5-r2017-02\python-3.5.4.amd64\lib\site-packages\tensorflow\python\ops\control_flow_ops.py", line 2855, in _GroupControlDeps
    return no_op(name=name)
  File "C:\WinPython-64bit-3.5.4.0Qt5-r2017-02\python-3.5.4.amd64\lib\contextlib.py", line 66, in __exit__
    next(self.gen)
  File "C:\WinPython-64bit-3.5.4.0Qt5-r2017-02\python-3.5.4.amd64\lib\site-packages\tensorflow\python\framework\ops.py", line 3240, in device
    self._device_function_stack.pop()
IndexError: pop from empty list

Exception in thread Thread-63:
Traceback (most recent call last):
  File "C:\WinPython-64bit-3.5.4.0Qt5-r2017-02\python-3.5.4.amd64\lib\site-packages\tensorflow\python\client\session.py", line 1139, in _do_call
    return fn(*args)
  File "C:\WinPython-64bit-3.5.4.0Qt5-r2017-02\python-3.5.4.amd64\lib\site-packages\tensorflow\python\client\session.py", line 1121, in _run_fn
    status, run_metadata)
  File "C:\WinPython-64bit-3.5.4.0Qt5-r2017-02\python-3.5.4.amd64\lib\contextlib.py", line 66, in __exit__
    next(self.gen)
  File "C:\WinPython-64bit-3.5.4.0Qt5-r2017-02\python-3.5.4.amd64\lib\site-packages\tensorflow\python\framework\errors_impl.py", line 466, in raise_exception_on_not_ok_status
    pywrap_tensorflow.TF_GetCode(status))
tensorflow.python.framework.errors_impl.InvalidArgumentError: You must feed a value for placeholder tensor 'input_54' with dtype float and shape [1,1,5]
     [[Node: input_54 = Placeholder[dtype=DT_FLOAT, shape=[1,1,5], _device="/job:localhost/replica:0/task:0/cpu:0"]()]]

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\WinPython-64bit-3.5.4.0Qt5-r2017-02\python-3.5.4.amd64\lib\threading.py", line 914, in _bootstrap_inner
    self.run()
  File "C:\WinPython-64bit-3.5.4.0Qt5-r2017-02\python-3.5.4.amd64\lib\threading.py", line 862, in run
    self._target(*self._args, **self._kwargs)
  File "C:/Users/Daniele/Desktop/univ/tesi/codice/prove/mwe_thread_problem.py", line 38, in thread_fn
    net.set_weights(gw)
  File "C:\WinPython-64bit-3.5.4.0Qt5-r2017-02\python-3.5.4.amd64\lib\site-packages\keras\engine\topology.py", line 2007, in set_weights
    K.batch_set_value(tuples)
  File "C:\WinPython-64bit-3.5.4.0Qt5-r2017-02\python-3.5.4.amd64\lib\site-packages\keras\backend\tensorflow_backend.py", line 2252, in batch_set_value
    get_session().run(assign_ops, feed_dict=feed_dict)
  File "C:\WinPython-64bit-3.5.4.0Qt5-r2017-02\python-3.5.4.amd64\lib\site-packages\tensorflow\python\client\session.py", line 789, in run
    run_metadata_ptr)
  File "C:\WinPython-64bit-3.5.4.0Qt5-r2017-02\python-3.5.4.amd64\lib\site-packages\tensorflow\python\client\session.py", line 997, in _run
    feed_dict_string, options, run_metadata)
  File "C:\WinPython-64bit-3.5.4.0Qt5-r2017-02\python-3.5.4.amd64\lib\site-packages\tensorflow\python\client\session.py", line 1132, in _do_run
    target_list, options, run_metadata)
  File "C:\WinPython-64bit-3.5.4.0Qt5-r2017-02\python-3.5.4.amd64\lib\site-packages\tensorflow\python\client\session.py", line 1152, in _do_call
    raise type(e)(node_def, op, message)
tensorflow.python.framework.errors_impl.InvalidArgumentError: You must feed a value for placeholder tensor 'input_54' with dtype float and shape [1,1,5]
     [[Node: input_54 = Placeholder[dtype=DT_FLOAT, shape=[1,1,5], _device="/job:localhost/replica:0/task:0/cpu:0"]()]]

Caused by op 'input_54', defined at:
  File "C:/Users/Daniele/Desktop/univ/tesi/codice/prove/mwe_thread_problem.py", line 57, in <module>
    nets = [make_rnet() for i in range(n_threads)]
  File "C:/Users/Daniele/Desktop/univ/tesi/codice/prove/mwe_thread_problem.py", line 57, in <listcomp>
    nets = [make_rnet() for i in range(n_threads)]
  File "C:/Users/Daniele/Desktop/univ/tesi/codice/prove/mwe_thread_problem.py", line 16, in make_rnet
    inp = Input(batch_shape=(1,1,5))
  File "C:\WinPython-64bit-3.5.4.0Qt5-r2017-02\python-3.5.4.amd64\lib\site-packages\keras\engine\topology.py", line 1439, in Input
    input_tensor=tensor)
  File "C:\WinPython-64bit-3.5.4.0Qt5-r2017-02\python-3.5.4.amd64\lib\site-packages\keras\legacy\interfaces.py", line 87, in wrapper
    return func(*args, **kwargs)
  File "C:\WinPython-64bit-3.5.4.0Qt5-r2017-02\python-3.5.4.amd64\lib\site-packages\keras\engine\topology.py", line 1348, in __init__
    name=self.name)
  File "C:\WinPython-64bit-3.5.4.0Qt5-r2017-02\python-3.5.4.amd64\lib\site-packages\keras\backend\tensorflow_backend.py", line 497, in placeholder
    x = tf.placeholder(dtype, shape=shape, name=name)
  File "C:\WinPython-64bit-3.5.4.0Qt5-r2017-02\python-3.5.4.amd64\lib\site-packages\tensorflow\python\ops\array_ops.py", line 1530, in placeholder
    return gen_array_ops._placeholder(dtype=dtype, shape=shape, name=name)
  File "C:\WinPython-64bit-3.5.4.0Qt5-r2017-02\python-3.5.4.amd64\lib\site-packages\tensorflow\python\ops\gen_array_ops.py", line 1954, in _placeholder
    name=name)
  File "C:\WinPython-64bit-3.5.4.0Qt5-r2017-02\python-3.5.4.amd64\lib\site-packages\tensorflow\python\framework\op_def_library.py", line 767, in apply_op
    op_def=op_def)
  File "C:\WinPython-64bit-3.5.4.0Qt5-r2017-02\python-3.5.4.amd64\lib\site-packages\tensorflow\python\framework\ops.py", line 2506, in create_op
    original_op=self._default_original_op, op_def=op_def)
  File "C:\WinPython-64bit-3.5.4.0Qt5-r2017-02\python-3.5.4.amd64\lib\site-packages\tensorflow\python\framework\ops.py", line 1269, in __init__
    self._traceback = _extract_stack()

InvalidArgumentError (see above for traceback): You must feed a value for placeholder tensor 'input_54' with dtype float and shape [1,1,5]
     [[Node: input_54 = Placeholder[dtype=DT_FLOAT, shape=[1,1,5], _device="/job:localhost/replica:0/task:0/cpu:0"]()]]

In the real code much less threads (8) are needed for this behaviour to occur, maybe because each thread uses many networks (6) and they have way more parameters (~3 millions). Why is this happening?

DomHudson commented 6 years ago

I believe this is happening because you are using the default graph in each thread. Try creating a graph in the thread and using that instead of a default graph. On a side note - 64 threads sounds excessive. You are unlikely to get additional throughput at that sort of value.

Jackaljkdan commented 6 years ago

If I create a graph for each thread, simply by using with tf.Graph().as_default(): instead of the previous with stamentent, the following exception is raised, for any number of threads:

ValueError: Tensor("Placeholder:0", shape=(1, 1), dtype=float32) must be from the same graph as Tensor("lstm_2/Variable:0", shape=(1, 1), dtype=float32_ref).

on line net.reset_states() in thread_fn.

I think this may be due to the fact that the networks are created outside of the threads, using a different graph. However if I try creating them inside threads, each in a different graph, i.e.

def thread_fn(index, global_net):
    """
    :param index:
    :type global_net: Model
    """

    print("thread-%s" % index)

    local_graph = tf.Graph()

    # get global network weights
    with global_graph.as_default():
        gw = global_net.get_weights()

    with local_graph.as_default():
        net = make_rnet()
        net.reset_states()

        # sync weigths with global network
        net.set_weights(gw)
        ...

The following exception is raised, again on line net.reset_states() and for any number of threads:

TypeError: Cannot interpret feed_dict key as Tensor: Tensor Tensor("Placeholder:0", shape=(1, 1), dtype=float32) is not an element of this graph.

Also, I realize 64 is an excessive number, that value is only needed to reproduce the issue I experience in my real code, where I have 16 theads, each with 6 networks, of which one is selected based on some condition and used to make a prediction at every iteration.

DomHudson commented 6 years ago

The exact use case is confusing me a little. I'm not understanding the purpose behind setting weights on a global network from within the threads instantaneously. Are you trying to train one global network and then have concurrent threads predicting with the weights based off the global network? Or are you trying to train a global network and then modify it's weights from within the threads? I think there may be an easier to achieve what you're after but hard to say without knowing this.

I modified your code - I may not have carried over the functionality you were after regarding this global network but this is training one global model and then using it's values to predict in threads.

import threading
import numpy as np
from keras.layers import Input, LSTM
from keras.models import Model, model_from_json
import tensorflow as tf

n_threads = 64

def make_rnet():
    inp = Input(batch_shape=(1,1,5))
    out = LSTM(1)(inp)
    return Model(
        inputs=inp,
        outputs=out
    )

def thread_fn(index, architecture, weights):
    """
    :param index:
    :type global_net: Model
    """
    print("thread-%s" % index)

    with tf.Session(graph = tf.Graph()) as sess:

        # Build model.
        net = model_from_json(architecture)
        net.set_weights(weights)
        net.compile(optimizer='rmsprop', loss='mse')

        in_shape = [int(d) for d in net.input.shape]
        out_shape = [int(d) for d in net.output.shape]

        # Test prediction.
        predictions = net.predict(np.ones(shape=in_shape))
        print(predictions)

        # Test fit on random data.
        x = np.random.random(size=in_shape)
        y = np.ones(shape=out_shape)

        net.fit(x,y, verbose=0, batch_size=1)

# Train a global network.
global_net = make_rnet()
global_net.compile(optimizer='rmsprop', loss='mse')
x = np.random.random(
    size=[int(d) for d in global_net.input.shape]
)
y = np.ones(
    shape=[int(d) for d in global_net.output.shape]
)
global_net.fit(x, y)

# Get the network in a portable format.
architecture = global_net.to_json()
weights = global_net.get_weights()

# Generate threads.
threads = [
    threading.Thread(target=thread_fn, args=(i, architecture, weights)) for i in range(n_threads)
]

print("starting %s threads..." % n_threads)

for t in threads:
    t.start()

for t in threads:
    t.join()

print("threads terminated.")

Alternatively, if you do need to get data from the threads back out perhaps a queue implementation would work?

import json
import threading
import queue
import numpy as np
from keras.layers import Input, LSTM
from keras.models import Model, model_from_json
import tensorflow as tf

queue = queue.Queue()
n_threads = 32

def make_rnet():
    inp = Input(batch_shape=(1,1,5))
    out = LSTM(1)(inp)
    return Model(
        inputs=inp,
        outputs=out
    )

def thread_fn(index, queue, architecture, weights):
    """
    :param index:
    :type global_net: Model
    """

    print("thread-%s" % index)

    with tf.Session(graph = tf.Graph()) as sess:

        # Build model.
        net = model_from_json(architecture)
        net.set_weights(weights)
        net.compile(optimizer='rmsprop', loss='mse')

        in_shape = [int(d) for d in net.input.shape]
        out_shape = [int(d) for d in net.output.shape]

        # Test prediction.
        predictions = net.predict(np.ones(shape=in_shape))
        print(predictions)

        # Test fit on random data.
        x = np.random.random(size=in_shape)
        y = np.ones(shape=out_shape)

        net.fit(x,y, verbose=0, batch_size=1)

        # Enqueue new weights.
        queue.put({
            'weights': net.get_weights(),
            'index': index
        })

# Train a global network.
global_net = make_rnet()
global_net.compile(optimizer='rmsprop', loss='mse')
x = np.random.random(
    size=[int(d) for d in global_net.input.shape]
)
y = np.ones(
    shape=[int(d) for d in global_net.output.shape]
)
global_net.fit(x, y)

# Get the network in a portable format.
architecture = global_net.to_json()
weights = global_net.get_weights()

# Generate threads.
threads = [
    threading.Thread(target=thread_fn, args=(i, queue, architecture, weights)) for i in range(n_threads)
]

print("starting %s threads..." % n_threads)

for t in threads:
    t.start()

while True:
    # Setting weights from thread.
    item = queue.get()

    print("Callback from thread-%s" % item['index'])
    global_net.set_weights(item['weights'])

    queue.task_done()
Jackaljkdan commented 6 years ago

Thank you for your help. I'm trying to implement A3C reinforcement learning (https://arxiv.org/abs/1602.01783) in keras, the pseudo code in the paper is in algorithm S3. This means each thread should perform a loop in which first it synchronizes its weigths with the global ones, uses them to build an episode (using .predict() in keras), then train on it and apply the gradients to the global network, then finally begin a new iteration. The algorithm is intended not to be thread safe in order to maximize the throughput.

I like your queue implementation, I was also thinking I don't really need a global network, only global weights to syncronize the threads to.

I'm not sure I understand why keras is not throwing any exception on your thread implementation: I see you create a new session with a new graph in each of them, however the session doesn't appear to be used in the thread code. Still, if I remove that "with" statement exceptions are raised regarding tensors not being element of the correct graph. Is keras using it automatically?

DomHudson commented 6 years ago

Hi, okay thank you for the context.

Yes agreed I don't think you need a global network then; just to keep the main weights in the global thread.

The reason why it is not erroring within the thread is because of this line: https://github.com/keras-team/keras/blob/master/keras/backend/tensorflow_backend.py#L166

From tensorflow's documentation:

Returns the default session for the current thread. The returned Session will be the innermost session on which a Session or Session.as_default() context has been entered. NOTE: The default session is a property of the current thread. If you create a new thread, and wish to use the default session in that thread, you must explicitly add a with sess.as_default(): in that thread's function.

Without the with line, tf.get_default_session() returns None.

Jackaljkdan commented 6 years ago

Thank you!

FrancisYizhang commented 6 years ago

Good! Thanks! @DomHudson 2397