apache / incubator-hugegraph

A graph database that supports more than 100+ billion data, high performance and scalability (Include OLTP Engine & REST-API & Backends)
https://hugegraph.apache.org
Apache License 2.0
2.65k stars 521 forks source link

[Bug] StoreEventListener in CachedSystemTransactionV2 and CachedGraphTransaction will be registered and monitored repeatedly #2618

Open haohao0103 opened 3 months ago

haohao0103 commented 3 months ago

Bug Type (问题类型)

logic (逻辑设计问题)

Before submit

Environment (环境信息)

Expected & Actual behavior (期望与实际表现)

StoreEventListener in CachedSystemTransactionV2 and CachedGraphTransaction will be registered and monitored repeatedly; Because CachedSystemTransactionV2 and CachedGraphTransaction are both ThreadLocal types in TinkerPopTransaction, each thread holds a corresponding instance. When initializing each instance, it registers a storeEventListener with the storeEventHub held by AbstractBackendStoreProvider. Since each listener implements the same functionality, storeEventHub holds a large number of redundant storeEventListeners

Vertex/Edge example (问题点 / 边数据举例)

No response

Schema [VertexLabel, EdgeLabel, IndexLabel] (元数据结构)

No response

dosubot[bot] commented 3 months ago

To resolve the issue of StoreEventListener being registered and monitored repeatedly in CachedSystemTransactionV2 and CachedGraphTransaction, you need to ensure that the listeners are not registered multiple times. This can be achieved by checking if the listeners are already registered before adding them. Here is how you can modify the listenChanges method to include this check:

private void listenChanges() {
    // Listen store event: "store.init", "store.clear", ...
    Set<String> storeEvents = ImmutableSet.of(Events.STORE_INIT,
                                              Events.STORE_CLEAR,
                                              Events.STORE_TRUNCATE);
    this.storeEventListener = event -> {
        if (storeEvents.contains(event.name())) {
            LOG.debug("Graph {} clear graph cache on event '{}'",
                      this.graph(), event.name());
            this.clearCache(null, true);
            return true;
        }
        return false;
    };

    // Check if the storeEventListener is already registered
    if (!this.store().provider().containsListener(this.storeEventListener)) {
        this.store().provider().listen(this.storeEventListener);
    }

    // Listen cache event: "cache"(invalid cache item)
    this.cacheEventListener = event -> {
        LOG.debug("Graph {} received graph cache event: {}",
                  this.graph(), event);
        Object[] args = event.args();
        E.checkArgument(args.length > 0 && args[0] instanceof String,
                        "Expect event action argument");
        if (Cache.ACTION_INVALID.equals(args[0])) {
            event.checkArgs(String.class, HugeType.class, Object.class);
            HugeType type = (HugeType) args[1];
            if (type.isVertex()) {
                // Invalidate vertex cache
                Object arg2 = args[2];
                if (arg2 instanceof Id) {
                    Id id = (Id) arg2;
                    this.verticesCache.invalidate(id);
                } else if (arg2 != null && arg2.getClass().isArray()) {
                    int size = Array.getLength(arg2);
                    for (int i = 0; i < size; i++) {
                        Object id = Array.get(arg2, i);
                        E.checkArgument(id instanceof Id,
                                        "Expect instance of Id in array, " +
                                        "but got '%s'", id.getClass());
                        this.verticesCache.invalidate((Id) id);
                    }
                } else {
                    E.checkArgument(false,
                                    "Expect Id or Id[], but got: %s",
                                    arg2);
                }
            } else if (type.isEdge()) {
                /*
                 * Invalidate edge cache via clear instead of invalidate
                 * because of the cacheKey is QueryId not EdgeId
                 */
                // this.edgesCache.invalidate(id);
                this.edgesCache.clear();
            }
            return true;
        } else if (Cache.ACTION_CLEAR.equals(args[0])) {
            event.checkArgs(String.class, HugeType.class);
            HugeType type = (HugeType) args[1];
            this.clearCache(type, false);
            return true;
        }
        return false;
    };

    EventHub graphEventHub = this.params().graphEventHub();
    // Check if the cacheEventListener is already registered
    if (!graphEventHub.containsListener(Events.CACHE)) {
        graphEventHub.listen(Events.CACHE, this.cacheEventListener);
    }
}

