Redis实践(8)Redisson 简单实践

Redis
placeholder image
admin 发布于:2023-05-28 21:55:58
阅读:loading

Redisson真是的非常的高大上(深奥),除了对Redis有一些了解之外,对分布式也要有深刻理解,而且想要使用起来毫不费力还必须对Java有深厚的功底,特别是对JUC要有一定的掌握,本次也是兴趣所及花了一些业余时间来略挖一二,主要也是随着个人掌握技能的程度和兴趣所及,进行了简单的几个示例,详细如下。

1.集群节点

/**
 * 测试redis的基本信息
 *
 * @author chendd
 * @date 2023/5/21 22:30
 */
public class RedisContextBasicTest extends BaseTest {

    @Resource
    private Redisson redisson;

    @Test
    public void contextRedissonConfig() {
        Config config = this.redisson.getConfig();
        List<String> nodeAddresses = config.useClusterServers().getNodeAddresses();
        nodeAddresses.stream().forEach(System.out::println);
        System.out.println(JSONObject.toJSONString(config , true));
    }
}

输出结果:

image.png

2.闭锁(CountDownLatch)

闭锁对应的JDK中的CountDownLatch,可用于多线程时的线程递减,但JDK中提供的类库均只在一个应用进程中有效,本次编写一个实例,共计50个元素,模拟多个进程来共享消费这50个原始,这种实现将必须借助于分布式框架(关系型数据库也行),参考如下代码

package cn.chendd.redis;

import org.junit.Test;
import org.redisson.Redisson;
import org.redisson.api.RCountDownLatch;

import javax.annotation.Resource;
import java.util.concurrent.TimeUnit;

/**
 * 闭锁(CountDownLatch)可以多开客户端验证
 * @author chendd
 * @date 2023/5/23 17:50
 */
public class RedisCountDownLatchTest extends BaseTest {

    @Resource
    private Redisson redisson;

