使用setnx来实现分布式锁 #
public class RedisDistributedLock {
private final JedisPool jedisPool;
public RedisDistributedLock(JedisPool jedisPool) {
this.jedisPool = jedisPool;
}
public boolean tryLock(String lockKey, String requestId, int expireTime) {
try (Jedis jedis = jedisPool.getResource()) {
String result = jedis.set(lockKey, requestId, "NX", "PX", expireTime);
return "OK".equals(result);
}
}
public boolean unlock(String lockKey, String requestId) {
try (Jedis jedis = jedisPool.getResource()) {
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId));
return Long.parseLong(result.toString()) == 1L;
}
}
}
tryLock方法接收三个参数,分别是锁的键值lockKey、加锁的请求标识requestId和锁的过期时间expireTime。该方法会尝试使用Redis的set命令加锁,如果加锁成功则返回true,否则返回false。其中NX表示只在锁的键不存在时设置锁,PX表示锁的过期时间为expireTime毫秒。
unlock方法接收两个参数,分别是锁的键值lockKey和加锁的请求标识requestId。该方法会执行一个Lua脚本,判断当前锁的值是否等于请求标识requestId,如果是则删除锁并返回true,否则返回false。该方法使用eval命令执行Lua脚本,传入锁的键值和请求标识两个参数,返回值是执行结果。
jedis setnx实现可重入锁 #
import redis.clients.jedis.Jedis;
public class ReentrantRedisLock {
public synchronized boolean tryLock(Jedis jedis,String lockKey) {
String currentThreadId = String.valueOf(Thread.currentThread().getId());
// 尝试获取锁
String lockValue = jedis.get(lockKey);
if (lockValue == null) {
// 锁不存在,尝试设置锁
jedis.set(lockKey, currentThreadId + ":1", "NX", "EX", 30);
return true;
}
// 锁存在,检查是否由当前线程持有
String[] parts = lockValue.split(":");
//加锁线程是当前线程,则增加次数,进行重入加锁
if (parts.length == 2 && parts[0].equals(currentThreadId)) {
int count = Integer.parseInt(parts[1]) + 1;
jedis.set(lockKey, currentThreadId + ":" + count, "XX", "EX", 30);
return true;
}
//加锁失败
return false;
}
public synchronized void unlock(Jedis jedis,String lockKey) {
String currentThreadId = String.valueOf(Thread.currentThread().getId());
String lockValue = jedis.get(lockKey);
if (lockValue != null) {
String[] parts = lockValue.split(":");
if (parts.length == 2 && parts[0].equals(currentThreadId)) {
int count = Integer.parseInt(parts[1]);
//减少重入次数
if (count > 1) {
jedis.set(lockKey, currentThreadId + ":" + (count - 1), "XX", "EX", 30);
} else {
//解锁
jedis.del(lockKey);
}
}
}
}
}
利用lua脚本的原子性实现
-- tryLock.lua
-- 尝试获取锁的Lua脚本
-- KEYS[1] 是锁的key
-- ARGV[1] 是当前线程的标识
-- ARGV[2] 是锁的超时时间
local lockValue = redis.call('get', KEYS[1])
if lockValue == false then
-- 锁不存在,创建锁并设置超时
redis.call('setex', KEYS[1], ARGV[2], ARGV[1] .. ':1')
return true
else
local parts = {}
local index = 0
for match in (lockValue .. ":"):gmatch("(.-)" .. ":") do
parts[index] = match
index = index + 1
end
if parts[0] == ARGV[1] then
-- 锁已经被当前线程持有,重入次数加1
local count = tonumber(parts[1]) + 1
redis.call('setex', KEYS[1], ARGV[2], ARGV[1] .. ':' .. count)
return true
end
end
return false
..
是Lua中的字符串连接操作符,用于连接两个字符串。gmatch 是Lua的一个字符串操作函数,用于在给定字符串中全局匹配指定的模式,并返回一个迭代器,每次调用这个迭代器都会返回下一个匹配的字符串。
模式 "(.-):" 是一个模式表达式,功能是匹配任意数量的字符直到遇到第一个 ":"
-- unlock.lua
-- 释放锁的Lua脚本
-- KEYS[1] 是锁的key
-- ARGV[1] 是当前线程的标识
local lockValue = redis.call('get', KEYS[1])
if lockValue ~= false then
local parts = {}
local index = 0
for match in (lockValue .. ":"):gmatch("(.-)" .. ":") do
parts[index] = match
index = index + 1
end
if parts[0] == ARGV[1] then
local count = tonumber(parts[1])
if count > 1 then
-- 减少重入次数
count = count - 1
redis.call('set', KEYS[1], ARGV[1] .. ':' .. count)
else
-- 重入次数为0,删除锁
redis.call('del', KEYS[1])
end
return true
end
end
return false
// 尝试获取锁
String tryLockScript = "..."; // Lua脚本字符串
Object result = jedis.eval(tryLockScript, Collections.singletonList(lockKey), Arrays.asList(currentThreadId, "30"));
// 释放锁
String unlockScript = "..."; // Lua脚本字符串
jedis.eval(unlockScript, Collections.singletonList(lockKey), Collections.singletonList(currentThreadId));
使用redlock实现分布式锁 #
Config config1 = new Config();
config1.useSingleServer()
.setAddress("redis://127.0.0.1:6379");
Config config2 = new Config();
config2.useSingleServer()
.setAddress("redis://127.0.0.1:6380");
Config config3 = new Config();
config3.useSingleServer()
.setAddress("redis://127.0.0.1:6381");
RedissonClient redissonClient1 = Redisson.create(config1);
RedissonClient redissonClient2 = Redisson.create(config2);
RedissonClient redissonClient3 = Redisson.create(config3);
RLock lock1 = redissonClient1.getLock("lockKey");
RLock lock2 = redissonClient2.getLock("lockKey");
RLock lock3 = redissonClient3.getLock("lockKey");
RedissonRedLock redLock = new RedissonRedLock(lock1, lock2, lock3);
boolean lockResult = redLock.tryLock(waitTime, leaseTime, TimeUnit.MILLISECONDS);
// 业务逻辑
if (lockResult) {
try {
// 业务逻辑
} finally {
redLock.unlock();
}
} else {
// 获取锁失败的处理逻辑
}
redissonClient1.shutdown();
redissonClient2.shutdown();
redissonClient3.shutdown();
使用redisson实现分布式锁 #
定义redission客户端
@Configuration
public class RedissonConfig {
@Bean(destroyMethod="shutdown")
public RedissonClient redisson() throws IOException {
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6379");
RedissonClient redisson = Redisson.create(config);
return redisson;
}
}
调用加锁(可重入锁)
@Service
public class LockTestService{
@Autowired
RedissonClient redisson;
public void testLock(){
RLock lock = redisson.getLock("myLock");
try {
lock.lock();
// 设置锁的超时时间为30秒
// lock.lock(30, TimeUnit.SECONDS);
// 执行需要加锁的代码
} finally {
lock.unlock();
}
}
}
公平锁
RLock fairLock = redisson.getFairLock("anyLock");
fairLock.lock();
联锁
RLock lock1 = redissonInstance1.getLock("lock1");
RLock lock2 = redissonInstance2.getLock("lock2");
RLock lock3 = redissonInstance3.getLock("lock3");
RedissonMultiLock lock = new RedissonMultiLock(lock1, lock2, lock3);
// 同时加锁:lock1 lock2 lock3
// 所有的锁都上锁成功才算成功。
lock.lock();
...
lock.unlock();
可重入读写锁(允许同时有多个读锁和一个写锁对同一个资源进行加锁。)
RReadWriteLock rwlock = redisson.getReadWriteLock("anyRWLock");
// 最常见的使用方法
rwlock.readLock().lock();
// 或
rwlock.writeLock().lock();
使用redisson实现延迟队列 #
定义客户端
@Configuration
public class RedissonConfig {
@Bean(destroyMethod="shutdown")
public RedissonClient redisson() throws IOException {
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6379");
RedissonClient redisson = Redisson.create(config);
return redisson;
}
}
实现延迟队列
import org.redisson.api.RBlockingDeque;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.TimeUnit;
@Component
public class RedissonOrderDelayQueue {
@Autowired
RedissonClient redisson;
public void addTaskToDelayQueue(String orderId) {
RBlockingDeque<String> blockingDeque = redisson.getBlockingDeque("orderQueue");
RDelayedQueue<String> delayedQueue = redisson.getDelayedQueue(blockingDeque);
System.out.println(LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")) + "添加任务到延时队列里面");
delayedQueue.offer(orderId, 3, TimeUnit.SECONDS);
delayedQueue.offer(orderId, 6, TimeUnit.SECONDS);
delayedQueue.offer(orderId, 9, TimeUnit.SECONDS);
}
public String getOrderFromDelayQueue() {
RBlockingDeque<String> blockingDeque = redisson.getBlockingDeque("orderQueue");
RDelayedQueue<String> delayedQueue = redisson.getDelayedQueue(blockingDeque);
# 阻塞操作
String orderId = blockingDeque.take();
return orderId;
}
}
实现滑动窗口限流 #
命令 | 描述 |
---|---|
ZCARD key | 获取有序集合的成员数 |
ZREMRANGEBYSCORE key min max | 移除有序集合中给定的分数区间的所有成员 |
主要步骤如下:
- 定义滑动窗口的时间范围,例如,窗口大小为60秒。
- 每次收到一个请求时,我们就定义出一个zset然后存储到redis中。
- 然后再通过ZREMRANGEBYSCORE命令来删除分值小于窗口起始时间戳(当前时间戳-60s)的数据。
- 最后,再使用ZCARD命令来获取有序集合中的成员数量,即在窗口内的请求量。
import redis.clients.jedis.Jedis;
public class SlidingWindowRateLimiter {
private Jedis jedis;
private String key;
private int limit;
public boolean allowRequest(String key) {
//当前时间戳
long currentTime = System.currentTimeMillis();
//窗口开始时间是当前时间减60s
long windowStart = currentTime - 60 * 1000;
//删除窗口开始时间之前的所有数据
jedis.zremrangeByScore(key, "-inf", String.valueOf(windowStart));
//计算总请求数
long currentRequests = jedis.zcard(key);
//窗口足够则把当前请求加入
if (currentRequests < limit) {
jedis.zadd(key, currentTime, String.valueOf(currentTime));
return true;
}
return false;
}
}
通过lua脚本支持高并发
import redis.clients.jedis.Jedis;
public class SlidingWindowRateLimiter {
private Jedis jedis;
private String key;
private int limit;
public SlidingWindowRateLimiter(Jedis jedis, String key, int limit) {
this.jedis = jedis;
this.key = key;
this.limit = limit;
}
public boolean allowRequest(String key) {
// 当前时间戳
long currentTime = System.currentTimeMillis();
// 使用Lua脚本来确保原子性操作
String luaScript = "local window_start = ARGV[1] - 60000\n" +
"redis.call('ZREMRANGEBYSCORE', KEYS[1], '-inf', window_start)\n" +
"local current_requests = redis.call('ZCARD', KEYS[1])\n" +
"if current_requests < tonumber(ARGV[2]) then\n" +
" redis.call('ZADD', KEYS[1], ARGV[1], ARGV[1])\n" +
" return 1\n" +
"else\n" +
" return 0\n" +
"end";
Object result = jedis.eval(luaScript, 1, key, String.valueOf(currentTime), String.valueOf(limit));
return (Long) result == 1;
}
}
jedis实现pipeline #
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Pipeline;
public class RedisPipelineExample {
public static void main(String[] args) {
// 连接到 Redis 服务器
try (Jedis jedis = new Jedis("localhost", 6379)) {
// 创建 Pipeline
Pipeline pipeline = jedis.pipelined();
// 向 Pipeline 添加命令
pipeline.set("foo", "bar");
pipeline.get("foo");
pipeline.incr("counter");
// 执行 Pipeline 中的所有命令,并获取响应
List<Object> responses = pipeline.syncAndReturnAll();
// 输出响应
for (Object response : responses) {
System.out.println(response);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
jedis实现乐观锁 #
基于CAS乐观锁,通过WATCH
命令来实现
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Transaction;
public class RedisOptimisticLock {
public static void main(String[] args) {
// 连接到 Redis
Jedis jedis = new Jedis("localhost");
try {
// 监视键
String key = "myKey";
jedis.watch(key);
// 模拟从数据库读取最新值
String value = jedis.get(key);
int intValue = Integer.parseInt(value);
// 开始事务
Transaction t = jedis.multi();
// 在事务中执行操作
t.set(key, String.valueOf(intValue + 1));
// 尝试执行事务
if (t.exec() == null) {
System.out.println("事务执行失败,数据已被其他客户端修改");
} else {
System.out.println("事务执行成功");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
jedis.close();
}
}
}
扫描所有的key #
基于KEYS
命令实现
import redis.clients.jedis.Jedis;
public class RedisKeysExample {
public static void main(String[] args) {
Jedis jedis = new Jedis("localhost");
Set<String> keys = jedis.keys("*"); // 使用KEYS命令获取所有键
for(String key : keys) {
System.out.println(key);
}
jedis.close();
}
}
基于SCANS
命令实现
import redis.clients.jedis.Jedis;
import redis.clients.jedis.ScanParams;
import redis.clients.jedis.ScanResult;
public class RedisScanExample {
public static void main(String[] args) {
Jedis jedis = new Jedis("localhost");
String cursor = ScanParams.SCAN_POINTER_START;
ScanParams scanParams = new ScanParams().count(10);
do {
ScanResult<String> scanResult = jedis.scan(cursor, scanParams);
cursor = scanResult.getCursor();
scanResult.getResult().forEach(System.out::println);
} while (!cursor.equals("0"));
jedis.close();
}
}
单例模式 #
懒汉 #
在需要时才会去创建对象
public class Singleton {
private static Singleton instance;
private Singleton (){}
public static synchronized Singleton getInstance() {
if (instance == null) {
instance = new Singleton();
}
return instance;
}
}
饿汉 #
在类刚一初始化的时候就立即把单例对象创建出来。
public class Singleton {
private static Singleton instance = new Singleton();
private Singleton (){}
public static Singleton getInstance() {
return instance;
}
}
public class Singleton {
private Singleton instance = null;
static {
instance = new Singleton();
}
private Singleton (){}
public static Singleton getInstance() {
return this.instance;
}
}
静态内部类 #
public class Singleton {
private static class SingletonHolder {
private static final Singleton INSTANCE = new Singleton();
}
private Singleton (){}
public static final Singleton getInstance() {
return SingletonHolder.INSTANCE;
}
}
枚举 #
public enum Singleton {
INSTANCE;
public void whateverMethod() {
}
}
双重校验锁 #
public class Singleton {
private volatile static Singleton singleton;
private Singleton (){}
public static Singleton getSingleton() {
if (singleton == null) {
synchronized (Singleton.class) {
if (singleton == null) {
singleton = new Singleton();
}
}
}
return singleton;
}
}
CAS实现线程安全 #
public class Singleton {
private static final AtomicReference<Singleton> INSTANCE = new AtomicReference<Singleton>();
private Singleton() {}
public static Singleton getInstance() {
for (;;) {
Singleton singleton = INSTANCE.get();
if (null != singleton) {
return singleton;
}
singleton = new Singleton();
if (INSTANCE.compareAndSet(null, singleton)) {
return singleton;
}
}
}
}
破坏单例模式 #
反射破坏单例模式 #
Singleton singleton1 = Singleton.getSingleton();
//通过反射获取到构造函数
Constructor<Singleton> constructor = Singleton.class.getDeclaredConstructor();
//将构造函数设置为可访问类型
constructor.setAccessible(true);
//调用构造函数的newInstance创建一个对象
Singleton singleton2 = constructor.newInstance();
//判断反射创建的对象和之前的对象是不是同一个对象
System.out.println(s1 == s2);
setAccessible(true),使得反射对象在使用时应该取消 Java 语言访问检查,使得私有的构造函数能够被访问。
反序列化破坏单例 #
package com.tencent
import java.io.*;
public class SerializableDemo1 {
//为了便于理解,忽略关闭流操作及删除文件操作。真正编码时千万不要忘记
//Exception直接抛出
public static void main(String[] args) throws IOException, ClassNotFoundException {
//Write Obj to file
ObjectOutputStream oos = new ObjectOutputStream(new FileOutputStream("tempFile"));
oos.writeObject(Singleton.getSingleton());
//Read Obj from file
File file = new File("tempFile");
ObjectInputStream ois = new ObjectInputStream(new FileInputStream(file));
Singleton newInstance = (Singleton) ois.readObject();
//判断是否是同一个对象
System.out.println(newInstance == Singleton.getSingleton());
}
}
通过对Singleton的序列化与反序列化得到的对象是一个新的对象,这就破坏了Singleton的单例性。
序列化会通过Unsafe直接分配内存的方式来创建一个新的对象。
避免被破坏单例 #
避免被反射破坏 #
反射是调用默认的构造函数创建出来的,只需要我们改造下构造函数,使其在反射调用的时候识别出来对象是不是被创建过就行了
private Singleton() {
if (singleton != null){
throw new RuntimeException("单例对象只能创建一次... ");
}
避免反序列化破坏单例 #
解决反序列化的破坏单例,只需要我们自定义反序列化的策略就行了,就是说我们不要让他走默认逻辑一直调用到Unsafe创建对象,而是我们干预他的这个过程,干预方式就是在Singleton类中定义readResolve
,这样就可以解决该问题
public class Singleton implements Serializable{
private volatile static Singleton singleton;
private Singleton (){}
public static Singleton getSingleton() {
if (singleton == null) {
synchronized (Singleton.class) {
if (singleton == null) {
singleton = new Singleton();
}
}
}
return singleton;
}
private Object readResolve() {
return singleton;
}
}
spring拿到代理对象 #
TestService proxy = (TestService)AopContext.currentProxy();
同时mainClass上需要开启注解
@EnableAspectJAutoProxy(exposeProxy=True)
并发相关工具类 #
public class ConcurrentUtil {
private static final Logger logger = LoggerFactory.getLogger(ConcurrentUtil.class);
public static <T> void awaitComplete(List<Future<T>> futureList) {
for (Future<T> future : futureList) {
try {
future.get();
} catch (Exception e) {
future.cancel(true);
throw new OmsServerException(9999, "future get failed: " + e.getMessage());
}
}
}
public static <T> T awaitCompleteAndGet(Future<T> future) {
return awaitCompleteAndGet(future, 0, null);
}
public static <T> T awaitCompleteAndGet(Future<T> future, int timeout, TimeUnit unit) {
List<T> resultList = awaitCompleteAndGet(Arrays.asList(future), timeout, unit, true);
if (CollectionUtils.isEmpty(resultList)) {
return null;
}
return resultList.get(0);
}
public static <T> List<T> awaitCompleteAndGet(List<Future<T>> futureList) {
return awaitCompleteAndGet(futureList, true);
}
public static <T> List<T> awaitCompleteAndGet(List<Future<T>> futureList, boolean throwException) {
return awaitCompleteAndGet(futureList, 0L, null, throwException);
}
public static <T> List<T> awaitCompleteAndGet(List<Future<T>> futureList, long timeout, TimeUnit timeUnit,
boolean throwException) {
List<T> retList = new ArrayList<>();
for (Future<T> future : futureList) {
try {
T ret = null;
if (timeout <= 0 || null == timeUnit) {
ret = future.get();
} else {
ret = future.get(timeout, timeUnit);
}
if (null != ret) {
retList.add(ret);
} else {
logger.warn("future get result is null, maybe thread over head, auto ignore.");
}
} catch (Exception e) {
logger.error("await complete failed, err msg is: [{}]", e.getMessage());
future.cancel(true);
if (throwException) {
throw new OmsServerException(9999, "await complete failed, err msg is: " + e.getMessage());
}
}
}
return retList;
}
}