apache / incubator-hugegraph

A graph database that supports more than 100+ billion data, high performance and scalability (Include OLTP Engine & REST-API & Backends)
https://hugegraph.apache.org
Apache License 2.0
2.63k stars 517 forks source link

[Bug] Cassandra 后端运行一段时间可能内存泄漏 #2244

Open dongpodu opened 1 year ago

dongpodu commented 1 year ago

Bug Type (问题类型)

other exception / error (其他异常报错)

Before submit

Environment (环境信息)

Expected & Actual behavior (期望与实际表现)

在0.11.2的基础上新增了两个接口: 1、

@POST
    @Timed(name = "batch-create")
    @DecompressInterceptor.Decompress
    @Path("batch")
    @StatusFilter.Status(StatusFilter.Status.CREATED)
    @Consumes(APPLICATION_JSON)
    @Produces(APPLICATION_JSON_WITH_CHARSET)
    @RolesAllowed({"admin", "$owner=$graph $action=vertex_write"})
    public String create(@Context HugeConfig config,
                         @Context GraphManager manager,
                         @PathParam("graph") String graph,
                         @QueryParam("skip_error") @DefaultValue("true") boolean skipError,
                         String body) {
        JsonLine[] values = JsonUtil.fromJson(body, JsonLine[].class);
        HugeGraph g = graph(manager, graph);
        // 对点去重,避免重复导入
        Map<Id, JsonVertex> vertexs = new HashMap<>();
        for (JsonLine line : values) {
            line.in.id = getVertexId(g, line.in);
            vertexs.put((Id) line.in.id, line.in);
            line.out.id = getVertexId(g, line.out);
            vertexs.put((Id) line.out.id, line.out);
        }
        List<String> errors = Lists.newLinkedList();
        // 点结果
        Map<Id, Vertex> setIds = new HashMap<>();
        this.commit(config, g, vertexs.size(), () -> {
            for (JsonVertex vertex : vertexs.values()) {
                try{
                    setIds.put((Id) vertex.id, g.addVertex(vertex.properties()));
                }catch (Exception e){
                    LOG.error("create vertex error, vertex:{}", JsonUtil.toJson(vertex), e);
                    String message = "vertex:" + ((Id) vertex.id).asString() + " create error:"+e.getMessage();
                    errors.add(message);
                    if(!skipError) {
                        throw new RuntimeException(message);
                    }
                }
            }
            return null;
        });
        // 后创建边
        List<Id> edges = this.commit(config, g, values.length, () -> {
            List<Id> ids = new ArrayList<>(values.length);
            for (JsonLine line : values) {
                try{
                    Vertex srcVertex = setIds.get(line.in.id);
                    if(null == srcVertex){
                        throw new RuntimeException(line.in.id+" not found");
                    }
                    Vertex tgtVertex = setIds.get(line.out.id);
                    if(null == tgtVertex){
                        throw new RuntimeException(line.out.id+" not found");
                    }
                    Edge edge = srcVertex.addEdge(line.label, tgtVertex, line.properties());
                    ids.add((Id) edge.id());
                }catch (Exception e){
                    LOG.error("create edge error, edge:{}", JsonUtil.toJson(line), e);
                    String message = "edge create error:" + e.getMessage();
                    errors.add(message);
                    if(!skipError) {
                        throw new RuntimeException(message);
                    }
                }
            }
            return ids;
        });
        Map<String, Object> result = Maps.newHashMap();
        result.put("errors", errors);
        return JsonSerializer.instance().writeMap(result);
    }

2、

