vert-x3 / vertx-infinispan

Infinispan Cluster Manager for Vert.x
Apache License 2.0
14 stars 16 forks source link

Service Discovery's record not serializable #35

Closed cescoffier closed 7 years ago

cescoffier commented 7 years ago

By default the service discovery use a clustered "syncMap". With Hazelcast (and Zookeeper) cluster manager everything is working. When switching to Infinispan, I've:

Caused by: org.infinispan.commons.marshall.NotSerializableException: io.vertx.servicediscovery.impl.AsyncMap$$Lambda$315/24385931

Any idea ?

tsegismont commented 7 years ago

Can you share the code? You should avoid sync maps by the way. They shouldn't be exposed. They are used internally to store haInfo

cescoffier commented 7 years ago

About sync map, I didn't have a choice. Until we get the ClusteredWideMap API improved, I still need this hack.

To reproduce it just use the 2 following classes:

package store;

import io.vertx.core.*;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.Router;
import io.vertx.servicediscovery.Record;
import io.vertx.servicediscovery.ServiceDiscovery;
import io.vertx.servicediscovery.types.HttpEndpoint;

import java.util.HashMap;
import java.util.Map;
import java.util.Random;

public class PricerService extends AbstractVerticle {

    public static void main(String[] args) {
        Vertx.clusteredVertx(new VertxOptions()
            .setHAEnabled(true)
            .setClusterHost("127.0.0.1"), ar -> {
            Vertx vertx = ar.result();
            vertx.deployVerticle(PricerService.class.getName(),
                new DeploymentOptions().setHa(true));
        });
    }

    private Map<String, Double> prices = new HashMap<>();
    private Random random = new Random();
    private ServiceDiscovery discovery;
    private Record record;

    @Override
    public void start() throws Exception {
        ServiceDiscovery.create(vertx, discovery -> {
            this.discovery = discovery;
            discovery.publish(HttpEndpoint.createRecord("pricer", "localhost", 8081, "/"),
                ar -> record = ar.result());
        });

        Router router = Router.router(vertx);
        router.get("/prices/:name").handler(rc -> {
            String name = rc.pathParam("name");
            JsonObject productWithPrice = getProductWithPrice(name);
            rc.response().end(productWithPrice.encode());
        });

        vertx.eventBus().<JsonObject>consumer("pricer", msg -> {
            JsonObject body = msg.body();
            String name = body.getString("name");
            JsonObject productWithPrice = getProductWithPrice(name);
            msg.reply(productWithPrice);
        });

        vertx.createHttpServer()
            .requestHandler(router::accept)
            .listen(8081);
    }

    private JsonObject getProductWithPrice(String name) {
        Double price = prices
            .computeIfAbsent(name,
                k -> (double) random.nextInt(50));
        return new JsonObject().put("name", name)
            .put("price", price);
    }

    @Override
    public void stop(Future<Void> done) throws Exception {
        if (discovery != null && record != null) {
            discovery.unpublish(record.getRegistration(), v -> done.complete());
        }
    }
}
package store;

import io.vertx.core.VertxOptions;
import io.vertx.core.json.Json;
import io.vertx.core.json.JsonObject;
import io.vertx.rxjava.core.AbstractVerticle;
import io.vertx.rxjava.core.Vertx;
import io.vertx.rxjava.core.http.HttpServerResponse;
import io.vertx.rxjava.ext.web.Router;
import io.vertx.rxjava.ext.web.RoutingContext;
import io.vertx.rxjava.ext.web.client.HttpResponse;
import io.vertx.rxjava.ext.web.client.WebClient;
import io.vertx.rxjava.ext.web.handler.BodyHandler;
import io.vertx.rxjava.ext.web.handler.StaticHandler;
import io.vertx.rxjava.servicediscovery.ServiceDiscovery;
import io.vertx.rxjava.servicediscovery.types.HttpEndpoint;
import rx.Single;

public class AppV4 extends AbstractVerticle {

    private Database database;

    public static void main(String[] args) {
        Vertx.clusteredVertx(new VertxOptions().setClusterHost("127.0.0.1"), ar -> {
            Vertx vertx = ar.result();
            vertx.deployVerticle(AppV4.class.getName());
        });
    }

    private WebClient pricer;

