Open TemplarJQ opened 4 years ago
定义具体能力接口,以及提供锁业务处理的通用方法实现:lockHandle,具体的使用说明见代码注释
package com.taobao.gow.utils;
import com.taobao.gow.client.common.Result;
import java.util.function.Supplier;
/**
* 分布式乐观锁接口定义
*
* @author shangchen.wxx
*/
public interface DistributedLock {
/**
* 分布式锁超时时间,即当前分布式锁多长时间之后过期无用了
*
* @param lockTimeout 单位秒,必须指定;业务根据实际情况合理设置该值,避免在系统异常重启之后长时间锁独占
* @return 如果拿到锁,则返回分布式锁对应的value值,如果没有拿到则返回 null
*/
String lockKey(String lockKey, int lockTimeout);
/**
* unlock,必须在获取锁的时候才可以执行unlock操作
*/
boolean unlockKey(String lockKey, String lockValue);
/**
* 等待锁工具方法,该方法业务可以不用,自己去实现~~~
* 目前的实现只是单纯的现成sleep等待
*
* @param lockKey 等待的分布式锁
* @param sleepMillis 当前等待的线程sleep的时间,单位毫秒
* @param sleepTimes sleep的次数
*/
void waitLock(String lockKey, long sleepMillis, int sleepTimes);
default <T> Result<T> lockHandle(LockHandleParam param, Supplier<Result<T>> handle) {
final String lockKey = param.getKey();
String lockValue = null;
try {
while (true) {
lockValue = lockKey(lockKey, param.getLockTimeout());
if (lockValue == null) {
waitLock(lockKey, param.getSleepMillis(), param.getSleepTimes());
continue;
}
return handle.get();
}
} finally {
if (lockValue != null) {
unlockKey(lockKey, lockValue);
}
}
}
}
package com.taobao.gow.scaleoperation.common;
import com.taobao.gow.common.LDBTairClient;
import com.taobao.gow.utils.DistributedLock;
import com.taobao.tair.DataEntry;
import com.taobao.tair.Result;
import com.taobao.tair.ResultCode;
import com.taobao.tair.TairManager;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import java.net.InetAddress;
import java.net.UnknownHostException;
@Slf4j
@Component
public class ScaleOperationDistributedLock implements DistributedLock {
// private final RdbDistributedLock rdbDistributedLock;
private final LdbDistributedLock ldbDistributedLock;
@Autowired
public ScaleOperationDistributedLock(@Qualifier("ldbTairManager") TairManager ldbTairManager) {
// RdbSmartApi rdbSmartApi = RdbSmartFactory.getClientManager(rdbInstance);
// rdbSmartApi.setPassWord(rdbPassword);
// String initMessage = rdbSmartApi.init();
// log.info("init finish loan rdb tair cache: " + initMessage);
// rdbDistributedLock = new RdbDistributedLock(rdbSmartApi.sync());
ldbDistributedLock = new LdbDistributedLock(ldbTairManager, LDBTairClient.NAMESPACE);
}
@Override
public String lockKey(String lockKey, int lockTimeout) {
if (lockTimeout <= 0) {
throw new IllegalArgumentException("lockTimeout must gt 0!");
}
return ldbDistributedLock.lockKey(lockKey, lockTimeout);
}
@Override
public boolean unlockKey(String lockKey, String lockValue) {
return ldbDistributedLock.unlockKey(lockKey, lockValue);
}
@Override
public void waitLock(String lockKey, long sleepMillis, int sleepTimes) {
if (sleepMillis <= 0) {
throw new IllegalArgumentException("sleepMillis must gt 0!");
}
if (sleepTimes <= 0) {
throw new IllegalArgumentException("sleepTimes must gt 0!");
}
ldbDistributedLock.waitLock(lockKey, sleepMillis, sleepTimes);
}
private static abstract class AbstractDistributedLock implements DistributedLock {
private final String hostName;
AbstractDistributedLock() {
String hostName = null;
try {
hostName = InetAddress.getLocalHost().getHostAddress();
} catch (UnknownHostException e) {
log.error("InetAddress.getLocalHost() error!");
}
if (StringUtils.isBlank(hostName)) {
hostName = Long.toString((long) (Math.random() * 1000000000L));
}
log.warn("DistributedLock " + getClass().getSimpleName() + " init, hostName = " + hostName);
this.hostName = hostName;
}
String getLockValue() {
return hostName + Thread.currentThread().getName();
}
abstract String getStrValue(String key);
@Override
public void waitLock(String lockKey, long sleepMillis, int sleepTimes) {
try {
int times = 0;
while (times++ < sleepTimes) {
String lockVal = getStrValue(lockKey);
if (lockVal == null) {
//当前锁已经失效了,赶紧去lock吧
return;
}
Thread.sleep(sleepMillis);
}
} catch (Exception e) {
log.error("waitLock exception, lockKey: " + lockKey + ", sleepMillis = " + sleepMillis + ", sleepTimes = " + sleepTimes, e);
}
}
}
/**
* 基于tair1.0 ldb实现的分布式锁
*/
private static class LdbDistributedLock extends AbstractDistributedLock {
private final TairManager ldbTairManager;
private final int namespace;
private LdbDistributedLock(TairManager ldbTairManager, int namespace) {
this.ldbTairManager = ldbTairManager;
this.namespace = namespace;
}
Result<DataEntry> getTairByKey(String lockKey) {
Result<DataEntry> result = ldbTairManager.get(namespace, lockKey);
int retryGet = 0;
//最多重试2次,算上第一次获取,总共3次
while (retryGet++ < 2 && (result == null || ResultCode.CONNERROR.equals(result.getRc()) || ResultCode.TIMEOUT.equals(result.getRc())
|| ResultCode.UNKNOW.equals(result.getRc()))) {
result = ldbTairManager.get(namespace, lockKey);
}
return result;
}
/**
* @param lockKey key
* @param lockValue 锁得值
* @param retryCount 当前重试的次数,第一次调用为0,重试次数做多为2次
* @return 是否拿到锁
*/
private boolean tryLock(String lockKey, String lockValue, int retryCount, int lockTimeout) {
if (retryCount >= 3) {
return false;
}
Result<DataEntry> result = getTairByKey(lockKey);
if (ResultCode.DATANOTEXSITS.equals(result.getRc())) {
//添加锁
ResultCode code = ldbTairManager.put(namespace, lockKey, lockValue, 2, lockTimeout);
if (ResultCode.SUCCESS.equals(code)) {
//获取锁成功
return true;
} else if (ResultCode.CONNERROR.equals(code) || ResultCode.TIMEOUT.equals(code) || ResultCode.UNKNOW.equals(code)) {
//如果不是上面的错误码,就说明拿不到锁,返回吧
return tryLock(lockKey, lockValue, ++retryCount, lockTimeout);
} else {
return false;
}
} else {
//当前线程拿到的锁,OK的,直接返回
return result.getValue() != null && lockValue.equals(result.getValue().getValue());
}
}
@Override
public String lockKey(String lockKey, int lockTimeout) {
String lockValue = getLockValue();
if (tryLock(lockKey, lockValue, 0, lockTimeout)) {
return lockValue;
} else {
return null;
}
}
@Override
public boolean unlockKey(String lockKey, String lockValue) {
String val = getStrValue(lockKey);
if (val != null && val.equals(lockValue)) {
ldbTairManager.invalid(namespace, lockKey);
return true;
} else {
return false;
}
}
@Override
String getStrValue(String key) {
Result<DataEntry> result = getTairByKey(key);
if (result.getValue() != null) {
return (String) result.getValue().getValue();
} else {
return null;
}
}
}
}
实现起来更简单点,如下代码, 核心借用RDB的NX(在指定的 key 不存在时,为 key 设置指定的值)特性
/**
* 基于tair2.0 rdb实现的分布式锁
*/
private static class RdbDistributedLock extends AbstractDistributedLock {
private final static Charset CHARSET = StandardCharsets.UTF_8;
private final RedisSyncApi redisSyncApi;
private RdbDistributedLock(RedisSyncApi redisSyncApi) {
this.redisSyncApi = redisSyncApi;
}
@Override
String getStrValue(String key) {
try {
byte[] bytes = redisSyncApi.get(key.getBytes(CHARSET));
if (bytes == null) {
return null;
}
return new String(bytes, CHARSET);
} catch (Exception e) {
log.error("rdb getStrValue exception, key: " + key, e);
return null;
}
}
Object get(String key) {
try {
byte[] bytes = redisSyncApi.get(key.getBytes());
if (bytes == null || bytes.length == 0) {
return null;
}
try (ObjectInputStream objIn = new ObjectInputStream(new ByteArrayInputStream(bytes))) {
return objIn.readObject();
}
} catch (Exception e) {
log.error("redisSyncApi get exception, key: " + key, e);
}
return null;
}
String set(String key, Object obj, int timeout) {
ObjectOutputStream objOut = null;
try {
ByteArrayOutputStream byteOut = new ByteArrayOutputStream();
objOut = new ObjectOutputStream(byteOut);
objOut.writeObject(obj);
objOut.flush();
return redisSyncApi.set(key.getBytes(), byteOut.toByteArray(), SetParams.setParams().ex(timeout));
} catch (Exception e) {
log.error("redisSyncApi set exception, key: " + key + ", obj: " + JSON.toJSONString(obj), e);
return "FAILED";
} finally {
if (objOut != null) {
try {
objOut.close();
} catch (IOException ignored) {
}
}
}
}
long del(String key) {
try {
return redisSyncApi.del(key.getBytes());
} catch (Exception e) {
log.error("redisSyncApi del exception key: " + key, e);
return -1L;
}
}
@Override
public String lockKey(String lockKey, int lockTimeout) {
String lockValue = getLockValue();
try {
String result = redisSyncApi.set(lockKey.getBytes(CHARSET), lockValue.getBytes(CHARSET), "NX".getBytes(CHARSET), "EX".getBytes(CHARSET), lockTimeout);
if ("OK".equalsIgnoreCase(result)) {
//拿到锁
return lockValue;
}
} catch (Exception e) {
log.error("lockKey set exception, lockKey: " + lockKey + ", lockValue = " + lockValue, e);
}
return null;
}
@Override
public boolean unlockKey(String lockKey, String lockValue) {
try {
long ret = redisSyncApi.cad(lockKey.getBytes(CHARSET), lockValue.getBytes(CHARSET));
if (1 == ret) {
return true;
}
} catch (Exception e) {
log.error("unlockKey cad exception, lockKey: " + lockKey + ", lockValue = " + lockValue, e);
}
return false;
}
}
分布式锁实现方案
目前常用的实现方案主要有