keras-team / keras

Deep Learning for humans
http://keras.io/
Apache License 2.0
61.92k stars 19.45k forks source link

The issue to implement multiple CNN models on the individual thread with keras #6938

Closed SunnerLi closed 7 years ago

SunnerLi commented 7 years ago

I want to run the different model in the same time. However, I cannot get the correct answer for each running time. The following is my code. I generate the MNIST-like data, and build the 2 CNN network to do the training and testing.

from keras.layers import Conv2D, Dense, Flatten, Input, MaxPool2D
from keras.models import Model, Sequential
from keras import backend as K
import tensorflow as tf
import numpy as np
import threading
import time
import os

K.set_image_dim_ordering('tf')
x, y = np.random.random([4, 27, 27, 1]), np.random.random([4, 10])

# Model objects
small_model = None
big_model = None

class Has1ConvLayerCNN(object):
    small_graph = tf.get_default_graph()
    sess = None
    def __init__(self, model_name='small_model.h5'):
        with self.small_graph.as_default():
            self.sess = tf.Session(graph=self.small_graph)
            self.model = Sequential()
            self.model.add(Conv2D(8, (4, 4), activation='relu', input_shape=(27, 27, 1)))
            self.model.add(MaxPool2D())
            self.model.add(Flatten())
            self.model.add(Dense(10, activation='sigmoid'))
            self.sess.run(tf.global_variables_initializer())
            if os.path.exists(model_name):
                self.model.load_weights(model_name)          

    def compile(self):
        self.model.compile(loss='mse', optimizer='adam')

    def train(self, x, y, model_name='small_model.h5'):
        with self.small_graph.as_default():
            self.model.fit(x, y, epochs=1)
            self.model.save_weights(model_name)

    def test(self, x):
        with self.small_graph.as_default():
            return self.model.predict(x, batch_size=1)

class Has2ConvLayerCNN(object):
    big_graph = tf.get_default_graph()
    sess = None
    def __init__(self, model_name='big_model.h5'):
        with self.big_graph.as_default():
            self.sess = tf.Session(graph=self.big_graph)
            self.model = Sequential()
            self.model.add(Conv2D(8, (4, 4), activation='relu', input_shape=(27, 27, 1)))
            self.model.add(Conv2D(16, (2, 2), activation='relu'))
            self.model.add(MaxPool2D())
            self.model.add(Flatten())
            self.model.add(Dense(10, activation='sigmoid'))
            self.sess.run(tf.global_variables_initializer())
            if os.path.exists(model_name):
                self.model.load_weights(model_name)          

    def compile(self):
        with self.big_graph.as_default():
            self.model.compile(loss='mse', optimizer='adam')

    def train(self, x, y, model_name='big_model.h5'):
        with self.big_graph.as_default():
            self.model.fit(x, y, epochs=1)
            self.model.save_weights(model_name)

    def test(self, x, model_name='big_model.h5'):
        with self.big_graph.as_default():
            return self.model.predict(x, batch_size=1)

def smallThread():
    global small_model
    global x
    res = small_model.test(x)
    print "small result: ", res

def bigThread():
    global big_model
    global x
    res = big_model.test(x)
    print 'big result: ', res

if __name__ == '__main__':
    # Train
    small_model = Has1ConvLayerCNN()
    small_model.compile()
    big_model = Has2ConvLayerCNN()
    big_model.compile()
    print "----- Train 1 conv layer model -----"
    small_model.train(x, y)
    print "----- Train 2 conv layer model -----"
    big_model.train(x, y)

    # Clear the model objects
    small_model = None
    big_model = None

    # Load model and Test
    small_model = Has1ConvLayerCNN()
    small_model.compile()
    big_model = Has2ConvLayerCNN()
    big_model.compile()
    threading.Thread(target=smallThread).start()
    threading.Thread(target=bigThread).start()

I create individual session and graph object. Sometime I got the correct answer as the following:

Using TensorFlow backend.
2017-06-11 17:45:01.548647: W tensorflow/core/platform/cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use SSE4.1 instructions, but these are available on your machine and could speed up CPU computations.
2017-06-11 17:45:01.548673: W tensorflow/core/platform/cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use SSE4.2 instructions, but these are available on your machine and could speed up CPU computations.
2017-06-11 17:45:01.548678: W tensorflow/core/platform/cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use AVX instructions, but these are available on your machine and could speed up CPU computations.
2017-06-11 17:45:01.548683: W tensorflow/core/platform/cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use AVX2 instructions, but these are available on your machine and could speed up CPU computations.
2017-06-11 17:45:01.548699: W tensorflow/core/platform/cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use FMA instructions, but these are available on your machine and could speed up CPU computations.
----- Train 1 conv layer model -----
Epoch 1/1
4/4 [==============================] - 0s - loss: 0.1300
----- Train 2 conv layer model -----
Epoch 1/1
4/4 [==============================] - 0s - loss: 0.0883
big result:  [[ 0.51761824  0.49175     0.50416106  0.43220499  0.37879816  0.3602626
   0.33406499  0.41810563  0.39064723  0.46406019]
 [ 0.51863891  0.51891971  0.49938035  0.46237913  0.38975871  0.3768242
   0.33751875  0.46174175  0.38691592  0.44897127]
 [ 0.51071256  0.50159967  0.50252235  0.44284269  0.39376053  0.36824402
   0.33978003  0.44148278  0.41221312  0.45410269]
 [ 0.50967991  0.49973318  0.50594532  0.45762312  0.4220691   0.37480459
   0.34047222  0.44234774  0.38909501  0.44603276]]
small result:  [[ 0.54407543  0.54171276  0.46328813  0.2413315   0.27085894  0.33460364
   0.22706869  0.30919045  0.22811837  0.49186015]
 [ 0.57744735  0.53639919  0.44922575  0.25701386  0.27335384  0.35873184
   0.21858442  0.30952871  0.2886436   0.47201097]
 [ 0.55285573  0.50686091  0.43080822  0.23862234  0.28417778  0.38735572
   0.2486203   0.3243944   0.2822212   0.42979807]
 [ 0.50508142  0.51574749  0.502177    0.25125584  0.26323137  0.35078073
   0.23977536  0.35091829  0.24196538  0.46264389]]

However, I got the error sometimes...

Using TensorFlow backend.
2017-06-11 17:52:38.162244: W tensorflow/core/platform/cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use SSE4.1 instructions, but these are available on your machine and could speed up CPU computations.
2017-06-11 17:52:38.162269: W tensorflow/core/platform/cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use SSE4.2 instructions, but these are available on your machine and could speed up CPU computations.
2017-06-11 17:52:38.162274: W tensorflow/core/platform/cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use AVX instructions, but these are available on your machine and could speed up CPU computations.
2017-06-11 17:52:38.162278: W tensorflow/core/platform/cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use AVX2 instructions, but these are available on your machine and could speed up CPU computations.
2017-06-11 17:52:38.162284: W tensorflow/core/platform/cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use FMA instructions, but these are available on your machine and could speed up CPU computations.
----- Train 1 conv layer model -----
Epoch 1/1
4/4 [==============================] - 0s - loss: 0.1469
----- Train 2 conv layer model -----
Epoch 1/1
4/4 [==============================] - 0s - loss: 0.1124
Exception in thread Thread-1:
Traceback (most recent call last):
  File "/usr/lib/python2.7/threading.py", line 801, in __bootstrap_inner
    self.run()
  File "/usr/lib/python2.7/threading.py", line 754, in run
    self.__target(*self.__args, **self.__kwargs)
  File "test1.py", line 76, in smallThread
    res = small_model.test(x)
  File "test1.py", line 42, in test
    return self.model.predict(x, batch_size=1)
  File "/usr/local/lib/python2.7/dist-packages/keras/models.py", line 902, in predict
    return self.model.predict(x, batch_size=batch_size, verbose=verbose)
  File "/usr/local/lib/python2.7/dist-packages/keras/engine/training.py", line 1582, in predict
    self._make_predict_function()
  File "/usr/local/lib/python2.7/dist-packages/keras/engine/training.py", line 1049, in _make_predict_function
    **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/keras/backend/tensorflow_backend.py", line 2251, in function
    return Function(inputs, outputs, updates=updates)
  File "/usr/local/lib/python2.7/dist-packages/keras/backend/tensorflow_backend.py", line 2214, in __init__
    self.updates_op = tf.group(*updates_ops)
  File "/usr/local/lib/python2.7/dist-packages/tensorflow/python/framework/ops.py", line 3168, in __exit__
    self._graph._pop_control_dependencies_controller(self)
  File "/usr/local/lib/python2.7/dist-packages/tensorflow/python/framework/ops.py", line 3188, in _pop_control_dependencies_controller
    assert self._control_dependencies_stack[-1] is controller