@POST
    @Timed
    @Consumes(APPLICATION_JSON)
    @Produces(APPLICATION_JSON_WITH_CHARSET)
    public String post(@Context GraphManager manager,
                       @PathParam("graph") String graph, Request request) throws Exception {
        E.checkArgumentNotNull(request, "The request body can't be null");
        E.checkArgument(request.step != null, "The steps of request can't be null");
        E.checkArgument(CollectionUtils.isNotEmpty(request.sources), "The sources of request can't be null");
        long startTime = System.currentTimeMillis();
        HugeGraph g=null;
        try{
            g = graph(manager, graph);
            EdgeStep step = new EdgeStep(g, request.step.direction, request.step.labels,
                    request.step.properties, request.step.vertexProperties,
                    request.step.degree, request.step.skipDegree);

            Tuple3<Set<HugeNewTraverser.Node>, Map<Id, Integer>, Set<Edge>> tuple3;
            try (KneighborNewTraverser traverser = new KneighborNewTraverser(g)) {
                Set<Id> sourceIds = request.sources.stream()
                        .map(HugeVertex::getIdValue)
                        .filter(Objects::nonNull)
                        .collect(Collectors.toSet());
                tuple3 = traverser.customizedKneighborV3(sourceIds, step,
                        request.maxDepth,
                        request.limit);
            }finally {
                long endTime = System.currentTimeMillis();
                log.info("kneighbor_v3 query cost: {}", endTime - startTime);
            }

            Map<String, Map> vertexMap = new ConcurrentHashMap();
            HugeGraph finalG = g;
            long s = System.currentTimeMillis();
            tuple3.getFirst().parallelStream().forEach(node -> {
                try{
                    HugeVertex vertex = (HugeVertex) finalG.vertex(node.id());
                    Set<String> keys = request.vertexProperty.get(vertex.label());
                    if(CollectionUtils.isEmpty(keys)){
                        Map map = vertex.getProperties().values().stream().collect(Collectors.toMap(HugeProperty::key,HugeProperty::value));
                        vertexMap.put(vertex.id().toString(),
                                Maps.of("id", vertex.id(),
                                        "label", vertex.label(),
                                        "type", vertex.type().name().toLowerCase(),
                                        "depth", tuple3.getSecond().get(vertex.id()),
                                        "properties", map));
                    }else{
                        Map map = new HashMap();
                        keys.stream().filter(e->e.indexOf("properties") > -1)
                                .map(e->e.substring(11)).forEach(k -> map.put(k, vertex.value(k)));
                        Map vmap = new HashMap();
                        if(keys.contains("id")){
                            vmap.put("id", vertex.id());
                        }
                        if(keys.contains("label")){
                            vmap.put("label", vertex.label());
                        }
                        if(keys.contains("type")){
                            vmap.put("type", vertex.type().name().toLowerCase());
                        }
                        if(keys.contains("depth")){
                            vmap.put("depth", tuple3.getSecond().get(vertex.id()));
                        }
                        vmap.put("properties", map);
                        vertexMap.put(vertex.id().toString(), vmap);
                    }
                }catch (Exception e){
                    log.error("构建点异常, node:{}", node, e);
                }
            });
            long s1 = System.currentTimeMillis();
            log.info("s-s1 cost:{}ms", s1 - s);
            // 边
            Map<String, Map> edgeMap = new ConcurrentHashMap();
            tuple3.getThird().parallelStream().forEach(e -> {
                HugeEdge eg = (HugeEdge) e;
                Set<String> keys = request.edgeProperty.get(eg.label());
                if(CollectionUtils.isEmpty(keys)){
                    Map map = eg.getProperties().values().stream().collect(Collectors.toMap(HugeProperty::key,HugeProperty::value));
                    edgeMap.put(e.id().toString(),
                            Maps.of("id", e.id(),
                                    "label", e.label(),
                                    "type", "edge",
                                    "outV", eg.outVertex().id(),
                                    "outVLabel", ((HugeVertex)eg.outVertex()).type().name().toLowerCase(),
                                    "inV", eg.inVertex().id(),
                                    "inVLabel", ((HugeVertex)eg.inVertex()).type().name().toLowerCase(),
                                    "properties", map)
                    );
                }else{
                    Map map = new HashMap();
                    keys.stream().filter(k->k.indexOf("properties") > -1)
                            .map(k->k.substring(11))
                            .forEach(k -> map.put(k, eg.value(k)));
                    Map vmap = new HashMap();
                    if(keys.contains("id")){
                        vmap.put("id", eg.id());
                    }
                    if(keys.contains("label")){
                        vmap.put("label", eg.label());
                    }
                    if(keys.contains("type")){
                        vmap.put("type", "edge");
                    }
                    if(keys.contains("outV")){
                        vmap.put("outV", eg.outVertex().id());
                    }
                    if(keys.contains("outVLabel")){
                        vmap.put("outVLabel", ((HugeVertex)eg.outVertex()).type().name().toLowerCase());
                    }
                    if(keys.contains("inV")){
                        vmap.put("inV", eg.inVertex().id());
                    }
                    if(keys.contains("inVLabel")){
                        vmap.put("inVLabel", ((HugeVertex)eg.inVertex()).type().name().toLowerCase());
                    }
                    vmap.put("properties", map);
                    edgeMap.put(e.id().toString(), vmap);
                }
            });
            long s2 = System.currentTimeMillis();
            log.info("kneighbor_v3 s-s1:{}, s1-s2:{}ms", s1 - s, s2 - s1);
            return JsonSerializer.instance().writeMap(Maps.of("vertexs", vertexMap.values(), "edges", edgeMap.values()));
        }finally {
            long endTime = System.currentTimeMillis();
            log.info("kneighbor_v3 query cost: {}", endTime - startTime);
            if (g != null) {
                g.tx().commit();
            }
        }
    }。

