chrysimo / muskel2

muskel2
2 stars 1 forks source link

Esecuzione parallela usando nodi remoti non funziona! #1

Open nicolettiant89 opened 7 years ago

nicolettiant89 commented 7 years ago

Salve, quando tento di eseguire qualunque tipo di algoritmo contenuto nei suoi file di testing, come ad esempio RemotePrimeTest, che vertono sull'esecuzione distribuita dell'algoritmo, ottengo sempre il solito errore, cioè:

mar 04, 2017 12:48:43 PM com.hazelcast.core.LifecycleService INFORMAZIONI: HazelcastClient[hz.client_0_dev][3.5.3] is STARTING mar 04, 2017 12:48:43 PM com.hazelcast.core.LifecycleService INFORMAZIONI: HazelcastClient[hz.client_0_dev][3.5.3] is STARTED mar 04, 2017 12:48:44 PM com.hazelcast.core.LifecycleService INFORMAZIONI: HazelcastClient[hz.client_0_dev][3.5.3] is CLIENT_CONNECTED mar 04, 2017 12:48:44 PM com.hazelcast.client.spi.impl.ClientMembershipListener INFORMAZIONI:

Members [1] { Member [192.168.0.2]:5701 }

Exception in thread "main" java.lang.RuntimeException: java.util.concurrent.ExecutionException: com.hazelcast.nio.serialization.HazelcastSerializationException: Problem while reading DataSerializable, namespace: 0, id: 0, class: 'it.reactive.muskel.internal.executor.remote.hazelcast.HazelcastClassLoaderCallable', exception: it.reactive.muskel.internal.executor.remote.hazelcast.HazelcastClassLoaderCallable at it.reactive.muskel.processors.BlockingMuskelProcessorImpl.first(BlockingMuskelProcessorImpl.java:102) at it.reactive.muskel.processors.BlockingMuskelProcessorImpl.first(BlockingMuskelProcessorImpl.java:43) at it.reactive.muskel.examples.PrimeFactorization.main(PrimeFactorization.java:169) Caused by: java.util.concurrent.ExecutionException: com.hazelcast.nio.serialization.HazelcastSerializationException: Problem while reading DataSerializable, namespace: 0, id: 0, class: 'it.reactive.muskel.internal.executor.remote.hazelcast.HazelcastClassLoaderCallable', exception: it.reactive.muskel.internal.executor.remote.hazelcast.HazelcastClassLoaderCallable at com.hazelcast.client.spi.impl.ClientInvocationFuture.resolveException(ClientInvocationFuture.java:188) at com.hazelcast.client.spi.impl.ClientInvocationFuture.resolveResponse(ClientInvocationFuture.java:160) at com.hazelcast.client.spi.impl.ClientInvocationFuture.access$000(ClientInvocationFuture.java:41) at com.hazelcast.client.spi.impl.ClientInvocationFuture$1.run(ClientInvocationFuture.java:234) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) at com.hazelcast.util.executor.HazelcastManagedThread.executeRun(HazelcastManagedThread.java:76) at com.hazelcast.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:92) Caused by: com.hazelcast.nio.serialization.HazelcastSerializationException: Problem while reading DataSerializable, namespace: 0, id: 0, class: 'it.reactive.muskel.internal.executor.remote.hazelcast.HazelcastClassLoaderCallable', exception: it.reactive.muskel.internal.executor.remote.hazelcast.HazelcastClassLoaderCallable at com.hazelcast.nio.serialization.DataSerializer.read(DataSerializer.java:120) at com.hazelcast.nio.serialization.DataSerializer.read(DataSerializer.java:39) at com.hazelcast.nio.serialization.StreamSerializerAdapter.read(StreamSerializerAdapter.java:41) at com.hazelcast.nio.serialization.SerializationServiceImpl.readObject(SerializationServiceImpl.java:325) at com.hazelcast.nio.serialization.ByteArrayObjectDataInput.readObject(ByteArrayObjectDataInput.java:489) at com.hazelcast.executor.impl.client.PartitionTargetCallableRequest.read(PartitionTargetCallableRequest.java:120) at com.hazelcast.client.impl.client.ClientRequest.readPortable(ClientRequest.java:95) at com.hazelcast.nio.serialization.PortableSerializer.read(PortableSerializer.java:84) at com.hazelcast.nio.serialization.PortableSerializer.read(PortableSerializer.java:73) at com.hazelcast.nio.serialization.PortableSerializer.read(PortableSerializer.java:29) at com.hazelcast.nio.serialization.StreamSerializerAdapter.read(StreamSerializerAdapter.java:41) at com.hazelcast.nio.serialization.SerializationServiceImpl.toObject(SerializationServiceImpl.java:276) at com.hazelcast.client.impl.ClientEngineImpl$ClientPacketProcessor.loadRequest(ClientEngineImpl.java:396) at com.hazelcast.client.impl.ClientEngineImpl$ClientPacketProcessor.run(ClientEngineImpl.java:371) at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:100) at com.hazelcast.spi.impl.operationexecutor.classic.OperationThread.processPartitionSpecificRunnable(OperationThread.java:130) at com.hazelcast.spi.impl.operationexecutor.classic.OperationThread.process(OperationThread.java:120) at com.hazelcast.spi.impl.operationexecutor.classic.OperationThread.doRun(OperationThread.java:101) at com.hazelcast.spi.impl.operationexecutor.classic.OperationThread.run(OperationThread.java:76) at ------ End remote and begin local stack-trace ------.(Unknown Source) at com.hazelcast.client.spi.impl.ClientInvocationFuture.resolveException(ClientInvocationFuture.java:175) ... 8 more Caused by: java.lang.ClassNotFoundException: it.reactive.muskel.internal.executor.remote.hazelcast.HazelcastClassLoaderCallable at java.net.URLClassLoader$1.run(URLClassLoader.java:217) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:205) at java.lang.ClassLoader.loadClass(ClassLoader.java:321) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:294) at java.lang.ClassLoader.loadClass(ClassLoader.java:266) at com.hazelcast.nio.ClassLoaderUtil.tryLoadClass(ClassLoaderUtil.java:125) at com.hazelcast.nio.ClassLoaderUtil.loadClass(ClassLoaderUtil.java:114) at com.hazelcast.nio.ClassLoaderUtil.newInstance(ClassLoaderUtil.java:67) at com.hazelcast.nio.serialization.DataSerializer.read(DataSerializer.java:109) at com.hazelcast.nio.serialization.DataSerializer.read(DataSerializer.java:39) at com.hazelcast.nio.serialization.StreamSerializerAdapter.read(StreamSerializerAdapter.java:41) at com.hazelcast.nio.serialization.SerializationServiceImpl.readObject(SerializationServiceImpl.java:325) at com.hazelcast.nio.serialization.ByteArrayObjectDataInput.readObject(ByteArrayObjectDataInput.java:489) at com.hazelcast.executor.impl.client.PartitionTargetCallableRequest.read(PartitionTargetCallableRequest.java:120) at com.hazelcast.client.impl.client.ClientRequest.readPortable(ClientRequest.java:95) at com.hazelcast.nio.serialization.PortableSerializer.read(PortableSerializer.java:84) at com.hazelcast.nio.serialization.PortableSerializer.read(PortableSerializer.java:73) at com.hazelcast.nio.serialization.PortableSerializer.read(PortableSerializer.java:29) at com.hazelcast.nio.serialization.StreamSerializerAdapter.read(StreamSerializerAdapter.java:41) at com.hazelcast.nio.serialization.SerializationServiceImpl.toObject(SerializationServiceImpl.java:276) at com.hazelcast.client.impl.ClientEngineImpl$ClientPacketProcessor.loadRequest(ClientEngineImpl.java:396) at com.hazelcast.client.impl.ClientEngineImpl$ClientPacketProcessor.run(ClientEngineImpl.java:371) at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:100) at com.hazelcast.spi.impl.operationexecutor.classic.OperationThread.processPartitionSpecificRunnable(OperationThread.java:130) at com.hazelcast.spi.impl.operationexecutor.classic.OperationThread.process(OperationThread.java:120) at com.hazelcast.spi.impl.operationexecutor.classic.OperationThread.doRun(OperationThread.java:101) at com.hazelcast.spi.impl.operationexecutor.classic.OperationThread.run(OperationThread.java:76) Exception in thread "MuskelClassLoaderClient-0b626e59-a5ce-4186-aee9-ef477dab4d1e" com.hazelcast.spi.exception.DistributedObjectDestroyedException: DistributedObject[com.hazelcast.collection.impl.queue.QueueService@64fef26a -> requestQueue_0b626e59-a5ce-4186-aee9-ef477dab4d1e] has been destroyed! at com.hazelcast.spi.impl.proxyservice.impl.ProxyServiceImpl.destroyLocalDistributedObject(ProxyServiceImpl.java:156) at com.hazelcast.spi.impl.proxyservice.impl.ProxyServiceImpl.destroyDistributedObject(ProxyServiceImpl.java:141) at com.hazelcast.client.impl.client.ClientDestroyRequest.call(ClientDestroyRequest.java:46) at com.hazelcast.client.impl.client.CallableClientRequest.process(CallableClientRequest.java:29) at com.hazelcast.client.impl.ClientEngineImpl$ClientPacketProcessor.processRequest(ClientEngineImpl.java:468) at com.hazelcast.client.impl.ClientEngineImpl$ClientPacketProcessor.run(ClientEngineImpl.java:384) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) at java.lang.Thread.run(Thread.java:679) at com.hazelcast.util.executor.HazelcastManagedThread.executeRun(HazelcastManagedThread.java:76) at com.hazelcast.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:92) at ------ End remote and begin local stack-trace ------.(Unknown Source) at com.hazelcast.client.spi.impl.ClientInvocationFuture.resolveException(ClientInvocationFuture.java:175) at com.hazelcast.client.spi.impl.ClientInvocationFuture.resolveResponse(ClientInvocationFuture.java:160) at com.hazelcast.client.spi.impl.ClientInvocationFuture.get(ClientInvocationFuture.java:127) at com.hazelcast.client.spi.impl.ClientInvocationFuture.get(ClientInvocationFuture.java:102) at com.hazelcast.client.spi.ClientProxy.invokeInterruptibly(ClientProxy.java:151) at com.hazelcast.client.proxy.ClientQueueProxy.invokeInterruptibly(ClientQueueProxy.java:293) at com.hazelcast.client.proxy.ClientQueueProxy.poll(ClientQueueProxy.java:144) at it.reactive.muskel.context.hazelcast.HazelcastMuskelContext$HazelcastMuskelQueue.poll(HazelcastMuskelContext.java:388) at it.reactive.muskel.context.hazelcast.classloader.client.ClassloaderClientService.doProcessSingle(ClassloaderClientService.java:81) at it.reactive.muskel.context.hazelcast.classloader.client.ClassloaderClientService$ResourceRequestRunnable.run(ClassloaderClientService.java:114) at java.lang.Thread.run(Thread.java:745)

In pratica credo che il classLoader non riesca a trovare, per qualche strana ragione, la classe it.reactive.muskel.internal.executor.remote.hazelcast.HazelcastClassLoaderCallable. Le dice qualcosa questo errore, Grazie.

chrysimo commented 7 years ago

Buongiorno,

ho provato ad eseguire il test con i numeri primi e mi sta funzionando.

Proviamo a metterci in una condizione pulita visto che la classe PrimeFactorization non è presente nel pacchetto rilasciato:

1) Vai nelle directory server/target e lanci un nodo server con il comando java -jar muskel-server-1.0.0.jar. dovrebbero comparire dei messaggi a video tra cui

2017-03-05 11:11:16.675 INFO 16308 --- [ main] it.reactive.muskel.server.Application : Started Application in 6.035 seconds (JVM running for 6.618)

2) Apri con la tua IDE preferita il progetto di esempio Prime e prova a lanciare il Junit Test RemotePrimeTest.remoteMultipleMapCount (alcuni non andranno perchè ci sono configurati gli ip delle macchine del dipartimento). In alternativa, sempre nel solito progetto, copia ed incolla in una nuova classe il contenuto del metodo all'interno del main.

Ti consiglio di modificare il parametro FROM a 1, perchè altrimenti ci diventi vecchio!

Sulla console del server dovresti vedere:

Thread[[default]-3,5,main] - 1 Thread[[default]-3,5,main] - 2 Thread[[default]-3,5,main] - 3 Thread[[default]-0,5,main] - 5 Thread[[default]-3,5,main] - 7 Thread[[default]-5,5,main] - 11 Thread[[default]-0,5,main] - 13 Thread[[default]-0,5,main] - 17 Thread[[default]-5,5,main] - 19 Thread[[default]-3,5,main] - 23 Thread[[default]-2,5,main] - 29 Thread[[default]-3,5,main] - 31 Thread[[default]-2,5,main] - 37 Thread[[default]-6,5,main] - 41 ...

Fammi sapere Christian