AssertionError

Exception in thread Thread-2:
Traceback (most recent call last):
  File "/usr/lib/python2.7/threading.py", line 801, in __bootstrap_inner
    self.run()
  File "/usr/lib/python2.7/threading.py", line 754, in run
    self.__target(*self.__args, **self.__kwargs)
  File "test1.py", line 82, in bigThread
    res = big_model.test(x)
  File "test1.py", line 71, in test
    return self.model.predict(x, batch_size=1)
  File "/usr/local/lib/python2.7/dist-packages/keras/models.py", line 902, in predict
    return self.model.predict(x, batch_size=batch_size, verbose=verbose)
  File "/usr/local/lib/python2.7/dist-packages/keras/engine/training.py", line 1585, in predict
    batch_size=batch_size, verbose=verbose)
  File "/usr/local/lib/python2.7/dist-packages/keras/engine/training.py", line 1212, in _predict_loop
    batch_outs = f(ins_batch)
  File "/usr/local/lib/python2.7/dist-packages/keras/backend/tensorflow_backend.py", line 2227, in __call__
    session = get_session()
  File "/usr/local/lib/python2.7/dist-packages/keras/backend/tensorflow_backend.py", line 164, in get_session
    _initialize_variables()
  File "/usr/local/lib/python2.7/dist-packages/keras/backend/tensorflow_backend.py", line 337, in _initialize_variables
    sess.run(tf.variables_initializer(uninitialized_variables))
  File "/usr/local/lib/python2.7/dist-packages/tensorflow/python/client/session.py", line 778, in run
    run_metadata_ptr)
  File "/usr/local/lib/python2.7/dist-packages/tensorflow/python/client/session.py", line 982, in _run
    feed_dict_string, options, run_metadata)
  File "/usr/local/lib/python2.7/dist-packages/tensorflow/python/client/session.py", line 1032, in _do_run
    target_list, options, run_metadata)
  File "/usr/local/lib/python2.7/dist-packages/tensorflow/python/client/session.py", line 1052, in _do_call
    raise type(e)(node_def, op, message)
InvalidArgumentError: You must feed a value for placeholder tensor 'conv2d_4_input' with dtype float
     [[Node: conv2d_4_input = Placeholder[dtype=DT_FLOAT, shape=[], _device="/job:localhost/replica:0/task:0/cpu:0"]()]]

Caused by op u'conv2d_4_input', defined at:
  File "test1.py", line 101, in <module>
    small_model = Has1ConvLayerCNN()
  File "test1.py", line 24, in __init__
    self.model.add(Conv2D(8, (4, 4), activation='relu', input_shape=(27, 27, 1)))
  File "/usr/local/lib/python2.7/dist-packages/keras/models.py", line 429, in add
    dtype=layer.dtype, name=layer.name + '_input')
  File "/usr/local/lib/python2.7/dist-packages/keras/engine/topology.py", line 1414, in Input
    input_tensor=tensor)
  File "/usr/local/lib/python2.7/dist-packages/keras/legacy/interfaces.py", line 88, in wrapper
    return func(*args, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/keras/engine/topology.py", line 1325, in __init__
    name=self.name)
  File "/usr/local/lib/python2.7/dist-packages/keras/backend/tensorflow_backend.py", line 391, in placeholder
    x = tf.placeholder(dtype, shape=shape, name=name)
  File "/usr/local/lib/python2.7/dist-packages/tensorflow/python/ops/array_ops.py", line 1507, in placeholder
    name=name)
  File "/usr/local/lib/python2.7/dist-packages/tensorflow/python/ops/gen_array_ops.py", line 1997, in _placeholder
    name=name)
  File "/usr/local/lib/python2.7/dist-packages/tensorflow/python/framework/op_def_library.py", line 768, in apply_op
    op_def=op_def)
  File "/usr/local/lib/python2.7/dist-packages/tensorflow/python/framework/ops.py", line 2336, in create_op
    original_op=self._default_original_op, op_def=op_def)
  File "/usr/local/lib/python2.7/dist-packages/tensorflow/python/framework/ops.py", line 1228, in __init__
    self._traceback = _extract_stack()

