MNIST with spark standalone cluster mode #37

Closed nagamanojk closed 6 years ago

nagamanojk commented 6 years ago


Hi Joeri,

I tried to run one of my experiment with pysprak standalone cluster mode, with 3 workers. I'm getting an connectionRefused. error to the worker. Is this expected?

ee207437@pcg-ee207437-1:/usr/lib/spark$ ./bin/spark-submit --master spark:// examples/src/main/python/ gtzan.parquet Using TensorFlow backend. 17/10/11 14:35:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable root |-- features_normalized: vector (nullable = true) |-- label_index: double (nullable = true) |-- label: array (nullable = true) | |-- element: double (containsNull = true)

Number of training instances: 887
Number of testing instances: 113 2017-10-11 14:36:03.929908: W tensorflow/core/platform/] The TensorFlow library wasn't compiled to use SSE4.1 instructions, but these are available on your machine and could speed up CPU computations. 2017-10-11 14:36:03.929928: W tensorflow/core/platform/] The TensorFlow library wasn't compiled to use SSE4.2 instructions, but these are available on your machine and could speed up CPU computations. 2017-10-11 14:36:03.929934: W tensorflow/core/platform/] The TensorFlow library wasn't compiled to use AVX instructions, but these are available on your machine and could speed up CPU computations. 2017-10-11 14:36:03.929938: W tensorflow/core/platform/] The TensorFlow library wasn't compiled to use AVX2 instructions, but these are available on your machine and could speed up CPU computations. 2017-10-11 14:36:03.929943: W tensorflow/core/platform/] The TensorFlow library wasn't compiled to use FMA instructions, but these are available on your machine and could speed up CPU computations. Exception in thread Thread-2: Traceback (most recent call last): File "/usr/lib/python2.7/", line 801, in __bootstrap_inner File "/usr/lib/python2.7/", line 754, in run*self.args, *self.__kwargs) File "/usr/local/lib/python2.7/dist-packages/distkeras/", line 466, in service self.parameter_server.initialize() File "/usr/local/lib/python2.7/dist-packages/distkeras/", line 111, in initialize file_descriptor.bind(('', self.master_port)) File "/usr/lib/python2.7/", line 228, in meth return getattr(self._sock,name)(args) error: [Errno 98] Address already in use

[Stage 9:> (0 + 3) / 3]17/10/11 14:36:10 WARN TaskSetManager: Lost task 1.0 in stage 9.0 (TID 656,, executor 2): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/usr/lib/spark/python/lib/", line 177, in main process() File "/usr/lib/spark/python/lib/", line 172, in process serializer.dump_stream(func(split_index, iterator), outfile) File "/usr/local/lib/python2.7/dist-packages/distkeras/", line 261, in train self.connect() File "/usr/local/lib/python2.7/dist-packages/distkeras/", line 197, in connect self.socket = connect(self.master_host, self.master_port, self.disable_nagle) File "/usr/local/lib/python2.7/dist-packages/distkeras/", line 97, in connect fd.connect((host, port)) File "/usr/lib/python2.7/", line 228, in meth return getattr(self._sock,name)(*args) error: [Errno 111] Connection refused

Thanks, Manoj

JoeriHermans commented 6 years ago

Hi Manoj,

The error says the parameter server port is already allocated, could you check this?

File "/usr/lib/python2.7/", line 228, in meth return getattr(self._sock,name)(*args) error: [Errno 98] Address already in use


nagamanojk commented 6 years ago


Hi Joeri, I tried to re-run, now I'm not get error: [Errno 98] Address already in use, but still, its not able to connect to worker machines. Please find the attached worker1.log & master.log from SparkUI. It look like the code runs only on master, and not on worker. I have tried the same example with Spark's mlLib API's it works fine. Please let me know, if you need any additional information. worker1.log master.log

./bin/spark-submit --master spark:// examples/src/main/python/ gtzan.parquet Using TensorFlow backend. 17/10/11 17:15:07 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable root |-- features_normalized: vector (nullable = true) |-- label_index: double (nullable = true) |-- label: array (nullable = true) | |-- element: double (containsNull = true)