客户端只会调这两个接口,在运行2天左右,会报com.datastax.driver.core.exceptions.BusyPoolException: [/10.157.40.45] Pool is busy (no available connection and timed out after 5000 MILLISECONDS)。 使用jmap查看session数,发现session一直在增长,请问这个问题该怎么排查?

Vertex/Edge example (问题点 / 边数据举例)

No response

Schema [VertexLabel, EdgeLabel, IndexLabel] (元数据结构)

No response

JackyYangPassion commented 1 year ago

参考下 #1626 通过jmap 看下线上 ‘对象存活’ 情况 贴一下

jmap -histo:live pid | head  -n 10
dongpodu commented 1 year ago

结果如下 image

dongpodu commented 1 year ago
image
JackyYangPassion commented 1 year ago

这个 #instances 一直上涨吗?

dongpodu commented 1 year ago

是的

dongpodu commented 1 year ago
image
JackyYangPassion commented 1 year ago

Cassandra 用什么版本?我在本地复现下

dongpodu commented 1 year ago

cassandra-3.11.10

dongpodu commented 1 year ago

这是堆栈信息: 构建点异常,

node:urn:datacenter:tab:Hive1.b_ods.ods_db2892_t_novel_a_d com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /10.157.40.45:9042 (com.datastax.driver.core.exceptions.BusyPoolException: [/10.157.40.45] Pool is busy (no available connection and timed out after 5000 MILLISECONDS)), /10.157.41.25:9042 (com.datastax.driver.core.exceptions.BusyPoolException: [/10.157.41.25] Pool is busy (no available connection and timed out after 5000 MILLISECONDS)))
    at com.datastax.driver.core.exceptions.NoHostAvailableException.copy(NoHostAvailableException.java:83)
    at com.datastax.driver.core.exceptions.NoHostAvailableException.copy(NoHostAvailableException.java:37)
    at com.datastax.driver.core.DriverThrowables.propagateCause(DriverThrowables.java:35)
    at com.datastax.driver.core.Cluster.connect(Cluster.java:334)
    at com.baidu.hugegraph.backend.store.cassandra.CassandraSessionPool$Session.open(CassandraSessionPool.java:229)
    at com.baidu.hugegraph.backend.store.cassandra.CassandraSessionPool$Session.tryOpen(CassandraSessionPool.java:222)
    at com.baidu.hugegraph.backend.store.cassandra.CassandraSessionPool$Session.opened(CassandraSessionPool.java:236)
    at com.baidu.hugegraph.backend.store.cassandra.CassandraStore.opened(CassandraStore.java:175)
    at com.baidu.hugegraph.backend.store.AbstractBackendStore.checkOpened(AbstractBackendStore.java:57)
    at com.baidu.hugegraph.backend.store.cassandra.CassandraStore.query(CassandraStore.java:250)
    at com.baidu.hugegraph.backend.tx.AbstractTransaction.query(AbstractTransaction.java:166)
    at com.baidu.hugegraph.backend.tx.GraphTransaction.query(GraphTransaction.java:487)
    at com.baidu.hugegraph.backend.tx.GraphTransaction.queryVerticesFromBackend(GraphTransaction.java:736)
    at com.baidu.hugegraph.backend.cache.CachedGraphTransaction.queryVerticesByIds(CachedGraphTransaction.java:215)
    at com.baidu.hugegraph.backend.cache.CachedGraphTransaction.queryVerticesFromBackend(CachedGraphTransaction.java:184)
    at com.baidu.hugegraph.backend.tx.GraphTransaction.queryVerticesByIds(GraphTransaction.java:685)
    at com.baidu.hugegraph.backend.tx.GraphTransaction.queryVertex(GraphTransaction.java:639)
    at com.baidu.hugegraph.StandardHugeGraph.vertex(StandardHugeGraph.java:544)
    at com.baidu.hugegraph.api.traversers.KneighborAPIV3.lambda$post$3(KneighborAPIV3.java:75)
    at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
    at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) [1 skipped]
    at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:291)
 [5 skipped]