    @Test
    public void simpleApi() throws InterruptedException {
        RCountDownLatch countDownLatch = this.redisson.getCountDownLatch("anyCountDownLatch");
        System.out.println("释放存在:" + countDownLatch.isExists());
        int value = 50;
        boolean set = countDownLatch.trySetCount(value);
        System.out.println("尝试设置值:" + set);
        System.out.println("数量减少1");
        countDownLatch.countDown();
        System.out.println("当前总数:" + countDownLatch.getCount());

        //开循环 1 秒钟消费一个
        for (int i = 0; i < value; i++) {
            try {
                TimeUnit.SECONDS.sleep(1L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            countDownLatch.countDown();
            long count = countDownLatch.getCount();
            System.out.println("消费 1 个,还剩:" + count);
            if (count <= 0) {
                break;
            }
        }
        System.out.println("等待全部消费完毕,等待中...");
        countDownLatch.await();
        System.out.println("操作执行完毕");
    }

}

输出结果,下图分别开启了三个Junit的测试进程(启动时日志有进程ID),多个进程数据共享,另有原图较大有需要观看自行下载分布式闭锁完整图.gif》,下图为稍小一些的运行结果图:

分布式闭锁示例.gif

3.分布式锁(Lock)

尝试获取锁,若获取到锁则线程休眠15秒,模拟干活花费时间,在此时间范围内独占锁,又开启了其它两个进程来尝试获取锁,在线程未释放锁时其它进程的获取锁将无法获取到(在JDK中多个线程锁互斥,分布式锁是进程内),参考如下代码:

@Resource
private Redisson redisson;

@Test
public void tryLock() throws Exception {
    int pid = jvmPid();
    System.out.println(String.format("进程 %d 开始执行,时间:%s" , pid , getDateTime()));
    RLock lock = redisson.getLock("tryLock");
    boolean isLock = lock.tryLock();
    System.out.println(String.format("进程 %d 是否获取到锁:%b,时间:%s" , pid , isLock , getDateTime()));
    if (isLock) {
        try {
            System.out.println(String.format("进程 %d 开始幸苦干活 15 秒,时间:%s" , pid , getDateTime()));
            TimeUnit.SECONDS.sleep(15L);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
            System.out.println(String.format("进程 %d 活干完了准备释放,时间:%s" , pid , getDateTime()));
        }
    } else {
        System.out.println(String.format("进程 %d 未获取到锁,时间:%s" , pid , getDateTime()));
    }
}

输出结果,可以看到三个进程中只有进程ID为47656的获取到锁执行干活,参考如下图所示:

分布式锁.gif

大家都知道,如果负责储存这个分布式锁的Redis节点宕机以后,而且这个锁正好处于锁住的状态时,这个锁会出现锁死的状态。为了避免这种情况的发生,Redisson内部提供了一个监控锁的看门狗,它的作用是在Redisson实例被关闭前,不断的延长锁的有效期。默认情况下,看门狗的检查锁的超时时间是30秒钟,也可以通过修改Config.lockWatchdogTimeout来另行指定。

另外Redisson还通过加锁的方法提供了leaseTime的参数来指定加锁的时间。超过这个时间后锁便自动解开了。

关于锁的时间还有几个小例子懒得再写了,有趣的是模拟死锁的实现,除非获取到锁的机器宕机进程退出,否则将被视为锁是正常的锁。

4.信号量(Semaphore)

Java中Semaphore信号量是一个比较有用的东西,跟锁的区别在于它可以限制并发执行的数量,比如用于接口限流,限制一个接口并发为100,超过100后到达101时将被限制访问,若100以内有完活退出的线程,又可增加被访问的线程,本次示例模拟一共处理100个请求使用15个线程处理,限制最大同时处理10个并发(信号量设置为10),则同时只会有10个线程处于干活状态,本次开启3个进程,每个进程中处理相应的请求,能处理就干活,处理不了就歇息,参考如下代码所示:

/**
 * 信号量(Semaphore)简单测试,semaphore 持久化存储
 *
 * @author chendd
 * @date 2023/5/23 21:56
 */
public class RedisSemaphoreTest extends BaseTest {

    @Resource
    private Redisson redisson;

    @Test
    public void simpleApi() throws InterruptedException {
        //构造信号量
        RSemaphore semaphore = redisson.getSemaphore("semaphore");
        if (semaphore.isExists() && semaphore.availablePermits() == 0) {
            System.err.println("活已经全部分配完了...,干不了了,可以删除 [semaphore] 再试");
            ///semaphore.delete();
            return;
        }
        //尝试设置初始范围为 10,仅当不存在时设置
        boolean set = semaphore.trySetPermits(10);
        System.out.println("trySetPermits = " + set);
        ExecutorService executorService = Executors.newFixedThreadPool(15);
        AtomicInteger value = new AtomicInteger(0);
        //模拟100次请求,使用线程池进行消费
        for (int i = 1 ; i <= 100 ; i++) {
            executorService.submit(() -> {
                boolean consumer = semaphore.tryAcquire();
                if (consumer) {
                    try {
                        System.out.println(String.format("线程 %s 获取到令牌,干活中...%d" , Thread.currentThread().getName() , value.addAndGet(1)));
                        TimeUnit.SECONDS.sleep(1L);
                    } catch (Exception e) {
                        e.printStackTrace();
                    } finally {
                        semaphore.release();
                        System.out.println(String.format("线程 %s 获取到令牌,干完活...%d" , Thread.currentThread().getName() , value.addAndGet(1)));
                    }
                } else {
                    System.err.println(String.format("线程 %s 未获取到令牌,歇息中...%d" , Thread.currentThread().getName() , value.addAndGet(1)));
                }
            });
        }
        //主线程休眠 1 分钟
        TimeUnit.SECONDS.sleep(10L);
    }

}

运行示例参考如下图所示:

信号量.gif

另有分布式调度任务服务、分布式对象、分布式集合等等等等,非常多的技术点,就先搁置吧,有缘再深挖。


 点赞


 发表评论

当前回复:作者

 评论列表


留言区