InvalidArgumentError (see above for traceback): You must feed a value for placeholder tensor 'conv2d_4_input' with dtype float
     [[Node: conv2d_4_input = Placeholder[dtype=DT_FLOAT, shape=[], _device="/job:localhost/replica:0/task:0/cpu:0"]()]]

Is there any wrong about my implementation? or some thing I should notice while doing multi-thread work?

SunnerLi commented 7 years ago

I got some hint from the stackoverflow. The following split the code as the two part. The first part is the code of kera models, and the other one is main program. I named the model code as model.py. The contain of model.py is shown below:

from keras.layers import Conv2D, Dense, Flatten, Input, MaxPool2D
from keras.models import Sequential
from keras import backend as K
import tensorflow as tf
import numpy as np
import threading
import os

class Has1ConvLayerCNN(object):
    graph = None
    model = None
    model_name = None
    def __init__(self):
        pass

    def createModelAndOp(self, model_name='small_model.h5'):
        self.graph = tf.Graph()
        self.model_name = model_name

        with self.graph.as_default():
            self.model = Sequential()
            self.model.add(Conv2D(5, (2, 2), activation='relu', input_shape=(27, 27, 1)))
            self.model.add(MaxPool2D())
            self.model.add(Conv2D(10, (2, 2), activation='relu'))
            self.model.add(MaxPool2D())
            self.model.add(Conv2D(15, (2, 2), activation='relu'))
            self.model.add(Conv2D(20, (2, 2), activation='relu'))
            self.model.add(Flatten())
            self.model.add(Dense(100, activation='relu'))
            self.model.add(Dense(10, activation='sigmoid'))
            self.model.compile(optimizer='sgd', loss='mse')

        def train(x, y):
            with tf.Session(graph=self.graph) as sess:
                sess.run(tf.global_variables_initializer())
                self.model.fit(x, y, verbose=0)
                self.model.save_weights(self.model_name)

        def test(x):
            with tf.Session(graph=self.graph) as sess:
                sess.run(tf.global_variables_initializer())
                if os.path.exists(model_name):
                    self.model.load_weights(model_name)
                return self.model.predict(x)

        # Use closure to return the function
        return train, test

class Has2ConvLayerCNN(object):
    graph = None
    model = None
    model_name = None
    def __init__(self):
        pass

    def createModelAndOp(self, model_name='big_model.h5'):
        self.graph = tf.Graph()
        self.model_name = model_name
        with self.graph.as_default():
            self.model = Sequential()
            self.model.add(Conv2D(8, (2, 2), activation='relu', input_shape=(27, 27, 1)))
            self.model.add(MaxPool2D())
            self.model.add(Conv2D(16, (2, 2), activation='relu'))
            self.model.add(MaxPool2D())
            self.model.add(Conv2D(32, (2, 2), activation='relu'))
            self.model.add(Conv2D(64, (2, 2), activation='relu'))
            self.model.add(Flatten())
            self.model.add(Dense(128, activation='relu'))
            self.model.add(Dense(10, activation='sigmoid'))
            self.model.compile(optimizer='sgd', loss='mse')

        def train(x, y):
            with tf.Session(graph=self.graph) as sess:
                sess.run(tf.global_variables_initializer())
                self.model.fit(x, y, verbose=0)
                self.model.save_weights(self.model_name)

        def test(x):
            with tf.Session(graph=self.graph) as sess:
                sess.run(tf.global_variables_initializer())
                if os.path.exists(model_name):
                    self.model.load_weights(model_name)
                return self.model.predict(x)

        # Use closure to return the function
        return train, test

