Heroyf @ heroyf.com

Java Code Snippets

Mar 24 · 30min

使用setnx来实现分布式锁

public class RedisDistributedLock {
    private final JedisPool jedisPool;

    public RedisDistributedLock(JedisPool jedisPool) {
        this.jedisPool = jedisPool;
    }

    public boolean tryLock(String lockKey, String requestId, int expireTime) {
        try (Jedis jedis = jedisPool.getResource()) {
            String result = jedis.set(lockKey, requestId, "NX", "PX", expireTime);
            return "OK".equals(result);
        }
    }

    public boolean unlock(String lockKey, String requestId) {
        try (Jedis jedis = jedisPool.getResource()) {
            String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
            Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId));
            return Long.parseLong(result.toString()) == 1L;
        }
    }
}

tryLock方法接收三个参数,分别是锁的键值lockKey、加锁的请求标识requestId和锁的过期时间expireTime。该方法会尝试使用Redis的set命令加锁,如果加锁成功则返回true,否则返回false。其中NX表示只在锁的键不存在时设置锁,PX表示锁的过期时间为expireTime毫秒。

unlock方法接收两个参数,分别是锁的键值lockKey和加锁的请求标识requestId。该方法会执行一个Lua脚本,判断当前锁的值是否等于请求标识requestId,如果是则删除锁并返回true,否则返回false。该方法使用eval命令执行Lua脚本,传入锁的键值和请求标识两个参数,返回值是执行结果。

jedis setnx实现可重入锁

import redis.clients.jedis.Jedis;

public class ReentrantRedisLock {

    public synchronized boolean tryLock(Jedis jedis,String lockKey) {
        String currentThreadId = String.valueOf(Thread.currentThread().getId());

        // 尝试获取锁
        String lockValue = jedis.get(lockKey);
        if (lockValue == null) {
            // 锁不存在,尝试设置锁
            jedis.set(lockKey, currentThreadId + ":1", "NX", "EX", 30);
            return true;
        }

        // 锁存在,检查是否由当前线程持有
        String[] parts = lockValue.split(":");

        //加锁线程是当前线程,则增加次数,进行重入加锁
        if (parts.length == 2 && parts[0].equals(currentThreadId)) {
            int count = Integer.parseInt(parts[1]) + 1;
            jedis.set(lockKey, currentThreadId + ":" + count, "XX", "EX", 30);
            return true;
        }

        //加锁失败
        return false;
    }

    public synchronized void unlock(Jedis jedis,String lockKey) {
        String currentThreadId = String.valueOf(Thread.currentThread().getId());

        String lockValue = jedis.get(lockKey);
        if (lockValue != null) {
            String[] parts = lockValue.split(":");
            if (parts.length == 2 && parts[0].equals(currentThreadId)) {
                int count = Integer.parseInt(parts[1]);
                //减少重入次数
                if (count > 1) {
                    jedis.set(lockKey, currentThreadId + ":" + (count - 1), "XX", "EX", 30);
                } else {
                    //解锁
                    jedis.del(lockKey);
                }
            }
        }
    }
}

利用lua脚本的原子性实现

-- tryLock.lua
-- 尝试获取锁的Lua脚本
-- KEYS[1] 是锁的key
-- ARGV[1] 是当前线程的标识
-- ARGV[2] 是锁的超时时间
local lockValue = redis.call('get', KEYS[1])
if lockValue == false then
    -- 锁不存在,创建锁并设置超时
    redis.call('setex', KEYS[1], ARGV[2], ARGV[1] .. ':1')
    return true
else
    local parts = {}
    local index = 0
    for match in (lockValue .. ":"):gmatch("(.-)" .. ":") do
        parts[index] = match
        index = index + 1
    end
    if parts[0] == ARGV[1] then
        -- 锁已经被当前线程持有,重入次数加1
        local count = tonumber(parts[1]) + 1
        redis.call('setex', KEYS[1], ARGV[2], ARGV[1] .. ':' .. count)
        return true
    end
end
return false

.. 是Lua中的字符串连接操作符,用于连接两个字符串。

gmatch 是Lua的一个字符串操作函数,用于在给定字符串中全局匹配指定的模式,并返回一个迭代器,每次调用这个迭代器都会返回下一个匹配的字符串。

模式 "(.-):" 是一个模式表达式,功能是匹配任意数量的字符直到遇到第一个 ":"