nicolettiant89 commented 7 years ago

Salve, grazie mille per la pronta risposta. Facendo come mi ha consigliato funziona sia il suo esempio sia il mio file PrimeFactorization.java tuttavia solo in locale. Le spiego io sono un tesista del prof. Danelutto e la mia tesi riguarda l'implementazione di una serie di algoritmi base utilizzando la sua libreria Muskel2. Gli algoritmi li devo testare sui server dell'università, quindi sulla macchina titanic ho scaricato muskel2, compilato scompattato ecc, ho avviato il server secondo le modalità spiegate tuttavia mi esce la seguente eccezione sul client:

Exception in thread "main" java.util.concurrent.CompletionException: java.util.concurrent.ExecutionException: it.reactive.muskel.internal.executor.remote.hazelcast.utils.ExceptionSerializerUtils$RemoteInvocationExeption: java.lang.RuntimeException: Could not serialize lambda Serialization trace: transformer (it.reactive.muskel.internal.operators.OperatorMap$TransformerSupplier) at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593) at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) at it.reactive.muskel.context.hazelcast.HazelcastMuskelContext$HazelcastMuskelExecutorService$1.onFailure(HazelcastMuskelContext.java:290) at com.hazelcast.client.spi.impl.ClientInvocationFuture$1.run(ClientInvocationFuture.java:236) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) at com.hazelcast.util.executor.HazelcastManagedThread.executeRun(HazelcastManagedThread.java:76) at com.hazelcast.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:92) Caused by: java.util.concurrent.ExecutionException: it.reactive.muskel.internal.executor.remote.hazelcast.utils.ExceptionSerializerUtils$RemoteInvocationExeption: java.lang.RuntimeException: Could not serialize lambda Serialization trace: transformer (it.reactive.muskel.internal.operators.OperatorMap$TransformerSupplier) at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at com.hazelcast.executor.impl.DistributedExecutorService$CallableProcessor.run(DistributedExecutorService.java:203) at com.hazelcast.util.executor.CachedExecutorServiceDelegate$Worker.run(CachedExecutorServiceDelegate.java:209) ... 5 more Caused by: it.reactive.muskel.internal.executor.remote.hazelcast.utils.ExceptionSerializerUtils$RemoteInvocationExeption: java.lang.RuntimeException: Could not serialize lambda Serialization trace: transformer (it.reactive.muskel.internal.operators.OperatorMap$TransformerSupplier) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:144) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:551) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:790) at it.reactive.muskel.internal.utils.SerializerUtils.deserialize(SerializerUtils.java:51) at it.reactive.muskel.internal.executor.remote.hazelcast.AbstractHazelcastClassLoaderExecutor.doOperation(AbstractHazelcastClassLoaderExecutor.java:68) at it.reactive.muskel.internal.executor.remote.hazelcast.HazelcastClassLoaderCallable.call(HazelcastClassLoaderCallable.java:31) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at com.hazelcast.executor.impl.DistributedExecutorService$CallableProcessor.run(DistributedExecutorService.java:201) at com.hazelcast.util.executor.CachedExecutorServiceDelegate$Worker.run(CachedExecutorServiceDelegate.java:209) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) at com.hazelcast.util.executor.HazelcastManagedThread.executeRun(HazelcastManagedThread.java:76) at com.hazelcast.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:92) at ------ End remote and begin local stack-trace ------.(Unknown Source) at com.hazelcast.client.spi.impl.ClientInvocationFuture.resolveException(ClientInvocationFuture.java:175) at com.hazelcast.client.spi.impl.ClientInvocationFuture.resolveResponse(ClientInvocationFuture.java:160) at com.hazelcast.client.spi.impl.ClientInvocationFuture.access$000(ClientInvocationFuture.java:41) at com.hazelcast.client.spi.impl.ClientInvocationFuture$1.run(ClientInvocationFuture.java:234) ... 5 more Caused by: it.reactive.muskel.internal.executor.remote.hazelcast.utils.ExceptionSerializerUtils$RemoteInvocationExeption: Could not serialize lambda at com.esotericsoftware.kryo.serializers.ClosureSerializer.read(ClosureSerializer.java:73) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:708) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:551) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:790) at it.reactive.muskel.internal.utils.SerializerUtils.deserialize(SerializerUtils.java:51) at it.reactive.muskel.internal.executor.remote.hazelcast.AbstractHazelcastClassLoaderExecutor.doOperation(AbstractHazelcastClassLoaderExecutor.java:68) at it.reactive.muskel.internal.executor.remote.hazelcast.HazelcastClassLoaderCallable.call(HazelcastClassLoaderCallable.java:31) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at com.hazelcast.executor.impl.DistributedExecutorService$CallableProcessor.run(DistributedExecutorService.java:201) at com.hazelcast.util.executor.CachedExecutorServiceDelegate$Worker.run(CachedExecutorServiceDelegate.java:209) ... 5 more Caused by: it.reactive.muskel.internal.executor.remote.hazelcast.utils.ExceptionSerializerUtils$RemoteInvocationExeption: Unable to find class: it.reactive.muskel.examples.PrimeFactorization Serialization trace: capturingClass (java.lang.invoke.SerializedLambda) at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:156) at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:133) at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:670) at com.esotericsoftware.kryo.serializers.DefaultSerializers$ClassSerializer.read(DefaultSerializers.java:326) at com.esotericsoftware.kryo.serializers.DefaultSerializers$ClassSerializer.read(DefaultSerializers.java:314) at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:759) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:132) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:551) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:686) at com.esotericsoftware.kryo.serializers.ClosureSerializer.read(ClosureSerializer.java:70) ... 15 more Caused by: it.reactive.muskel.internal.executor.remote.hazelcast.utils.ExceptionSerializerUtils$RemoteInvocationExeption: it.reactive.muskel.examples.PrimeFactorization at it.reactive.muskel.server.hazelcast.classloader.HazelcastClassLoader.defineClass(HazelcastClassLoader.java:206) at it.reactive.muskel.server.hazelcast.classloader.HazelcastClassLoader.findClass(HazelcastClassLoader.java:197) at it.reactive.muskel.server.hazelcast.classloader.HazelcastClassLoader.loadClass(HazelcastClassLoader.java:185) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:154) ... 24 more

Da quello che sembra non riesce a trovare la classe PrimeFactorization (strano poichè secondo il suo doc, i nodi remoti dovrebbero richiamarsi automaticamente dal client le classi che gli occorrono) oppure non riesce a serializzarla, quindi ho provato anche a fargli implementare la classe Serializable, ma niente. Grazie anticipatamente per le risposte. Antonio

chrysimo commented 7 years ago

Ciao,

mi potresti inviare la classe che hai scritto?

nicolettiant89 commented 7 years ago

eccola, il file è inserito nel progetto muskel2 al livello del file RemotePrimeTest.java:

package it.reactive.muskel.examples;

import java.io.Serializable; import java.math.BigInteger; import java.util.ArrayList; import java.util.List;

import it.reactive.muskel.MuskelExecutor; import it.reactive.muskel.MuskelProcessor; import it.reactive.muskel.context.MuskelContext; import it.reactive.muskel.functions.SerializablePublisher; import it.reactive.muskel.functions.SerializableSubscriber;