As you can see, the function should use closure style to program. Or the error will occure. I don't really know the reason, but I haven't encounter the problem while I use closure.

Next, the main program is this:

from keras.layers import Conv2D, Dense, Flatten, Input, MaxPool2D
from model import Has1ConvLayerCNN, Has2ConvLayerCNN
from keras.models import Sequential
from keras import backend as K
import tensorflow as tf
import numpy as np
import threading
import time
import os

if __name__ == '__main__':
    # Create data and model instance
    x, y = np.random.random([4, 27, 27, 1]), np.random.random([4, 10])
    small_cnn = Has1ConvLayerCNN()
    big_cnn = Has2ConvLayerCNN()

    # Form operations list 
    small_cnn_train, small_cnn_test = small_cnn.createModelAndOp()
    big_cnn_train, big_cnn_test = big_cnn.createModelAndOp()
    train_ops = [small_cnn_train, big_cnn_train]    
    test_ops = [small_cnn_test, big_cnn_test]

    # Create the list of threads (train operations)
    threads = []
    for i in range(len(train_ops)):
        threads.append(threading.Thread(target=train_ops[i], args=(x, y)))

    print "------------ Train --------------------"
    # Show the time of multithreading
    _time = time.time()
    for i in range(len(threads)):
        threads[i].start()
    print "<<      Start whole thread     >> time spend: ", time.time() - _time
    for i in range(len(threads)):
        threads[i].join(5)
    print "<<   Wait whole threads join   >> time spend: ", time.time() - _time

    # Show the time of working with order
    _time = time.time()
    train_ops[0](x, y)
    train_ops[1](x, y)
    print "<<  Main thread work in order  >> time spend: ", time.time() - _time

    # Create the list of threads (test operations)
    threads = []
    for i in range(len(train_ops)):
        threads.append(threading.Thread(target=test_ops[i], args=(x,)))

    print "------------ Test --------------------"
    # Show the time of multithreading
    _time = time.time()
    for i in range(len(threads)):
        threads[i].start()
    print "<<      Start whole thread     >> time spend: ", time.time() - _time
    for i in range(len(threads)):
        threads[i].join(5)
    print "<<   Wait whole threads join   >> time spend: ", time.time() - _time

    # Show the time of working with order
    _time = time.time()
    for i in range(len(test_ops)):
        _x = np.expand_dims(x[0], 0)
        test_ops[i](_x)
    print "<<  Main thread work in order  >> time spend: ", time.time() - _time

An the following is the result:

Using TensorFlow backend.
------------ Train --------------------
2017-06-18 17:23:28.364516: W tensorflow/core/platform/cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use SSE4.1 instructions, but these are available on your machine and could speed up CPU computations.
2017-06-18 17:23:28.364537: W tensorflow/core/platform/cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use SSE4.2 instructions, but these are available on your machine and could speed up CPU computations.
2017-06-18 17:23:28.364542: W tensorflow/core/platform/cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use AVX instructions, but these are available on your machine and could speed up CPU computations.
2017-06-18 17:23:28.364546: W tensorflow/core/platform/cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use AVX2 instructions, but these are available on your machine and could speed up CPU computations.
2017-06-18 17:23:28.364550: W tensorflow/core/platform/cpu_feature_guard.cc:45] The TensorFlow library wasn't compiled to use FMA instructions, but these are available on your machine and could speed up CPU computations.
<<      Start whole thread     >> time spend:  0.000718832015991
<<   Wait whole threads join   >> time spend:  0.377538919449
<<  Main thread work in order  >> time spend:  0.152100801468
------------ Test --------------------
<<      Start whole thread     >> time spend:  0.00042986869812
<<   Wait whole threads join   >> time spend:  0.079735994339
<<  Main thread work in order  >> time spend:  0.0590391159058

After this testing, I didn't suggest to use multithreading technique to implement the different model in individual thread. As you can see, the program will spend more time in training or testing.

If there's some error or misunderstanding, welcome to raise your opinion.