vert-x3 / vertx-hazelcast

Hazelcast Cluster Manager for Vert.x
Apache License 2.0
76 stars 76 forks source link

Lock release should release a single permit #94

Closed meggarr closed 5 years ago

meggarr commented 6 years ago

Hi, there,

I just noticed that the implementation of Locks with Hazelcast is via ISemaphore which can be potentially release a permit multiple times. Though it is developer's obligation to use the Semaphore correctly, from the Lock's point of view, it should be able to release more than one time, e.g. in multiple exception handler. But when it tries to acquire the lock, multiple release() should not result that the Lock can be acquired multiple times.

This could be a drawback of using ISemaphore to implement the Lock, while I am thinking the Ignite's implementation using a queue with a cap (max size) is more appropriate.

Any ideas?

meggarr commented 6 years ago

Or we have to have a flag in the HazelcastLock to track the locked/released state. I would submit a PR for this.

tsegismont commented 6 years ago

Can you provide a snippet demonstrating what's wrong with calling lock#release multiple times? Thanks

meggarr commented 6 years ago

I don't have a snippet. However, when I try to use a lock represents a device resource, and at one time there could only be one operation on the device. In a 3 instances cluster A, B, C, all 3 try to acquire the lock and operate the device, and only A gets the lock, and the other two are waiting. In A, after device operation is done, it calls lock#release, and for an unknown reason, A calls lock#release again. Because the Lock is essentially a ISemaphore, the two lock#release calls increase the permit number to two. Then both B and C will get the lock or permit to operation on the device at the same time - that is not desired.

tsegismont commented 6 years ago

I asked for snippet because a lock acquired through the Vert.x API should have initial-permits set to 1. See https://github.com/vert-x3/vertx-hazelcast/blob/59cb7c756101372e000771b59d205174fa0cdbf7/src/main/resources/default-cluster.xml#L168

As per Hazelcast doc:

initial-permits: the thread count to which the concurrent access is limited. For example, if you set it to "3", concurrent access to the object is limited to 3 threads.

How do you retrieve the lock and configure Hazelcast?

meggarr commented 6 years ago

Below is my config, and I think it's alright, the initial permit is 1.