Additionally, you should implement an unlistenChanges method to ensure that the listeners are properly unregistered when they are no longer needed, similar to the approach used in CachedSchemaTransaction:

private void unlistenChanges() {
    // Unlisten store event
    this.store().provider().unlisten(this.storeEventListener);

    // Unlisten cache event
    EventHub graphEventHub = this.params().graphEventHub();
    graphEventHub.unlisten(Events.CACHE, this.cacheEventListener);
}

By adding these checks and ensuring proper unregistration, you ensure that the listeners are only registered once, preventing the issue of them being registered and monitored repeatedly [1][2].

To continue talking to Dosu, mention @dosu.

haohao0103 commented 3 months ago

To resolve the issue of StoreEventListener being registered and monitored repeatedly in CachedSystemTransactionV2 and CachedGraphTransaction, you need to ensure that the listeners are not registered multiple times. This can be achieved by checking if the listeners are already registered before adding them. Here is how you can modify the listenChanges method to include this check:

private void listenChanges() {
    // Listen store event: "store.init", "store.clear", ...
    Set<String> storeEvents = ImmutableSet.of(Events.STORE_INIT,
                                              Events.STORE_CLEAR,
                                              Events.STORE_TRUNCATE);
    this.storeEventListener = event -> {
        if (storeEvents.contains(event.name())) {
            LOG.debug("Graph {} clear graph cache on event '{}'",
                      this.graph(), event.name());
            this.clearCache(null, true);
            return true;
        }
        return false;
    };

    // Check if the storeEventListener is already registered
    if (!this.store().provider().containsListener(this.storeEventListener)) {
        this.store().provider().listen(this.storeEventListener);
    }

    // Listen cache event: "cache"(invalid cache item)
    this.cacheEventListener = event -> {
        LOG.debug("Graph {} received graph cache event: {}",
                  this.graph(), event);
        Object[] args = event.args();
        E.checkArgument(args.length > 0 && args[0] instanceof String,
                        "Expect event action argument");
        if (Cache.ACTION_INVALID.equals(args[0])) {
            event.checkArgs(String.class, HugeType.class, Object.class);
            HugeType type = (HugeType) args[1];
            if (type.isVertex()) {
                // Invalidate vertex cache
                Object arg2 = args[2];
                if (arg2 instanceof Id) {
                    Id id = (Id) arg2;
                    this.verticesCache.invalidate(id);
                } else if (arg2 != null && arg2.getClass().isArray()) {
                    int size = Array.getLength(arg2);
                    for (int i = 0; i < size; i++) {
                        Object id = Array.get(arg2, i);
                        E.checkArgument(id instanceof Id,
                                        "Expect instance of Id in array, " +
                                        "but got '%s'", id.getClass());
                        this.verticesCache.invalidate((Id) id);
                    }
                } else {
                    E.checkArgument(false,
                                    "Expect Id or Id[], but got: %s",
                                    arg2);
                }
            } else if (type.isEdge()) {
                /*
                 * Invalidate edge cache via clear instead of invalidate
                 * because of the cacheKey is QueryId not EdgeId
                 */
                // this.edgesCache.invalidate(id);
                this.edgesCache.clear();
            }
            return true;
        } else if (Cache.ACTION_CLEAR.equals(args[0])) {
            event.checkArgs(String.class, HugeType.class);
            HugeType type = (HugeType) args[1];
            this.clearCache(type, false);
            return true;
        }
        return false;
    };

    EventHub graphEventHub = this.params().graphEventHub();
    // Check if the cacheEventListener is already registered
    if (!graphEventHub.containsListener(Events.CACHE)) {
        graphEventHub.listen(Events.CACHE, this.cacheEventListener);
    }
}

Additionally, you should implement an unlistenChanges method to ensure that the listeners are properly unregistered when they are no longer needed, similar to the approach used in CachedSchemaTransaction:

private void unlistenChanges() {
    // Unlisten store event
    this.store().provider().unlisten(this.storeEventListener);

    // Unlisten cache event
    EventHub graphEventHub = this.params().graphEventHub();
    graphEventHub.unlisten(Events.CACHE, this.cacheEventListener);
}