    @Override
    public void start() throws Exception {
        Router router = Router.router(vertx);

        // REST API
        router.route().handler(BodyHandler.create());
        router.get("/assets/*").handler(StaticHandler.create());
        ServiceDiscovery.create(vertx, discovery -> {
            Single<WebClient> pricer = HttpEndpoint
                .rxGetWebClient(discovery, svc -> svc.getName().equals("pricer"));
            pricer.flatMap(cl -> {
                this.pricer = cl;
                return pricer; // Quick edit....
            }).flatMap(db -> {
                return vertx.createHttpServer()
                    .requestHandler(router::accept)
                    .rxListen(8080);
            }).subscribe();
        });
    }
}
tsegismont commented 7 years ago

Thanks for the reproducer.

Can you elaborate on the shortcomings of cluster-wide map? You're not the first one asking for changes :) I am working on methods for retrieving keys and values. Anything else?

2017-06-13 15:59 GMT+02:00 Clement Escoffier notifications@github.com:

About sync map, I didn't have a choice. Until we get the ClusteredWideMap API improved, I still need this hack.

To reproduce it just use the 2 following classes:

package store;

import io.vertx.core.*; import io.vertx.core.json.JsonObject; import io.vertx.ext.web.Router; import io.vertx.servicediscovery.Record; import io.vertx.servicediscovery.ServiceDiscovery; import io.vertx.servicediscovery.types.HttpEndpoint;

import java.util.HashMap; import java.util.Map; import java.util.Random;

public class PricerService extends AbstractVerticle {

public static void main(String[] args) {
    Vertx.clusteredVertx(new VertxOptions()
        .setHAEnabled(true)
        .setClusterHost("127.0.0.1"), ar -> {
        Vertx vertx = ar.result();
        vertx.deployVerticle(PricerService.class.getName(),
            new DeploymentOptions().setHa(true));
    });
}

private Map<String, Double> prices = new HashMap<>();
private Random random = new Random();
private ServiceDiscovery discovery;
private Record record;

@Override
public void start() throws Exception {
    ServiceDiscovery.create(vertx, discovery -> {
        this.discovery = discovery;
        discovery.publish(HttpEndpoint.createRecord("pricer", "localhost", 8081, "/"),
            ar -> record = ar.result());
    });

    Router router = Router.router(vertx);
    router.get("/prices/:name").handler(rc -> {
        String name = rc.pathParam("name");
        JsonObject productWithPrice = getProductWithPrice(name);
        rc.response().end(productWithPrice.encode());
    });

    vertx.eventBus().<JsonObject>consumer("pricer", msg -> {
        JsonObject body = msg.body();
        String name = body.getString("name");
        JsonObject productWithPrice = getProductWithPrice(name);
        msg.reply(productWithPrice);
    });

    vertx.createHttpServer()
        .requestHandler(router::accept)
        .listen(8081);
}

private JsonObject getProductWithPrice(String name) {
    Double price = prices
        .computeIfAbsent(name,
            k -> (double) random.nextInt(50));
    return new JsonObject().put("name", name)
        .put("price", price);
}

@Override
public void stop(Future<Void> done) throws Exception {
    if (discovery != null && record != null) {
        discovery.unpublish(record.getRegistration(), v -> done.complete());
    }
}

}

package store;

import io.vertx.core.VertxOptions; import io.vertx.core.json.Json; import io.vertx.core.json.JsonObject; import io.vertx.rxjava.core.AbstractVerticle; import io.vertx.rxjava.core.Vertx; import io.vertx.rxjava.core.http.HttpServerResponse; import io.vertx.rxjava.ext.web.Router; import io.vertx.rxjava.ext.web.RoutingContext; import io.vertx.rxjava.ext.web.client.HttpResponse; import io.vertx.rxjava.ext.web.client.WebClient; import io.vertx.rxjava.ext.web.handler.BodyHandler; import io.vertx.rxjava.ext.web.handler.StaticHandler; import io.vertx.rxjava.servicediscovery.ServiceDiscovery; import io.vertx.rxjava.servicediscovery.types.HttpEndpoint; import rx.Single;

public class AppV4 extends AbstractVerticle {

private Database database;

public static void main(String[] args) {
    Vertx.clusteredVertx(new VertxOptions().setClusterHost("127.0.0.1"), ar -> {
        Vertx vertx = ar.result();
        vertx.deployVerticle(AppV4.class.getName());
    });
}

private WebClient pricer;

@Override
public void start() throws Exception {
    Router router = Router.router(vertx);

    // REST API
    router.route().handler(BodyHandler.create());
    router.get("/assets/*").handler(StaticHandler.create());
    ServiceDiscovery.create(vertx, discovery -> {
        Single<WebClient> pricer = HttpEndpoint
            .rxGetWebClient(discovery, svc -> svc.getName().equals("pricer"));
        pricer.flatMap(cl -> {
            this.pricer = cl;
            return pricer; // Quick edit....
        }).flatMap(db -> {
            return vertx.createHttpServer()
                .requestHandler(router::accept)
                .rxListen(8080);
        }).subscribe();
    });
}

}