<?xml version="1.0" encoding="UTF-8"?>
<hazelcast xsi:schemaLocation="http://www.hazelcast.com/schema/config hazelcast-config-3.10.xsd"
           xmlns="http://www.hazelcast.com/schema/config"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
  <properties>
    <property name="hazelcast.phone.home.enabled">false</property>
    <property name="hazelcast.logging.type">log4j2</property>
    <property name="hazelcast.mancenter.enabled">false</property>
    <property name="hazelcast.memcache.enabled">false</property>
    <property name="hazelcast.rest.enabled">true</property>

    <property name="hazelcast.io.thread.count">3</property>
    <property name="hazelcast.prefer.ipv4.stack">true</property>
    <property name="hazelcast.heartbeat.failuredetector.type">deadline</property>
    <property name="hazelcast.heartbeat.interval.seconds">5</property>
    <property name="hazelcast.max.no.heartbeat.seconds">50</property>
    <property name="hazelcast.wait.seconds.before.join">5</property>

    <property name="hazelcast.partition.table.send.interval">10</property>
    <property name="hazelcast.partition.backup.sync.interval">20</property>
    <property name="hazelcast.merge.first.run.delay.seconds">180</property>
    <property name="hazelcast.merge.next.run.delay.seconds">120</property>

    <property name="hazelcast.shutdownhook.enabled">false</property>
    <property name="hazelcast.discovery.enabled">true</property>
  </properties>

  <instance-name>${k8s.pod_name}</instance-name>

  <group>
    <name>abc</name>
    <password>Abc123</password>
  </group>

  <network>
    <port auto-increment="true" port-count="10000">5701</port>
    <outbound-ports>
      <!--
      Allowed port range when connecting to other nodes.
      0 or * means use system provided port.
      -->
      <ports>0</ports>
    </outbound-ports>
    <join>
      <!-- deactivate normal discovery -->
      <multicast enabled="false"/>
      <tcp-ip enabled="false"/>
      <!-- activate the Kubernetes plugin -->
      <discovery-strategies>
        <discovery-strategy enabled="true"
                            class="com.hazelcast.kubernetes.HazelcastKubernetesDiscoveryStrategy">
          <properties>
            <!-- configure discovery service API lookup -->
            <property name="service-label-name">abc/vertx-cluster</property>
            <property name="service-label-value">true</property>
            <property name="namespace">${k8s.namespace}</property>
            <property name="kubernetes-master">https://kubernetes.kubernetes.rancher.internal:6443</property>
          </properties>
        </discovery-strategy>
      </discovery-strategies>
    </join>
  </network>

  <executor-service name="vertx-hazelcast-pool">
    <pool-size>8</pool-size>
    <!--Queue capacity. 0 means Integer.MAX_VALUE.-->
    <queue-capacity>24</queue-capacity>
  </executor-service>

  <multimap name="__vertx.subs">
    <!--
        Number of backups. If 1 is set as the backup-count for example,
        then all entries of the map will be copied to another JVM for
        fail-safety. 0 means no backup.
    -->
    <backup-count>1</backup-count>
  </multimap>

  <map name="__vertx.haInfo">
    <!--
        We cannot set quorum rules on the HA Info map, as Vert.X is listening on member changes of HA.
        During the member changing, the quorum size may not be met, so it will fail when update HA Info
        because of quorum failure.
    -->
    <!--<quorum-ref>xxx-quorum-rule</quorum-ref>-->
    <!--
        Number of backups. If 1 is set as the backup-count for example,
        then all entries of the map will be copied to another JVM for
        fail-safety. 0 means no backup.
    -->
    <backup-count>1</backup-count>
    <!--
        Maximum number of seconds for each entry to stay in the map. Entries that are
        older than <time-to-live-seconds> and not updated for <time-to-live-seconds>
        will get automatically evicted from the map.
        Any integer between 0 and Integer.MAX_VALUE. 0 means infinite. Default is 0.
    -->
    <time-to-live-seconds>0</time-to-live-seconds>
    <!--
        Maximum number of seconds for each entry to stay idle in the map. Entries that are
        idle(not touched) for more than <max-idle-seconds> will get
        automatically evicted from the map. Entry is touched if get, put or containsKey is called.
        Any integer between 0 and Integer.MAX_VALUE. 0 means infinite. Default is 0.
    -->
    <max-idle-seconds>0</max-idle-seconds>
    <!--
        Valid values are:
        NONE (no eviction),
        LRU (Least Recently Used),
        LFU (Least Frequently Used).
        NONE is the default.
    -->
    <eviction-policy>NONE</eviction-policy>
    <!--
        Maximum size of the map. When max size is reached,
        map is evicted based on the policy defined.
        Any integer between 0 and Integer.MAX_VALUE. 0 means
        Integer.MAX_VALUE. Default is 0.
    -->
    <max-size policy="PER_NODE">0</max-size>
    <!--
        While recovering from split-brain (network partitioning),
        map entries in the small cluster will merge into the bigger cluster
        based on the policy set here. When an entry merge into the
        cluster, there might an existing entry with the same key already.
        Values of these entries might be different for that same key.
        Which value should be set for the key? Conflict is resolved by
        the policy set here. Default policy is PutIfAbsentMapMergePolicy

        There are built-in merge policies such as
        DiscardMergePolicy: the entry from the smaller cluster will be discarded.
        ExpirationTimeMergePolicy: the entry with the higher expiration time wins.
        HigherHitsMergePolicy: the entry with the higher number of hits wins.
        HyperLogLogMergePolicy: specialized merge policy for the CardinalityEstimator, which uses the default merge algorithm from HyperLogLog research, keeping the max register value of the two given instances.
        LatestAccessMergePolicy: the entry with the latest access wins.
        LatestUpdateMergePolicy: the entry with the latest update wins.
        PassThroughMergePolicy: the entry from the smaller cluster wins.
        PutIfAbsentMergePolicy: the entry from the smaller cluster wins if it doesn’t exist in the cluster.
    -->
    <merge-policy batch-size="50">LatestUpdateMergePolicy</merge-policy>
  </map>

  <quorum enabled="true" name="locks-quorum-rule">
    <quorum-size>${cluster.locks_quorum_size}</quorum-size>
    <quorum-type>READ_WRITE</quorum-type>
    <recently-active-quorum heartbeat-tolerance-millis="50000" />
  </quorum>

  <!-- Used internally in Vert.x to implement async locks -->
  <semaphore name="__vertx.*">
    <initial-permits>1</initial-permits>
    <backup-count>1</backup-count>
    <quorum-ref>locks-quorum-rule</quorum-ref>
  </semaphore>

</hazelcast>

To get the lock, it is just SharedData#getLockWithTimeout, e.g.

    long start = System.currentTimeMillis();
    sd.getLockWithTimeout(lockKey, lockTimeout, lr -> {
      long duration = System.currentTimeMillis() - start;
      log.debug("SharedData getLock {} takes {}ms, {}, context: {}", lockKey, duration, lr.succeeded() ? "succeeded" : "failed", contextHash);
      ...
    });

So in my example, instance A, B and C all run above code, instance A will get the lock (permit) firstly, B and C will be waiting. Then if A releases the lock twice (increase the permit to 2), then B and C will both acquire the lock..

tsegismont commented 6 years ago

Thanks for the details. Would you mind to create a pull request with your changes? Could you also create a PR on Vert.x core to update io.vertx.core.shareddata.AsynchronousLockTest? Then we would be sure it works as expected on all cluster managers.

meggarr commented 6 years ago

@tsegismont Yes, I have put the PR for this one - PR #99 .

PR on Vet.x core is https://github.com/eclipse-vertx/vert.x/pull/2640