By adding these checks and ensuring proper unregistration, you ensure that the listeners are only registered once, preventing the issue of them being registered and monitored repeatedly [1][2].

To continue talking to Dosu, mention @dosu.

@dosu // Check if the storeEventListener is already registered if (!this.store().provider().containsListener(this.storeEventListener)) { this.store().provider().listen(this.storeEventListener); } this logic cannot avoid duplicate registration of storeEventListener,

dosubot[bot] commented 3 months ago

To ensure that storeEventListener is not registered multiple times in CachedSystemTransactionV2 and CachedGraphTransaction, you can use a ThreadLocal variable to keep track of whether the listener has already been registered for the current thread. Here is an example of how you can modify the listenChanges method to achieve this:

public final class CachedGraphTransaction extends GraphTransaction {

    private static final ThreadLocal<Boolean> listenerRegistered = ThreadLocal.withInitial(() -> false);

    private void listenChanges() {
        if (listenerRegistered.get()) {
            return;
        }

        // Listen store event: "store.init", "store.clear", ...
        Set<String> storeEvents = ImmutableSet.of(Events.STORE_INIT,
                                                  Events.STORE_CLEAR,
                                                  Events.STORE_TRUNCATE);
        this.storeEventListener = event -> {
            if (storeEvents.contains(event.name())) {
                LOG.debug("Graph {} clear graph cache on event '{}'",
                          this.graph(), event.name());
                this.clearCache(null, true);
                return true;
            }
            return false;
        };
        this.store().provider().listen(this.storeEventListener);

        // Listen cache event: "cache"(invalid cache item)
        this.cacheEventListener = event -> {
            LOG.debug("Graph {} received graph cache event: {}",
                      this.graph(), event);
            Object[] args = event.args();
            E.checkArgument(args.length > 0 && args[0] instanceof String,
                            "Expect event action argument");
            if (Cache.ACTION_INVALID.equals(args[0])) {
                event.checkArgs(String.class, HugeType.class, Object.class);
                HugeType type = (HugeType) args[1];
                if (type.isVertex()) {
                    // Invalidate vertex cache
                    Object arg2 = args[2];
                    if (arg2 instanceof Id) {
                        Id id = (Id) arg2;
                        this.verticesCache.invalidate(id);
                    } else if (arg2 != null && arg2.getClass().isArray()) {
                        int size = Array.getLength(arg2);
                        for (int i = 0; i < size; i++) {
                            Object id = Array.get(arg2, i);
                            E.checkArgument(id instanceof Id,
                                            "Expect instance of Id in array, " +
                                            "but got '%s'", id.getClass());
                            this.verticesCache.invalidate((Id) id);
                        }
                    } else {
                        E.checkArgument(false,
                                        "Expect Id or Id[], but got: %s",
                                        arg2);
                    }
                } else if (type.isEdge()) {
                    /*
                     * Invalidate edge cache via clear instead of invalidate
                     * because of the cacheKey is QueryId not EdgeId
                     */
                    // this.edgesCache.invalidate(id);
                    this.edgesCache.clear();
                }
                return true;
            } else if (Cache.ACTION_CLEAR.equals(args[0])) {
                event.checkArgs(String.class, HugeType.class);
                HugeType type = (HugeType) args[1];
                this.clearCache(type, false);
                return true;
            }
            return false;
        };
        EventHub graphEventHub = this.params().graphEventHub();
        if (!graphEventHub.containsListener(Events.CACHE)) {
            graphEventHub.listen(Events.CACHE, this.cacheEventListener);
        }

        listenerRegistered.set(true);
    }

    private void unlistenChanges() {
        // Unlisten store event
        this.store().provider().unlisten(this.storeEventListener);

        // Unlisten cache event
        EventHub graphEventHub = this.params().graphEventHub();
        graphEventHub.unlisten(Events.CACHE, this.cacheEventListener);

        listenerRegistered.set(false);
    }

    // Other methods...
}

This code uses a ThreadLocal variable listenerRegistered to keep track of whether the listeners have been registered for the current thread. The listenChanges method checks this variable before registering the listeners, and the unlistenChanges method resets it when the listeners are unregistered [1].

To continue talking to Dosu, mention @dosu.