-- unlock.lua
-- 释放锁的Lua脚本
-- KEYS[1] 是锁的key
-- ARGV[1] 是当前线程的标识
local lockValue = redis.call('get', KEYS[1])
if lockValue ~= false then
    local parts = {}
    local index = 0
    for match in (lockValue .. ":"):gmatch("(.-)" .. ":") do
        parts[index] = match
        index = index + 1
    end
    if parts[0] == ARGV[1] then
        local count = tonumber(parts[1])
        if count > 1 then
            -- 减少重入次数
            count = count - 1
            redis.call('set', KEYS[1], ARGV[1] .. ':' .. count)
        else
            -- 重入次数为0,删除锁
            redis.call('del', KEYS[1])
        end
        return true
    end
end
return false
// 尝试获取锁
String tryLockScript = "..."; // Lua脚本字符串
Object result = jedis.eval(tryLockScript, Collections.singletonList(lockKey), Arrays.asList(currentThreadId, "30"));

// 释放锁
String unlockScript = "..."; // Lua脚本字符串
jedis.eval(unlockScript, Collections.singletonList(lockKey), Collections.singletonList(currentThreadId));

使用redlock实现分布式锁

Config config1 = new Config();
config1.useSingleServer()
       .setAddress("redis://127.0.0.1:6379");

Config config2 = new Config();
config2.useSingleServer()
       .setAddress("redis://127.0.0.1:6380");

Config config3 = new Config();
config3.useSingleServer()
       .setAddress("redis://127.0.0.1:6381");

RedissonClient redissonClient1 = Redisson.create(config1);
RedissonClient redissonClient2 = Redisson.create(config2);
RedissonClient redissonClient3 = Redisson.create(config3);

RLock lock1 = redissonClient1.getLock("lockKey");
RLock lock2 = redissonClient2.getLock("lockKey");
RLock lock3 = redissonClient3.getLock("lockKey");

RedissonRedLock redLock = new RedissonRedLock(lock1, lock2, lock3);

boolean lockResult = redLock.tryLock(waitTime, leaseTime, TimeUnit.MILLISECONDS);

// 业务逻辑

if (lockResult) {
    try {
        // 业务逻辑
    } finally {
        redLock.unlock();
    }
} else {
    // 获取锁失败的处理逻辑
}

redissonClient1.shutdown();
redissonClient2.shutdown();
redissonClient3.shutdown();

使用redisson实现分布式锁

定义redission客户端

@Configuration
public class RedissonConfig {

    @Bean(destroyMethod="shutdown")
    public RedissonClient redisson() throws IOException {
        Config config = new Config();
        config.useSingleServer().setAddress("redis://127.0.0.1:6379");
        RedissonClient redisson = Redisson.create(config);
        return redisson;
    }
}

调用加锁(可重入锁)

@Service
public class LockTestService{
    @Autowired
    RedissonClient redisson;

    public void testLock(){
        RLock lock = redisson.getLock("myLock");
        try {
            lock.lock();
            // 设置锁的超时时间为30秒
            // lock.lock(30, TimeUnit.SECONDS);
            // 执行需要加锁的代码
        } finally {
            lock.unlock();
        }
    }
}

公平锁

RLock fairLock = redisson.getFairLock("anyLock");
fairLock.lock();

联锁

RLock lock1 = redissonInstance1.getLock("lock1");
RLock lock2 = redissonInstance2.getLock("lock2");
RLock lock3 = redissonInstance3.getLock("lock3");

RedissonMultiLock lock = new RedissonMultiLock(lock1, lock2, lock3);
// 同时加锁:lock1 lock2 lock3
// 所有的锁都上锁成功才算成功。
lock.lock();
...
lock.unlock();

可重入读写锁(允许同时有多个读锁和一个写锁对同一个资源进行加锁。)

RReadWriteLock rwlock = redisson.getReadWriteLock("anyRWLock");
// 最常见的使用方法
rwlock.readLock().lock();
// 或
rwlock.writeLock().lock();

使用redisson实现延迟队列

定义客户端

@Configuration
public class RedissonConfig {

    @Bean(destroyMethod="shutdown")
    public RedissonClient redisson() throws IOException {
        Config config = new Config();
        config.useSingleServer().setAddress("redis://127.0.0.1:6379");
        RedissonClient redisson = Redisson.create(config);
        return redisson;
    }
}