Number of training instances: 897
Number of testing instances: 103 2017-10-11 17:15:14.950629: W tensorflow/core/platform/] The TensorFlow library wasn't compiled to use SSE4.1 instructions, but these are available on your machine and could speed up CPU computations. 2017-10-11 17:15:14.950654: W tensorflow/core/platform/] The TensorFlow library wasn't compiled to use SSE4.2 instructions, but these are available on your machine and could speed up CPU computations. 2017-10-11 17:15:14.950660: W tensorflow/core/platform/] The TensorFlow library wasn't compiled to use AVX instructions, but these are available on your machine and could speed up CPU computations. 2017-10-11 17:15:14.950665: W tensorflow/core/platform/] The TensorFlow library wasn't compiled to use AVX2 instructions, but these are available on your machine and could speed up CPU computations. 2017-10-11 17:15:14.950669: W tensorflow/core/platform/] The TensorFlow library wasn't compiled to use FMA instructions, but these are available on your machine and could speed up CPU computations. [Stage 9:> (0 + 3) / 3]17/10/11 17:15:20 WARN TaskSetManager: Lost task 2.0 in stage 9.0 (TID 669,, executor 0): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/usr/lib/spark/python/lib/", line 177, in main process() File "/usr/lib/spark/python/lib/", line 172, in process serializer.dump_stream(func(split_index, iterator), outfile) File "/usr/local/lib/python2.7/dist-packages/distkeras/", line 261, in train self.connect() File "/usr/local/lib/python2.7/dist-packages/distkeras/", line 197, in connect self.socket = connect(self.master_host, self.master_port, self.disable_nagle) File "/usr/local/lib/python2.7/dist-packages/distkeras/", line 97, in connect fd.connect((host, port)) File "/usr/lib/python2.7/", line 228, in meth return getattr(self._sock,name)(*args) error: [Errno 111] Connection refused

Training time: 27.7298340797
Train Accuracy: 0.952062430323
Test Accuracy: 0.78640776699

Thanks, Manoj

JoeriHermans commented 6 years ago

Hi Manoj,

This shouldn't happen. My guess is that there is an other process occupying the default port (5000). Or, the OS has a firewall enabled. What is the output of netstat -l4pn?


nagamanojk commented 6 years ago

Hi Joeri, It looks like there is no process running on port 5000 and firewall is inactive.

ee207437@pcg-ee207437-3:~$ netstat -lanp --protocol=inet (Not all processes could be identified, non-owned process info will not be shown, you would have to be root to see it all.) Active Internet connections (servers and established) Proto Recv-Q Send-Q Local Address Foreign Address State PID/Program name tcp 0 0 LISTEN -
tcp 0 0
tcp 0 0 LISTEN 2067/vino-server udp 0 0 -
udp 0 0 -
udp 0 0

ee207437@pcg-ee207437-3:~$ sudo ufw status Status: inactive

Thanks, Manoj

JoeriHermans commented 6 years ago

Hmm, strange. Could you send me a sample of how you use ADAG? Because it seems that for some reason the parameter server is not allocated.


nagamanojk commented 6 years ago

Hi Joeri, Please find below code details

if name == "main":

num_processes = 3
num_executors = 1
application_name = "Distributed Keras XXX Analysis"
num_workers = 3

spark = SparkSession\

reader = spark
transFile = sys.argv[1]

# Read the dataset.
raw_dataset = \
                 .select("features_normalized", "label_index", "label")

# Finally, we create a trainingset and a testset.
(training_set, test_set) = raw_dataset.randomSplit([0.9, 0.1])

# Print the schema of the dataset.

#Multilayer Perceptron
mlp = Sequential()
mlp.add(Dense(40, input_shape=(30,)))


optimizer_mlp = 'adam'
loss_mlp = 'categorical_crossentropy'

training_set = training_set.repartition(num_workers)
test_set = test_set.repartition(num_workers)
print("Number of training instances: " + str(training_set.count()))
print("Number of testing instances: " + str(test_set.count()))

#In[10]: Evaluation
def evaluate_accuracy(model, test_set, features="features_normalized"):
    evaluator = AccuracyEvaluator(prediction_col="prediction_index", label_col="label_index")
    predictor = ModelPredictor(keras_model=model, features_col=features)
    transformer = LabelIndexTransformer(output_dim=10)
    test_set =, "label_index")
    test_set = predictor.predict(test_set)
    test_set = transformer.transform(test_set)
    score = evaluator.evaluate(test_set)

    return score

