秒杀优化 秒杀优化-异步秒杀思路 我们来回顾一下下单流程
当用户发起请求,此时会请求nginx,nginx会访问到tomcat,而tomcat中的程序,会进行串行操作,分成如下几个步骤
查询优惠卷
判断秒杀库存是否足够
查询订单
校验是否是一人一单
扣减库存
创建订单
在这六步操作中,又有很多操作是要去操作数据库的,而且还是一个线程串行执行, 这样就会导致我们的程序执行的很慢,所以我们需要异步程序执行,那么如何加速呢?
在这里笔者想给大家分享一下课程内没有的思路,看看有没有小伙伴这么想,
比如,我们可以不可以使用异步编排来做,或者说我开启N多线程,N多个线程,一个线程执行查询优惠卷,一个执行判断扣减库存,一个去创建订单等等,然后再统一做返回,
这种做法和课程中有哪种好呢?
答案是课程中的好,因为如果你采用我刚说的方式,如果访问的人很多,那么线程池中的线程可能一下子就被消耗完了,
而且你使用上述方案,最大的特点在于,你觉得时效性会非常重要 ,
但是你想想是吗?并不是,比如我只要确定他能做这件事,然后我后边慢慢做就可以了,我并不需要他一口气做完这件事,
所以我们应当采用的是课程中,类似消息队列的方式 来完成我们的需求,而不是使用线程池或者是异步编排的方式来完成这个需求
优化方案 :我们将耗时比较短的逻辑判断放入到redis中,
比如是否库存足够,比如是否一人一单,这样的操作,只要这种逻辑可以完成,就意味着我们是一定可以下单完成的,我们只需要进行快速的逻辑判断,根本就不用等下单逻辑走完,我们直接给用户返回成功,
再在后台开一个线程,后台线程慢慢的去执行queue里边的消息,这样程序不就超级快了吗?而且也不用担心线程池消耗殆尽的问题,因为这里我们的程序中并没有手动使用任何线程池,
当然这里边有两个难点
第一个难点是我们怎么在redis中去快速校验一人一单,还有库存判断
第二个难点是由于我们校验和tomct下单是两个线程,那么我们如何知道到底哪个单他最后是否成功,或者是下单完成,为了完成这件事我们在redis操作完之后,我们会将一些信息返回给前端,同时也会把这些信息丢到异步queue中去,后续操作中,可以通过这个id来查询我们tomcat中的下单逻辑是否完成了。
我们现在来看看整体思路:
当用户下单之后,判断库存是否充足只需要导redis中去根据key找对应的value是否大于0即可,
如果不充足,则直接结束,如果充足,继续在redis中判断用户是否可以下单,
如果set集合中没有这条数据,说明他可以下单,如果set集合中没有这条记录,则将userId和优惠卷存入到redis中,并且返回0,
整个过程需要保证是原子性的,我们可以使用lua来操作
当以上判断逻辑走完之后,我们可以判断当前redis中返回的结果是否是0 ,如果是0,则表示可以下单,则将之前说的信息存入到到queue中去,然后返回,然后再来个线程异步的下单,前端可以通过返回的订单id来判断是否下单成功。
秒杀优化-Redis完成秒杀资格判断
需求:
新增秒杀优惠券的同时,将优惠券信息保存到Redis中
基于Lua脚本,判断秒杀库存、一人一单,决定用户是否抢购成功
如果抢购成功,将优惠券id和用户id封装后存入阻塞队列
开启线程任务,不断从阻塞队列中获取信息,实现异步下单功能
新增秒杀优惠券的同时,将优惠券信息保存到Redis中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 @Override @Transactional public void addSeckillVoucher (Voucher voucher) { save(voucher); SeckillVoucher seckillVoucher = new SeckillVoucher (); seckillVoucher.setVoucherId(voucher.getId()); seckillVoucher.setStock(voucher.getStock()); seckillVoucher.setBeginTime(voucher.getBeginTime()); seckillVoucher.setEndTime(voucher.getEndTime()); seckillVoucherService.save(seckillVoucher); stringRedisTemplate.opsForValue().set(SECKILL_STOCK_KEY + voucher.getId(), voucher.getStock().toString()); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 local voucherId = ARGV[1 ]local userId = ARGV[2 ]local orderId = ARGV[3 ]local stockKey = 'seckill:stock:' .. voucherIdlocal orderKey = 'seckill:order:' .. voucherIdif (tonumber (redis.call('get' , stockKey)) <= 0 ) then return 1 end if (redis.call('sismember' , orderKey, userId) == 1 ) then return 2 end redis.call('incrby' , stockKey, -1 ) redis.call('sadd' , orderKey, userId) redis.call('xadd' , 'stream.orders' , '*' , 'userId' , userId, 'voucherId' , voucherId, 'id' , orderId) return 0
当以上lua表达式执行完毕后,剩下的就是根据步骤3,4来执行我们接下来的任务了
开启线程任务,不断从阻塞队列中获取信息,实现异步下单功能
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 @Override public Result seckillVoucher (Long voucherId) { Long userId = UserHolder.getUser().getId(); long orderId = redisIdWorker.nextId("order" ); Long result = stringRedisTemplate.execute( SECKILL_SCRIPT, Collections.emptyList(), voucherId.toString(), userId.toString(), String.valueOf(orderId) ); int r = result.intValue(); if (r != 0 ) { return Result.fail(r == 1 ? "库存不足" : "不能重复下单" ); } return Result.ok(orderId); }
秒杀优化-基于阻塞队列实现秒杀优化 VoucherOrderServiceImpl
修改下单动作,现在我们去下单时,是通过lua表达式去原子执行判断逻辑,
如果判断我出来不为0 ,则要么是库存不足,要么是重复下单,返回错误信息,
如果是0,则把下单的逻辑保存到队列中去,然后异步执行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 private BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue <>(1024 * 1024 );private ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();@PostConstruct private void init () { SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler ()); } private class VoucherOrderHandler implements Runnable { @Override public void run () { while (true ) { try { VoucherOrder voucherOrder = orderTasks.take(); handleVoucherOrder(voucherOrder); } catch (InterruptedException e) { log.error("处理订单异常" , e); } } } } private void handleVoucherOrder (VoucherOrder voucherOrder) { Long userId = voucherOrder.getUserId(); RLock lock = redissonClient.getLock("lock:order:" + userId); boolean isLock = lock.tryLock(); if (!isLock) { log.error("不允许重复下单" ); return ; } try { proxy.createVoucherOrder(voucherOrder); } finally { lock.unlock(); } } private IVoucherOrderService proxy;@Override public Result seckillVoucher (Long voucherId) { Long userId = UserHolder.getUser().getId(); Long result = stringRedisTemplate.execute( SECKILL_SCRIPT, Collections.emptyList(), voucherId.toString(), userId.toString() ); int r = result.intValue(); if (r != 0 ) { return Result.fail(r == 1 ? "库存不足" : "不能重复下单" ); } VoucherOrder voucherOrder = new VoucherOrder (); Long orderId = redisIdWorker.nextId("order" ); voucherOrder.setId(orderId); voucherOrder.setUserId(userId); voucherOrder.setVoucherId(voucherId); orderTasks.add(voucherOrder); proxy = (IVoucherOrderService) AopContext.currentProxy(); return Result.ok(orderId); } @Transactional public void createVoucherOrder (VoucherOrder voucherOrder) { Long userId = voucherOrder.getUserId(); Integer count = query().eq("user_id" , userId).eq("voucher_id" , voucherOrder.getVoucherId()).count(); if (count > 0 ) { log.error("用户已经购买过一次" ); return ; } boolean success = seckillVoucherService.update() .setSql("stock = stock - 1" ) .eq("voucher_id" , voucherOrder.getVoucherId()).gt("stock" , 0 ) .update(); if (!success) { log.error("库存不足!" ); return ; } save(voucherOrder); }
小总结:
秒杀业务的优化思路是什么?
先利用Redis完成库存余量、一人一单判断,完成抢单业务
再将下单业务放入阻塞队列,利用独立线程异步下单
基于阻塞队列的异步秒杀存在哪些问题?
Redis消息队列 Redis消息队列-认识消息队列 什么是消息队列 :字面意思就是存放消息的队列。最简单的消息队列模型包括3个角色:
消息队列:存储和管理消息,也被称为消息代理(Message Broker)
生产者:发送消息到消息队列
消费者:从消息队列获取消息并处理消息
Redis 提供了三种不同的方式来实现消息队列:
list 结构:基于List 结构模拟消息队列
PubSub:基本的点对点消息模型
Stream:比较完善的消息队列模型
使用队列的好处在于 解耦: 所谓解耦,
举一个生活中的例子就是:快递员(生产者)把快递放到快递柜里边(Message Queue)去,
我们(消费者)从快递柜里边去拿东西,这就是一个异步,
如果耦合,那么这个快递员相当于直接把快递交给你,这事固然好,
但是万一你不在家,那么快递员就会一直等你,这就浪费了快递员的时间,所以这种思想在我们日常开发中,是非常有必要的。
这种场景在我们秒杀中就变成了:
我们下单之后,利用redis去进行校验下单条件,再通过队列把消息发送出去,
然后再启动一个线程去消费这个消息,完成解耦,同时也加快我们的响应速度。
这里我们可以使用一些现成的mq,比如kafka,rabbitmq等等,但是呢,如果没有安装mq,我们也可以直接使用redis提供的mq方案,降低我们的部署和学习成本。
Redis消息队列-基于List实现消息队列 基于List结构模拟消息队列
消息队列(Message Queue),字面意思就是存放消息的队列。
而Redis的list数据结构是一个双向链表,很容易模拟出队列效果。
队列是入口和出口不在一边,
因此我们可以利用:LPUSH 结合 RPOP、或者 RPUSH 结合 LPOP来实现。
不过要注意的是,当队列中没有消息时RPOP或LPOP操作会返回null,并不像JVM的阻塞队列那样会阻塞并等待消息。因此这里应该使用BRPOP或者BLPOP来实现阻塞效果 。
基于List的消息队列有哪些优缺点? 优点:
利用Redis存储,不受限于JVM内存上限
基于Redis的持久化机制,数据安全性有保证
可以满足消息有序性
缺点:
Redis消息队列-基于PubSub的消息队列 PubSub(发布订阅)是Redis2.0版本引入的消息传递模型。
基于PubSub的消息队列有哪些优缺点? 优点:
缺点:
不支持数据持久化
无法避免消息丢失
消息堆积有上限,超出时数据丢失
Redis消息队列-基于Stream的消息队列
Stream 是 Redis 5.0 引入的一种新数据类型,可以实现一个功能非常完善的消息队列。
发送消息的命令:XADD
例如:
读取消息的方式之一:XREAD
例如,使用XREAD读取第一个消息:
XREAD阻塞方式,读取最新的消息:
在业务开发中,我们可以循环的调用XREAD阻塞方式来查询最新消息,从而实现持续监听队列的效果,伪代码如下
注意:当我们指定起始ID为$时,代表读取最新的消息,如果我们处理一条消息的过程中,又有超过1条以上的消息到达队列,则下次获取时也只能获取到最新的一条,会出现漏读消息的问题
STREAM类型消息队列的XREAD命令特点:
消息可回溯
一个消息可以被多个消费者读取
可以阻塞读取
有消息漏读的风险
Redis消息队列-基于Stream的消息队列-消费者组
消费者组(Consumer Group):将多个消费者划分到一个组中,监听同一个队列。具备下列特点:
消息分流:队列中的消息会分流
给组内的不同消费者,而不是重复消费,从而加快消息处理的速度
消息标示:消费者会维护一个标示
,记录最后一个
被处理的消息,哪怕消费者宕机重启,还会从标示之后读取消息
。确保每一个消息都会被消费。
消息确认:消费者获取消息后,消息处于pending状态
,并存入一个pending-list
。当处理完成后需要通过XACK来确认消息
,标记消息为已处理,才会从pending-list移除
。
创建消费者组:XGROUP CREATE key groupName ID [MKSTREAM]
key:队列名称
groupName:消费者组名称
ID:起始ID标示,$代表队列中最后一个消息,0则代表队列中第一个消息
MKSTREAM:队列不存在时自动创建队列
其它常见命令:
删除指定的消费者组
1 XGROUP DESTORY key groupName
给指定的消费者组添加消费者
1 XGROUP CREATECONSUMER key groupname consumername
删除消费者组中的指定消费者
1 XGROUP DELCONSUMER key groupname consumername
从消费者组读取消息 :
1 XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
group:消费组名称
consumer:消费者名称,如果消费者不存在,会自动创建一个消费者
count:本次查询的最大数量
BLOCK milliseconds:当没有消息时最长等待时间
NOACK:无需手动ACK,获取到消息后自动确认
STREAMS key:指定队列名称
ID:获取消息的起始ID:
“>”:从下一个未消费的消息开始
其它:根据指定id从pending-list中获取已消费但未确认的消息,例如0,是从pending-list中的第一个消息开始
消费者监听消息的基本思路:
STREAM类型消息队列的XREADGROUP命令特点:
消息可回溯
可以多消费者争抢消息,加快消费速度
可以阻塞读取
没有消息漏读的风险
有消息确认机制,保证消息至少被消费一次
最后我们来个小对比
基于Redis的Stream结构作为消息队列,实现异步秒杀下单
需求:
创建一个Stream类型的消息队列,名为stream.orders
修改之前的秒杀下单Lua脚本,在认定有抢购资格后,直接向stream.orders中添加消息,内容包含voucherId、userId、orderId
项目启动时,开启一个线程任务,尝试获取stream.orders中的消息,完成下单\
修改lua表达式,新增3.6
1 2 3 4 redis.call('sadd' , orderKey, userId) redis.call('xadd' , 'stream.orders' , '*' , 'userId' , userId, 'voucherId' , voucherId, 'id' , orderId)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 local voucherId = ARGV[1 ]local userId = ARGV[2 ]local orderId = ARGV[3 ]local stockKey = 'seckill:stock:' .. voucherIdlocal orderKey = 'seckill:order:' .. voucherIdif (tonumber (redis.call('get' , stockKey)) <= 0 ) then return 1 end if (redis.call('sismember' , orderKey, userId) == 1 ) then return 2 end redis.call('incrby' , stockKey, -1 ) redis.call('sadd' , orderKey, userId) redis.call('xadd' , 'stream.orders' , '*' , 'userId' , userId, 'voucherId' , voucherId, 'id' , orderId) return 0
项目启动时,开启一个线程任务,尝试获取stream.orders中的消息,完成下单
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();@PostConstruct private void init () { SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler ()); } String queueName = "stream.orders" ;private class VoucherOrderHandler implements Runnable { @Override public void run () { while (true ) { try { List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read( Consumer.from("g1" , "c1" ), StreamReadOptions.empty().count(1 ).block(Duration.ofSeconds(2 )), StreamOffset.create(queueName, ReadOffset.lastConsumed()) ); if (list == null || list.isEmpty()) { continue ; } MapRecord<String, Object, Object> record = list.get(0 ); log.warn("mapRecord = " + record); Map<Object, Object> values = record.getValue(); VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder (), true ); handleVoucherOrder(voucherOrder); stringRedisTemplate.opsForStream().acknowledge(queueName, "g1" , record.getId()); } catch (Exception e) { log.error("处理订单异常" , e); handlePendingList(); } } } } private void handlePendingList () { while (true ) { try { List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read( Consumer.from("g1" , "c1" ), StreamReadOptions.empty().count(1 ), StreamOffset.create(queueName, ReadOffset.from("0" )) ); if (list == null || list.isEmpty()) { break ; } MapRecord<String, Object, Object> record = list.get(0 ); log.warn("mapRecord = " + record); Map<Object, Object> values = record.getValue(); VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder (), true ); handleVoucherOrder(voucherOrder); stringRedisTemplate.opsForStream().acknowledge(queueName, "g1" , record.getId()); } catch (Exception e) { log.error("处理pending-list异常" , e); try { Thread.sleep(20 ); } catch (InterruptedException ex) { throw new RuntimeException (ex); } } } } private void handleVoucherOrder (VoucherOrder voucherOrder) { Long userId = voucherOrder.getUserId(); RLock lock = redissonClient.getLock("lock:order:" + userId); boolean isLock = lock.tryLock(); if (!isLock) { log.error("不允许重复下单" ); return ; } try { proxy.createVoucherOrder(voucherOrder); } finally { lock.unlock(); } } private IVoucherOrderService proxy;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 @Service @Slf4j public class VoucherOrderServiceImpl extends ServiceImpl <VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService { @Resource private ISeckillVoucherService seckillVoucherService; @Resource private RedisIdWorker redisIdWorker; @Resource private StringRedisTemplate stringRedisTemplate; @Resource private RedissonClient redissonClient; private static final DefaultRedisScript<Long> SECKILL_SCRIPT; static { SECKILL_SCRIPT = new DefaultRedisScript <>(); SECKILL_SCRIPT.setLocation(new ClassPathResource ("seckill.lua" )); SECKILL_SCRIPT.setResultType(Long.class); } private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor(); @PostConstruct private void init () { SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler ()); } String queueName = "stream.orders" ; private class VoucherOrderHandler implements Runnable { @Override public void run () { while (true ) { try { List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read( Consumer.from("g1" , "c1" ), StreamReadOptions.empty().count(1 ).block(Duration.ofSeconds(2 )), StreamOffset.create(queueName, ReadOffset.lastConsumed()) ); if (list == null || list.isEmpty()) { continue ; } MapRecord<String, Object, Object> record = list.get(0 ); log.warn("mapRecord = " + record); Map<Object, Object> values = record.getValue(); VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder (), true ); handleVoucherOrder(voucherOrder); stringRedisTemplate.opsForStream().acknowledge(queueName, "g1" , record.getId()); } catch (Exception e) { log.error("处理订单异常" , e); handlePendingList(); } } } } private void handlePendingList () { while (true ) { try { List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read( Consumer.from("g1" , "c1" ), StreamReadOptions.empty().count(1 ), StreamOffset.create(queueName, ReadOffset.from("0" )) ); if (list == null || list.isEmpty()) { break ; } MapRecord<String, Object, Object> record = list.get(0 ); log.warn("mapRecord = " + record); Map<Object, Object> values = record.getValue(); VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder (), true ); handleVoucherOrder(voucherOrder); stringRedisTemplate.opsForStream().acknowledge(queueName, "g1" , record.getId()); } catch (Exception e) { log.error("处理pending-list异常" , e); try { Thread.sleep(20 ); } catch (InterruptedException ex) { throw new RuntimeException (ex); } } } } private void handleVoucherOrder (VoucherOrder voucherOrder) { Long userId = voucherOrder.getUserId(); RLock lock = redissonClient.getLock("lock:order:" + userId); boolean isLock = lock.tryLock(); if (!isLock) { log.error("不允许重复下单" ); return ; } try { proxy.createVoucherOrder(voucherOrder); } finally { lock.unlock(); } } private IVoucherOrderService proxy; @Override public Result seckillVoucher (Long voucherId) { Long userId = UserHolder.getUser().getId(); long orderId = redisIdWorker.nextId("order" ); Long result = stringRedisTemplate.execute( SECKILL_SCRIPT, Collections.emptyList(), voucherId.toString(), userId.toString(), String.valueOf(orderId) ); int r = result.intValue(); if (r != 0 ) { return Result.fail(r == 1 ? "库存不足" : "不能重复下单" ); } proxy = (IVoucherOrderService) AopContext.currentProxy(); return Result.ok(orderId); } @Transactional public void createVoucherOrder (VoucherOrder voucherOrder) { Long userId = voucherOrder.getUserId(); Integer count = query().eq("user_id" , userId).eq("voucher_id" , voucherOrder.getVoucherId()).count(); if (count > 0 ) { log.error("用户已经购买过一次" ); return ; } boolean success = seckillVoucherService.update() .setSql("stock = stock - 1" ) .eq("voucher_id" , voucherOrder.getVoucherId()).gt("stock" , 0 ) .update(); if (!success) { log.error("库存不足!" ); return ; } save(voucherOrder); } }
达人探店 达人探店-发布探店笔记 发布探店笔记
探店笔记类似点评网站的评价,往往是图文结合。对应的表有两个:
tb_blog:探店笔记表,包含笔记中的标题、文字、图片等
tb_blog_comments:其他用户对探店笔记的评价
具体发布流程
上传接口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 @Slf4j @RestController @RequestMapping("upload") public class UploadController { @PostMapping("blog") public Result uploadImage (@RequestParam("file") MultipartFile image) { try { String originalFilename = image.getOriginalFilename(); String fileName = createNewFileName(originalFilename); image.transferTo(new File (SystemConstants.IMAGE_UPLOAD_DIR, fileName)); log.debug("文件上传成功,{}" , fileName); return Result.ok(fileName); } catch (IOException e) { throw new RuntimeException ("文件上传失败" , e); } } }
注意:同学们在操作时,需要修改SystemConstants.IMAGE_UPLOAD_DIR 自己图片所在的地址,在实际开发中图片一般会放在nginx上或者是云存储上。
BlogController
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 @RestController @RequestMapping("/blog") public class BlogController { @Resource private IBlogService blogService; @PostMapping public Result saveBlog (@RequestBody Blog blog) { UserDTO user = UserHolder.getUser(); blog.setUpdateTime(user.getId()); blogService.saveBlog(blog); return Result.ok(blog.getId()); } }
达人探店-查看探店笔记 实现查看发布探店笔记的接口
实现代码:
BlogServiceImpl
1 2 3 4 5 6 7 8 9 10 11 12 @Override public Result queryBlogById (Long id) { Blog blog = getById(id); if (blog == null ) { return Result.fail("笔记不存在!" ); } queryBlogUser(blog); return Result.ok(blog); }
达人探店-点赞功能 初始代码
1 2 3 4 5 6 @GetMapping("/likes/{id}") public Result queryBlogLikes (@PathVariable("id") Long id) { blogService.update().setSql("liked = liked +1 " ).eq("id" ,id).update(); return Result.ok(); }
问题分析:这种方式会导致一个用户无限点赞,明显是不合理的
造成这个问题的原因是,我们现在的逻辑,发起请求只是给数据库+1,所以才会出现这个问题
完善点赞功能
需求:
同一个用户只能点赞一次,再次点击则取消点赞
如果当前用户已经点赞,则点赞按钮高亮显示(前端已实现,判断字段Blog类的isLike属性)
实现步骤:
给Blog类中添加一个isLike字段
,标示是否被当前用户点赞
修改点赞功能,利用Redis的set集合
判断是否点赞过,未点赞过则点赞数+1
,已点赞过则点赞数-1
修改根据id查询Blog的业务,判断当前登录用户是否点赞过,赋值给isLike字段
修改分页查询Blog业务,判断当前登录用户是否点赞
过,赋值给isLike字段
为什么采用set集合:
因为我们的数据是不能重复的,当用户操作过之后,无论他怎么操作,都是
具体步骤:
在Blog 添加一个字段
1 2 @TableField(exist = false) private Boolean isLike;
修改代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 @Override public Result likeBlog (Long id) { Long userId = UserHolder.getUser().getId(); String key = "blog:liked:" + id; Boolean isMember = stringRedisTemplate.opsForSet().isMember(key, userId.toString()); if (BooleanUtil.isFalse(isMember)) { boolean isSuccess = update().setSql("liked = liked + 1" ).eq("id" , id).update(); if (isSuccess) { stringRedisTemplate.opsForSet().add(key, userId.toString()); } } else { boolean isSuccess = update().setSql("liked = liked - 1" ).eq("id" , id).update(); if (isSuccess) { stringRedisTemplate.opsForSet().remove(key, userId.toString()); } } return Result.ok(); }
修改完毕之后,页面上还不能立即显示点赞完毕的后果,我们还需要修改查询Blog业务,判断Blog是否被当前用户点赞过
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 @Override public Result queryBlogById (Long id) { Blog blog = getById(id); if (blog == null ) { return Result.fail("笔记不存在" ); } queryBlogByUser(blog); isBlogLiked(blog); return Result.ok(blog); } private void isBlogLiked (Blog blog) { Long userId = UserHolder.getUser().getId(); String key = "blog:liked:" + blog.getId(); Boolean isMember = stringRedisTemplate.opsForSet().isMember(key, userId.toString()); blog.setIsLike(BooleanUtil.isTrue(isMember)); } private void queryBlogByUser (Blog blog) { Long userId = blog.getUserId(); User user = userService.getById(userId); blog.setName(user.getNickName()); blog.setIcon(user.getIcon()); } @Override public Result queryHotBlog (Integer current) { Page<Blog> page = query() .orderByDesc("liked" ) .page(new Page <>(current, SystemConstants.MAX_PAGE_SIZE)); List<Blog> records = page.getRecords(); records.forEach(blog -> { this .queryBlogByUser(blog); this .isBlogLiked(blog); }); return Result.ok(records); }
达人探店-点赞排行榜
在探店笔记的详情页面,应该把给该笔记点赞的人显示出来,比如最早点赞的TOP5,形成点赞排行榜:
之前的点赞是放到set集合,但是set集合是不能排序的,所以这个时候,咱们可以采用一个可以排序的set集合,就是咱们的sortedSet
我们接下来来对比一下这些集合的区别是什么
修改代码
点赞逻辑代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 private void isBlogLiked (Blog blog) { UserDTO user = UserHolder.getUser(); if (user == null ) { return ; } Long userId = user.getId(); String key = BLOG_LIKED_KEY + blog.getId(); Double score = stringRedisTemplate.opsForZSet().score(key, userId.toString()); blog.setIsLike(score != null ); } @Override public Result likeBlog (Long id) { Long userId = UserHolder.getUser().getId(); String key = BLOG_LIKED_KEY + id; Double score = stringRedisTemplate.opsForZSet().score(key, userId.toString()); if (score == null ) { boolean isSuccess = update().setSql("liked = liked + 1" ).eq("id" , id).update(); if (isSuccess) { stringRedisTemplate.opsForZSet().add(key, userId.toString(), System.currentTimeMillis()); } } else { boolean isSuccess = update().setSql("liked = liked - 1" ).eq("id" , id).update(); if (isSuccess) { stringRedisTemplate.opsForZSet().remove(key, userId.toString()); } } return Result.ok(); }
点赞列表查询列表
BlogController
1 2 3 4 @GetMapping("/likes/{id}") public Result queryBlogLikes (@PathVariable("id") Long id) { return blogService.queryBlogLikes(id); }
BlogService
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 @Override public Result queryBlogLikes (Long id) { String key = BLOG_LIKED_KEY + id; Set<String> top5 = stringRedisTemplate.opsForZSet().range(key, 0 , 4 ); if (top5 == null || top5.isEmpty()) { return Result.ok(Collections.emptyList()); } List<Long> ids = top5.stream().map(Long::valueOf).collect(Collectors.toList()); String idStr = StrUtil.join("," , ids); List<UserDTO> userDTOS = userService.query() .in("id" , ids).last("ORDER BY FIELD(id," + idStr + ")" ).list() .stream() .map(user -> BeanUtil.copyProperties(user, UserDTO.class)) .collect(Collectors.toList()); return Result.ok(userDTOS); }
好友关注 好友关注-关注和取消关注 针对用户的操作:可以对用户进行关注和取消关注功能。
实现思路:
需求:基于该表数据结构,实现两个接口:
关注是User之间的关系,是博主与粉丝的关系,数据库中有一张tb_follow表来标示:
1 2 3 4 5 6 7 CREATE TABLE `tb_follow` ( `id` bigint NOT NULL AUTO_INCREMENT COMMENT '主键', `user_id` bigint unsigned NOT NULL COMMENT '用户id', `follow_user_id` bigint unsigned NOT NULL COMMENT '关联的用户id', `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', PRIMARY KEY (`id`) USING BTREE ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci ROW_FORMAT=COMPACT;
注意: 这里需要把主键修改为自增长,简化开发。
FollowController
1 2 3 4 5 6 7 8 9 10 @PutMapping("/{id}/{isFollow}") public Result follow (@PathVariable("id") Long followUserId, @PathVariable("isFollow") Boolean isFollow) { return followService.follow(followUserId, isFollow); } @GetMapping("/or/not/{id}") public Result isFollow (@PathVariable("id") Long followUserId) { return followService.isFollow(followUserId); }
FollowService
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 @Override public Result follow (Long followUserId, Boolean isFollow) { Long userId = UserHolder.getUser().getId(); if (isFollow) { Follow follow = new Follow (); follow.setUserId(userId); follow.setFollowUserId(followUserId); save(follow); } else { remove(new QueryWrapper <Follow>().eq("user_id" , userId) .eq("follow_user_id" , followUserId)); } return Result.ok(); } @Override public Result isFollow (Long followUserId) { Long userId = UserHolder.getUser().getId(); Integer count = query().eq("user_id" , userId).eq("follow_user_id" , followUserId).count(); return Result.ok(count > 0 ); }
好友关注-共同关注 想要去看共同关注的好友,需要首先进入到这个页面,这个页面会发起两个请求
去查询用户的详情
去查询用户的笔记
以上两个功能和共同关注没有什么关系,大家可以自行将笔记中的代码拷贝到idea中就可以实现这两个功能了,我们的重点在于共同关注功能。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 @GetMapping("/{id}") public Result queryUserById (@PathVariable("id") Long userId) { User user = userService.getById(userId); if (user == null ) { return Result.ok(); } UserDTO userDTO = BeanUtil.copyProperties(user, UserDTO.class); return Result.ok(userDTO); } @GetMapping("/of/user") public Result queryBlogByUserId ( @RequestParam(value = "current", defaultValue = "1") Integer current, @RequestParam("id") Long id) { Page<Blog> page = blogService.query() .eq("user_id" , id).page(new Page <>(current, SystemConstants.MAX_PAGE_SIZE)); List<Blog> records = page.getRecords(); return Result.ok(records); }
接下来我们来看看共同关注如何实现:
需求:利用Redis中恰当的数据结构,实现共同关注功能。在博主个人页面展示出当前用户与博主的共同关注呢。
当然是使用我们之前学习过的set集合咯,
在set集合中,有交集并集补集的api,我们可以把两人的关注的人分别放入到一个set集合中,然后再通过api去查看这两个set集合中的交集数据。
我们先来改造当前的关注列表
改造原因是因为我们需要在用户关注了某位用户后,需要将数据放入到set集合中,方便后续进行共同关注,同时当取消关注时,也需要从set集合中进行删除
FollowServiceImpl
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 @Override public Result follow (Long followUserId, Boolean isFollow) { Long userId = UserHolder.getUser().getId(); String key = "follows:" + userId; if (isFollow) { Follow follow = new Follow (); follow.setUserId(userId); follow.setFollowUserId(followUserId); boolean isSuccess = save(follow); if (isSuccess) { stringRedisTemplate.opsForSet().add(key, followUserId.toString()); } } else { boolean isSuccess = remove(new QueryWrapper <Follow>() .eq("user_id" , userId).eq("follow_user_id" , followUserId)); if (isSuccess) { stringRedisTemplate.opsForSet().remove(key, followUserId.toString()); } } return Result.ok(); }
具体的关注代码:
FollowServiceImpl
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 @Override public Result followCommons (Long id) { Long userId = UserHolder.getUser().getId(); String key = "follows:" + userId; String key2 = "follows:" + id; Set<String> intersect = stringRedisTemplate.opsForSet().intersect(key, key2); if (intersect == null || intersect.isEmpty()) { return Result.ok(Collections.emptyList()); } List<Long> ids = intersect.stream().map(Long::valueOf).collect(Collectors.toList()); List<UserDTO> users = userService.listByIds(ids) .stream() .map(user -> BeanUtil.copyProperties(user, UserDTO.class)) .collect(Collectors.toList()); return Result.ok(users); }
好友关注-Feed流实现方案 当我们关注了用户后,这个用户发了动态,那么我们应该把这些数据推送给用户,
这个需求,其实我们又把他叫做Feed流,关注推送也叫做Feed流,直译为投喂。
为用户持续的提供“沉浸式”的体验,通过无限下拉刷新获取新的信息。
对于传统的模式的内容解锁:我们是需要用户去通过搜索引擎或者是其他的方式去解锁想要看的内容
对于新型的Feed流的的效果:不需要我们用户再去推送信息,而是系统分析用户到底想要什么,然后直接把内容推送给用户,从而使用户能够更加的节约时间,不用主动去寻找。
Feed流的实现有两种模式:
Feed流产品有两种常见模式:
Timeline:不做内容筛选,简单的按照内容发布时间排序,常用于好友或关注。例如朋友圈
智能排序:利用智能算法屏蔽掉违规的、用户不感兴趣的内容。推送用户感兴趣信息来吸引用户
优点:投喂用户感兴趣信息,用户粘度很高,容易沉迷
缺点:如果算法不精准,可能起到反作用
本例中的个人页面,是基于关注的好友来做Feed流,因此采用Timeline的模式。该模式的实现方案有三种:
我们本次针对好友的操作,采用的就是Timeline
的方式,只需要拿到我们关注用户的信息,然后按照时间排序即可,因此采用Timeline的模式。该模式的实现方案有三种:
拉模式 :也叫做读扩散
该模式的核心含义就是:当张三和李四和王五发了消息后,都会保存在自己的邮箱中,
假设赵六要读取信息,那么他会从读取他自己的收件箱,此时系统会从他关注的人群中,把他关注人的信息全部都进行拉取,然后在进行排序
优点:比较节约空间,因为赵六在读信息时,并没有重复读取,而且读取完之后可以把他的收件箱进行清楚。
缺点:比较延迟,当用户读取数据时才去关注的人里边去读取数据,假设用户关注了大量的用户,那么此时就会拉取海量的内容,对服务器压力巨大。
推模式 :也叫做写扩散。
推模式是没有写邮箱的,当张三写了一个内容,此时会主动的把张三写的内容发送到他的粉丝收件箱中去,
推拉结合模式 :也叫做读写混合,兼具推和拉两种模式的优点。
推拉模式是一个折中的方案,站在发件人这一段,
如果是个普通的人,那么我们采用写扩散的方式,直接把数据写入到他的粉丝中去,因为普通的人他的粉丝关注量比较小,所以这样做没有压力,
如果是大V,那么他是直接将数据先写入到一份到发件箱里边去,然后再直接写一份到活跃粉丝收件箱里边去,
现在站在收件人这端来看,如果是活跃粉丝,那么大V和普通的人发的都会直接写入到自己收件箱里边来,而如果是普通的粉丝,由于他们上线不是很频繁,所以等他们上线时,再从发件箱里边去拉信息。
三种实现方案对比 :
拉模式
推模式
推拉结合
写比例
低
高
中
读比例
高
低
中
用户读取延迟
高
低
低
实现难度
复杂
简单
很复杂
使用场景
很少使用
用户量少、没有大V
过千万的用户量,有大V
好友关注-推送到粉丝收件箱
需求:
修改新增探店笔记的业务,在保存blog到数据库的同时,推送到粉丝的收件箱
收件箱满足可以根据时间戳排序,必须用Redis的数据结构实现
查询收件箱数据时,可以实现分页查询
Feed流中的数据会不断更新,所以数据的角标也在变化,因此不能采用传统的分页模式。
传统了分页在feed流是不适用的,因为我们的数据会随时发生变化
假设在t1 时刻,我们去读取第一页,此时page = 1 ,size = 5
,那么我们拿到的就是10~6
这几条记录,
假设现在t2
时候又发布了一条记录,此时t3
时刻,我们来读取第二页,读取第二页传入的参数是page=2
,size=5
,
那么此时读取到的第二页实际上是从6
开始,然后是6~2
,那么我们就读取到了重复的数据,
所以feed流的分页,不能采用原始方案来做。
Feed流的滚动分页
我们需要记录每次操作的最后一条,然后从这个位置开始去读取数据
举个例子 :我们从t1
时刻开始,拿第一页数据,拿到了10~6
,然后记录下当前最后一次拿取的记录,就是6,
t2
时刻发布了新的记录,此时这个11
放到最顶上,但是不会影响我们之前记录的6
,
此时t3
时刻来拿第二页,第二页这个时候拿数据,还是从6
后一点的5
去拿,就拿到了5-1
的记录。
我们这个地方可以采用sortedSet
来做,可以进行范围查询,并且还可以记录当前获取数据时间戳最小值,就可以实现滚动分页了
核心的意思:就是我们在保存完探店笔记后,获得到当前笔记的粉丝,然后把数据推送到粉丝的redis中去。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 @Override public Result saveBlog (Blog blog) { UserDTO user = UserHolder.getUser(); blog.setUserId(user.getId()); boolean isSuccess = save(blog); if (!isSuccess){ return Result.fail("新增笔记失败!" ); } List<Follow> follows = followService.query().eq("follow_user_id" , user.getId()).list(); for (Follow follow : follows) { Long userId = follow.getUserId(); String key = FEED_KEY + userId; stringRedisTemplate.opsForZSet().add(key, blog.getId().toString(), System.currentTimeMillis()); } return Result.ok(blog.getId()); }
好友关注-实现分页查询收邮箱
需求:在个人主页的“关注”卡片中,查询并展示推送的Blog信息:
具体操作如下:
每次查询完成后,我们要分析出查询出数据的最小时间戳,这个值会作为下一次查询的条件
我们需要找到与上一次查询相同的查询个数作为偏移量,下次查询时,跳过这些查询过的数据,拿到我们需要的数据
综上:我们的请求参数中就需要携带 lastId
:上一次查询的最小时间戳
和偏移量
这两个参数。
这两个参数第一次会由前端来指定,以后的查询就根据后台结果作为条件,再次传递到后台。
定义出来具体的返回值实体类
1 2 3 4 5 6 @Data public class ScrollResult { private List<?> list; private Long minTime; private Integer offset; }
BlogController
注意:RequestParam 表示接受url地址栏传参的注解,当方法上参数的名称和url地址栏不相同时,可以通过RequestParam 来进行指定
1 2 3 4 5 @GetMapping("/of/follow") public Result queryBlogOfFollow ( @RequestParam("lastId") Long max, @RequestParam(value = "offset", defaultValue = "0") Integer offset) { return blogService.queryBlogOfFollow(max, offset); }
BlogServiceImpl
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 @Override public Result queryBlogOfFollow (Long max, Integer offset) { Long userId = UserHolder.getUser().getId(); String key = FEED_KEY + userId; Set<ZSetOperations.TypedTuple<String>> typedTuples = stringRedisTemplate.opsForZSet() .reverseRangeByScoreWithScores(key, 0 , max, offset, 2 ); if (typedTuples == null || typedTuples.isEmpty()) { return Result.ok(); } List<Long> ids = new ArrayList <>(typedTuples.size()); long minTime = 0 ; int os = 1 ; for (ZSetOperations.TypedTuple<String> tuple : typedTuples) { ids.add(Long.valueOf(tuple.getValue())); long time = tuple.getScore().longValue(); if (time == minTime){ os++; }else { minTime = time; os = 1 ; } } os = minTime == max ? os : os + offset; String idStr = StrUtil.join("," , ids); List<Blog> blogs = query().in("id" , ids).last("ORDER BY FIELD(id," + idStr + ")" ).list(); for (Blog blog : blogs) { queryBlogUser(blog); isBlogLiked(blog); } ScrollResult r = new ScrollResult (); r.setList(blogs); r.setOffset(os); r.setMinTime(minTime); return Result.ok(r); }
附近商户 附近商户-GEO数据结构的基本用法 GEO就是Geolocation的简写形式,代表地理坐标。Redis在3.2版本中加入了对GEO的支持,允许存储地理坐标信息,帮助我们根据经纬度来检索数据。常见的命令有:
GEOADD
:添加一个地理空间信息,包含:经度(longitude)、纬度(latitude)、值(member)
GEODIST
:计算指定的两个点之间的距离并返回
GEOHASH
:将指定member的坐标转为hash字符串形式并返回
GEOPOS
:返回指定member的坐标
GEORADIUS
:指定圆心、半径,找到该圆内包含的所有member,并按照与圆心之间的距离排序后返回。6.以后已废弃
GEOSEARCH
:在指定范围内搜索member,并按照与指定点之间的距离排序后返回。范围可以是圆形或矩形。6.2.新功能
GEOSEARCHSTORE
:与GEOSEARCH功能一致,不过可以把结果存储到一个指定的key。 6.2.新功能
附近商户-导入店铺数据到GEO 具体场景说明:
当我们点击美食之后,会出现一系列的商家,商家中可以按照多种排序方式,我们此时关注的是距离,
这个地方就需要使用到我们的GEO,向后台传入当前app收集的地址(我们此处是写死的) ,
以当前坐标作为圆心,同时绑定相同的店家类型type,以及分页信息,把这几个条件传入后台,后台查询出对应的数据再返回。
我们要做的事情是:将数据库表中的数据导入到redis中去,redis中的GEO,
GEO在redis中就一个menber
和一个经纬度
,我们把x和y轴传入到redis做的经纬度位置去,
但我们不能把所有的数据都放入到menber中去,毕竟作为redis是一个内存级数据库,如果存海量数据,redis还是力不从心,所以我们在这个地方存储他的id
即可。
但是这个时候还有一个问题,就是在redis中并没有存储type,所以我们无法根据type来对数据进行筛选,
所以我们可以按照商户类型做分组,类型相同的商户作为同一组,以typeId为key
存入同一个GEO集合中即可
代码
Map<Long, List<Shop>> map = list.stream().collect(Collectors.groupingBy(Shop::getTypeId));
可以根据typeId对 list 集合进行分组形成一个map集合
HmDianPingApplicationTests
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 @Test void loadShopData () { List<Shop> list = shopService.list(); Map<Long, List<Shop>> map = list.stream().collect(Collectors.groupingBy(Shop::getTypeId)); for (Map.Entry<Long, List<Shop>> entry : map.entrySet()) { Long typeId = entry.getKey(); String key = SHOP_GEO_KEY + typeId; List<Shop> value = entry.getValue(); List<RedisGeoCommands.GeoLocation<String>> locations = new ArrayList <>(value.size()); for (Shop shop : value) { locations.add(new RedisGeoCommands .GeoLocation<>( shop.getId().toString(), new Point (shop.getX(), shop.getY()) )); } stringRedisTemplate.opsForGeo().add(key, locations); } }
附近商户-实现附近商户功能 SpringDataRedis的2.3.9版本并不支持Redis 6.2提供的GEOSEARCH命令,因此我们需要提示其版本,修改自己的POM
第一步:导入pom
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-data-redis</artifactId > <exclusions > <exclusion > <artifactId > spring-data-redis</artifactId > <groupId > org.springframework.data</groupId > </exclusion > <exclusion > <artifactId > lettuce-core</artifactId > <groupId > io.lettuce</groupId > </exclusion > </exclusions > </dependency > <dependency > <groupId > org.springframework.data</groupId > <artifactId > spring-data-redis</artifactId > <version > 2.6.2</version > </dependency > <dependency > <groupId > io.lettuce</groupId > <artifactId > lettuce-core</artifactId > <version > 6.1.6.RELEASE</version > </dependency >
第二步:
ShopController
1 2 3 4 5 6 7 8 9 @GetMapping("/of/type") public Result queryShopByType ( @RequestParam("typeId") Integer typeId, @RequestParam(value = "current", defaultValue = "1") Integer current, @RequestParam(value = "x", required = false) Double x, @RequestParam(value = "y", required = false) Double y ) { return shopService.queryShopByType(typeId, current, x, y); }
ShopServiceImpl
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 @Override public Result queryShopByType (Integer typeId, Integer current, Double x, Double y) { if (x == null || y == null ) { Page<Shop> page = query() .eq("type_id" , typeId) .page(new Page <>(current, SystemConstants.DEFAULT_PAGE_SIZE)); return Result.ok(page.getRecords()); } int from = (current - 1 ) * SystemConstants.DEFAULT_PAGE_SIZE; int end = current * SystemConstants.DEFAULT_PAGE_SIZE; String key = SHOP_GEO_KEY + typeId; GeoResults<RedisGeoCommands.GeoLocation<String>> results = stringRedisTemplate.opsForGeo() .search( key, GeoReference.fromCoordinate(x, y), new Distance (5000 ), RedisGeoCommands.GeoSearchCommandArgs.newGeoSearchArgs().includeDistance().limit(end) ); if (results == null ) { return Result.ok(Collections.emptyList()); } List<GeoResult<RedisGeoCommands.GeoLocation<String>>> list = results.getContent(); if (list.size() <= from) { return Result.ok(Collections.emptyList()); } List<Long> ids = new ArrayList <>(list.size()); Map<String, Distance> distanceMap = new HashMap <>(list.size()); list.stream().skip(from).forEach(result -> { String shopIdStr = result.getContent().getName(); ids.add(Long.valueOf(shopIdStr)); Distance distance = result.getDistance(); distanceMap.put(shopIdStr, distance); }); String idStr = StrUtil.join("," , ids); List<Shop> shops = query().in("id" , ids).last("ORDER BY FIELD(id," + idStr + ")" ).list(); for (Shop shop : shops) { shop.setDistance(distanceMap.get(shop.getId().toString()).getValue()); } return Result.ok(shops); }
用户签到 用户签到-BitMap功能演示 我们针对签到功能完全可以通过mysql来完成,比如说以下这张表
1 2 3 4 5 6 7 8 9 CREATE TABLE `tb_sign` ( `id` bigint unsigned NOT NULL AUTO_INCREMENT COMMENT '主键', `user_id` bigint unsigned NOT NULL COMMENT '用户id', `year` year NOT NULL COMMENT '签到的年', `month` tinyint NOT NULL COMMENT '签到的月', `date` date NOT NULL COMMENT '签到的日期', `is_backup` tinyint unsigned DEFAULT NULL COMMENT '是否补签', PRIMARY KEY (`id`) USING BTREE ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci ROW_FORMAT=COMPACT;
用户一次签到,就是一条记录,
我们如何能够简化一点呢?
其实可以考虑小时候一个挺常见的方案,就是小时候,咱们准备一张小小的卡片,你只要签到就打上一个勾,我最后判断你是否签到,其实只需要到小卡片上看一看就知道了
我们可以采用类似这样的方案来实现我们的签到需求。
Redis中是利用string类型数据结构实现BitMap
,因此最大上限是512M,转换为bit则是 2^32个bit位。
BitMap的操作命令有:
SETBIT
:向指定位置(offset)存入一个0或1
GETBIT
:获取指定位置(offset)的bit值
BITCOUNT
:统计BitMap中值为1的bit位的数量
BITFIELD
:操作(查询、修改、自增)BitMap中bit数组中的指定位置(offset)的值
BITFIELD_RO
:获取BitMap中bit数组,并以十进制形式返回
BITOP
:将多个BitMap的结果做位运算(与 、或、异或)
BITPOS
:查找bit数组中指定范围内第一个0或1出现的位置
用户签到-实现签到功能
需求:实现签到接口,将当前用户当天签到信息保存到Redis中
思路 :
我们通过接口文档发现,此接口并没有传递任何的参数,没有参数怎么确实是哪一天签到呢?这个很容易,可以通过后台代码直接获取即可,然后到对应的地址上去修改bitMap。
代码
UserController
1 2 3 4 @PostMapping("/sign") public Result sign () { return userService.sign(); }
UserServiceImpl
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @Override public Result sign () { Long userId = UserHolder.getUser().getId(); LocalDateTime now = LocalDateTime.now(); String keySuffix = now.format(DateTimeFormatter.ofPattern(":yyyyMM" )); String key = USER_SIGN_KEY + userId + keySuffix; int dayOfMonth = now.getDayOfMonth(); stringRedisTemplate.opsForValue().setBit(key, dayOfMonth - 1 , true ); return Result.ok(); }
用户签到-签到统计 问题1: 什么叫做连续签到天数? 从最后一次签到开始向前统计,直到遇到第一次未签到为止,计算总的签到次数,就是连续签到天数。
Java逻辑代码:
获得当前这个月的最后一次签到数据,
定义一个计数器,然后不停的向前
统计,直到获得第一个非0
的数字即可,
每得到一个非0
的数字计数器+1
,直到遍历完所有的数据,就可以获得当前月的签到总天数了
问题2: 如何得到本月到今天为止的所有签到数据?
BITFIELD key GET u[dayOfMonth] 0
假设今天是10号,那么我们就可以从当前月的第一天开始,获得到当前这一天的位数,是10号,那么就是10位,去拿这段时间的数据,就能拿到所有的数据了,那么这10天里边签到了多少次呢?统计有多少个1即可。
问题3:如何从后向前遍历每个bit位?
注意
:bitMap返回的数据是10进制,
那假如说返回一个数字8,那么我哪儿知道到底哪些是0,哪些是1呢?
我们只需要让得到的10进制数字和1做与运算就可以了,因为1只有遇见1 才是1,其他数字都是0 ,
我们把签到结果和1进行与
操作,每与
一次,就把签到结果向右移动一位
(>>
),依次内推,我们就能完成逐个遍历的效果了。
需求:实现下面接口,统计当前用户截止当前时间在本月的连续签到天数
有用户和有时间我们就可以组织出对应的key,此时就能找到这个用户截止这天的所有签到记录,再根据这套算法,就能统计出来他连续签到的次数了
代码
UserController
1 2 3 4 @GetMapping("/sign/count") public Result signCount () { return userService.signCount(); }
UserServiceImpl
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 @Override public Result signCount () { Long userId = UserHolder.getUser().getId(); LocalDateTime now = LocalDateTime.now(); String keySuffix = now.format(DateTimeFormatter.ofPattern(":yyyyMM" )); String key = USER_SIGN_KEY + userId + keySuffix; int dayOfMonth = now.getDayOfMonth(); List<Long> result = stringRedisTemplate.opsForValue().bitField( key, BitFieldSubCommands.create() .get(BitFieldSubCommands.BitFieldType.unsigned(dayOfMonth)).valueAt(0 ) ); if (result == null || result.isEmpty()) { return Result.ok(0 ); } Long num = result.get(0 ); if (num == null || num == 0 ) { return Result.ok(0 ); } int count = 0 ; while (true ) { if ((num & 1 ) == 0 ) { break ; }else { count++; } num >>>= 1 ; } return Result.ok(count); }
额外加餐-关于使用bitmap来解决缓存穿透的方案 回顾缓存穿透 :
发起了一个数据库不存在的,redis里边也不存在的数据,通常你可以把他看成一个攻击
解决方案:
第一种解决方案:遇到的问题是如果用户访问的是id不存在的数据,则此时就无法生效
第二种解决方案:遇到的问题是:如果是不同的id那就可以防止下次过来直击数据
所以我们如何解决呢?
我们可以将数据库的数据,所对应的id
写入到一个list集合
中,
当用户过来访问的时候,我们直接去判断list中是否包含
当前的要查询的数据,
如果说用户要查询的id数据并不在
list集合中,则直接返回
,
如果list中包含
对应查询的id数据,则说明不是一次缓存穿透数据,则直接放行
。
现在的问题是这个主键其实并没有那么短,而是很长的一个 主键
我们可以把list数据抽象成一个非常大的bitmap,
UV统计 UV统计-HyperLogLog 首先我们搞懂两个概念:
通常来说PV会比UV大很多,
所以衡量同一个网站的访问量,我们需要综合考虑很多因素,
所以我们只是单纯的把这两个值作为一个参考值
UV统计在服务端做会比较麻烦,
因为要判断该用户是否已经统计过了,需要将统计过的用户信息保存。
但是如果每个访问的用户都保存到Redis中,数据量会非常恐怖,那怎么处理呢?
Hyperloglog(HLL) 是从Loglog算法派生的概率算法,
Redis 中的HLL是基于string
结构实现的,
单个HLL的内存永远小于16kb ,内存占用低 的令人发指!作为代价,
其测量结果是概率性的,有小于0.81%的误差 。不过对于UV统计来说,这完全可以忽略。
UV统计-测试百万数据的统计
测试思路:我们直接利用单元测试,向HyperLogLog中添加100万条数据,看看内存占用和统计效果如何
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 @Test void testHyperLogLog () { String[] values = new String [1000 ]; int j = 0 ; for (int i = 0 ; i < 1000000 ; i++) { j = i % 1000 ; values[j] = "user_" + i; if (j == 999 ) { stringRedisTemplate.opsForHyperLogLog().add("hl2" , values); } } Long hl2 = stringRedisTemplate.opsForHyperLogLog().size("hl2" ); System.out.println("hl2 == " + hl2); }
经过测试:我们会发生他的误差是在允许范围内,并且内存占用极小
3452168
3468792
= 16.23kb