实现延迟队列

import org.redisson.api.RBlockingDeque;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.TimeUnit;

@Component
public class RedissonOrderDelayQueue {

    @Autowired
    RedissonClient redisson;

    public void addTaskToDelayQueue(String orderId) {

        RBlockingDeque<String> blockingDeque = redisson.getBlockingDeque("orderQueue");
        RDelayedQueue<String> delayedQueue = redisson.getDelayedQueue(blockingDeque);

        System.out.println(LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")) + "添加任务到延时队列里面");
        delayedQueue.offer(orderId, 3, TimeUnit.SECONDS);
        delayedQueue.offer(orderId, 6, TimeUnit.SECONDS);
        delayedQueue.offer(orderId, 9, TimeUnit.SECONDS);
    }

   public String getOrderFromDelayQueue() {
        RBlockingDeque<String> blockingDeque = redisson.getBlockingDeque("orderQueue");
        RDelayedQueue<String> delayedQueue = redisson.getDelayedQueue(blockingDeque);
        # 阻塞操作
        String orderId = blockingDeque.take();
        return orderId;
    }
}

实现滑动窗口限流

命令描述
ZCARD key获取有序集合的成员数
ZREMRANGEBYSCORE key min max移除有序集合中给定的分数区间的所有成员

主要步骤如下:

  1. 定义滑动窗口的时间范围,例如,窗口大小为60秒。
  2. 每次收到一个请求时,我们就定义出一个zset然后存储到redis中。
  3. 然后再通过ZREMRANGEBYSCORE命令来删除分值小于窗口起始时间戳(当前时间戳-60s)的数据。
  4. 最后,再使用ZCARD命令来获取有序集合中的成员数量,即在窗口内的请求量。
import redis.clients.jedis.Jedis;

public class SlidingWindowRateLimiter {
    private Jedis jedis;
    private String key;
    private int limit;

    public boolean allowRequest(String key) {
        //当前时间戳
        long currentTime = System.currentTimeMillis();
        //窗口开始时间是当前时间减60s
        long windowStart = currentTime - 60 * 1000;
        //删除窗口开始时间之前的所有数据
        jedis.zremrangeByScore(key, "-inf", String.valueOf(windowStart));
        //计算总请求数
        long currentRequests = jedis.zcard(key);
        //窗口足够则把当前请求加入
        if (currentRequests < limit) {
            jedis.zadd(key, currentTime, String.valueOf(currentTime));
            return true;
        }

        return false;
    }
}

通过lua脚本支持高并发

import redis.clients.jedis.Jedis;

public class SlidingWindowRateLimiter {
    private Jedis jedis;
    private String key;
    private int limit;

    public SlidingWindowRateLimiter(Jedis jedis, String key, int limit) {
        this.jedis = jedis;
        this.key = key;
        this.limit = limit;
    }

