总流程
Websocket模块
服务端推送web方案
短轮询
web端不停地间隔一段时间像服务端发一个http请求,如果有新消息,就会在某次请求返回
适用于
扫码登陆
客户端使用量不大
缺点
大量无效请求
服务端请求压力大
长轮询
优点:相比短轮询模式
大幅降低短轮询模式中客户端高频无用的轮询导致的网络开销和功耗开销
降低了服务端处理请求的 QPS
缺点:
无效请求:长轮询在超时时间内没有获取到消息时,会结束返回,因此仍然没有完全解决客户端“无效”请求的问题。
服务端压力大:服务端总挂(hang)住请求,只是降低了入口请求的 QPS,并没有减少对后端资源轮询的压力。假如有 1000 个请求在等待消息,可能意味着有 1000 个线程在不断轮询消息存储资源。(轮询转移到了后端)
Websocket长连接
长轮询和短轮询都算作是服务端没法主动向客户端推送的一种曲线救国的方式,那最好的方案,就是能不能解决这个问题,因此诞生了websocket。
实现原理:客户端和服务器之间维持一个TCP/IP长连接,全双工通道。
如何使用Tomcat实现WebSocket即时通讯服务服务端_tomcat websocket-CSDN博客
netty对websocket协议的实现_netty实现websocket-CSDN博客
为啥我选netty不用tomcat?
netty是nio基于事件驱动 的多路框架,使用单线程或少量线程处理大量的并发连接。相比之下,
Tomcat 是基于多线程的架构,每个连接都会分配一个线程,适用于处理相对较少的并发连接。最近
的 Tomcat 版本(如 Tomcat 8、9)引入了 NIO(New I/0)模型。所以这个点并不是重点。
Netty 提供了丰富的功能和组件,可以灵活地构建自定义的网络应用。它具有强大的编解码器和处理
器,可以轻松处理复杂的协议和数据格式。Netty的扩展性也非常好,可以根据需要添加自定义的组
件。比如我们可以用netty的pipeline方便的进行前置后置的处理,可以用netty的心跳处理器来检查连
接的状态。这些都是netty的优势。
netty实现websocket
心跳包
如果用户突然关闭网页,是不会有断开通知给服务端的。那么服务端永远感知不到用户下线。因此需要客
户端维持一个心跳,当指定时间没有心跳,服务端主动断开,进行用户下线操作。
直接接入netty的现有组件newIdleStateHand1er(30,0,θ)可以实现30秒链接没有读请求,就主动
关闭链接。我们的web前端需要保持每10s发送一个心跳包。
请求处理
自己实现的处理器NettyWebSocketServerHandler接受websocket信息。根据消息类型进行路由处理。
目前请求对websocket依赖很低,只做这一件事
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 @Override protected void channelRead0 (ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception { WSBaseReq wsBaseReq = JSONUtil.toBean(msg.text(), WSBaseReq.class); WSReqTypeEnum wsReqTypeEnum = WSReqTypeEnum.of(wsBaseReq.getType()); switch (wsReqTypeEnum) { case LOGIN: this .webSocketService.handleLoginReq(ctx.channel()); log.info("请求二维码 = " + msg.text()); break ; case HEARTBEAT: break ; default : log.info("未知类型" ); } }
websocket前后端交互
我们用websocket的目的,主要是用于后端推送前端,前端能用http的就尽量用http。这样的好处是,http丰富
的拦截器,注解,请求头等功能,可以更好地实现或者是收口我们想要的功能。尽量对websocket的依赖降到最
低。前后端的交互用的是json串,里面通过type标识次此次的事件类型。
前端请求
1.请求登录二维码
2.心跳检测
3用户认证
后端返回
1 2 3 4 { type:1 , datajson:jsondata }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public enum WSRespTypeEnum { LOGIN_URL(1 , "登录二维码返回" , WSLoginUrl.class), LOGIN_SCAN_SUCCESS(2 , "用户扫描成功等待授权" , null ), LOGIN_SUCCESS(3 , "用户登录成功返回用户信息" , WSLoginSuccess.class), MESSAGE(4 , "新消息" , WSMessage.class), ONLINE_OFFLINE_NOTIFY(5 , "上下线通知" , WSOnlineOfflineNotify.class), INVALIDATE_TOKEN(6 , "使前端的token失效,意味着前端需要重新登录" , null ), BLACK(7 , "拉黑用户" , WSBlack.class), MARK(8 , "消息标记" , WSMsgMark.class), RECALL(9 , "消息撤回" , WSMsgRecall.class), APPLY(10 , "好友申请" , WSFriendApply.class), MEMBER_CHANGE(11 , "成员变动" , WSMemberChange.class), ; private final Integer type; private final String desc; private final Class dataClass; private static Map<Integer, WSRespTypeEnum> cache; static { cache = Arrays.stream(WSRespTypeEnum.values()).collect(Collectors.toMap(WSRespTypeEnum::getType, Function.identity())); } public static WSRespTypeEnum of (Integer type) { return cache.get(type); } }
用户模块
用户表设计
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 package com.abin.mallchat.common.user.domain.entity;import com.baomidou.mybatisplus.annotation.IdType;import com.baomidou.mybatisplus.annotation.TableField;import com.baomidou.mybatisplus.annotation.TableId;import com.baomidou.mybatisplus.annotation.TableName;import com.baomidou.mybatisplus.extension.handlers.JacksonTypeHandler;import lombok.*;import java.io.Serializable;import java.util.Date;@Data @EqualsAndHashCode(callSuper = false) @Builder @AllArgsConstructor @NoArgsConstructor @TableName(value = "user", autoResultMap = true) public class User implements Serializable { private static final long serialVersionUID = 1L ; public static Long UID_SYSTEM = 1L ; @TableId(value = "id", type = IdType.AUTO) private Long id; @TableField("name") private String name; @TableField("avatar") private String avatar; @TableField("sex") private Integer sex; @TableField("open_id") private String openId; @TableField("active_status") private Integer activeStatus; @TableField("last_opt_time") private Date lastOptTime; @TableField(value = "ip_info", typeHandler = JacksonTypeHandler.class) private IpInfo ipInfo; @TableField("item_id") private Long itemId; @TableField("status") private Integer status; @TableField("create_time") private Date createTime; @TableField("update_time") private Date updateTime; public void refreshIp (String ip) { if (ipInfo == null ) { ipInfo = new IpInfo(); } ipInfo.refreshIp(ip); } }
token认证
jwt实现token,只需要解析就行了
双token方案
一个access_token(十分钟)和一个refresh_token(一个月)
双token是一个多方平衡的完美方案,它希望对用户的认证有所掌控,又不希望每次的检验会增加韩时,它不想给
用户过长的授权时间,又不想用户因此频繁登录影响体验,因此变成了每隔一段access_token的过期时间,都
会重新掌控局面,进行重新认证的复杂判断。
中心管理token
WT碰巧有去中心化的特性,但为了能够控制它的上下线,主动下线,登录续期等功能。我们依然可以对
它进行中心化的管理。
这也是抹茶当前采用的方式。依赖redis中心化管理uid-》token的信息。确保一个uid只有一个有效的
token。用户登录后,每一次认证都会解析出uid,并请求redis进行token比对。并且异步判断有效期小于
一天,进行续期,
有人说为啥不用uuid做token呢,既然都是redis中心存储,用uuid还可以少一次解析。
如果用uuid,前端每一次请求除了带上uuid还需要带上eid。
因为单纯用uuid,黑客很有可能不断遍历uuid去撞库,碰巧撞到有关联的在线用户。而如果将uuid和uid
一起比对,哪怕uuid碰巧撞到了登录的用户,还需要确保是相同的uid,这个概率会降低非常之多。
用jwt的话,正好包含了uid,让前端传起来方便,所以就这么选择了。
大家明白了其中的差别,到时候就懂得怎么去对线面试官的。其实我们用jwt,但是却没怎么用到它的特
性,本质上这样的场最用个uuid就差不多了
登录拦截器
登录接口 :负责认证身份,并将用户信息存入 HttpSession 。
拦截器 (preHandle) :在请求进入 Controller 之前拦截。校验 Session 中是否存在用户信息。
若存在:将用户信息读取并存入当前线程的 ThreadLocal 中,放行。
若不存在:拦截请求,返回 401 状态码。
业务层 (Controller/Service) :需要用户信息时,直接从 ThreadLocal 中获取,无需依赖 Session API。
拦截器 (afterCompletion) :请求结束,响应返回前,移除 ThreadLocal 中的用户信息 ,防止内存泄漏。
Spring缓存
@Cacheable 注解标记的方法,其返回值会被自动缓存。当首次调用该方法时,方法会被执行并将结果存入缓存;后续使用相同参数调用时,Spring 会直接从缓存返回结果,不再执行方法内部逻辑。
幂等设计
幂等需要加锁
分布式锁
使用RedissonClient工具类, 这里就是简历里说的redis的原子操作
RedissonClient 是基于 Redis 的 Java 客户端,专为分布式系统设计,提供比原生 Jedis 更丰富的功能集。其核心优势体现在三个方面:
分布式数据结构支持 :提供 Redis 原生数据结构的 Java 封装,如 RMap、RList、RSet 等,支持线程安全的分布式操作。
高级分布式组件 :内置分布式锁(RLock)、发布订阅(RTopic)、远程服务(RRemoteService)等组件,简化分布式系统开发。
性能优化机制 :通过连接池管理、异步操作、批量处理等技术,显著提升高并发场景下的吞吐量。
在电商秒杀系统中,使用 RedissonClient 的分布式锁可确保同一用户只能生成一个订单,避免超卖问题。其 RLock 实现基于 Redis 的 SETNX 命令,支持可重入、公平锁、超时自动释放等特性,比数据库乐观锁更高效。
这里使用的就是分布式锁(RLock)
分布式锁如何确保比事务先执行,避免脏读的问题—加上Order(0)注解
ip地址解析
使用淘宝的ip地址库
IM顶层设计
消息id方案
会话级别的单调递增,保证单个群组里面的消息id是有序且唯一的 分布式id 如雪花算法 和美团的leaf算法
但是分布式id是趋势递增的
消息可靠ACK
在线推送:推送服务增加一个判断,如果是在线消息,才会进行推送,并且记录消息在内存中,定时任务也只
会拉取内存ack队列的消息,进行推送重试。
只是设计,还没有做,消息可靠性保证,抹茶为啥不做消息可靠的保证呢?一方面是复杂。另一方面抹茶是一个web项目,压根就不存储消息,消息到没到问题不是很大,哪怕消息到了,你刷新一下,消息也又没了。
IM 服务器在推送消息时,携带一个标识 SID(安全标识符,类似 TCP 的 sequenceId), 推送出消息后会将当前消息添加到“待 ACK 消息列表”,客户端 B 成功接收完消息后,会 给 IM 服务器回一个业务层的 ACK 包,包中携带有本条接收消息的 SID,IM 服务器接收 后,会从“待 ACK 消息列表”记录中删除此条消息,本次推送才算真正结束。
对于推送的消息,如果在一定时间内没有收到 ACK 包,就会触发服务端的重传
消息重复
如果在一定时间内没有收到 ACK 包,就会触发服务端的重 传。收不到 ACK 的情况有两种,除了推送的消息真正丢失导致用户 B 不回 ACK 外,还可 能是用户 B 回的 ACK 包本身丢了。 对于第二种情况,ACK 包丢失导致的服务端重传,可能会让接收方收到重复推送的消息
接收方根据这 个唯一的 Sequence ID 来进行业务层的去重,这样经过去重后,对于用户 B 来说,看到的 还是接收到一条消息,不影响使用体验。
推拉模式
推模式:有新消息,服务端需要主动推送给前端。需要用到websocket。并且后台会维护一个定时任务,定时推送还未接收到ack的消息。保证消息的实时性。
拉模式:拉模式又分为短轮询和长轮询。前端主动询问后端是否有新消息。以定时的频率访问。我们项目已经用到了websocket,一般就不用拉模式了。拉模式可以用在历史消息列表。新消息,还是要保证消息的即时性。
推拉结合:理论上保证消息的及时性,推模式足够了,为什么还要拉模式?推模式需要考虑推送失败的情况,又需要服务端启动定时任务,确保ack,方案比较复杂,对服务器消耗也大。实际上推送的失败概率没有那么高,如果客户端每隔一定的频率进行消息拉取。相当于客户端是那个定时任务。就能达到最终一致性。
采用推拉结合。推主要是保证及时性。而拉主要是保证最终一致性,也就是消息到达的可靠性。
消息完整性检查
设想一下,假设一台 IM 服务器在推送出消息后,由于硬件原因宕机了,这种情况下,如果 这条消息真的丢了,由于负责的 IM 服务器宕机了无法触发重传,导致接收方 B 收不到这 条消息。 这就存在一个问题,当用户 B 再次重连上线后,可能并不知道之前有一条消息丢失的情 况。对于这种重传失效的情况该如何处理?
比较常见的消息完整性检查的实现机制有“时间戳比对”, 也可以使用全局的自增序列作为版本号来 代替
消息的时序性
要保证消息的时序一致性的一个关键问题是:我 们是否能找到这么一个时序基准,使得我们的消息具备“时序可比较性”
发送方不合适当时序基准
发送方时钟存在较大不稳定因素,用户可以随时调整时钟导致序号回退等问题。
发送方本地序号如果重装应用会导致序号清零,也会导致序号回退的问题。
类似“群聊消息”和“单用户的多点登陆”这种多发送方场景,都存在:同一时钟的某 一时间点,都可能有多条消息发给同一接收对象。比如同一个群里,多个人同时发言; 或者同一个用户登录两台设备,两台设备同时给某一接收方发消息。多设备间由于存在 时钟不同步的问题,并不能保证设备带上来的时间是准确的,可能存在群里的用户 A 先发言,B 后发言,但由于用户 A 的手机时钟比用户 B 的慢了半分钟,如果以这个时 间作为“时序基准”来进行排序,可能反而导致用户 A 的发言被认为是晚于用户 B 的
IM 服务器的本地时钟也不太适合当时序基准
IM 服务都是集群化部署,集群化部署也就是许多服务器同时部 署任务。 虽然多台服务器通过 NTP 时间同步服务,能降低服务集群机器间的时钟差异到毫秒级别, 但仍然还是存在一定的时钟误差。
而且 IM 服务器规模相对比较大,时钟的统一性维护上也比较有挑战,整体时钟很难保持极 低误差,因此一般也不能用 IM 服务器的本地时钟来作为消息的“时序基准”
使用一个全局递增的序号生成器,应该就能避免多服务器时钟不同步的问题了, IM 服务端就能通过这个序号生成器发出的序号,来作为消息排序的“时序基准”
多端同步
这个本项目当然是没有实现的,客户端维护一个游标,需要保证唯一性和有效性
因为是web项目,采用全局递增的id,手机和电脑端打开这个网页都一样
单聊群聊
表设计 单聊群聊 | DrawSQL 会话表 | DrawSQL
抽象出一个房间表,房间id就是会话id
单聊房间的设计:房间key由两个uid拼接,且uid1 < uid2
消息已读未读
使用表记录用户阅读的时间线,因为发送的消息是有时间的,这样就能知道哪些已读,哪些未读
1 2 3 4 5 6 7 select count (0 ) from ( select 1 from msg where room_id = 我房间 and create_time> 我阅读 Limit 100 )
微信最大只展示99条未读数,这样未读数最多扫描100条 消息表 | DrawSQL
多类型消息
其实就是利用oss的对象存储服务就好了,也就是一条url,在发送的时候加上一个消息类型就行了
旁路缓存
我们采用的是典型的 Cache-Aside(旁路缓存) 模式的变种,结合 订阅发布 机制来保证分布式环境下的最终一致性。
读取流程(由近及远)
一级缓存 (Caffeine) :直接从 JVM 内存读取。如果命中,直接返回,耗时微秒级(≈ 1 0 − 6 \approx 10^{-6} ≈ 1 0 − 6 s)。
二级缓存 (Redis) :若 L1 未命中,查询 Redis。如果命中,回填至 Caffeine,然后返回,耗时毫秒级(≈ 1 − 5 \approx 1-5 ≈ 1 − 5 ms)。
持久层 (MySQL) :若 L2 未命中,查询数据库。命中后双写回 Redis 和 Caffeine。
获取不到,就加载数据库,再将数据库写回redis缓存
redis 存储token
消息模块
rocketmq
rocketmq 使用docker-compose部署
配置文件
1 vi data/rocketmq/broker/conf/broker.conf
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 # 所属集群名字 brokerClusterName=Defaultcluster# broker名字 brokerName=broker-a# 0表示master >0表示slave brokerId=0# nameServer地址,分号分割 # namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876 namesrvAddr=rocketmq-namesrv:9876# 这个很有讲究 如果是正式环境 这里一定要填写内网地址(安全) # 如果是用于测试或者本地这里建议要填外网地址,因为你的本地代码是无法连接到阿里云内网,只能连接外网。 brokerIP1=47.96.119.233# 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数 defaultTopicQueueNums=4# 是否允许Broker自动创建Topic,建议线下开启,线上关闭 autoCreateTopicEnable=true# 是否允许broker 自动创建订阅组,建议线下开启,线上关闭 autoCreateSubscriptionGroup=true# Broker 对外服务的监听端口, listenPort=10911# 此参数控制是否开启密码,不开启设置false aclEnable=false# 删除文件时间点,默认凌晨4点 deleteWhen=04# 文件保留时间,默认48小时 fileReservedTime=120# commitLog 每个文件的大小默认1G mapedFileSizeCommitLog=1073741824# ConsumeQueue 每个文件默认存 30W 条,根据业务情况调整 mapedFileSizeConsumeQueue=300000# destroyMapedFileIntervalForcibly=120000 # redeleteHangedFileInterval=120000 # 检测物理文件磁盘空间 diskMaxUsedSpaceRatio=88# Broker 的角色 # - ASYNC_MASTER 异步复制Master # - SYNC_MASTER 同步双写Master # - SLAVE brokerRole=ASYNC_MASTER# 刷盘方式 # - ASYNC_FLUSH 异步刷盘 # - SYNC_FLUSH 同步刷盘 flushDiskType=ASYNC_FLUSH
创建acl文件,用于开启用户名密码
1 vi data/rocketmq/broker/conf/plain_acl.yml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 # 全局白名单,如果配置了则不需要走acl校验,慎重配置 globalwhiteRemoteAddresses:# - 47.180.93.* # - 156.254.120. accounts: - accesskey: RocketMQ secretkey: 12345678 whiteRemoteAddress: admin: false defaultTopicPerm: DENY defaultGroupPerm: SUB topicPerms: - topiCA=DENY - topicB=PUB|SUB - topicC=SUB groupPerms : # the group should convert to retry topic - groupA=DENY - groupB=PUB|SUB - groupC=SUB - accessKey: galileoim # 密码不能小于6位数 secretKey: 12345678 whiteRemoteAddress: # if it is admin,it could access all resources 上面的用于教学,我们用超级管理员账号 admin: true
给console加上账号密码
1 vi data/rocketmq/console/data/users.properties
1 2 3 4 5 6 7 # Define Admin # 用户名和密码规则:[用户名=密码,权限],这里的权限为1表示管理员,0表示普通用户 galileoim=123456,1# Define Users # user1=user1 # user1=user1
创建yml文件
1 vi data/rocketmq/docker-compose.yml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 version: '3.5' services: rocketmq-namesrv: image: foxiswho/rocketmq:4.8.0 container_name: rocketmq-namesrv restart: always ports: - 9876:9876 volumes: - ./namesrv/logs/nameserver-a:/home/rocketmq/logs - ./namesrv/store:/home/rocketmq/store environment: JAVA_OPT_EXT: "-Duser.home=/home/rocketmq -Xms128m -Xmx128m -Xmn64m" command: ["sh","mqnamesrv"] networks: rocketmq_net: aliases: - rocketmq-namesrv rocketmq-broker: image: foxiswho/rocketmq:4.8.0 container_name: rocketmq-broker restart: always ports: - 10909:10909 - 10911:10911 volumes: - ./broker/logs:/home/rocketmq/logs - ./broker/store:/home/rocketmq/store - ./broker/conf/plain_acl.yml:/home/rocketmq/rocketmq-4.8.0/conf/plain_acl.yml - ./broker/conf/broker.conf:/etc/rocketmq/broker.conf environment: JAVA_OPT_EXT: "-Duser.home=/home/rocketmq -Xms128m -Xmx128m -Xmn64m" command: ["sh","mqbroker","-c","/etc/rocketmq/broker.conf"] depends_on: - rocketmq-namesrv networks: rocketmq_net: aliases: - rocketmq-broker rocketmq-console: image: iamverygood/rocketmq-console:4.7.1 container_name: rocketmq-console restart: always ports: - 8180:8080 volumes: - ./console/data:/tmp/rocketmq-console/data environment: JAVA_OPTS: "-Drocketmq.namesrv.addr=rocketmq-namesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" # -Drocketmq.config.loginRequired=true -Drocketmq.config.aclEnabled=true -Drocketmq.config.accessKey=galileoim -Drocketmq.config.secretKey=12345678 depends_on: - rocketmq-namesrv networks: rocketmq_net: aliases: - rocketmq-console networks: rocketmq_net: name: rocketmq_net driver: bridge
打开9876 ,10911 ,8180 这几个端口
到对应目录下启动容器
1 2 cd /data/rocketmq docker-compose up -d
本地消息表
确保本地操作和第三方保证一致 确保数据库操作和MQ一致,请求超时导致不一致性 使用超时态,使用定时重试 保证分布式一致性
我们讨论了分布式一致性的背最。导致不一致的场景(网络)。以及如何去保证一致性,讨论到持久化,重试,幂等。以及保证一致性的几个方案,其中引出了本地消息表这种常见的方案。通过高度抽象,把对第三方的操作,!
转为对本地方法执行的保证。通过操作记录和本地其他事务一同入库,持久化,通过spring的定时任务保证重试,通过spring容器的getbean,来找到对应的方法,执行反射调用。
@成员
记录uid就行
文件
使用oss对象存储服务,这里使用的是minio本地部署
其他
线程池
线程池配置和使用
Executor 框架
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 package org.william.galileoim.common.common.config;import org.william.galileoim.common.common.factory.MyThreadFactory;import org.william.galileoim.transaction.annotation.SecureInvokeConfigurer;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.context.annotation.Primary;import org.springframework.scheduling.annotation.AsyncConfigurer;import org.springframework.scheduling.annotation.EnableAsync;import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;import java.util.concurrent.Executor;import java.util.concurrent.ThreadPoolExecutor;@Configuration @EnableAsync public class ThreadPoolConfig implements AsyncConfigurer , SecureInvokeConfigurer { public static final String GALILEOIM_EXECUTOR = "galileoimExecutor" ; public static final String WS_EXECUTOR = "websocketExecutor" ; public static final String AICHAT_EXECUTOR = "aichatExecutor" ; @Override public Executor getAsyncExecutor () { return galileoimExecutor(); } @Override public Executor getSecureInvokeExecutor () { return galileoimExecutor(); } @Bean(GALILEOIM_EXECUTOR) @Primary public ThreadPoolTaskExecutor galileoimExecutor () { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(10 ); executor.setMaxPoolSize(10 ); executor.setQueueCapacity(200 ); executor.setThreadNamePrefix("galileoim-executor-" ); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); executor.setThreadFactory(new MyThreadFactory(executor)); executor.initialize(); return executor; } @Bean(WS_EXECUTOR) public ThreadPoolTaskExecutor websocketExecutor () { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(16 ); executor.setMaxPoolSize(16 ); executor.setQueueCapacity(1000 ); executor.setThreadNamePrefix("websocket-executor-" ); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy()); executor.setThreadFactory(new MyThreadFactory(executor)); executor.initialize(); return executor; } @Bean(AICHAT_EXECUTOR) public ThreadPoolTaskExecutor chatAiExecutor () { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(10 ); executor.setMaxPoolSize(10 ); executor.setQueueCapacity(15 ); executor.setThreadNamePrefix("aichat-executor-" ); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy()); executor.setThreadFactory(new MyThreadFactory(executor)); return executor; } }
面试相关
为什么要做这个项目
AI聊天,基于IM开发
项目亮点
高性能通信架构 : 我们彻底摒弃了传统的 HTTP 短轮询,改用 Netty + WebSocket 的长连接方案。这样做的核心目的是减少握手开销,利用 Netty 的非阻塞 I/O 模型,让服务器能够轻松支撑数万甚至更高量级的并发连接,确保消息能够“秒达”
严谨的状态与安全管理 : 在安全层面,我们采用了 JWT 进行无状态鉴权,结合拦截器和 ThreadLocal 。这种设计不仅保证了系统的可扩展性(方便做集群扩展),还确保了用户信息在整个请求链路中的透明传输和线程安全。
极极致的性能优化(多级缓存) : 为了应对高频的在线状态查询和会话读取,我们设计了 Redis + Caffeine 的多级缓存机制。Redis 负责分布式环境下的热点数据共享,而本地缓存 Caffeine 则进一步压榨性能,有效防止了缓存穿透和高并发下对 Redis 的压力。
可靠的消息交付体系 : 在 IM 系统最核心的消息丢失和乱序问题上,我们引入了 RocketMQ 并在业务层设计了 ACK 机制 :
一致性 :利用 RocketMQ 的事务消息和顺序消息特性,解决了群聊消息在多客户端间的时序一致性问题。
幂等性 :通过全局消息 ID 和 Redis 原子操作,我们实现了精准的去重逻辑,即使在网络波动导致超时重传的情况下,也能确保“不丢消息、不重消息”。
过程中遇到过什么问题,如何解决
未读数设计时,“未读数”实现中,会话未读数和总未读数一般都是单独维护的。对于高频使用的“总未读”(角标显示等场景),如果每次都通过聚合所有会话未读来获取,用户的互动会话不 多的话,性能还可以保证;一旦会话数比较多,由于需要多次从存储获取,容易出现某些会话未读由于超时等原因没取到,导致总未读数计算少了
原因:就是每次发和查看消息后,需要先更新会话未读,再更新总未读,两个未读的变更不是原子性的,会出现某一个成功另一个失败的情况,也会出现由于并发更新导致操作被覆盖的情况。所以要解决这些问题,需要保证两个未读更新操作的原子性。我使用的是添加事务功能,因为它不需要额外的维护锁的资源,实现较为简单。
Redis 通过 MULTI、DISCARD 、EXEC 和 WATCH 四个命令来支持事务操作。 比如每次变更未读前先 watch 要修改的 key,然后事务执行变更会话未读和变更总未读的 操作,如果在最终执行事务时被 watch 的两个未读的 key 的值已经被修改过,那么本次事 务会失败,业务层还可以继续重试直到事务变更成功
命令
核心作用
关键说明
MULTI
开启事务
执行后客户端进入事务状态,后续命令不再立即执行,而是进入事务队列,返回QUEUED
EXEC
执行事务
触发队列中所有命令批量顺序执行,返回事务结果数组,执行后自动退出事务状态
DISCARD
放弃事务
清空事务命令队列,立即退出事务状态,所有入队命令均不执行
WATCH key1 key2…
监控键,实现乐观锁
监控指定key,EXEC执行前若key被其他客户端修改,事务直接终止,返回nil
UNWATCH
取消监控
取消所有WATCH监控的key,EXEC/DISCARD执行后会自动执行 ,无需手动调用
未解决的问题
信息安全性,这个还没有保证,在网络传输过程中也没有进行加密。
IM 系统最容易触碰的法律红线就是内容违规。
敏感词过滤(DFA 算法) :在消息进入 RocketMQ 之前,需要经过一个“风控过滤器”。利用 DFA(确定有穷自动机)算法 对文本进行高效检索,命中敏感词的消息予以拦截、替换或人工审核。
端到端加密(E2EE) :这是 IM 安全的“天花板”。
原理 :消息在发送端用接收端的公钥加密,服务端只负责转发,由于没有私钥,服务端也无法解密内容。
现状 :Gelato-AI 目前还是服务器可解密的架构。若要演进,我们需要引入类似 Signal 协议的 Double Ratchet 算法 。
鉴权与会话安全:防止“身份冒用”
Token 刷新机制 :采用 AccessToken (短效) + RefreshToken (长效) 机制。即便 AccessToken 被窃取,黑客也只有几分钟的作案时间。
强制踢下线(Kick-out) :当用户在异地登录或修改密码时,后端需要通过 Redis 维护一个 Blacklist 或修改 Version 字段,通过 Netty 长连接实时下发“强制下线”指令,废除当前所有旧 Token。