— You are receiving this because you commented. Reply to this email directly, view it on GitHub https://github.com/vert-x3/vertx-infinispan/issues/35#issuecomment-308125052, or mute the thread https://github.com/notifications/unsubscribe-auth/ABbltpmHLbK68SU1GDXJbNXirSjK2bmOks5sDpXXgaJpZM4Npana .

cescoffier commented 7 years ago

Let me explain my use case in the service discovery. The default service discovery backend (the entity storing the records) is:

In both case, I implement a common abstraction: https://github.com/vert-x3/vertx-service-discovery/blob/master/vertx-service-discovery/src/main/java/io/vertx/servicediscovery/impl/AsyncMap.java

I would need the following method in the clustered-wide maps:

So I bascically need the async version of "Map".

tsegismont commented 7 years ago

@cescoffier I believe the issue is here:

  public void getAll(Handler<AsyncResult<Map<K, V>>> asyncResultHandler) {
    vertx.<Map<K, V>>executeBlocking(
        future -> {
          Map<K, V> map = new LinkedHashMap<>();
          syncMap.entrySet().stream().forEach(entry -> map.put(entry.getKey(), entry.getValue()));
          future.complete(map);
        },
        asyncResultHandler
    );
  }

Infinispan expects the forEach lambda to be Serializable. But that doesn't seem right since forEach is a terminal operation.

cescoffier commented 7 years ago

do you mean that a regular for loop would work?

tsegismont commented 7 years ago

I believe so

2017-06-16 17:47 GMT+02:00 Clement Escoffier notifications@github.com:

do you mean that a regular for loop would work?

— You are receiving this because you commented. Reply to this email directly, view it on GitHub https://github.com/vert-x3/vertx-infinispan/issues/35#issuecomment-309061957, or mute the thread https://github.com/notifications/unsubscribe-auth/ABbltskq4HIIjWSITlxFroWFk64I15VPks5sEqOfgaJpZM4Npana .

tsegismont commented 7 years ago

I'd try a simple Map#forEach

2017-06-16 17:54 GMT+02:00 Thomas SEGISMONT tsegismont@gmail.com:

I believe so

2017-06-16 17:47 GMT+02:00 Clement Escoffier notifications@github.com:

do you mean that a regular for loop would work?

— You are receiving this because you commented. Reply to this email directly, view it on GitHub https://github.com/vert-x3/vertx-infinispan/issues/35#issuecomment-309061957, or mute the thread https://github.com/notifications/unsubscribe-auth/ABbltskq4HIIjWSITlxFroWFk64I15VPks5sEqOfgaJpZM4Npana .

tsegismont commented 7 years ago

Let me check

2017-06-16 18:04 GMT+02:00 Thomas SEGISMONT tsegismont@gmail.com:

I'd try a simple Map#forEach

2017-06-16 17:54 GMT+02:00 Thomas SEGISMONT tsegismont@gmail.com:

I believe so

2017-06-16 17:47 GMT+02:00 Clement Escoffier notifications@github.com:

do you mean that a regular for loop would work?

— You are receiving this because you commented. Reply to this email directly, view it on GitHub https://github.com/vert-x3/vertx-infinispan/issues/35#issuecomment-309061957, or mute the thread https://github.com/notifications/unsubscribe-auth/ABbltskq4HIIjWSITlxFroWFk64I15VPks5sEqOfgaJpZM4Npana .

tsegismont commented 7 years ago

@cescoffier this works

  public void getAll(Handler<AsyncResult<Map<K, V>>> asyncResultHandler) {
    vertx.<Map<K, V>>executeBlocking(
        future -> {
          Map<K, V> map = new LinkedHashMap<>();
          syncMap.forEach(map::put);
          future.complete(map);
        },
        asyncResultHandler
    );
  }

But I'd simply go with:

  public void getAll(Handler<AsyncResult<Map<K, V>>> asyncResultHandler) {
    vertx.<Map<K, V>>executeBlocking(
        future -> {
          future.complete(new LinkedHashMap<>(syncMap));
        },
        asyncResultHandler
    );
  }
tsegismont commented 7 years ago

@cescoffier closing and creating a PR in service discovery

cescoffier commented 7 years ago

Thanks @tsegismont !