    public boolean allowRequest(String key) {
        // 当前时间戳
        long currentTime = System.currentTimeMillis();

        // 使用Lua脚本来确保原子性操作
        String luaScript = "local window_start = ARGV[1] - 60000\n" +
                           "redis.call('ZREMRANGEBYSCORE', KEYS[1], '-inf', window_start)\n" +
                           "local current_requests = redis.call('ZCARD', KEYS[1])\n" +
                           "if current_requests < tonumber(ARGV[2]) then\n" +
                           "    redis.call('ZADD', KEYS[1], ARGV[1], ARGV[1])\n" +
                           "    return 1\n" +
                           "else\n" +
                           "    return 0\n" +
                           "end";

        Object result = jedis.eval(luaScript, 1, key, String.valueOf(currentTime), String.valueOf(limit));

        return (Long) result == 1;
    }
}

jedis实现pipeline

import redis.clients.jedis.Jedis;
import redis.clients.jedis.Pipeline;

public class RedisPipelineExample {
    public static void main(String[] args) {
        // 连接到 Redis 服务器
        try (Jedis jedis = new Jedis("localhost", 6379)) {
            // 创建 Pipeline
            Pipeline pipeline = jedis.pipelined();

            // 向 Pipeline 添加命令
            pipeline.set("foo", "bar");
            pipeline.get("foo");
            pipeline.incr("counter");

            // 执行 Pipeline 中的所有命令,并获取响应
            List<Object> responses = pipeline.syncAndReturnAll();

            // 输出响应
            for (Object response : responses) {
                System.out.println(response);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

jedis实现乐观锁

基于CAS乐观锁,通过WATCH命令来实现

import redis.clients.jedis.Jedis;
import redis.clients.jedis.Transaction;

public class RedisOptimisticLock {
    public static void main(String[] args) {
        // 连接到 Redis
        Jedis jedis = new Jedis("localhost");
        try {
            // 监视键
            String key = "myKey";
            jedis.watch(key);

            // 模拟从数据库读取最新值
            String value = jedis.get(key);
            int intValue = Integer.parseInt(value);

            // 开始事务
            Transaction t = jedis.multi();

            // 在事务中执行操作
            t.set(key, String.valueOf(intValue + 1));

            // 尝试执行事务
            if (t.exec() == null) {
                System.out.println("事务执行失败,数据已被其他客户端修改");
            } else {
                System.out.println("事务执行成功");
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            jedis.close();
        }
    }
}

扫描所有的key

基于KEYS命令实现

import redis.clients.jedis.Jedis;

public class RedisKeysExample {
    public static void main(String[] args) {
        Jedis jedis = new Jedis("localhost");
        Set<String> keys = jedis.keys("*"); // 使用KEYS命令获取所有键
        for(String key : keys) {
            System.out.println(key);
        }
        jedis.close();
    }
}

基于SCANS命令实现

import redis.clients.jedis.Jedis;
import redis.clients.jedis.ScanParams;
import redis.clients.jedis.ScanResult;

public class RedisScanExample {
    public static void main(String[] args) {
        Jedis jedis = new Jedis("localhost");
        String cursor = ScanParams.SCAN_POINTER_START;
        ScanParams scanParams = new ScanParams().count(10);
        do {
            ScanResult<String> scanResult = jedis.scan(cursor, scanParams);
            cursor = scanResult.getCursor();
            scanResult.getResult().forEach(System.out::println);
        } while (!cursor.equals("0"));
        jedis.close();
    }
}

单例模式

懒汉

在需要时才会去创建对象

public class Singleton {
    private static Singleton instance;
    private Singleton (){}
    public static synchronized Singleton getInstance() {
    if (instance == null) {
        instance = new Singleton();
    }
    return instance;
    }
}

饿汉

在类刚一初始化的时候就立即把单例对象创建出来。

public class Singleton {
    private static Singleton instance = new Singleton();
    private Singleton (){}
    public static Singleton getInstance() {
        return instance;
    }
}
public class Singleton {
    private Singleton instance = null;
    static {
        instance = new Singleton();
    }
    private Singleton (){}
    public static Singleton getInstance() {
        return this.instance;
    }
}

静态内部类

public class Singleton {
    private static class SingletonHolder {
        private static final Singleton INSTANCE = new Singleton();
    }
    private Singleton (){}
    public static final Singleton getInstance() {
        return SingletonHolder.INSTANCE;
    }
}

枚举

public enum Singleton {
    INSTANCE;
    public void whateverMethod() {
    }
}

双重校验锁

public class Singleton {
    private volatile static Singleton singleton;
    private Singleton (){}
    public static Singleton getSingleton() {
    if (singleton == null) {
        synchronized (Singleton.class) {
        if (singleton == null) {
            singleton = new Singleton();
        }
        }
    }
    return singleton;
    }
}

CAS实现线程安全

public class Singleton {
    private static final AtomicReference<Singleton> INSTANCE = new AtomicReference<Singleton>();

    private Singleton() {}

    public static Singleton getInstance() {
        for (;;) {
            Singleton singleton = INSTANCE.get();
            if (null != singleton) {
                return singleton;
            }

            singleton = new Singleton();
            if (INSTANCE.compareAndSet(null, singleton)) {
                return singleton;
            }
        }
    }
}

破坏单例模式

反射破坏单例模式

Singleton singleton1 = Singleton.getSingleton();

//通过反射获取到构造函数
Constructor<Singleton> constructor = Singleton.class.getDeclaredConstructor();
//将构造函数设置为可访问类型
constructor.setAccessible(true);
//调用构造函数的newInstance创建一个对象
Singleton singleton2 = constructor.newInstance();
//判断反射创建的对象和之前的对象是不是同一个对象
System.out.println(s1 == s2);

setAccessible(true),使得反射对象在使用时应该取消 Java 语言访问检查,使得私有的构造函数能够被访问。

反序列化破坏单例

package com.tencent

import java.io.*;
public class SerializableDemo1 {
    //为了便于理解,忽略关闭流操作及删除文件操作。真正编码时千万不要忘记
    //Exception直接抛出
    public static void main(String[] args) throws IOException, ClassNotFoundException {
        //Write Obj to file
        ObjectOutputStream oos = new ObjectOutputStream(new FileOutputStream("tempFile"));
        oos.writeObject(Singleton.getSingleton());
        //Read Obj from file
        File file = new File("tempFile");
        ObjectInputStream ois =  new ObjectInputStream(new FileInputStream(file));
        Singleton newInstance = (Singleton) ois.readObject();
        //判断是否是同一个对象
        System.out.println(newInstance == Singleton.getSingleton());
    }
}

通过对Singleton的序列化与反序列化得到的对象是一个新的对象,这就破坏了Singleton的单例性。

序列化会通过Unsafe直接分配内存的方式来创建一个新的对象。

避免被破坏单例

避免被反射破坏

反射是调用默认的构造函数创建出来的,只需要我们改造下构造函数,使其在反射调用的时候识别出来对象是不是被创建过就行了

private Singleton() {
    if (singleton != null){
    throw new RuntimeException("单例对象只能创建一次... ");
}

避免反序列化破坏单例

解决反序列化的破坏单例,只需要我们自定义反序列化的策略就行了,就是说我们不要让他走默认逻辑一直调用到Unsafe创建对象,而是我们干预他的这个过程,干预方式就是在Singleton类中定义readResolve,这样就可以解决该问题

public class Singleton implements Serializable{
    private volatile static Singleton singleton;
    private Singleton (){}
    public static Singleton getSingleton() {
        if (singleton == null) {
            synchronized (Singleton.class) {
                if (singleton == null) {
                    singleton = new Singleton();
                }
            }
        }
        return singleton;
    }

    private Object readResolve() {
        return singleton;
    }
}

spring拿到代理对象

 TestService proxy = (TestService)AopContext.currentProxy();
 同时mainClass上需要开启注解
 @EnableAspectJAutoProxy(exposeProxy=True)

并发相关工具类

public class ConcurrentUtil {

    private static final Logger logger = LoggerFactory.getLogger(ConcurrentUtil.class);

    public static <T> void awaitComplete(List<Future<T>> futureList) {
        for (Future<T> future : futureList) {
            try {
                future.get();
            } catch (Exception e) {
                future.cancel(true);
                throw new OmsServerException(9999, "future get failed: " + e.getMessage());
            }
        }
    }

    public static <T> T awaitCompleteAndGet(Future<T> future) {
        return awaitCompleteAndGet(future, 0, null);
    }

    public static <T> T awaitCompleteAndGet(Future<T> future, int timeout, TimeUnit unit) {
        List<T> resultList = awaitCompleteAndGet(Arrays.asList(future), timeout, unit, true);
        if (CollectionUtils.isEmpty(resultList)) {
            return null;
        }
        return resultList.get(0);
    }

    public static <T> List<T> awaitCompleteAndGet(List<Future<T>> futureList) {
        return awaitCompleteAndGet(futureList, true);
    }

    public static <T> List<T> awaitCompleteAndGet(List<Future<T>> futureList, boolean throwException) {
        return awaitCompleteAndGet(futureList, 0L, null, throwException);
    }

    public static <T> List<T> awaitCompleteAndGet(List<Future<T>> futureList, long timeout, TimeUnit timeUnit,
            boolean throwException) {
        List<T> retList = new ArrayList<>();
        for (Future<T> future : futureList) {
            try {
                T ret = null;
                if (timeout <= 0 || null == timeUnit) {
                    ret = future.get();
                } else {
                    ret = future.get(timeout, timeUnit);
                }
                if (null != ret) {
                    retList.add(ret);
                } else {
                    logger.warn("future get result is null, maybe thread over head, auto ignore.");
                }
            } catch (Exception e) {
                logger.error("await complete failed, err msg is: [{}]", e.getMessage());
                future.cancel(true);
                if (throwException) {
                    throw new OmsServerException(9999, "await complete failed, err msg is: " + e.getMessage());
                }
            }
        }
        return retList;
    }
}