Caused by: com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /10.157.40.45:9042 (com.datastax.driver.core.exceptions.BusyPoolException: [/10.157.40.45] Pool is busy (no available connection and timed out after 5000 MILLISECONDS)), /10.157.41.25:9042 (com.datastax.driver.core.exceptions.BusyPoolException: [/10.157.41.25] Pool is busy (no available connection and timed out after 5000 MILLISECONDS)))
    at com.datastax.driver.core.RequestHandler.reportNoMoreHosts(RequestHandler.java:265)
    at com.datastax.driver.core.RequestHandler.access$1200(RequestHandler.java:62)
    at com.datastax.driver.core.RequestHandler$SpeculativeExecution.findNextHostAndQuery(RequestHandler.java:357)
    at com.datastax.driver.core.RequestHandler$SpeculativeExecution$1.onFailure(RequestHandler.java:426)
    at com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1349)
    at com.google.common.util.concurrent.MoreExecutors$DirectExecutor.execute(MoreExecutors.java:398)
    at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1024)
    at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:866)
    at com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:711)
    at com.google.common.util.concurrent.SettableFuture.setException(SettableFuture.java:54)
    at com.datastax.driver.core.HostConnectionPool$PendingBorrow$1.run(HostConnectionPool.java:746)
    at io.netty.util.concurrent.PromiseTask$RunnableAdapter.call(PromiseTask.java:38)
    at io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:120)
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
    at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:311)
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
    at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
 [1 skipped]
JackyYangPassion commented 1 year ago

image

本机复现了下,每次查询不命中缓存的情况下

Map<Long, BackendSession> sessions; 持续上涨,明天我再分析下,定位下根因

dongpodu commented 1 year ago

请问现在有定位到根因么?

JackyYangPassion commented 1 year ago

还在分析中,方便的话可以提供下 gremlin-server.yaml rest-server.properties 两个配置文件 @dongpodu

目前周日在本机持续 CRUD,发现 BackendSession session = this.threadLocalSession.get(); 是 与线程绑定的,使用默认配置,增长到一定数量就稳定了

dongpodu commented 1 year ago

我这边的session会一直增长,下面是配置内容

rest-server.properties配置

restserver.url=http://0.0.0.0:58080 gremlinserver.url=http://127.0.0.1:58081 graphs=[keeper:conf/keeper.properties,keeper_metadata:conf/keeper_metadata.properties] auth.authenticator=com.baidu.hugegraph.auth.ConfigAuthenticator auth.admin_token=xxxxxxxxxx auth.user_tokens=xxxxxxx

server.id=server-40-25-m server.role=master

batch.max_write_ratio=100 restserver.request_timeout=30 restserver.connection_idle_timeout=30 gremlinserver.timeout=300

gremlin-server.yaml配置

host: 127.0.0.1 port: 58081

scriptEvaluationTimeout: 300000