#In[11]: ADAG
trainer = ADAG(keras_model=mlp, worker_optimizer=optimizer_mlp, loss=loss_mlp, num_workers=num_workers,
           batch_size=10, communication_window=5, num_epoch=200,
           features_col="features_normalized", label_col="label")
# Modify the default parallelism factor.
trained_model = trainer.train(training_set)

# View the weights of the trained model.

print("Training time: " + str(trainer.get_training_time()))
print("Train Accuracy: " + str(evaluate_accuracy(trained_model, training_set)))
print("Test Accuracy: " + str(evaluate_accuracy(trained_model, test_set)))



JoeriHermans commented 6 years ago

Hmm, this is really strange. But in the ouput it shows that the model is trained?

Could you give the output of detemine_host_address()? Maybe the port only binds to the local address for some reason. And after the crash do print(trainer.parameter_server).


nagamanojk commented 6 years ago


For determine_host_address() O/P is I tried to print "host" and "port" from /usr/local/lib/python2.7/dist-packages/distkeras/ line 97

When I run in with --master local[*} on PC-ee207437-1 From ee207437-1 host: port:5000 From ee207437-1 host: port:5000 From ee207437-1 host: port:5000

This is expected, as I have num_worker = 3

When I run in with --master spark:// on PC-ee207437-1 From ee207437-1 host: port:5000 (This is master) From ee207437-2 host: port:5000 (This is worker-1) From ee207437-3 host: port:5000 (This is worker-2)

From worker-2 and worker-3 its trying to connect to itself, is this correct ? Shouldn't it connect to master, i.e.,

/etc/hosts has pcg-ee207437-1 pcg-ee207437-2 pcg-ee207437-3

spark/sbin/slaves pcg-ee207437-1 pcg-ee207437-2 pcg-ee207437-3

In SparkUI I see all worker connected to master. I tried running other examples like I works fine, I see reduction in run time.

I think, I'm missing some env-var setting for dist-keras with spark-standalone.

Thanks, Manoj

JoeriHermans commented 6 years ago

Yes, that's the error. We need to force determine_host_address to not pick the local address. If we fix that it will work.

I'm on my phone right now, but I can look at it in about one hour. I'll keep you posted.


JoeriHermans commented 6 years ago

Also, did you define the hostname of local as in /etc/hosts? That would explain why determine host address doesn't function properly.


JoeriHermans commented 6 years ago

Hi Manoj,

I think I have a fix. The only downside is that this code isn't cross-platform. But I don't think a lot of people run Spark on Windows / Mac anyway.

import os
import fcntl
import socket
import struct

def get_interface_ip(ifname):
    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

    return socket.inet_ntoa(fcntl.ioctl(s.fileno(), 0x8915, struct.pack('256s',

def get_default_iface():
    route = "/proc/net/route"
    with open(route) as f:
        for line in f.readlines():
                iface, dest, _, flags, _, _, _, _, _, _, _, =  line.strip().split()
                if dest != '00000000' or not int(flags, 16) & 2:
                return iface

def determine_host_address():
    # Retrieve the Network Interface in ASCII encoding.
    iface = get_default_iface().encode("ascii")
    # Obtain the network address from an active NIC.
    address = get_interface_ip(iface)

    return address

Could you verify if that determine_host_address() doesn't return the local address on your machines?


nagamanojk commented 6 years ago

Hi Jeori,

After commenting localhost /etc/hosts in workers. I did not get error. However my code is still running. I shall keep you posted once it's completed. Also I shall try with your code fix that you have suggested.

Thank you for immediate responses. -Manoj

JoeriHermans commented 6 years ago

If it's ok for you I'll close this issue now. Feel free to re-open it.


nagamanojk commented 6 years ago

Hi Joeri, Sorry for the delay, I was stuck up with some other things on application side. I tried to print "host" and "port" from /usr/local/lib/python2.7/dist-packages/distkeras/ line 97

When I run in with --master spark:// on PC-ee207437-1 From ee207437-1 host: port:5000 (This is master) From ee207437-2 host: port:5000 (This is worker-1) From ee207437-3 host: port:5000 (This is worker-2)

The Run completes successfully. But in the end of worker log, this below line is printed. "17/10/17 08:34:24 ERROR Co" This is not an issue, right?

Thanks a lot for immediate responses, Joeri. --Manoj