public class PrimeFactorization implements Serializable{ /**

Exception in thread "MuskelClassLoaderClient-852d581a-9750-41f7-a8c9-cfdd1fe8195b" com.hazelcast.spi.exception.DistributedObjectDestroyedException: DistributedObject[com.hazelcast.collection.impl.queue.QueueService@3ffdda14 -> requestQueue_852d581a-9750-41f7-a8c9-cfdd1fe8195b] has been destroyed! at com.hazelcast.spi.impl.proxyservice.impl.ProxyServiceImpl.destroyLocalDistributedObject(ProxyServiceImpl.java:156) at com.hazelcast.spi.impl.proxyservice.impl.ProxyServiceImpl.destroyDistributedObject(ProxyServiceImpl.java:141) at com.hazelcast.client.impl.client.ClientDestroyRequest.call(ClientDestroyRequest.java:46) at com.hazelcast.client.impl.client.CallableClientRequest.process(CallableClientRequest.java:29) at com.hazelcast.client.impl.ClientEngineImpl$ClientPacketProcessor.processRequest(ClientEngineImpl.java:468) at com.hazelcast.client.impl.ClientEngineImpl$ClientPacketProcessor.run(ClientEngineImpl.java:384) at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.lang.Thread.run(Unknown Source) at com.hazelcast.util.executor.HazelcastManagedThread.executeRun(HazelcastManagedThread.java:76) at com.hazelcast.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:92) at ------ End remote and begin local stack-trace ------.(Unknown Source) at com.hazelcast.client.spi.impl.ClientInvocationFuture.resolveException(ClientInvocationFuture.java:175) at com.hazelcast.client.spi.impl.ClientInvocationFuture.resolveResponse(ClientInvocationFuture.java:160) at com.hazelcast.client.spi.impl.ClientInvocationFuture.get(ClientInvocationFuture.java:127) at com.hazelcast.client.spi.impl.ClientInvocationFuture.get(ClientInvocationFuture.java:102) at com.hazelcast.client.spi.ClientProxy.invokeInterruptibly(ClientProxy.java:151) at com.hazelcast.client.proxy.ClientQueueProxy.invokeInterruptibly(ClientQueueProxy.java:293) at com.hazelcast.client.proxy.ClientQueueProxy.poll(ClientQueueProxy.java:144) at it.reactive.muskel.context.hazelcast.HazelcastMuskelContext$HazelcastMuskelQueue.poll(HazelcastMuskelContext.java:388) at it.reactive.muskel.context.hazelcast.classloader.client.ClassloaderClientService.doProcessSingle(ClassloaderClientService.java:81) at it.reactive.muskel.context.hazelcast.classloader.client.ClassloaderClientService$ResourceRequestRunnable.run(ClassloaderClientService.java:114) at java.lang.Thread.run(Thread.java:745)

Grazie, Antonio

chrysimo commented 7 years ago

Ciao Antonio,

riscontri tale errore perchè il try(MuskelProcessor cpu... chiude il context che cerchi di riutilizzare successivamente quando richiami nuovamente parallelFactorization1.

Inoltre non mi quadra molto come utilizzi il Publisher (e potrebbe darti errori quando se i worker dovessero essere troppo lenti): la onNext e la OnComplete della classe SerializablePublisher andrebbero invocate dopo aver effettuata la subscription (ti devi registrare usando s.onSubscribe...) all'interno del metodo request(k) della classe Subscription. Vedi il sorgente della classe OnSubscribeRange e vedrai che non è così banale realizzarlo...

Per non sbagliarmi, se fossi in te userei:

MuskelProcessor.range(start, count)

oppure se vuoi long

MuskelProcessor.fromStream(LongStream.range(1, 1000).mapToObj(k -> new BigInteger(k)))

e con la filter filtri i valori che non vuoi

Oppure se la lista non è enorme

List<BigInteger> elements = new ArrayList<>();
    for(BigInteger i=new BigInteger("2");i.compareTo(sqrt(n))<=0;i=i.add(new BigInteger("1"))){ 
        elements.add(i);

MuskelProcessor.fromIterable(elements)

Christian

nicolettiant89 commented 7 years ago

Grazie mille. Un'altra domanda, con il comando

...map(i-> f(i), MuskelExecutor.local())...

vado a parallelizzare l'esecuzione di f(i) fra i core del mio calcolatore locale, mentre

...map(i->f(i), MuskelExecutor.remote())...

mi permette di parallelizzare il calcolo di f(i) fra i vari nodi/calcolatori remoti, giusto? ma una cosa non mi è chiara: immaginiamo di avere tre nodi remoti A,B,C ognuno con 8 core; i vari elementi "i" verranno eseguiti in parallelo da i tre nodi remoti ignorando che sono multicore quindi divido il tempo di esecuzione fratto 3 oppure vado a sfruttare anche i core sottostanti (quello che vorrei fare) e quindi divido il tempo di esecuzione di 3*8? Non so se hai capito quello che intendo. Antonio

https://www.avast.com/sig-email?utm_medium=email&utm_source=link&utm_campaign=sig-email&utm_content=webmail Mail priva di virus. www.avast.com https://www.avast.com/sig-email?utm_medium=email&utm_source=link&utm_campaign=sig-email&utm_content=webmail <#DAB4FAD8-2DD7-40BB-A1B8-4E2AA1F9FDF2>

Il giorno 6 marzo 2017 17:31, Christian Simonelli notifications@github.com ha scritto:

Ciao Antonio,

riscontri tale errore perchè il try(MuskelProcessor cpu... chiude il context che cerchi di riutilizzare successivamente quando richiami nuovamente parallelFactorization1.

Inoltre non mi quadra molto come utilizzi il Publisher (e potrebbe darti errori quando se i worker dovessero essere troppo lenti): la onNext e la OnComplete della classe SerializablePublisher andrebbero invocate dopo aver effettuata la subscription (ti devi registrare usando s.onSubscribe...) all'interno del metodo request(k) della classe Subscription. Vedi il sorgente della classe OnSubscribeRange e vedrai che non è così banale realizzarlo...

Per non sbagliarmi, se fossi in te userei:

MuskelProcessor.range(start, count)

oppure se vuoi long

MuskelProcessor.fromStream(LongStream.range(1, 1000).mapToObj(k -> new BigInteger(k)))

e con la filter filtri i valori che non vuoi

Oppure se la lista non è enorme

List elements = new ArrayList<>(); for(BigInteger i=new BigInteger("2");i.compareTo(sqrt(n))<=0;i=i.add(new BigInteger("1"))){ elements.add(i);

MuskelProcessor.fromIterable(elements)

Christian

— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub https://github.com/chrysimo/muskel2/issues/1#issuecomment-284450607, or mute the thread https://github.com/notifications/unsubscribe-auth/AQU-NXH2LBgErP06JBhbq6o46V_4CtjTks5rjDTEgaJpZM4MTEOT .

chrysimo commented 7 years ago

Ciao Antonio,

quando lanci i server usa l'opzione -clientPoolSize o --clientPoolSize (default 16) che indica il pool size del singolo nodo

In questo modo a tempo di esecuzione avrai la somma dei pool size dei nodi server, cioè quello che desideri!

Provo a spiegarti come funziona il giro.

Il client ed il server utilizzano la libreria Hazelcast che permette di definire i pool size di un numero indefinito di thread pool. Ogni thread pool viene definito in configurazione e ha un nome. Se vedi il file HazelcastConfiguration che è la classe che istanzia il nodo server di Hazelcast, vedi scritto

config.getExecutorConfig("default").setPoolSize(clientPoolSize);

Supponi di voler definire un nuovo pool specifico chiamato pippo in più oltre a quello di default

    config.getExecutorConfig("pippo").setPoolSize(clientPoolSize);

Lato client per usare quello specifico e non di default dovrai usare 

MuskelExecutor.remote("pippo")

Ti ricordo inoltre che potresti inventarti configurazioni che usano ThreadPool locali a quelli remoti. Supponi questo caso d'uso

Stream di compressione video (me lo sto inventando) dove i nodi (o gruppi) fanno cose specifiche

Uno divide il video immagini (A)

Uno fa la compressione (B)

Uno ricrea le immagini compresse (C)

Quindi lo stream passa da A - B- C

a codice viene una cosa del genere:

MuskelProcessor.fromPublisher(pub).map(A, MuskelExecutor.local()).map(B, MuskelExecutor.local()),map(C, MuskelExecutor.local()),

Con la executeOn sposti lo stream sul nodo desiderato e se dopo invochi map(i-> elaborazione, MuskelExecutor.local()) andrai ad eseguirla sul Thread Pool locale del nodoA e non del client

MuskelProcessor.fromPublisher(pub).executeOn("nodoA").map(A, MuskelExecutor.local())

Christian

nicolettiant89 commented 7 years ago

ok, quando avevo capito bene, però di fronte a questo algoritmo che implementa un po l'ultima cosa che hai detto:

public static List parallelFactorization1(Integer n){ List values=MuskelProcessor.range(1, 40067) .withContext(MuskelContext.builder().client() .addAddress("localhost:5701").name("muskel").password("password").build()) .executeOn(MuskelExecutor.remote("pippo")) //.doOnNext((x)-> System.out.println(x)) .map((i)-> n%i==0 ? i:null, MuskelExecutor.local()) //.doOnNext((x)-> System.out.println(x)) .filter((x)-> x!=null) //.doOnNext((x)-> System.out.println(x)) .doOnNext(i -> System.out.println("On Server: " + i)) .toLocal() .toList() .toBlocking().first();

//context.close(); //values.addAll(parallelFactorization1(temp)); return values; } sul server mi esce il seguente errore: java.lang.IllegalArgumentException: Context cannot be null at it.reactive.muskel.internal.operators.OperatorMap$OperatorMapSubscribeExecutor.schedule(OperatorMap.java:182) ~[muskel-core-1.0.0.jar!/:] at it.reactive.muskel.internal.operators.OperatorMap$OperatorMapSubscribeExecutor.request(OperatorMap.java:194) ~[muskel-core-1.0.0.jar!/:] at it.reactive.muskel.internal.subscriber.AbstractBasicSubscriber.request(AbstractBasicSubscriber.java:137) ~[muskel-core-1.0.0.jar!/:] at it.reactive.muskel.internal.subscriber.AbstractBasicSubscriber.request(AbstractBasicSubscriber.java:112) ~[muskel-core-1.0.0.jar!/:] at it.reactive.muskel.internal.subscriber.subscription.utils.SubscriptionTopicUtils.lambda$createSubscriptionCallBack$0(SubscriptionTopicUtils.java:39) ~[muskel-core-1.0.0. jar!/:] at it.reactive.muskel.context.hazelcast.HazelcastMuskelContext.lambda$addMessageListener$0(HazelcastMuskelContext.java:137) ~[muskel-core-1.0.0.jar!/:] at com.hazelcast.topic.impl.TopicService.dispatchEvent(TopicService.java:138) ~[hazelcast-3.5.3.jar!/:3.5.3] at com.hazelcast.spi.impl.eventservice.impl.LocalEventDispatcher.run(LocalEventDispatcher.java:63) ~[hazelcast-3.5.3.jar!/:3.5.3] at com.hazelcast.util.executor.StripedExecutor$Worker.process(StripedExecutor.java:190) [hazelcast-3.5.3.jar!/:3.5.3] at com.hazelcast.util.executor.StripedExecutor$Worker.run(StripedExecutor.java:174) [hazelcast-3.5.3.jar!/:3.5.3]

d'altro canto se alla map tolgo "MuskelProcessor.local()" l'algoritmo funziona ma non termina mai sul client anche se uso context.close() oppure includo tutto nel try...

https://www.avast.com/sig-email?utm_medium=email&utm_source=link&utm_campaign=sig-email&utm_content=webmail Mail priva di virus. www.avast.com https://www.avast.com/sig-email?utm_medium=email&utm_source=link&utm_campaign=sig-email&utm_content=webmail <#DAB4FAD8-2DD7-40BB-A1B8-4E2AA1F9FDF2>

Il giorno 7 marzo 2017 12:58, Christian Simonelli notifications@github.com ha scritto:

Ciao Antonio,

quando lanci i server usa l'opzione -clientPoolSize o --clientPoolSize (default 16) che indica il pool size del singolo nodo

In questo modo a tempo di esecuzione avrai la somma dei pool size dei nodi server, cioè quello che desideri!

Provo a spiegarti come funziona il giro.

Il client ed il server utilizzano la libreria Hazelcast che permette di definire i pool size di un numero indefinito di thread pool. Ogni thread pool viene definito in configurazione e ha un nome. Se vedi il file HazelcastConfiguration che è la classe che istanzia il nodo server di Hazelcast, vedi scritto

config.getExecutorConfig("default").setPoolSize(clientPoolSize);

Supponi di voler definire un nuovo pool specifico chiamato pippo in più oltre a quello di default

config.getExecutorConfig("pippo").setPoolSize(clientPoolSize);

Lato client per usare quello specifico e non di default dovrai usare

MuskelExecutor.remote("pippo")

Ti ricordo inoltre che potresti inventarti configurazioni che usano ThreadPool locali a quelli remoti. Supponi questo caso d'uso

Stream di compressione video (me lo sto inventando) dove i nodi (o gruppi) fanno cose specifiche

Uno divide il video immagini (A)

Uno fa la compressione (B)

Uno ricrea le immagini compresse (C)

Quindi lo stream passa da A - B- C

a codice viene una cosa del genere:

MuskelProcessor.fromPublisher(pub).map(A, MuskelExecutor.local()).map(B, MuskelExecutor.local()),map(C, MuskelExecutor.local()),

Con la executeOn sposti lo stream sul nodo desiderato e se dopo invochi map(i-> elaborazione, MuskelExecutor.local()) andrai ad eseguirla sul Thread Pool locale del nodoA e non del client

MuskelProcessor.fromPublisher(pub).executeOn("nodoA").map(A, MuskelExecutor.local())

Christian

— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub https://github.com/chrysimo/muskel2/issues/1#issuecomment-284702238, or mute the thread https://github.com/notifications/unsubscribe-auth/AQU-NXPi9R23t1_mdG5Gg3yu5rQ5-wfxks5rjUZsgaJpZM4MTEOT .

chrysimo commented 7 years ago

Ho committato diverse modifiche che avevo in canna. Potresti fare update e vedere se hai lo stesso problema?

nicolettiant89 commented 7 years ago

Funziona perfettamente. Grazie mille

https://www.avast.com/sig-email?utm_medium=email&utm_source=link&utm_campaign=sig-email&utm_content=webmail Mail priva di virus. www.avast.com https://www.avast.com/sig-email?utm_medium=email&utm_source=link&utm_campaign=sig-email&utm_content=webmail <#DAB4FAD8-2DD7-40BB-A1B8-4E2AA1F9FDF2>

2017-03-07 15:12 GMT+01:00 Christian Simonelli notifications@github.com:

Ho committato diverse modifiche che avevo in canna. Potresti fare update e vedere se hai lo stesso problema?

— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub https://github.com/chrysimo/muskel2/issues/1#issuecomment-284731981, or mute the thread https://github.com/notifications/unsubscribe-auth/AQU-NTanL_bburlGdS456oqXCS16HXaVks5rjWXqgaJpZM4MTEOT .

nicolettiant89 commented 7 years ago

Salve, ho un altro problema con lo stesso algoritmo:

public static List parallelLocalFactorization(BigInteger n){ if(n.compareTo(BigInteger.ZERO)<=0) throw new IllegalArgumentException("Negative value not allowed!"); else if (n.compareTo(new BigInteger("2"))<0) return new ArrayList<>(); else if (n.isProbablePrime(100)) { ArrayList res= new ArrayList<>(); res.add(n); return res; } SerializablePublisher publisher= new SerializablePublisher() { @Override public void subscribe(SerializableSubscriber<? super BigInteger> s) { s.onSubscribe(new Subscription() { private final SerializableSubscriber<? super BigInteger> subscriber=s; public BigInteger val=new BigInteger("2"); @Override public void request(long value) { int count=0; while(count<value||value==Long.MAX_VALUE){ if(val.compareTo(sqrt(n))<=0){ subscriber.onNext(val); val=val.add(new BigInteger("1")); } else{ subscriber.onComplete(); return; } count++; } } @Override public void cancel() { // TODO Auto-generated method stub } }); } }; List values=MuskelProcessor.fromPublisher(publisher) .withContext(MuskelContext.builder().client() .addAddress("pianosa.di.unipi.it:5701").name("muskel").password("password") //.addAddress("pianosau.di.unipi.it ").name("muskel").password("password").defaultPoolSize(no_thread) .build()) .groupBySortedToList(x->ThreadLocalRandom.current().nextInt(1, no_thread + 1)) //.groupBySortedToList(x->x.mod(BigInteger.valueOf(10))) .flatMap(list-> MuskelProcessor.fromIterable(list.toBlocking().first()) .map((i)-> { return i.isProbablePrime(300)&&n.mod(i).compareTo(BigInteger.ZERO)==0 ? i : null; }) .filter((x)-> x!=null) ,MuskelExecutor.remote()) .toList().toBlocking().first(); //System.out.println(values); BigInteger temp=n; for(BigInteger p:values){ temp=temp.divide(p); } values.addAll(parallelLocalFactorization(temp)); return values; }

seguendo il tuo consiglio ho cercato di comprendere meglio il funzionamento del Publisher, mi occorre utilizzarlo poichè lo stream molto spesso è troppo grande per essere contenuto all'interno di una List. Come vedi l'ho modificato pensando di averlo capito dato che l'algoritmo gira perfettamente in locale e scala anche, purtroppo però in remoto mi da il seguente errore:

Exception in thread "main" java.util.concurrent.CompletionException: java.util.concurrent.ExecutionException: com.hazelcast.nio.serialization.HazelcastSerializationException: com.esotericsoftware.kryo.KryoException: Buffer overflow. Available: 0, required: 1 Serialization trace: element (it.reactive.muskel.internal.publisher.ScalarPublisher) publisher (it.reactive.muskel.processors.GroupedMuskelProcessorImpl) source (it.reactive.muskel.internal.operators.OperatorMap$TransformerSupplier) at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593) at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) at it.reactive.muskel.context.hazelcast.HazelcastMuskelContext$HazelcastMuskelExecutorService$1.onFailure(HazelcastMuskelContext.java:290) at com.hazelcast.client.spi.impl.ClientInvocationFuture$1.run(ClientInvocationFuture.java:236) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) at com.hazelcast.util.executor.HazelcastManagedThread.executeRun(HazelcastManagedThread.java:76) at com.hazelcast.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:92) Caused by: java.util.concurrent.ExecutionException: com.hazelcast.nio.serialization.HazelcastSerializationException: com.esotericsoftware.kryo.KryoException: Buffer overflow. Available: 0, required: 1 Serialization trace: element (it.reactive.muskel.internal.publisher.ScalarPublisher) publisher (it.reactive.muskel.processors.GroupedMuskelProcessorImpl) source (it.reactive.muskel.internal.operators.OperatorMap$TransformerSupplier) at com.hazelcast.client.spi.impl.ClientInvocationFuture.resolveException(ClientInvocationFuture.java:188) at com.hazelcast.client.spi.impl.ClientInvocationFuture.resolveResponse(ClientInvocationFuture.java:160) at com.hazelcast.client.spi.impl.ClientInvocationFuture.access$000(ClientInvocationFuture.java:41) at com.hazelcast.client.spi.impl.ClientInvocationFuture$1.run(ClientInvocationFuture.java:234) ... 5 more Caused by: com.hazelcast.nio.serialization.HazelcastSerializationException: com.esotericsoftware.kryo.KryoException: Buffer overflow. Available: 0, required: 1 Serialization trace: element (it.reactive.muskel.internal.publisher.ScalarPublisher) publisher (it.reactive.muskel.processors.GroupedMuskelProcessorImpl) source (it.reactive.muskel.internal.operators.OperatorMap$TransformerSupplier) at com.hazelcast.nio.serialization.SerializationServiceImpl.handleException(SerializationServiceImpl.java:380) at com.hazelcast.nio.serialization.SerializationServiceImpl.writeObject(SerializationServiceImpl.java:307) at com.hazelcast.nio.serialization.ByteArrayObjectDataOutput.writeObject(ByteArrayObjectDataOutput.java:315) at com.hazelcast.executor.impl.client.PartitionTargetCallableRequest.write(PartitionTargetCallableRequest.java:111) at com.hazelcast.client.impl.client.ClientRequest.writePortable(ClientRequest.java:86) at com.hazelcast.nio.serialization.PortableSerializer.writeInternal(PortableSerializer.java:62) at com.hazelcast.nio.serialization.PortableSerializer.write(PortableSerializer.java:53) at com.hazelcast.nio.serialization.PortableSerializer.write(PortableSerializer.java:29) at com.hazelcast.nio.serialization.StreamSerializerAdapter.write(StreamSerializerAdapter.java:37) at com.hazelcast.nio.serialization.SerializationServiceImpl.toData(SerializationServiceImpl.java:227) at com.hazelcast.nio.serialization.SerializationServiceImpl.toData(SerializationServiceImpl.java:207) at com.hazelcast.client.spi.impl.ClientInvocationServiceSupport.send(ClientInvocationServiceSupport.java:104) at com.hazelcast.client.spi.impl.ClientSmartInvocationServiceImpl.invokeOnPartitionOwner(ClientSmartInvocationServiceImpl.java:50) at com.hazelcast.client.spi.impl.ClientInvocation.invokeOnSelection(ClientInvocation.java:161) at com.hazelcast.client.spi.impl.ClientInvocation.invoke(ClientInvocation.java:149) at com.hazelcast.client.proxy.ClientExecutorServiceProxy.invokeOnPartitionOwner(ClientExecutorServiceProxy.java:563) at com.hazelcast.client.proxy.ClientExecutorServiceProxy.submitToRandomInternal(ClientExecutorServiceProxy.java:431) at com.hazelcast.client.proxy.ClientExecutorServiceProxy.submit(ClientExecutorServiceProxy.java:297) at it.reactive.muskel.context.hazelcast.HazelcastMuskelContext$HazelcastMuskelExecutorService.submitToKeyOwner(HazelcastMuskelContext.java:296) at it.reactive.muskel.internal.executor.impl.MultipleMuskelExecutorService.submitToKeyOwner(MultipleMuskelExecutorService.java:35) at it.reactive.muskel.internal.operators.OperatorMap$OperatorMapSubscribeExecutor.onNext(OperatorMap.java:210) at it.reactive.muskel.internal.subscriber.AbstractBasicSubscriber.doOnNext(AbstractBasicSubscriber.java:93) at it.reactive.muskel.internal.operators.OperatorGroupByToList$OperatorGroupBySubScriber.doOnNext(OperatorGroupByToList.java:47) at it.reactive.muskel.internal.subscriber.AbstractOperatorGroupBySubScriber.lambda$0(AbstractOperatorGroupBySubScriber.java:66) at java.util.TreeMap.forEach(TreeMap.java:1005) at it.reactive.muskel.internal.subscriber.AbstractOperatorGroupBySubScriber.onComplete(AbstractOperatorGroupBySubScriber.java:66) at it.reactive.muskel.examples.PrimeFactorization$1$1.request(PrimeFactorization.java:103) at it.reactive.muskel.internal.subscriber.AbstractBasicSubscriber.request(AbstractBasicSubscriber.java:137) at it.reactive.muskel.internal.subscriber.AbstractBasicSubscriber.request(AbstractBasicSubscriber.java:112) at it.reactive.muskel.internal.subscriber.AbstractOperatorGroupBySubScriber.request(AbstractOperatorGroupBySubScriber.java:45) at it.reactive.muskel.internal.subscriber.AbstractBasicSubscriber.request(AbstractBasicSubscriber.java:137) at it.reactive.muskel.internal.subscriber.AbstractBasicSubscriber.request(AbstractBasicSubscriber.java:112) at it.reactive.muskel.internal.operators.OperatorMap$OperatorMapSubscribeExecutor.dequeueAndRequestMore(OperatorMap.java:154) at it.reactive.muskel.internal.operators.OperatorMap$OperatorMapSubscribeExecutor.lambda$1(OperatorMap.java:123) at it.reactive.muskel.internal.executor.local.ContextForwardRunnable.doOperation(ContextForwardRunnable.java:24) at it.reactive.muskel.internal.executor.local.ContextForwardRunnable.doOperation(ContextForwardRunnable.java:1) at it.reactive.muskel.internal.executor.local.AbstractContextForward.lambda$0(AbstractContextForward.java:41) at it.reactive.muskel.context.utils.ManagedContextUtils.executeWithContext(ManagedContextUtils.java:34) at it.reactive.muskel.context.utils.ManagedContextUtils.executeWithContext(ManagedContextUtils.java:27) at it.reactive.muskel.internal.executor.local.AbstractContextForward.doOperation(AbstractContextForward.java:36) at it.reactive.muskel.internal.executor.local.ContextForwardRunnable.run(ContextForwardRunnable.java:18) at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1626) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) at ------ End remote and begin local stack-trace ------.(Unknown Source) at com.hazelcast.client.spi.impl.ClientInvocationFuture.resolveException(ClientInvocationFuture.java:175) ... 8 more Caused by: com.esotericsoftware.kryo.KryoException: Buffer overflow. Available: 0, required: 1 Serialization trace: element (it.reactive.muskel.internal.publisher.ScalarPublisher) publisher (it.reactive.muskel.processors.GroupedMuskelProcessorImpl) source (it.reactive.muskel.internal.operators.OperatorMap$TransformerSupplier) at com.esotericsoftware.kryo.io.Output.require(Output.java:163) at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:246) at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:232) at com.esotericsoftware.kryo.serializers.DefaultSerializers$BigIntegerSerializer.write(DefaultSerializers.java:222) at com.esotericsoftware.kryo.serializers.DefaultSerializers$BigIntegerSerializer.write(DefaultSerializers.java:201) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:628) at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:100) at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:40) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:552) at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:80) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:518) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:552) at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:80) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:518) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:552) at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:80) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:518) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:628) at it.reactive.muskel.internal.utils.SerializerUtils.serializeToByteArray(SerializerUtils.java:38) at it.reactive.muskel.internal.executor.remote.hazelcast.AbstractHazelcastClassLoaderExecutor.writeData(AbstractHazelcastClassLoaderExecutor.java:88) at com.hazelcast.nio.serialization.DataSerializer.write(DataSerializer.java:140) at com.hazelcast.nio.serialization.DataSerializer.write(DataSerializer.java:39) at com.hazelcast.nio.serialization.StreamSerializerAdapter.write(StreamSerializerAdapter.java:37) at com.hazelcast.nio.serialization.SerializationServiceImpl.writeObject(SerializationServiceImpl.java:305) at com.hazelcast.nio.serialization.ByteArrayObjectDataOutput.writeObject(ByteArrayObjectDataOutput.java:315) at com.hazelcast.executor.impl.client.PartitionTargetCallableRequest.write(PartitionTargetCallableRequest.java:111) at com.hazelcast.client.impl.client.ClientRequest.writePortable(ClientRequest.java:86) at com.hazelcast.nio.serialization.PortableSerializer.writeInternal(PortableSerializer.java:62) at com.hazelcast.nio.serialization.PortableSerializer.write(PortableSerializer.java:53) at com.hazelcast.nio.serialization.PortableSerializer.write(PortableSerializer.java:29) at com.hazelcast.nio.serialization.StreamSerializerAdapter.write(StreamSerializerAdapter.java:37) at com.hazelcast.nio.serialization.SerializationServiceImpl.toData(SerializationServiceImpl.java:227) at com.hazelcast.nio.serialization.SerializationServiceImpl.toData(SerializationServiceImpl.java:207) at com.hazelcast.client.spi.impl.ClientInvocationServiceSupport.send(ClientInvocationServiceSupport.java:104) at com.hazelcast.client.spi.impl.ClientSmartInvocationServiceImpl.invokeOnPartitionOwner(ClientSmartInvocationServiceImpl.java:50) at com.hazelcast.client.spi.impl.ClientInvocation.invokeOnSelection(ClientInvocation.java:161) at com.hazelcast.client.spi.impl.ClientInvocation.invoke(ClientInvocation.java:149) at com.hazelcast.client.proxy.ClientExecutorServiceProxy.invokeOnPartitionOwner(ClientExecutorServiceProxy.java:563) at com.hazelcast.client.proxy.ClientExecutorServiceProxy.submitToRandomInternal(ClientExecutorServiceProxy.java:431) at com.hazelcast.client.proxy.ClientExecutorServiceProxy.submit(ClientExecutorServiceProxy.java:297) at it.reactive.muskel.context.hazelcast.HazelcastMuskelContext$HazelcastMuskelExecutorService.submitToKeyOwner(HazelcastMuskelContext.java:296) at it.reactive.muskel.internal.executor.impl.MultipleMuskelExecutorService.submitToKeyOwner(MultipleMuskelExecutorService.java:35) at it.reactive.muskel.internal.operators.OperatorMap$OperatorMapSubscribeExecutor.onNext(OperatorMap.java:210) at it.reactive.muskel.internal.subscriber.AbstractBasicSubscriber.doOnNext(AbstractBasicSubscriber.java:93) at it.reactive.muskel.internal.operators.OperatorGroupByToList$OperatorGroupBySubScriber.doOnNext(OperatorGroupByToList.java:47) at it.reactive.muskel.internal.subscriber.AbstractOperatorGroupBySubScriber.lambda$0(AbstractOperatorGroupBySubScriber.java:66) at java.util.TreeMap.forEach(TreeMap.java:1005) at it.reactive.muskel.internal.subscriber.AbstractOperatorGroupBySubScriber.onComplete(AbstractOperatorGroupBySubScriber.java:66) at it.reactive.muskel.examples.PrimeFactorization$1$1.request(PrimeFactorization.java:103) at it.reactive.muskel.internal.subscriber.AbstractBasicSubscriber.request(AbstractBasicSubscriber.java:137) at it.reactive.muskel.internal.subscriber.AbstractBasicSubscriber.request(AbstractBasicSubscriber.java:112) at it.reactive.muskel.internal.subscriber.AbstractOperatorGroupBySubScriber.request(AbstractOperatorGroupBySubScriber.java:45) at it.reactive.muskel.internal.subscriber.AbstractBasicSubscriber.request(AbstractBasicSubscriber.java:137) at it.reactive.muskel.internal.subscriber.AbstractBasicSubscriber.request(AbstractBasicSubscriber.java:112) at it.reactive.muskel.internal.operators.OperatorMap$OperatorMapSubscribeExecutor.dequeueAndRequestMore(OperatorMap.java:154) at it.reactive.muskel.internal.operators.OperatorMap$OperatorMapSubscribeExecutor.lambda$1(OperatorMap.java:123) at it.reactive.muskel.internal.executor.local.ContextForwardRunnable.doOperation(ContextForwardRunnable.java:24) at it.reactive.muskel.internal.executor.local.ContextForwardRunnable.doOperation(ContextForwardRunnable.java:1) at it.reactive.muskel.internal.executor.local.AbstractContextForward.lambda$0(AbstractContextForward.java:41) at it.reactive.muskel.context.utils.ManagedContextUtils.executeWithContext(ManagedContextUtils.java:34) at it.reactive.muskel.context.utils.ManagedContextUtils.executeWithContext(ManagedContextUtils.java:27) at it.reactive.muskel.internal.executor.local.AbstractContextForward.doOperation(AbstractContextForward.java:36) at it.reactive.muskel.internal.executor.local.ContextForwardRunnable.run(ContextForwardRunnable.java:18) at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1626) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)

sai percaso da cosa potrebbe essere dovuto? Grazie mille, Antonio

2017-03-08 11:22 GMT+01:00 Antonio Nicoletti nicolettiant89@gmail.com:

Funziona perfettamente. Grazie mille

https://www.avast.com/sig-email?utm_medium=email&utm_source=link&utm_campaign=sig-email&utm_content=webmail Mail priva di virus. www.avast.com https://www.avast.com/sig-email?utm_medium=email&utm_source=link&utm_campaign=sig-email&utm_content=webmail <#m_7180426926966897893_DAB4FAD8-2DD7-40BB-A1B8-4E2AA1F9FDF2>

2017-03-07 15:12 GMT+01:00 Christian Simonelli notifications@github.com:

Ho committato diverse modifiche che avevo in canna. Potresti fare update e vedere se hai lo stesso problema?

— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub https://github.com/chrysimo/muskel2/issues/1#issuecomment-284731981, or mute the thread https://github.com/notifications/unsubscribe-auth/AQU-NTanL_bburlGdS456oqXCS16HXaVks5rjWXqgaJpZM4MTEOT .

chrysimo commented 7 years ago

Ciao,

mi potresti allegare il sorgente java così faccio prima a verificarlo?

Grazie

chrysimo commented 7 years ago

Purtroppo l'elaborazione remota richiede la serializzazione dei dati dello stream che può avere le sue insidie... Nello specifico mii sembra strano si presenti un problema di Buffer overflow sulla libreria Kryo. Credo che il problema sia altrove cmq prova a modificare la riga 37 della classe it.reactive.muskel.internal.utils.SerializerUtils

Da UnsafeOutput output = new UnsafeOutput(4096); A

UnsafeOutput output = new UnsafeOutput(4096, -1);

Nel caso funzioni aggiornerò la classe su github

nicolettiant89 commented 7 years ago

Allora con il cambiamento suggeritomi funziona, anche se ho fatto anche qualche modifica al codice, però al solito la computazione remota, una volta fatto il suo lavoro non termina e anche se forzo la chiusura con context.close() non ne vuole sapere anzi mi da un simpatico errore:

Exception in thread "MuskelClassLoaderClient-6c308832-480e-4a78-a31b-33ee225e0a6b" com.hazelcast.spi.exception.DistributedObjectDestroyedException: DistributedObject[com.hazelcast.collection.impl.queue.QueueService@1469a9ab -> requestQueue_6c308832-480e-4a78-a31b-33ee225e0a6b] has been destroyed! at com.hazelcast.spi.impl.proxyservice.impl.ProxyServiceImpl.destroyLocalDistributedObject(ProxyServiceImpl.java:156) at com.hazelcast.spi.impl.proxyservice.impl.ProxyServiceImpl.destroyDistributedObject(ProxyServiceImpl.java:141) at com.hazelcast.client.impl.client.ClientDestroyRequest.call(ClientDestroyRequest.java:46) at com.hazelcast.client.impl.client.CallableClientRequest.process(CallableClientRequest.java:29) at com.hazelcast.client.impl.ClientEngineImpl$ClientPacketProcessor.processRequest(ClientEngineImpl.java:468) at com.hazelcast.client.impl.ClientEngineImpl$ClientPacketProcessor.run(ClientEngineImpl.java:384) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) at com.hazelcast.util.executor.HazelcastManagedThread.executeRun(HazelcastManagedThread.java:76) at com.hazelcast.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:92) at ------ End remote and begin local stack-trace ------.(Unknown Source) at com.hazelcast.client.spi.impl.ClientInvocationFuture.resolveException(ClientInvocationFuture.java:175) at com.hazelcast.client.spi.impl.ClientInvocationFuture.resolveResponse(ClientInvocationFuture.java:160) at com.hazelcast.client.spi.impl.ClientInvocationFuture.get(ClientInvocationFuture.java:127) at com.hazelcast.client.spi.impl.ClientInvocationFuture.get(ClientInvocationFuture.java:102) at com.hazelcast.client.spi.ClientProxy.invokeInterruptibly(ClientProxy.java:151) at com.hazelcast.client.proxy.ClientQueueProxy.invokeInterruptibly(ClientQueueProxy.java:293) at com.hazelcast.client.proxy.ClientQueueProxy.poll(ClientQueueProxy.java:144) at it.reactive.muskel.context.hazelcast.HazelcastMuskelContext$HazelcastMuskelQueue.poll(HazelcastMuskelContext.java:388) at it.reactive.muskel.context.hazelcast.classloader.client.ClassloaderClientService.doProcessSingle(ClassloaderClientService.java:81) at it.reactive.muskel.context.hazelcast.classloader.client.ClassloaderClientService$ResourceRequestRunnable.run(ClassloaderClientService.java:114) at java.lang.Thread.run(Thread.java:745)

quindi, penso a causa di qualche processo che rimane aperto sul server due invocazioni della classe java allegata, con i medesimi argomenti (ad esempio questi "43429 1 1 True 1 remote")la prima volta funziona normalmente la seconda si blocca, ti allego il sorgente. Grazie. Antonio

https://www.avast.com/sig-email?utm_medium=email&utm_source=link&utm_campaign=sig-email&utm_content=webmail Mail priva di virus. www.avast.com https://www.avast.com/sig-email?utm_medium=email&utm_source=link&utm_campaign=sig-email&utm_content=webmail <#DAB4FAD8-2DD7-40BB-A1B8-4E2AA1F9FDF2>

Il giorno 13 marzo 2017 16:14, Christian Simonelli <notifications@github.com

ha scritto:

Purtroppo l'elaborazione remota richiede la serializzazione dei dati dello stream che può avere le sue insidie... Nello specifico mii sembra strano si presenti un problema di Buffer overflow sulla libreria Kryo. Credo che il problema sia altrove cmq prova a modificare la riga 37 della classe it.reactive.muskel.internal.utils.SerializerUtils

Da UnsafeOutput output = new UnsafeOutput(4096); A

UnsafeOutput output = new UnsafeOutput(4096, -1);

Nel caso funzioni aggiornerò la classe su github

— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub https://github.com/chrysimo/muskel2/issues/1#issuecomment-286137717, or mute the thread https://github.com/notifications/unsubscribe-auth/AQU-NRpB7ReF3Fuauem56Myf46dF_ioUks5rlV05gaJpZM4MTEOT .

nicolettiant89 commented 7 years ago

il fatto che l'algoritmo non termina in remoto cmq è colpa del main se eseguo la stessa cosa con @Test funziona...

https://www.avast.com/sig-email?utm_medium=email&utm_source=link&utm_campaign=sig-email&utm_content=webmail Mail priva di virus. www.avast.com https://www.avast.com/sig-email?utm_medium=email&utm_source=link&utm_campaign=sig-email&utm_content=webmail <#DAB4FAD8-2DD7-40BB-A1B8-4E2AA1F9FDF2>

Il giorno 14 marzo 2017 16:16, Antonio Nicoletti nicolettiant89@gmail.com ha scritto:

Allora con il cambiamento suggeritomi funziona, anche se ho fatto anche qualche modifica al codice, però al solito la computazione remota, una volta fatto il suo lavoro non termina e anche se forzo la chiusura con context.close() non ne vuole sapere anzi mi da un simpatico errore:

Exception in thread "MuskelClassLoaderClient-6c308832-480e-4a78-a31b-33ee225e0a6b" com.hazelcast.spi.exception.DistributedObjectDestroyedException: DistributedObject[com.hazelcast.collection.impl. queue.QueueService@1469a9ab -> requestQueue_6c308832-480e-4a78-a31b-33ee225e0a6b] has been destroyed! at com.hazelcast.spi.impl.proxyservice.impl.ProxyServiceImpl. destroyLocalDistributedObject(ProxyServiceImpl.java:156) at com.hazelcast.spi.impl.proxyservice.impl.ProxyServiceImpl. destroyDistributedObject(ProxyServiceImpl.java:141) at com.hazelcast.client.impl.client.ClientDestroyRequest. call(ClientDestroyRequest.java:46) at com.hazelcast.client.impl.client.CallableClientRequest. process(CallableClientRequest.java:29) at com.hazelcast.client.impl.ClientEngineImpl$ClientPacketProcessor. processRequest(ClientEngineImpl.java:468) at com.hazelcast.client.impl.ClientEngineImpl$ClientPacketProcessor.run( ClientEngineImpl.java:384) at java.util.concurrent.ThreadPoolExecutor.runWorker( ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run( ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) at com.hazelcast.util.executor.HazelcastManagedThread.executeRun( HazelcastManagedThread.java:76) at com.hazelcast.util.executor.HazelcastManagedThread.run( HazelcastManagedThread.java:92) at ------ End remote and begin local stack-trace ------.(Unknown Source) at com.hazelcast.client.spi.impl.ClientInvocationFuture.resolveException( ClientInvocationFuture.java:175) at com.hazelcast.client.spi.impl.ClientInvocationFuture.resolveResponse( ClientInvocationFuture.java:160) at com.hazelcast.client.spi.impl.ClientInvocationFuture.get( ClientInvocationFuture.java:127) at com.hazelcast.client.spi.impl.ClientInvocationFuture.get( ClientInvocationFuture.java:102) at com.hazelcast.client.spi.ClientProxy.invokeInterruptibly( ClientProxy.java:151) at com.hazelcast.client.proxy.ClientQueueProxy.invokeInterruptibly( ClientQueueProxy.java:293) at com.hazelcast.client.proxy.ClientQueueProxy.poll( ClientQueueProxy.java:144) at it.reactive.muskel.context.hazelcast.HazelcastMuskelContext$ HazelcastMuskelQueue.poll(HazelcastMuskelContext.java:388) at it.reactive.muskel.context.hazelcast.classloader.client. ClassloaderClientService.doProcessSingle(ClassloaderClientService.java:81) at it.reactive.muskel.context.hazelcast.classloader.client. ClassloaderClientService$ResourceRequestRunnable.run( ClassloaderClientService.java:114) at java.lang.Thread.run(Thread.java:745)

quindi, penso a causa di qualche processo che rimane aperto sul server due invocazioni della classe java allegata, con i medesimi argomenti (ad esempio questi "43429 1 1 True 1 remote")la prima volta funziona normalmente la seconda si blocca, ti allego il sorgente. Grazie. Antonio

https://www.avast.com/sig-email?utm_medium=email&utm_source=link&utm_campaign=sig-email&utm_content=webmail Mail priva di virus. www.avast.com https://www.avast.com/sig-email?utm_medium=email&utm_source=link&utm_campaign=sig-email&utm_content=webmail <#m_-7562063121154404538_DAB4FAD8-2DD7-40BB-A1B8-4E2AA1F9FDF2>

Il giorno 13 marzo 2017 16:14, Christian Simonelli < notifications@github.com> ha scritto:

Purtroppo l'elaborazione remota richiede la serializzazione dei dati dello stream che può avere le sue insidie... Nello specifico mii sembra strano si presenti un problema di Buffer overflow sulla libreria Kryo. Credo che il problema sia altrove cmq prova a modificare la riga 37 della classe it.reactive.muskel.internal.utils.SerializerUtils

Da UnsafeOutput output = new UnsafeOutput(4096); A

UnsafeOutput output = new UnsafeOutput(4096, -1);

Nel caso funzioni aggiornerò la classe su github

— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub https://github.com/chrysimo/muskel2/issues/1#issuecomment-286137717, or mute the thread https://github.com/notifications/unsubscribe-auth/AQU-NRpB7ReF3Fuauem56Myf46dF_ioUks5rlV05gaJpZM4MTEOT .

nicolettiant89 commented 7 years ago

tuttavia ne ignoro il motivo...

Il giorno 14 marzo 2017 18:09, Antonio Nicoletti nicolettiant89@gmail.com ha scritto:

il fatto che l'algoritmo non termina in remoto cmq è colpa del main se eseguo la stessa cosa con @Test funziona...

https://www.avast.com/sig-email?utm_medium=email&utm_source=link&utm_campaign=sig-email&utm_content=webmail Mail priva di virus. www.avast.com https://www.avast.com/sig-email?utm_medium=email&utm_source=link&utm_campaign=sig-email&utm_content=webmail <#m_797620605152132099_DAB4FAD8-2DD7-40BB-A1B8-4E2AA1F9FDF2>

Il giorno 14 marzo 2017 16:16, Antonio Nicoletti <nicolettiant89@gmail.com

ha scritto:

Allora con il cambiamento suggeritomi funziona, anche se ho fatto anche qualche modifica al codice, però al solito la computazione remota, una volta fatto il suo lavoro non termina e anche se forzo la chiusura con context.close() non ne vuole sapere anzi mi da un simpatico errore:

Exception in thread "MuskelClassLoaderClient-6c308 832-480e-4a78-a31b-33ee225e0a6b" com.hazelcast.spi.exception.Di stributedObjectDestroyedException: DistributedObject[com.hazelcas t.collection.impl.queue.QueueService@1469a9ab -> requestQueue_6c308832-480e-4a78-a31b-33ee225e0a6b] has been destroyed! at com.hazelcast.spi.impl.proxyservice.impl.ProxyServiceImpl.de stroyLocalDistributedObject(ProxyServiceImpl.java:156) at com.hazelcast.spi.impl.proxyservice.impl.ProxyServiceImpl.de stroyDistributedObject(ProxyServiceImpl.java:141) at com.hazelcast.client.impl.client.ClientDestroyRequest.call( ClientDestroyRequest.java:46) at com.hazelcast.client.impl.client.CallableClientRequest.proce ss(CallableClientRequest.java:29) at com.hazelcast.client.impl.ClientEngineImpl$ClientPacketProce ssor.processRequest(ClientEngineImpl.java:468) at com.hazelcast.client.impl.ClientEngineImpl$ClientPacketProce ssor.run(ClientEngineImpl.java:384) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool Executor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo lExecutor.java:617) at java.lang.Thread.run(Thread.java:745) at com.hazelcast.util.executor.HazelcastManagedThread.executeRu n(HazelcastManagedThread.java:76) at com.hazelcast.util.executor.HazelcastManagedThread.run(Hazel castManagedThread.java:92) at ------ End remote and begin local stack-trace ------.(Unknown Source) at com.hazelcast.client.spi.impl.ClientInvocationFuture.resolve Exception(ClientInvocationFuture.java:175) at com.hazelcast.client.spi.impl.ClientInvocationFuture.resolve Response(ClientInvocationFuture.java:160) at com.hazelcast.client.spi.impl.ClientInvocationFuture.get(Cli entInvocationFuture.java:127) at com.hazelcast.client.spi.impl.ClientInvocationFuture.get(Cli entInvocationFuture.java:102) at com.hazelcast.client.spi.ClientProxy.invokeInterruptibly(Cli entProxy.java:151) at com.hazelcast.client.proxy.ClientQueueProxy.invokeInterrupti bly(ClientQueueProxy.java:293) at com.hazelcast.client.proxy.ClientQueueProxy.poll(ClientQueue Proxy.java:144) at it.reactive.muskel.context.hazelcast.HazelcastMuskelContext$ HazelcastMuskelQueue.poll(HazelcastMuskelContext.java:388) at it.reactive.muskel.context.hazelcast.classloader.client.Clas sloaderClientService.doProcessSingle(ClassloaderClientService.java:81) at it.reactive.muskel.context.hazelcast.classloader.client.Clas sloaderClientService$ResourceRequestRunnable.run(Classloader ClientService.java:114) at java.lang.Thread.run(Thread.java:745)

quindi, penso a causa di qualche processo che rimane aperto sul server due invocazioni della classe java allegata, con i medesimi argomenti (ad esempio questi "43429 1 1 True 1 remote")la prima volta funziona normalmente la seconda si blocca, ti allego il sorgente. Grazie. Antonio

https://www.avast.com/sig-email?utm_medium=email&utm_source=link&utm_campaign=sig-email&utm_content=webmail Mail priva di virus. www.avast.com https://www.avast.com/sig-email?utm_medium=email&utm_source=link&utm_campaign=sig-email&utm_content=webmail <#m_797620605152132099m-7562063121154404538_DAB4FAD8-2DD7-40BB-A1B8-4E2AA1F9FDF2>

Il giorno 13 marzo 2017 16:14, Christian Simonelli < notifications@github.com> ha scritto:

Purtroppo l'elaborazione remota richiede la serializzazione dei dati dello stream che può avere le sue insidie... Nello specifico mii sembra strano si presenti un problema di Buffer overflow sulla libreria Kryo. Credo che il problema sia altrove cmq prova a modificare la riga 37 della classe it.reactive.muskel.internal.utils.SerializerUtils

Da UnsafeOutput output = new UnsafeOutput(4096); A

UnsafeOutput output = new UnsafeOutput(4096, -1);

Nel caso funzioni aggiornerò la classe su github

— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub https://github.com/chrysimo/muskel2/issues/1#issuecomment-286137717, or mute the thread https://github.com/notifications/unsubscribe-auth/AQU-NRpB7ReF3Fuauem56Myf46dF_ioUks5rlV05gaJpZM4MTEOT .

chrysimo commented 7 years ago

Prova a mandarmi il main originale, nei messaggi prima non mi sembra di vederlo (o non lo trovo io)

nicolettiant89 commented 7 years ago

Si il main è alla fine del file che ti ho allegato in precedenza, però non è un problema di implementazione mia perchè ho provato a creare un public static void main nel file "RemotePrimeTest" per eseguire il metodo "remoteMultipleMapCount()"; quest'ultimo viene eseguito e da risultati ma la computazione non termina... forse è colpa del main thread anche se dovrebbe morire alla fine del main... Antonio

https://www.avast.com/sig-email?utm_medium=email&utm_source=link&utm_campaign=sig-email&utm_content=webmail Mail priva di virus. www.avast.com https://www.avast.com/sig-email?utm_medium=email&utm_source=link&utm_campaign=sig-email&utm_content=webmail <#DAB4FAD8-2DD7-40BB-A1B8-4E2AA1F9FDF2>

2017-03-14 22:32 GMT+01:00 Christian Simonelli notifications@github.com:

Prova a mandarmi il main originale, nei messaggi prima non mi sembra di vederlo (o non lo trovo io)

— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub https://github.com/chrysimo/muskel2/issues/1#issuecomment-286567596, or mute the thread https://github.com/notifications/unsubscribe-auth/AQU-NUEy0OyXgv1uHfME9qd5Pd4iFj6Rks5rlwdzgaJpZM4MTEOT .

nicolettiant89 commented 7 years ago

ok, aggiungendo System.exit(0) alla fine del main si chiude :)

Il giorno 15 marzo 2017 09:24, Antonio Nicoletti nicolettiant89@gmail.com ha scritto:

Si il main è alla fine del file che ti ho allegato in precedenza, però non è un problema di implementazione mia perchè ho provato a creare un public static void main nel file "RemotePrimeTest" per eseguire il metodo "remoteMultipleMapCount()"; quest'ultimo viene eseguito e da risultati ma la computazione non termina... forse è colpa del main thread anche se dovrebbe morire alla fine del main... Antonio

https://www.avast.com/sig-email?utm_medium=email&utm_source=link&utm_campaign=sig-email&utm_content=webmail Mail priva di virus. www.avast.com https://www.avast.com/sig-email?utm_medium=email&utm_source=link&utm_campaign=sig-email&utm_content=webmail <#m_-8333559558729963028_DAB4FAD8-2DD7-40BB-A1B8-4E2AA1F9FDF2>

2017-03-14 22:32 GMT+01:00 Christian Simonelli notifications@github.com:

Prova a mandarmi il main originale, nei messaggi prima non mi sembra di vederlo (o non lo trovo io)

— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub https://github.com/chrysimo/muskel2/issues/1#issuecomment-286567596, or mute the thread https://github.com/notifications/unsubscribe-auth/AQU-NUEy0OyXgv1uHfME9qd5Pd4iFj6Rks5rlwdzgaJpZM4MTEOT .

nicolettiant89 commented 7 years ago

domanda... dato questo algoritmo molto semplice pescato dai tuoi esempi,

@Test public void remoteMultipleMapCount() { long start=System.currentTimeMillis(); long numbers = range(FROM, 100) .withContext(MuskelContext.builder().client() .addAddress("pianosa.di.unipi.it").name("muskel").password("password") .addAddress("localhost:5701").name("muskel").password("password") .build()) .map(i -> range(2, i - 1).takeFirst(k -> i % k == 0) .defaultIfEmpty(i).map(k -> k.equals(i) ? k : null) .toBlocking().first(), MuskelExecutor.remote("local")) .filter(i -> i != null) .doOnNext( t -> System.out.println(Thread.currentThread() + " - "

// assertEquals(TOTAL, numbers); System.out.println(numbers); System.out.println(System.currentTimeMillis()-start+" ms"); }

e due server: localhost e pianosa avviati rispettivamente con il comando

java -jar .\muskel-server-1.0.0.jar --clientPoolSize=8 --groups=local

come faccio nella map a usare contemporaneamente i threadpool di entrambi i nodi affichè di fatto lavori con 16 worker? in altre parole come faccio a sfruttare entrambi i server contemporaneamente nel consumo parallelo dei task che la map genera? Chiedo questo perchè da diversi test ho notato che solo uno dei server viene preso in considerazione mentre con l'altro non viene instaurata alcuna connessione nonostante li abbia messi anche nello stesso gruppo. Grazie. antonio

Il giorno 15 marzo 2017 09:42, Antonio Nicoletti nicolettiant89@gmail.com ha scritto:

ok, aggiungendo System.exit(0) alla fine del main si chiude :)

Il giorno 15 marzo 2017 09:24, Antonio Nicoletti <nicolettiant89@gmail.com

ha scritto:

Si il main è alla fine del file che ti ho allegato in precedenza, però non è un problema di implementazione mia perchè ho provato a creare un public static void main nel file "RemotePrimeTest" per eseguire il metodo "remoteMultipleMapCount()"; quest'ultimo viene eseguito e da risultati ma la computazione non termina... forse è colpa del main thread anche se dovrebbe morire alla fine del main... Antonio

https://www.avast.com/sig-email?utm_medium=email&utm_source=link&utm_campaign=sig-email&utm_content=webmail Mail priva di virus. www.avast.com https://www.avast.com/sig-email?utm_medium=email&utm_source=link&utm_campaign=sig-email&utm_content=webmail <#m_1152701695746478856m-8333559558729963028_DAB4FAD8-2DD7-40BB-A1B8-4E2AA1F9FDF2>

2017-03-14 22:32 GMT+01:00 Christian Simonelli notifications@github.com :

Prova a mandarmi il main originale, nei messaggi prima non mi sembra di vederlo (o non lo trovo io)

— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub https://github.com/chrysimo/muskel2/issues/1#issuecomment-286567596, or mute the thread https://github.com/notifications/unsubscribe-auth/AQU-NUEy0OyXgv1uHfME9qd5Pd4iFj6Rks5rlwdzgaJpZM4MTEOT .

chrysimo commented 7 years ago

Controlla se si vedono, nei log del test dovresti trovare due nodi

Members [2] { Member [xxx.xx.xx.xxx]:5701 Member [Pianosa....]:5701 }

nicolettiant89 commented 7 years ago

no è quello il problema vedo un solo membro... in pratica non fa il discovery

https://www.avast.com/sig-email?utm_medium=email&utm_source=link&utm_campaign=sig-email&utm_content=webmail Mail priva di virus. www.avast.com https://www.avast.com/sig-email?utm_medium=email&utm_source=link&utm_campaign=sig-email&utm_content=webmail <#DAB4FAD8-2DD7-40BB-A1B8-4E2AA1F9FDF2>

2017-03-15 17:49 GMT+01:00 Christian Simonelli notifications@github.com:

Controlla se si vedono, nei log del test dovresti trovare due nodi

Members [2] { Member [xxx.xx.xx.xxx]:5701 Member [Pianosa....]:5701 }

— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub https://github.com/chrysimo/muskel2/issues/1#issuecomment-286805292, or mute the thread https://github.com/notifications/unsubscribe-auth/AQU-NeY-Dfn42-xMpOzgliAGsletxl--ks5rmBabgaJpZM4MTEOT .

nicolettiant89 commented 7 years ago

sto cercando di capire come funziona il multicast discovery, modificando anche i parametri discoveryMulticastGroup e discoveryMulticastPort, ma niente, non riesco a creare un cluster composto da più membri è come se la fase di discovery la saltasse... per cercare di creare un cluster fra titanic e pianosa ho anche eseguito il seguente comando su titanic:

java -jar muskel-server-1.0.0.jar --groups=pianosa --discoveryTcpMembers=131.114.3.250 oppure java -jar muskel-server-1.0.0.jar --groups=pianosa --discoveryTcpMembers= pianosa.di.unipi.it

tuttavia anche in questo modo non riesco a stipulare una connessione fra i due. Da quello che ho capito quando invoco il metodo contenuto nella precedente mail (remoteMultipleMapCount()), anche se indico nella definizione di MuskelContext un solo server, ad esempio pianosa, l'altro dovrebbe essere trovato automaticamente, nella fase di discovery preliminare, attraverso dei messaggi multicast che pianosa manda all'indirizzo default 224.2.2.3, sulla porta 54327... quindi localhost (o qualunque altro server tipo titanic) dovrebbe rispondere generando il cluster... ho capito bene il meccanismo? Antonio

Il giorno 15 marzo 2017 18:03, Antonio Nicoletti nicolettiant89@gmail.com ha scritto:

no è quello il problema vedo un solo membro... in pratica non fa il discovery

https://www.avast.com/sig-email?utm_medium=email&utm_source=link&utm_campaign=sig-email&utm_content=webmail Mail priva di virus. www.avast.com https://www.avast.com/sig-email?utm_medium=email&utm_source=link&utm_campaign=sig-email&utm_content=webmail <#m_5587735348211468775_DAB4FAD8-2DD7-40BB-A1B8-4E2AA1F9FDF2>

2017-03-15 17:49 GMT+01:00 Christian Simonelli notifications@github.com:

Controlla se si vedono, nei log del test dovresti trovare due nodi

Members [2] { Member [xxx.xx.xx.xxx]:5701 Member [Pianosa....]:5701 }

— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub https://github.com/chrysimo/muskel2/issues/1#issuecomment-286805292, or mute the thread https://github.com/notifications/unsubscribe-auth/AQU-NeY-Dfn42-xMpOzgliAGsletxl--ks5rmBabgaJpZM4MTEOT .

chrysimo commented 7 years ago

Ciao Antonio,

Scusa ma sono rientrato ora a casa e ti sto rispondendo con il cellulare...

Riprendo l'esempio di ieri dove avevi un test (client) con indicati in modo esplicito due nodi server, uno locale ed uno su una macchina esterna. I due nodi server tra loro non si vedono via multicast in modo automatico perché appartengono a due sottoreti diverse. Quindi giustamente la configurazione client che hai fatto indica direttamente l'indirizzo tcp dove sono attivi. Il motivo per cui non si vedono credo sia dovuto al fatto che sulla macchina del dipartimento ci sono due intefacce di rete e quella alla quale ti stai collegando non è quella di default. Questo significa che la porta server non viene aperta verso l'esterno. Per farlo recupera dal tuo PC l'IP di pianosa, supponi 131.4.5.7 e lancia il nodo server di pianosa con questo parametro

java -jar muskel-server-1.0.0.jar --networkInterfaces=131...* ....

Fammi sapere Christian

nicolettiant89 commented 7 years ago

Ciao,

allora funziona avviando i server, prima PIANOSA e poi quello LOCALE nel seguente modo:

1) java -jar muskel-server-1.0.0.jar --networkInterfaces=131.114.. --discoveryTcpMembers=qualunque indirizzo [PIANOSA] 2) java -jar muskel-server-1.0.0.jar --groups=local --discoveryTcpMembers= 131.114.3.250:5701 (indirizzo tcp su cui si è attivato il server PIANOSA)

Nell'avvio del primo server bisogna mettere necessariamente il parametro "discoveryTcpMembers" poichè altrimenti, per come è fatto il file "HaselcastConfiguration.java", il server si avvierebbe nella modalità multicast ottenendo poi, quando eseguo il secondo comando sul server locale, il seguente errore:

[131.114.3.250]:5701 [muskel] [3.5.3] Invalid join request from: Address[192.168.15.108]:5701, reason:Incompatible joiners! expected: multicast, found: tcp-ip

cosa che impedisce la connessione. Comunque a parte questo piccolo cavillo si connette perfettamente :). Avviare i server in ordine inverso è più complesso poichè il pc locale non ha un interfaccia pubblica, suppongo, che lo espone direttamente sul web ma deve passare per il router poi l'ISP e cosi via. Cosa hanno i server dell'uni un NAT che li espone direttamente sul web...? Ho letto che i task da eseguire vengono consegnati ai nodi remoti in ROUND ROBIN e questo penalizza i tempi di esecuzione quando i nodi remoti (in caso di task bilanciati) lavorano con velocità di clock cpu differenti... perciò chiedo è molto complessa l'implementazione dell'emissione ON DEMAND in remoto da realizzare? Antonio

https://www.avast.com/sig-email?utm_medium=email&utm_source=link&utm_campaign=sig-email&utm_content=webmail Mail priva di virus. www.avast.com https://www.avast.com/sig-email?utm_medium=email&utm_source=link&utm_campaign=sig-email&utm_content=webmail <#DAB4FAD8-2DD7-40BB-A1B8-4E2AA1F9FDF2>

Il giorno 16 marzo 2017 01:04, Christian Simonelli <notifications@github.com

ha scritto:

Ciao Antonio,

Scusa ma sono rientrato ora a casa e ti sto rispondendo con il cellulare...

Riprendo l'esempio di ieri dove avevi un test (client) con indicati in modo esplicito due nodi server, uno locale ed uno su una macchina esterna. I due nodi server tra loro non si vedono via multicast in modo automatico perché appartengono a due sottoreti diverse. Quindi giustamente la configurazione client che hai fatto indica direttamente l'indirizzo tcp dove sono attivi. Il motivo per cui non si vedono credo sia dovuto al fatto che sulla macchina del dipartimento ci sono due intefacce di rete e quella alla quale ti stai collegando non è quella di default. Questo significa che la porta server non viene aperta verso l'esterno. Per farlo recupera dal tuo PC l'IP di pianosa, supponi 131.4.5.7 e lancia il nodo server di pianosa con questo parametro

java -jar muskel-server-1.0.0.jar --networkInterfaces=131...* ....

Fammi sapere Christian

— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub https://github.com/chrysimo/muskel2/issues/1#issuecomment-286918226, or mute the thread https://github.com/notifications/unsubscribe-auth/AQU-NVf4XP_4nY8Gug2Jd68HwajP8O8Kks5rmHyEgaJpZM4MTEOT .

chrysimo commented 7 years ago

Secondo me per le tue prove è sufficiente che avvi i server così:

java -jar muskel-server-1.0.0.jar --networkInterfaces=131...*

e sul client metti tanti addAddress quanti sono i server

.withContext(MuskelContext.builder().client() .addAddress("pianosa.di.unipi.it").name("muskel").password("password") .addAddress("secondo.di.unipi.it:5701").name("muskel").password("password") .build())

Per quanto riguarda i task consegnati in maniera Round Robin è il funzionamento interno degli executor di hazelcast che funziona in questo modo ma che può essere sovrascritto. La possibilità di modificare il comportamento degli executor di hazelcast era descritto nel capitolo di conclusioni della tesi: "Attualmente il load balancing dei nodi viene effettuato in modalità round robin, in base al carico del client che esegue l’applicazione. Potrebbe essere utile realizzare un load balancer che distribuisca il carico in base all’effettivo utilizzo del singolo nodo server"

Così su due piedi non saprei quantificare l'entità della modifica...

nicolettiant89 commented 7 years ago

ok, grazie mille, Antonio

https://www.avast.com/sig-email?utm_medium=email&utm_source=link&utm_campaign=sig-email&utm_content=webmail Mail priva di virus. www.avast.com https://www.avast.com/sig-email?utm_medium=email&utm_source=link&utm_campaign=sig-email&utm_content=webmail <#DAB4FAD8-2DD7-40BB-A1B8-4E2AA1F9FDF2>

Il giorno 16 marzo 2017 12:44, Christian Simonelli <notifications@github.com

ha scritto:

Secondo me per le tue prove è sufficiente che avvi i server così:

java -jar muskel-server-1.0.0.jar --networkInterfaces=131...*

e sul client metti tanti addAddress quanti sono i server

.withContext(MuskelContext.builder().client() .addAddress("pianosa.di.unipi.it").name("muskel").password("password") .addAddress("secondo.di.unipi.it:5701").name("muskel"). password("password") .build())

Per quanto riguarda i task consegnati in maniera Round Robin è il funzionamento interno degli executor di hazelcast che funziona in questo modo ma che può essere sovrascritto. La possibilità di modificare il comportamento degli executor di hazelcast era descritto nel capitolo di conclusioni della tesi: "Attualmente il load balancing dei nodi viene effettuato in modalità round robin, in base al carico del client che esegue l’applicazione. Potrebbe essere utile realizzare un load balancer che distribuisca il carico in base all’effettivo utilizzo del singolo nodo server"

Così su due piedi non saprei quantificare l'entità della modifica...

— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub https://github.com/chrysimo/muskel2/issues/1#issuecomment-287033326, or mute the thread https://github.com/notifications/unsubscribe-auth/AQU-NQZINoXTUUjQyCe6CHBXprKh1fsmks5rmSCxgaJpZM4MTEOT .

nicolettiant89 commented 7 years ago

Salve, sempre in fatto di nodi remoti (sto usando sempre i server pianosa e pianosau come nodi remoti) e dato il seguente pezzo di codice:

Triple<Integer,Integer,Double> toJoin=MuskelProcessor.fromIterable(clusters.values()) .withContext(context) .groupBySortedToList((x)->x.getId()%no_Thread) //.doOnNext(x->System.out.println(x.toBlocking().first())) .flatMap(input->MuskelProcessor.fromIterable(input.toBlocking().first()) .map(x-> { double[] dist=new double[docDim-x.getId()-1]; Pair<Integer,Double> localMin=new Pair<>(-1, Double.POSITIVE_INFINITY); int incremental=0; for(Cluster<Point> clusterJ:_clusters.values()){ if(clusterJ.getId()>x.getId()){ double distance=_distancePoints.apply((Point)x.getPoints().get(0), clusterJ.getPoints().get(0)); dist[incremental]=distance; incremental++; if(distance<localMin.getValue()){ localMin.setKey(clusterJ.getId()); localMin.setValue(distance); } } } distances.put(x.getId(), dist); return new Triple<Integer, Integer, Double>(x.getId(), localMin.getKey(), localMin.getValue()); //return true; }) .reduce(new Triple<Integer,Integer,Double>(-1,-1,Double.POSITIVE_INFINITY), (u,v)->v.getThird()<u.getThird()? v:u) ,MuskelExecutor.remote()) .reduce(new Triple<Integer,Integer,Double>(-1,-1,Double.POSITIVE_INFINITY), (u,v)->v.getThird()<u.getThird()? v:u) //.toList() .toBlocking() .first();

dove -> clusters è una ConcurrentHashMap<Integer,Cluster> -> distances è una ConcurrentHashMap<Integer,double[]> -> Cluster è un semplice insieme che raccoglie un insieme di Point ergo un double[], siamo nel problema della clusterizzazione gerarchica (hierarchical clusterization) -> quello che cerco di fare in questo pezzo di codice è di calcolare la distanza esistente fra ogni coppia di cluster all'inizio del processo quando ognuno contiene un solo punto e inserire, per ognuno di essi, una istanza in distances con l'id del cluster in oggetto e il vettore double[] con le distanze rispetto a ogni altro cluster dotato di id superiore -> Triple<Integer,Integer,Double> toJoin andra a contenere gli id dei cluster attualmente più vicini che possono essere fusi in uno solo, con la relativa distanza (poi l'algoritmo continua ma siccome fino qua già non funziona mostro solo questo pezzo)

Quando lavoro in locale nessun problema, in remoto purtroppo se eseguo il codice cosi come è, anche su un data set di 10 punti si blocca e non mi calcola alcunchè, oppure una volta mi ha dato il suguente errore

[WARNING] java.lang.reflect.InvocationTargetException at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:294) at java.lang.Thread.run(Thread.java:745) Caused by: com.esotericsoftware.kryo.KryoException: Buffer overflow. Available: 0, required: 4096 Serialization trace: j (it.reactive.muskel.examples.Clustering2$Triple)

per fare in modo che l'algoritmo termini devo eliminare il primo reduce quello che di fatto viene operato sul nodo remoto e non sul client da cui parte la computazione (se tolgo questo reduce non capisco perchè funziona). Tuttavia in questo caso si verifica un altro problema la Map distances viene modificata a livello remoto ma ciò non ha alcun effetto a livello locale in pratica distances rimane vuoto anche se ogni nodo remoto dovrebbe aggiungerci un qualche istanza del tipo <id cluster, distanze>,; ora distances è una variabile globale e pensavo che venisse passata per riferimento invece si evince che ne viene fatta una copia, ora come recupero la copia che viene data automaticamente a ogni nodo remoto per salvare il risultato di ognuno nella variabile globale? Ho provato anche a restituire l'insieme delle entryMap, convergerle in una lista e poi fare tutto in locale ma in quel caso mi viene fuori l'eccezione che ho scritto sopra "Buffer overflow..."

Grazie mille, Antonio

nicolettiant89 commented 7 years ago

Salve, sempre in fatto di nodi remoti (sto usando sempre i server pianosa e pianosau come nodi remoti) e dato il seguente pezzo di codice:

Triple<Integer,Integer,Double> toJoin=MuskelProcessor.fromIterable(clusters.values())

.withContext(context)

.groupBySortedToList((x)->x.getId()%no_Thread)

//.doOnNext(x->System.out.println(x.toBlocking().first()))

.flatMap(input->MuskelProcessor.fromIterable(input.toBlocking().first())

.map(x-> {

double[] dist=new double[docDim-x.getId()-1];

Pair<Integer,Double> localMin=new Pair<>(-1, Double.POSITIVE_INFINITY);

int incremental=0;

for(Cluster clusterJ:_clusters.values()){

if(clusterJ.getId()>x.getId()){

double distance=_distancePoints.apply((Point)x.getPoints().get(0), clusterJ.getPoints().get(0));

dist[incremental]=distance;

incremental++;

if(distance<localMin.getValue()){

localMin.setKey(clusterJ.getId());

localMin.setValue(distance);

}

}

}

distances.put(x.getId(), dist);

return new Triple<Integer, Integer, Double>(x.getId(), localMin.getKey(), localMin.getValue());

//return true;

})

.reduce(new Triple<Integer,Integer,Double>(-1,-1,Double.POSITIVE_INFINITY),

(u,v)->v.getThird()<u.getThird()? v:u)

,MuskelExecutor.remote())

.reduce(new Triple<Integer,Integer,Double>(-1,-1,Double.POSITIVE_INFINITY),

(u,v)->v.getThird()<u.getThird()? v:u)

//.toList()

.toBlocking()

.first();

//toMerge <idI,<idJ,d>> restituisce l'id del cluster con distanza minima d rispetto ad un altro cluster idJ

dove -> clusters è una ConcurrentHashMap<Integer,Cluster> -> distances è una ConcurrentHashMap<Integer,double[]> -> Cluster è un semplice insieme che raccoglie un insieme di Point ergo un double[], siamo nel problema della clusterizzazione gerarchica (hierarchical clusterization) -> quello che cerco di fare in questo pezzo di codice è di calcolare la distanza esistente fra ogni coppia di cluster all'inizio del processo quando ognuno contiene un solo punto e inserire, per ognuno di essi, una istanza in distances con l'id del cluster in oggetto e il vettore double[] con le distanze rispetto a ogni altro cluster dotato di id superiore -> Triple<Integer,Integer,Double> toJoin andra a contenere gli id dei cluster attualmente più vicini che possono essere fusi in uno solo, con la relativa distanza (poi l'algoritmo continua ma siccome fino qua già non funziona mostro solo questo pezzo)

Quando lavoro in locale nessun problema, in remoto purtroppo se eseguo il codice cosi come è, anche su un data set di 10 punti si blocca e non mi calcola alcunchè, oppure una volta mi ha dato il suguente errore

[WARNING] java.lang.reflect.InvocationTargetException at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:294) at java.lang.Thread.run(Thread.java:745) Caused by: com.esotericsoftware.kryo.KryoException: Buffer overflow. Available: 0, required: 4096 Serialization trace: j (it.reactive.muskel.examples.Clustering2$Triple)

per fare in modo che l'algoritmo termini devo eliminare il primo reduce quello che di fatto viene operato sul nodo remoto e non sul client da cui parte la computazione (se tolgo questo reduce non capisco perchè funziona). Tuttavia in questo caso si verifica un altro problema la Map distances viene modificata a livello remoto ma ciò non ha alcun effetto a livello locale in pratica distances rimane vuoto anche se ogni nodo remoto dovrebbe aggiungerci un qualche istanza del tipo <id cluster, distanze>,; ora distances è una variabile globale e pensavo che venisse passata per riferimento invece si evince che ne viene fatta una copia, ora come recupero la copia che viene data automaticamente a ogni nodo remoto per salvare il risultato di ognuno nella variabile globale? Ho provato anche a restituire l'insieme delle entryMap, convergerle in una lista e poi fare tutto in locale ma in quel caso mi viene fuori l'eccezione che ho scritto sopra "Buffer overflow..."

Grazie mille, Antonio

Il giorno 16 marzo 2017 13:16, Antonio Nicoletti nicolettiant89@gmail.com ha scritto:

ok, grazie mille, Antonio

https://www.avast.com/sig-email?utm_medium=email&utm_source=link&utm_campaign=sig-email&utm_content=webmail Mail priva di virus. www.avast.com https://www.avast.com/sig-email?utm_medium=email&utm_source=link&utm_campaign=sig-email&utm_content=webmail <#m_-5122100891759834055_DAB4FAD8-2DD7-40BB-A1B8-4E2AA1F9FDF2>

Il giorno 16 marzo 2017 12:44, Christian Simonelli < notifications@github.com> ha scritto:

Secondo me per le tue prove è sufficiente che avvi i server così:

java -jar muskel-server-1.0.0.jar --networkInterfaces=131...*

e sul client metti tanti addAddress quanti sono i server

.withContext(MuskelContext.builder().client() .addAddress("pianosa.di.unipi.it").name("muskel").password("password") .addAddress("secondo.di.unipi.it:5701").name("muskel").passw ord("password") .build())

Per quanto riguarda i task consegnati in maniera Round Robin è il funzionamento interno degli executor di hazelcast che funziona in questo modo ma che può essere sovrascritto. La possibilità di modificare il comportamento degli executor di hazelcast era descritto nel capitolo di conclusioni della tesi: "Attualmente il load balancing dei nodi viene effettuato in modalità round robin, in base al carico del client che esegue l’applicazione. Potrebbe essere utile realizzare un load balancer che distribuisca il carico in base all’effettivo utilizzo del singolo nodo server"

Così su due piedi non saprei quantificare l'entità della modifica...

— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub https://github.com/chrysimo/muskel2/issues/1#issuecomment-287033326, or mute the thread https://github.com/notifications/unsubscribe-auth/AQU-NQZINoXTUUjQyCe6CHBXprKh1fsmks5rmSCxgaJpZM4MTEOT .

chrysimo commented 7 years ago

Ciao Antonio, non sono riuscito a provare il codice perchè sono in ferie (senza computer) fino a Mercoledì. In ogni caso per permettermi di velocizzare i test mi farebbe comodo avere un progettino (bastano i sorgenti a modo) che io possa lanciare e non mi debba preoccupare di ricostruire il tutto.

Ad ogno modo riguardo la map distances è corretto che, nella versione remota, non la trovi modificata a livello locale.

Se fossimo faccia a faccia tu giustamente mi risponderesti ma localmente funziona!

Il fatto è che hai realizzato un metodo funzionale che ha interazioni con l'esterno e questo non è teoricamente corretto nella programmazione funzionale (no side effect). Considera che, anche in ambiente locale, diminuisci il parallelismo accedendo ad una variabile condivisa.

Detto questo anche io in certi casi ho utilizzato (e faccio tutt'ora) in modo non "corretto" la programmazione funzionale perchè magari mi sarebbe costato troppo in termini algoritmici reimplementarlo oppure non sono stato in grado di risolvere il problema in altro modo.

In quei casi in cui devi realizzare metodi che hanno side effect potresti doverti aver bisogno di strutture dati diverse tra versione locale e remota. Visto che Muskel2 utilizza le librerie di Hazelcast, potresti usare quelle nella versione remota che hanno un'implementazione di mappa che puoi usare in questo modo:

Hazelcast .getHazelcastInstanceByName("muskelServer").getMap("miaMappa")

Attenzione: per usarla dentro al metodo funzionale devi dichiararla localmente

.map(x-> {

Map distances = Hazelcast .getHazelcastInstanceByName("muskelServer").getMap("miaMappa");

    ...

distances.put(...) }

Ciao Christian

nicolettiant89 commented 7 years ago

Ciao Christian, ho aspettato che tornassi a lavoro per rispondere cosi da non disturbarti ulterioremente. Innanzitutto ti allego il file contenente l'intero codice stand-alone sull'algoritmo di clustering con un semplice dataset da eseguire con il comando linux eseguito su titanic:

mvn exec:java -Dexec.mainClass="it.reactive.muskel.examples.Clustering2" -Dexec.args="17 2 2 True remote /home/nicoletti/test.txt euclidian Single_link /home/nicoletti/test.txt"

per mezzo del quale si cerca di sfruttare il cluster generato da pianosa e pianosau per la computazione remota dell'algoritmo applyParallelRemoteClusterization() il quale questa volta implementa o dovrebbe una procedura funzionale che non usa variabili globali; all'atto dell'esecuzione ottengo tale eccezione:

[WARNING] java.lang.reflect.InvocationTargetException at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:294) at java.lang.Thread.run(Thread.java:745) Caused by: com.esotericsoftware.kryo.KryoException: Buffer overflow. Available: 0, required: 4096 Serialization trace: j (it.reactive.muskel.examples.Clustering2$Triple)

che cmq blocca l'esecuzione, centra sicuramente la serializzazione della classe Triple, contenuta nel fila java allegato, il che è strano perchè è una classe banalissima per di più dichiarata Serializable, riesci a capire dove è il problema?

Per vedere come funziona in locale puoi provare ad eseguire:

mvn exec:java -Dexec.mainClass="it.reactive.muskel.examples.Clustering2" -Dexec.args="17 2 2 True local /home/nicoletti/test.txt euclidian Single_link"

il quale va ad eseguire il metodo applyParallelClusterization().

Grazie mille, Antonio.

Il giorno 5 maggio 2017 17:31, Christian Simonelli <notifications@github.com

ha scritto:

Ciao Antonio, non sono riuscito a provare il codice perchè sono in ferie (senza computer) fino a Mercoledì. In ogni caso per permettermi di velocizzare i test mi farebbe comodo avere un progettino (bastano i sorgenti a modo) che io possa lanciare e non mi debba preoccupare di ricostruire il tutto.

Ad ogno modo riguardo la map distances è corretto che, nella versione remota, non la trovi modificata a livello locale.

Se fossimo faccia a faccia tu giustamente mi risponderesti ma localmente funziona!

Il fatto è che hai realizzato un metodo funzionale che ha interazioni con l'esterno e questo non è teoricamente corretto nella programmazione funzionale (no side effect). Considera che, anche in ambiente locale, diminuisci il parallelismo accedendo ad una variabile condivisa.

Detto questo anche io in certi casi ho utilizzato (e faccio tutt'ora) in modo non "corretto" la programmazione funzionale perchè magari mi sarebbe costato troppo in termini algoritmici reimplementarlo oppure non sono stato in grado di risolvere il problema in altro modo.

In quei casi in cui devi realizzare metodi che hanno side effect potresti doverti aver bisogno di strutture dati diverse tra versione locale e remota. Visto che Muskel2 utilizza le librerie di Hazelcast, potresti usare quelle nella versione remota che hanno un'implementazione di mappa che puoi usare in questo modo:

Hazelcast .getHazelcastInstanceByName("muskelServer").getMap("miaMappa")

Attenzione: per usarla dentro al metodo funzionale devi dichiararla localmente

.map(x-> {

Map distances = Hazelcast .getHazelcastInstanceByName("muskelServer").getMap("miaMappa");

...

distances.put(...) }

Ciao Christian

— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub https://github.com/chrysimo/muskel2/issues/1#issuecomment-299512520, or mute the thread https://github.com/notifications/unsubscribe-auth/AQU-NbCFFgxjbJMJ8uQlLiIhuHOoX3qIks5r207ugaJpZM4MTEOT .

4 7 6 5 5 8 8 5 3 6 4 6 4 8 8 3 26 1 0 2 0 0 8 10 3 1 6 12 8 1 11 0 8 25 1 2 2 3 1 10 6 2 6 12 4 8 1 6 1 7 10 2 3 3 2 2 7 7 5 4 9 5 7 2 7 3 10 18 1 0 1 0 0 3 12 4 2 11 9 6 0 8 2 7 6 7 10 9 8 9 9 7 4 6 10 5 6 2 8 5 10 2 1 1 2 3 1 10 6 3 5 11 4 9 1 7 1 7 10 3 5 5 7 6 8 11 3 1 8 8 6 6 10 4 6 16 2 10 3 8 2 9 6 0 7 13 5 8 0 8 1 8 9 7 10 9 8 9 9 7 4 6 10 5 6 2 8 5 10 2 1 1 2 3 1 10 6 3 5 11 4 9 1 7 1 7 10 3 5 5 7 6 8 11 3 1 8 8 6 6 10 4 6 16 2 10 3 8 2 9 6 0 7 13 5 8 0 8 1 8 9

nicolettiant89 commented 7 years ago

Fatto

2017-05-10 11:27 GMT+01:00 Christian Simonelli notifications@github.com:

Ciao Antonio,

grazie del pensiero! Non riesco a vedere l'allegato.

me lo gireresti per mail? chry.simo at gmail dot com (Poi cancello il commento che non voglio finire nello spam!)

Ciao Christian

— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub https://github.com/chrysimo/muskel2/issues/1#issuecomment-300441571, or mute the thread https://github.com/notifications/unsubscribe-auth/AQU-NSH8ontf_aFQuYN5OzSJNQkMLh27ks5r4ZETgaJpZM4MTEOT .