channelizer: org.apache.tinkerpop.gremlin.server.channel.WsAndHttpChannelizer graphs: { keeper: conf/keeper.properties, keeper_metadata: conf/keeper_metadata.properties } scriptEngines: { gremlin-groovy: { plugins: { com.baidu.hugegraph.plugin.HugeGraphGremlinPlugin: {}, org.apache.tinkerpop.gremlin.server.jsr223.GremlinServerGremlinPlugin: {}, org.apache.tinkerpop.gremlin.jsr223.ImportGremlinPlugin: { classImports: [ java.lang.Math, com.baidu.hugegraph.backend.id.IdGenerator, com.baidu.hugegraph.type.define.Directions, com.baidu.hugegraph.type.define.NodeRole, com.baidu.hugegraph.traversal.algorithm.CustomizePathsTraverser, com.baidu.hugegraph.traversal.algorithm.CustomizedCrosspointsTraverser, com.baidu.hugegraph.traversal.algorithm.FusiformSimilarityTraverser, com.baidu.hugegraph.traversal.algorithm.HugeTraverser, com.baidu.hugegraph.traversal.algorithm.NeighborRankTraverser, com.baidu.hugegraph.traversal.algorithm.PathsTraverser, com.baidu.hugegraph.traversal.algorithm.PersonalRankTraverser, com.baidu.hugegraph.traversal.algorithm.ShortestPathTraverser, com.baidu.hugegraph.traversal.algorithm.SubGraphTraverser, com.baidu.hugegraph.traversal.optimize.Text, com.baidu.hugegraph.traversal.optimize.TraversalUtil, com.baidu.hugegraph.util.DateUtil ], methodImports: [java.lang.Math#*] }, org.apache.tinkerpop.gremlin.jsr223.ScriptFileGremlinPlugin: { files: [scripts/empty-sample.groovy] } } } } serializers:

dongpodu commented 1 year ago

大佬,请问这个问题还有进展么?

JackyYangPassion commented 1 year ago

持续在看,有进展在这里同步

dongpodu commented 1 year ago

我提个点,就是我把读写放在同一节点才出现了这种问题,以前是读写分离的,就没这个问题

dongpodu commented 1 year ago

还没排查到原因么?

JackyYangPassion commented 1 year ago

多节点部署是通过Raft模式部署的吗?

dongpodu commented 1 year ago

我dump了下内存,发现两个schema:keeper和keeper_metadata,这两个schema都有三个CassandraSessionPool,name为:keeper/s,keeper/g,keeper/m.(keep_metadata类似)。每个CassandraSessionPool对应的sessionCount都在2k+。 另外:每个CassandraSessionPool$Session的refs都超过1,这个对象不是线程独享的么,怎么refs会超过1呢?而且这个字段还是int类型的,不考虑使用线程安全的AtomicInteger么?

dongpodu commented 1 year ago

定位到原因了:cassandraSessionPool会为每个线程创建个session,如果线程已经绑定了session,是不会再创建新的session的。但问题就出在我是在parallelStream情况下使用session,parallelStream底层使用的是forkJoinPool线程池,这个线程池里的线程不是创建好后就一直存在的,线程会过几分钟回收掉,等提交新任务后,会重新创建线程。这样就会导致session随着forkJoinPool里线程新建而新建,但线程死掉后又不会被回收,因为cassandraSessionPool的属性Map<Long, BackendSession> sessions会记录每次创建的session,session也没机会被close掉。

dongpodu commented 1 year ago

请问如果主动关闭session?

dongpodu commented 1 year ago

现在解决了,使用的是自定义线程池: private static ExecutorService pool = new ThreadPoolExecutor(100,128,0L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>(),new NamedThreadFactory("custom-thread", true));

CompletableFuture.runAsync(()->{执行的方法},pool);

这个线程池的maxSize和coreSize是一样大的,这样就可以保证线程不会被销毁。

imbajin commented 1 year ago

现在解决了,使用的是自定义线程池: private static ExecutorService pool = new ThreadPoolExecutor(100,128,0L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>(),new NamedThreadFactory("custom-thread", true));

CompletableFuture.runAsync (()->{执行的方法},pool);

这个线程池的 maxSize 和 coreSize 是一样大的,这样就可以保证线程不会被销毁。

感谢持续跟进这个问题, 社区现在主要在忙推进下一个大版本融合的事, Cassandra 的确有些缺精力, 如果可以的话方便提交个 PR 修复改进它么?

另外对 Cassandra4 client 的支持适配已经提交了, 也可以随时试用和反馈