Gelato

部署相关

rocketmq

rocketmq 使用docker-compose部署

配置文件

1
vi data/rocketmq/broker/conf/broker.conf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# 所属集群名字
brokerClusterName=Defaultcluster

# broker名字
brokerName=broker-a

# 0表示master >0表示slave
brokerId=0

# nameServer地址,分号分割
# namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
namesrvAddr=rocketmq-namesrv:9876

#这个很有讲究 如果是正式环境 这里一定要填写内网地址(安全)
#如果是用于测试或者本地这里建议要填外网地址,因为你的本地代码是无法连接到阿里云内网,只能连接外网。
brokerIP1=47.96.119.233

# 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4

# 是否允许Broker自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true

# 是否允许broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true

#Broker 对外服务的监听端口,
listenPort=10911

# 此参数控制是否开启密码,不开启设置false
aclEnable=false

# 删除文件时间点,默认凌晨4点
deleteWhen=04

# 文件保留时间,默认48小时
fileReservedTime=120

# commitLog 每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824

# ConsumeQueue 每个文件默认存 30W 条,根据业务情况调整
mapedFileSizeConsumeQueue=300000

# destroyMapedFileIntervalForcibly=120000
# redeleteHangedFileInterval=120000
# 检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88

# Broker 的角色
# - ASYNC_MASTER 异步复制Master
# - SYNC_MASTER 同步双写Master
# - SLAVE
brokerRole=ASYNC_MASTER

# 刷盘方式
# - ASYNC_FLUSH 异步刷盘
# - SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH

创建acl文件,用于开启用户名密码

1
vi data/rocketmq/broker/conf/plain_acl.yml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#全局白名单,如果配置了则不需要走acl校验,慎重配置
globalwhiteRemoteAddresses:

# - 47.180.93.*
# - 156.254.120.
accounts:
- accesskey: RocketMQ
secretkey: 12345678
whiteRemoteAddress:
admin: false
defaultTopicPerm: DENY
defaultGroupPerm: SUB
topicPerms:
- topiCA=DENY
- topicB=PUB|SUB
- topicC=SUB
groupPerms :
# the group should convert to retry topic
- groupA=DENY
- groupB=PUB|SUB
- groupC=SUB

- accessKey: galileoim
# 密码不能小于6位数
secretKey: 12345678
whiteRemoteAddress:
# if it is admin,it could access all resources 上面的用于教学,我们用超级管理员账号
admin: true

给console加上账号密码

1
vi data/rocketmq/console/data/users.properties
1
2
3
4
5
6
7
#Define Admin
# 用户名和密码规则:[用户名=密码,权限],这里的权限为1表示管理员,0表示普通用户
galileoim=123456,1

#Define Users
#user1=user1
#user1=user1

创建yml文件

1
vi data/rocketmq/docker-compose.yml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
version: '3.5'
services:
rocketmq-namesrv:
image: foxiswho/rocketmq:4.8.0
container_name: rocketmq-namesrv
restart: always
ports:
- 9876:9876
volumes:
- ./namesrv/logs/nameserver-a:/home/rocketmq/logs
- ./namesrv/store:/home/rocketmq/store
environment:
JAVA_OPT_EXT: "-Duser.home=/home/rocketmq -Xms128m -Xmx128m -Xmn64m"
command: ["sh","mqnamesrv"]
networks:
rocketmq_net:
aliases:
- rocketmq-namesrv

rocketmq-broker:
image: foxiswho/rocketmq:4.8.0
container_name: rocketmq-broker
restart: always
ports:
- 10909:10909
- 10911:10911
volumes:
- ./broker/logs:/home/rocketmq/logs
- ./broker/store:/home/rocketmq/store
- ./broker/conf/plain_acl.yml:/home/rocketmq/rocketmq-4.8.0/conf/plain_acl.yml
- ./broker/conf/broker.conf:/etc/rocketmq/broker.conf
environment:
JAVA_OPT_EXT: "-Duser.home=/home/rocketmq -Xms128m -Xmx128m -Xmn64m"
command: ["sh","mqbroker","-c","/etc/rocketmq/broker.conf"]
depends_on:
- rocketmq-namesrv
networks:
rocketmq_net:
aliases:
- rocketmq-broker

rocketmq-console:
image: iamverygood/rocketmq-console:4.7.1
container_name: rocketmq-console
restart: always
ports:
- 8180:8080
volumes:
- ./console/data:/tmp/rocketmq-console/data
environment:
JAVA_OPTS: "-Drocketmq.namesrv.addr=rocketmq-namesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false"
# -Drocketmq.config.loginRequired=true -Drocketmq.config.aclEnabled=true -Drocketmq.config.accessKey=galileoim -Drocketmq.config.secretKey=12345678
depends_on:
- rocketmq-namesrv
networks:
rocketmq_net:
aliases:
- rocketmq-console
networks:
rocketmq_net:
name: rocketmq_net
driver: bridge

打开9876109118180这几个端口

到对应目录下启动容器

1
2
cd /data/rocketmq
docker-compose up -d

线程池

线程池配置和使用

Executor 框架

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
package org.william.galileoim.common.common.config;

import org.william.galileoim.common.common.factory.MyThreadFactory;
import org.william.galileoim.transaction.annotation.SecureInvokeConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;

/**
* Description: 线程池配置
*/
@Configuration
@EnableAsync
public class ThreadPoolConfig implements AsyncConfigurer, SecureInvokeConfigurer {
/**
* 项目共用线程池
*/
public static final String GALILEOIM_EXECUTOR = "galileoimExecutor";
/**
* websocket通信线程池
*/
public static final String WS_EXECUTOR = "websocketExecutor";


public static final String AICHAT_EXECUTOR = "aichatExecutor";

@Override
public Executor getAsyncExecutor() {
return galileoimExecutor();
}

@Override
public Executor getSecureInvokeExecutor() {
return galileoimExecutor();
}

@Bean(GALILEOIM_EXECUTOR)
@Primary
public ThreadPoolTaskExecutor galileoimExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(200);
executor.setThreadNamePrefix("galileoim-executor-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());//满了调用线程执行,认为重要任务
executor.setThreadFactory(new MyThreadFactory(executor));
executor.initialize();
return executor;
}

@Bean(WS_EXECUTOR)
public ThreadPoolTaskExecutor websocketExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(16);
executor.setMaxPoolSize(16);
executor.setQueueCapacity(1000);//支持同时推送1000人
executor.setThreadNamePrefix("websocket-executor-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());//满了直接丢弃,默认为不重要消息推送
executor.setThreadFactory(new MyThreadFactory(executor));
executor.initialize();
return executor;
}

@Bean(AICHAT_EXECUTOR)
public ThreadPoolTaskExecutor chatAiExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(15);
executor.setThreadNamePrefix("aichat-executor-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());//满了直接丢弃,默认为不重要消息推送
executor.setThreadFactory(new MyThreadFactory(executor));
return executor;
}
}

总流程

Websocket模块

服务端推送web方案

短轮询

web端不停地间隔一段时间像服务端发一个http请求,如果有新消息,就会在某次请求返回

适用于

  1. 扫码登陆
  2. 客户端使用量不大

缺点

  1. 大量无效请求
  2. 服务端请求压力大

长轮询

优点:相比短轮询模式

  1. 大幅降低短轮询模式中客户端高频无用的轮询导致的网络开销和功耗开销
  2. 降低了服务端处理请求的 QPS

缺点:

  1. 无效请求:长轮询在超时时间内没有获取到消息时,会结束返回,因此仍然没有完全解决客户端“无效”请求的问题。
  2. 服务端压力大:服务端总挂(hang)住请求,只是降低了入口请求的 QPS,并没有减少对后端资源轮询的压力。假如有 1000 个请求在等待消息,可能意味着有 1000 个线程在不断轮询消息存储资源。(轮询转移到了后端)

Websocket长连接

长轮询和短轮询都算作是服务端没法主动向客户端推送的一种曲线救国的方式,那最好的方案,就是能不能解决这个问题,因此诞生了websocket。
实现原理:客户端和服务器之间维持一个TCP/IP长连接,全双工通道。

如何使用Tomcat实现WebSocket即时通讯服务服务端_tomcat websocket-CSDN博客

netty对websocket协议的实现_netty实现websocket-CSDN博客

为啥我选netty不用tomcat?

  1. netty是nio基于事件驱动 的多路框架,使用单线程或少量线程处理大量的并发连接。相比之下,
    Tomcat 是基于多线程的架构,每个连接都会分配一个线程,适用于处理相对较少的并发连接。最近
    的 Tomcat 版本(如 Tomcat 8、9)引入了 NIO(New I/0)模型。所以这个点并不是重点。
  2. Netty 提供了丰富的功能和组件,可以灵活地构建自定义的网络应用。它具有强大的编解码器和处理
    器,可以轻松处理复杂的协议和数据格式。Netty的扩展性也非常好,可以根据需要添加自定义的组
    件。比如我们可以用netty的pipeline方便的进行前置后置的处理,可以用netty的心跳处理器来检查连
    接的状态。这些都是netty的优势。

netty实现websocket

心跳包
如果用户突然关闭网页,是不会有断开通知给服务端的。那么服务端永远感知不到用户下线。因此需要客
户端维持一个心跳,当指定时间没有心跳,服务端主动断开,进行用户下线操作。
直接接入netty的现有组件newIdleStateHand1er(30,0,θ)可以实现30秒链接没有读请求,就主动
关闭链接。我们的web前端需要保持每10s发送一个心跳包。

请求处理

自己实现的处理器NettyWebSocketServerHandler接受websocket信息。根据消息类型进行路由处理。
目前请求对websocket依赖很低,只做这一件事

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
//反序列化请求报文
WSBaseReq wsBaseReq = JSONUtil.toBean(msg.text(), WSBaseReq.class);
//解析type
WSReqTypeEnum wsReqTypeEnum = WSReqTypeEnum.of(wsBaseReq.getType());
//根据type做不同的处理
switch (wsReqTypeEnum) {
case LOGIN:
this.webSocketService.handleLoginReq(ctx.channel());
log.info("请求二维码 = " + msg.text());
break;
case HEARTBEAT:
break;
default:
log.info("未知类型");
}
}

websocket前后端交互

我们用websocket的目的,主要是用于后端推送前端,前端能用http的就尽量用http。这样的好处是,http丰富
的拦截器,注解,请求头等功能,可以更好地实现或者是收口我们想要的功能。尽量对websocket的依赖降到最
低。前后端的交互用的是json串,里面通过type标识次此次的事件类型。

前端请求

1
2
3
4
{
type:1,
data:{}
}

1.请求登录二维码

2.心跳检测

3用户认证

用户模块

用户表设计

image-20250417160859261

token认证

jwt实现token,只需要解析就行了

双token方案

一个access_token(十分钟)和一个refresh_token(一个月)

双token是一个多方平衡的完美方案,它希望对用户的认证有所掌控,又不希望每次的检验会增加韩时,它不想给
用户过长的授权时间,又不想用户因此频繁登录影响体验,因此变成了每隔一段access_token的过期时间,都
会重新掌控局面,进行重新认证的复杂判断。

中心管理token
WT碰巧有去中心化的特性,但为了能够控制它的上下线,主动下线,登录续期等功能。我们依然可以对
它进行中心化的管理。
这也是抹茶当前采用的方式。依赖redis中心化管理uid-》token的信息。确保一个uid只有一个有效的
token。用户登录后,每一次认证都会解析出uid,并请求redis进行token比对。并且异步判断有效期小于
一天,进行续期,
有人说为啥不用uuid做token呢,既然都是redis中心存储,用uuid还可以少一次解析。
如果用uuid,前端每一次请求除了带上uuid还需要带上eid。
因为单纯用uuid,黑客很有可能不断遍历uuid去撞库,碰巧撞到有关联的在线用户。而如果将uuid和uid
一起比对,哪怕uuid碰巧撞到了登录的用户,还需要确保是相同的uid,这个概率会降低非常之多。
用jwt的话,正好包含了uid,让前端传起来方便,所以就这么选择了。
大家明白了其中的差别,到时候就懂得怎么去对线面试官的。其实我们用jwt,但是却没怎么用到它的特
性,本质上这样的场最用个uuid就差不多了

登录拦截器

登录接口:负责认证身份,并将用户信息存入 HttpSession

拦截器 (preHandle):在请求进入 Controller 之前拦截。校验 Session 中是否存在用户信息。

  • 若存在:将用户信息读取并存入当前线程的 ThreadLocal 中,放行。
  • 若不存在:拦截请求,返回 401 状态码。

业务层 (Controller/Service):需要用户信息时,直接从 ThreadLocal 中获取,无需依赖 Session API。

拦截器 (afterCompletion):请求结束,响应返回前,移除 ThreadLocal 中的用户信息,防止内存泄漏。

Spring缓存

@Cacheable 注解标记的方法,其返回值会被自动缓存。当首次调用该方法时,方法会被执行并将结果存入缓存;后续使用相同参数调用时,Spring 会直接从缓存返回结果,不再执行方法内部逻辑。

幂等设计

幂等需要加锁

image-20260319135913608

分布式锁

使用RedissonClient工具类, 这里就是简历里说的redis的原子操作

RedissonClient 是基于 Redis 的 Java 客户端,专为分布式系统设计,提供比原生 Jedis 更丰富的功能集。其核心优势体现在三个方面:

  1. 分布式数据结构支持:提供 Redis 原生数据结构的 Java 封装,如 RMap、RList、RSet 等,支持线程安全的分布式操作。
  2. 高级分布式组件:内置分布式锁(RLock)、发布订阅(RTopic)、远程服务(RRemoteService)等组件,简化分布式系统开发。
  3. 性能优化机制:通过连接池管理、异步操作、批量处理等技术,显著提升高并发场景下的吞吐量。

在电商秒杀系统中,使用 RedissonClient 的分布式锁可确保同一用户只能生成一个订单,避免超卖问题。其 RLock 实现基于 Redis 的 SETNX 命令,支持可重入、公平锁、超时自动释放等特性,比数据库乐观锁更高效。

这里使用的就是分布式锁(RLock)

分布式锁如何确保比事务先执行,避免脏读的问题—加上Order(0)注解

ip地址解析

使用淘宝的ip地址库

IM顶层设计

消息id方案

会话级别的单调递增,保证单个群组里面的消息id是有序且唯一的 分布式id 如雪花算法 和美团的leaf算法

但是分布式id是趋势递增的

消息可靠ACK

在线推送:推送服务增加一个判断,如果是在线消息,才会进行推送,并且记录消息在内存中,定时任务也只
会拉取内存ack队列的消息,进行推送重试。

image-20260321144523714

只是设计,还没有做,消息可靠性保证,抹茶为啥不做消息可靠的保证呢?一方面是复杂。另一方面抹茶是一个web项目,压根就不存储消息,消息到没到问题不是很大,哪怕消息到了,你刷新一下,消息也又没了。

IM 服务器在推送消息时,携带一个标识 SID(安全标识符,类似 TCP 的 sequenceId), 推送出消息后会将当前消息添加到“待 ACK 消息列表”,客户端 B 成功接收完消息后,会 给 IM 服务器回一个业务层的 ACK 包,包中携带有本条接收消息的 SID,IM 服务器接收 后,会从“待 ACK 消息列表”记录中删除此条消息,本次推送才算真正结束。

对于推送的消息,如果在一定时间内没有收到 ACK 包,就会触发服务端的重传

消息重复

如果在一定时间内没有收到 ACK 包,就会触发服务端的重 传。收不到 ACK 的情况有两种,除了推送的消息真正丢失导致用户 B 不回 ACK 外,还可 能是用户 B 回的 ACK 包本身丢了。 对于第二种情况,ACK 包丢失导致的服务端重传,可能会让接收方收到重复推送的消息

  • 发送消息幂等 发送消息的一个随机md5,保证幂等标识 1s内的去重保证

  • 接收消息幂等 消息的唯一性,靠消息id来判断

接收方根据这 个唯一的 Sequence ID 来进行业务层的去重,这样经过去重后,对于用户 B 来说,看到的 还是接收到一条消息,不影响使用体验。

推拉模式

推模式:有新消息,服务端需要主动推送给前端。需要用到websocket。并且后台会维护一个定时任务,定时推送还未接收到ack的消息。保证消息的实时性。

拉模式:拉模式又分为短轮询和长轮询。前端主动询问后端是否有新消息。以定时的频率访问。我们项目已经用到了websocket,一般就不用拉模式了。拉模式可以用在历史消息列表。新消息,还是要保证消息的即时性。

推拉结合:理论上保证消息的及时性,推模式足够了,为什么还要拉模式?推模式需要考虑推送失败的情况,又需要服务端启动定时任务,确保ack,方案比较复杂,对服务器消耗也大。实际上推送的失败概率没有那么高,如果客户端每隔一定的频率进行消息拉取。相当于客户端是那个定时任务。就能达到最终一致性。
采用推拉结合。推主要是保证及时性。而拉主要是保证最终一致性,也就是消息到达的可靠性。

消息完整性检查

设想一下,假设一台 IM 服务器在推送出消息后,由于硬件原因宕机了,这种情况下,如果 这条消息真的丢了,由于负责的 IM 服务器宕机了无法触发重传,导致接收方 B 收不到这 条消息。 这就存在一个问题,当用户 B 再次重连上线后,可能并不知道之前有一条消息丢失的情 况。对于这种重传失效的情况该如何处理?

比较常见的消息完整性检查的实现机制有“时间戳比对”, 也可以使用全局的自增序列作为版本号来 代替

消息的时序性

要保证消息的时序一致性的一个关键问题是:我 们是否能找到这么一个时序基准,使得我们的消息具备“时序可比较性”

发送方不合适当时序基准

  1. 发送方时钟存在较大不稳定因素,用户可以随时调整时钟导致序号回退等问题。

  2. 发送方本地序号如果重装应用会导致序号清零,也会导致序号回退的问题。

  3. 类似“群聊消息”和“单用户的多点登陆”这种多发送方场景,都存在:同一时钟的某 一时间点,都可能有多条消息发给同一接收对象。比如同一个群里,多个人同时发言; 或者同一个用户登录两台设备,两台设备同时给某一接收方发消息。多设备间由于存在 时钟不同步的问题,并不能保证设备带上来的时间是准确的,可能存在群里的用户 A 先发言,B 后发言,但由于用户 A 的手机时钟比用户 B 的慢了半分钟,如果以这个时 间作为“时序基准”来进行排序,可能反而导致用户 A 的发言被认为是晚于用户 B 的

IM 服务器的本地时钟也不太适合当时序基准

  1. IM 服务都是集群化部署,集群化部署也就是许多服务器同时部 署任务。 虽然多台服务器通过 NTP 时间同步服务,能降低服务集群机器间的时钟差异到毫秒级别, 但仍然还是存在一定的时钟误差。
  2. 而且 IM 服务器规模相对比较大,时钟的统一性维护上也比较有挑战,整体时钟很难保持极 低误差,因此一般也不能用 IM 服务器的本地时钟来作为消息的“时序基准”

使用一个全局递增的序号生成器,应该就能避免多服务器时钟不同步的问题了, IM 服务端就能通过这个序号生成器发出的序号,来作为消息排序的“时序基准”

多端同步

这个本项目当然是没有实现的,客户端维护一个游标,需要保证唯一性和有效性

因为是web项目,采用全局递增的id,手机和电脑端打开这个网页都一样

单聊群聊

表设计 单聊群聊 | DrawSQL 会话表 | DrawSQL

抽象出一个房间表,房间id就是会话id

单聊房间的设计:房间key由两个uid拼接,且uid1 < uid2

消息已读未读

使用表记录用户阅读的时间线,因为发送的消息是有时间的,这样就能知道哪些已读,哪些未读

1
2
3
4
5
6
7
select count(0) from
(
select 1
from msg
where room_id = 我房间 and create_time>我阅读
Limit 100
)

微信最大只展示99条未读数,这样未读数最多扫描100条 消息表 | DrawSQL

多类型消息

其实就是利用oss的对象存储服务就好了,也就是一条url,在发送的时候加上一个消息类型就行了

旁路缓存

我们采用的是典型的 Cache-Aside(旁路缓存) 模式的变种,结合 订阅发布 机制来保证分布式环境下的最终一致性。

读取流程(由近及远)

  1. 一级缓存 (Caffeine):直接从 JVM 内存读取。如果命中,直接返回,耗时微秒级(106\approx 10^{-6}s)。
  2. 二级缓存 (Redis):若 L1 未命中,查询 Redis。如果命中,回填至 Caffeine,然后返回,耗时毫秒级(15\approx 1-5ms)。
  3. 持久层 (MySQL):若 L2 未命中,查询数据库。命中后双写回 Redis 和 Caffeine。

获取不到,就加载数据库,再将数据库写回redis缓存

redis 存储token

image-20241114185112346

消息模块

本地消息表

本地消息表(Local Message Table) 是一种非常经典的设计模式,它主要用来解决分布式事务中的最终一致性问题。

它的核心思想非常巧妙:利用关系型数据库(如 MySQL)单机事务的 ACID 特性,把“发消息”这个网络操作,变成一个“写数据库”的本地磁盘操作。

具体流程如下:

  1. 同库建表:在业务数据库(比如订单库)里,新建一张表,叫 local_message(本地消息表)。
  2. 绑定本地事务:在执行业务代码时,开启一个本地数据库事务。在这个事务里,既执行业务表的数据更新(比如生成订单),又往 local_message 表里插入一条状态为 发送中 (Sending) 的消息记录。
    • 因为它们在同一个数据库实例中,MySQL 可以利用 Undo Log / Redo Log 保证它们绝对同生共死。
  3. 异步投递:事务提交成功后,启动一个后台线程(或定时任务),去扫描 local_message 表里状态为 发送中 的记录。
  4. 调用 MQ:后台线程把这些消息投递到真正的 RocketMQ 中。
  5. 更新状态:如果 MQ 返回发送成功(ACK),就把本地消息表里的记录状态改为 已发送 (Sent) 或者直接删除。
  6. 失败补偿:如果投递 MQ 失败(网络波动),定时任务在下一次轮询时,依然会发现这条消息是 发送中,于是会不断重试,直到成功为止。(这也是为什么下游微服务必须做幂等校验的原因,因为重试可能导致重复投递)。

确保本地操作和第三方保证一致 确保数据库操作和MQ一致,请求超时导致不一致性 使用超时态,使用定时重试 保证分布式一致性

我们讨论了分布式一致性的背最。导致不一致的场景(网络)。以及如何去保证一致性,讨论到持久化,重试,幂等。以及保证一致性的几个方案,其中引出了本地消息表这种常见的方案。通过高度抽象,把对第三方的操作,!
转为对本地方法执行的保证。通过操作记录和本地其他事务一同入库,持久化,通过spring的定时任务保证重试,通过spring容器的getbean,来找到对应的方法,执行反射调用。

@成员

记录uid就行

文件

使用oss对象存储服务,这里使用的是minio本地部署

面试相关

image-20260319135014274

为什么要做这个项目

AI聊天,基于IM开发

项目结构

image-20241105152704486

系统采用微服务化的架构思想,各组件职责边界清晰:

  • 接入与通信层: Netty + WebSocket(承载海量长连接,全双工实时通信)
  • 业务开发框架: SpringBoot + MyBatis(高效处理离线逻辑与持久化)
  • 分布式消息队列: RocketMQ(核心枢纽,负责消息的异步解耦、削峰填谷与顺序保障)
  • 多级缓存架构: Redis + Caffeine(L1/L2 双级缓存,极速响应路由信息与在线状态)
  • 安全与存储: JWT(无状态鉴权) + ThreadLocal(链路追踪) + MySQL(持久化) + Minio(分布式对象存储)

项目亮点

  1. 彻底摒弃了传统的 HTTP 短轮询,改用 Netty + WebSocket 的长连接方案。减少握手开销,利用 Netty 的非阻塞 I/O 模型,让服务器能够轻松支撑数万甚至更高量级的并发连接,确保消息能够“秒达”

  2. 在安全层面,我们采用了 JWT 进行无状态鉴权,结合拦截器和 ThreadLocal。确保了用户信息在整个请求链路中的透明传输和线程安全。

  3. 为了应对高频的在线状态查询和会话读取,我们设计了 Redis + Caffeine 的多级缓存机制。Redis 负责分布式环境下的热点数据共享,而本地缓存 Caffeine 则进一步压榨性能,有效防止了缓存穿透和高并发下对 Redis 的压力。

  4. 可靠的消息交付体系: 在 IM 系统最核心的消息丢失和乱序问题上,我们引入了 RocketMQ 并在业务层设计了 ACK 机制

    • 一致性:利用 RocketMQ 的事务消息和顺序消息特性,解决了群聊消息在多客户端间的时序一致性问题。引入本地消息表设计思想,将核心业务落库与 RocketMQ 异步投递绑定为单机事务,低成本解决跨微服务通信的网络断层问题,确保数据的最终一致性
    • 幂等性:通过全局消息 ID 和 Redis 原子操作,我们实现了精准的去重逻辑,即使在网络波动导致超时重传的情况下,也能确保“不丢消息、不重消息”。

幂等性的设计

如果在一定时间内没有收到 ACK 包,就会触发服务端的重 传。收不到 ACK 的情况有两种,除了推送的消息真正丢失导致用户 B 不回 ACK 外,还可 能是用户 B 回的 ACK 包本身丢了。 对于第二种情况,ACK 包丢失导致的服务端重传,可能会让接收方收到重复推送的消息

  • 发送消息幂等 发送消息的一个随机md5,保证幂等标识 1s内的去重保证

  • 接收消息幂等 消息的唯一性,靠消息id来判断

接收方根据这 个唯一的 Sequence ID 来进行业务层的去重,这样经过去重后,对于用户 B 来说,看到的 还是接收到一条消息,不影响使用体验。

redis存储了哪些数据

实时状态

同步序列号:记录每个用户或群组的最新消息序号,客户端拉取(Pull)离线消息时作为版本比对依据

高频业务缓存: JSON 格式的基本信息(如头像、昵称、禁言状态), 群成员列表

结合系统架构图,Gelato-AI 的核心链路可以划分为以下四个关键阶段:

  1. 接入与鉴权网关(Access & Auth)
  • 长连接保持:客户端(App/Web)通过 WebSocket 与 IM 服务建立长连接。IM 服务是无状态的 Netty 集群,专门负责维持 TCP 连接、心跳检测和原始报文的收发。
  • 权限校验:长连接建立之初及发送敏感操作时,请求会路由至 Auth 服务,基于 JWT 进行合法性校验,拦截非法请求,保障通道安全。
  1. 消息上行与异步落库(Upstream & Persistence)
  • 消息接收:用户 A 发送一条消息,IM 服务接收后,首先对消息进行持久化(分配全局唯一 Message ID),确保消息落地不丢失。
  • 队列解耦:持久化后,IM 服务不直接将消息推给接收方,而是将消息投递到**消息队列(RocketMQ)**中。这一步实现了发送方与接收方的彻底解耦,从容应对突发的流量洪峰。
  1. 消息处理与分发模型(Processing & Distribution)—— 架构核心

后台的**消息处理(消费者)**服务监听 MQ,根据不同的聊天场景,采用不同的数据分发模型:

  • 单聊 & 普通群聊(写扩散 Write-Diffusion): 消费者将消息分别拷贝并写入到每个接收者的个人**信箱(Inbox)**中(如 A信箱、B信箱)。这种方式读取极快,隔离性好,适合参与人数有限的场景。
  • 热点群聊(读扩散 Read-Diffusion): 针对万人大群,如果采用写扩散会导致数据库瞬间产生上万次写操作(写风暴)。因此,系统将其写入一个公共的热点信箱。群成员按需从公共信箱中读取,极大减轻了存储层的写入压力。
  1. 消息下行与推拉结合(Downstream & Push-Pull)
  • 路由寻址(Router):消息准备就绪后,系统需要通知接收方。此时通过 Router 服务(结合 Redis 缓存的 Session 路由表),精准定位用户 B 目前连接在哪个 IM 服务节点上。
  • 推拉结合机制
    • 推(Push):Router 通知目标 IM 服务节点,向用户 B 的客户端推送一条**“新消息到达通知”**(而非完整消息体),或直接推送最新消息。
    • 拉(Pull):客户端收到通知或重新上线时,根据本地的 SyncSequence(同步序列号),主动向服务端的聚合层发起“拉取”请求,获取信箱中最新的连续消息。这完美解决了离线消息同步和多端数据一致性的问题。

选择 RocketMQ 而非 Kafka

选择 RocketMQ 而非 Kafka,核心原因可以用一句话概括:Kafka 生于“大数据批处理”,而 RocketMQ 生于“低延迟的复杂业务”。

简要来说,主要基于以下三个痛点:

  • 1. 极低延迟 vs. 批量吞吐 IM 系统的底线是“消息秒达”。Kafka 为了极致吞吐量,默认会把消息“攒批(Batch)”发送,这会牺牲端到端延迟。而 RocketMQ 侧重单条消息的极速流转,同步刷盘下延迟依然稳定在毫秒级。
  • 2. 海量队列支撑(规避 I/O 瓶颈) IM 系统为了隔离群聊和单聊,需要创建成千上万个 Topic/Queue。Kafka 的每个 Partition 对应独立的物理文件,队列数量一多就会导致磁盘随机读写,性能急剧恶化。RocketMQ 采用“全部消息写入单一 CommitLog + 逻辑索引 ConsumeQueue”的设计,轻松支撑十万级队列。
  • 3. 原生的高级业务特性 IM 业务逻辑复杂,而 RocketMQ 提供了开箱即用的杀手级功能:
    • 延迟消息:完美契合我们系统中的“ACK 超时重传”机制。
    • 消息轨迹(Trace):方便客服和开发快速排查“消息去哪了”。
    • 严格顺序消息:底层加锁机制彻底保证了群聊消息的时序,绝对不乱序。

总结: 如果是做日志收集或流计算,我选 Kafka;但做 Gelato-AI 这种要求消息绝对可靠、低延迟送达且业务状态复杂的系统,RocketMQ 是目前最稳妥的架构底座。

过程中遇到过什么问题,如何解决

未读数设计时,“未读数”实现中,会话未读数和总未读数一般都是单独维护的。对于高频使用的“总未读”(角标显示等场景),如果每次都通过聚合所有会话未读来获取,用户的互动会话不 多的话,性能还可以保证;一旦会话数比较多,由于需要多次从存储获取,容易出现某些会话未读由于超时等原因没取到,导致总未读数计算少了

原因:就是每次发和查看消息后,需要先更新会话未读,再更新总未读,两个未读的变更不是原子性的,会出现某一个成功另一个失败的情况,也会出现由于并发更新导致操作被覆盖的情况。所以要解决这些问题,需要保证两个未读更新操作的原子性。我使用的是添加事务功能,因为它不需要额外的维护锁的资源,实现较为简单。

Redis 通过 MULTI、DISCARD 、EXEC 和 WATCH 四个命令来支持事务操作。 比如每次变更未读前先 watch 要修改的 key,然后事务执行变更会话未读和变更总未读的 操作,如果在最终执行事务时被 watch 的两个未读的 key 的值已经被修改过,那么本次事 务会失败,业务层还可以继续重试直到事务变更成功

命令 核心作用 关键说明
MULTI 开启事务 执行后客户端进入事务状态,后续命令不再立即执行,而是进入事务队列,返回QUEUED
EXEC 执行事务 触发队列中所有命令批量顺序执行,返回事务结果数组,执行后自动退出事务状态
DISCARD 放弃事务 清空事务命令队列,立即退出事务状态,所有入队命令均不执行
WATCH key1 key2… 监控键,实现乐观锁 监控指定key,EXEC执行前若key被其他客户端修改,事务直接终止,返回nil
UNWATCH 取消监控 取消所有WATCH监控的key,EXEC/DISCARD执行后会自动执行,无需手动调用

未解决的问题

信息安全性,这个还没有保证,在网络传输过程中也没有进行加密。

IM 系统最容易触碰的法律红线就是内容违规。

  • 敏感词过滤(DFA 算法):在消息进入 RocketMQ 之前,需要经过一个“风控过滤器”。利用 DFA(确定有穷自动机)算法 对文本进行高效检索,命中敏感词的消息予以拦截、替换或人工审核。
  • 端到端加密(E2EE):这是 IM 安全的“天花板”。
    • 原理:消息在发送端用接收端的公钥加密,服务端只负责转发,由于没有私钥,服务端也无法解密内容。
    • 现状:Gelato-AI 目前还是服务器可解密的架构。若要演进,我们需要引入类似 Signal 协议的 Double Ratchet 算法

鉴权与会话安全:防止“身份冒用”

Token 刷新机制:采用 AccessToken (短效) + RefreshToken (长效) 机制。即便 AccessToken 被窃取,黑客也只有几分钟的作案时间。

强制踢下线(Kick-out):当用户在异地登录或修改密码时,后端需要通过 Redis 维护一个 Blacklist 或修改 Version 字段,通过 Netty 长连接实时下发“强制下线”指令,废除当前所有旧 Token。

其他问题

  1. 联合索引

失效:数据量少,数据全部一样

  1. LRU算法是怎么实现的

HashMap + 双向链表

  1. 一致性哈希

一致性哈希(Consistent Hashing) 是一种特殊的哈希算法,主要用于解决分布式缓存或负载均衡在**扩容(增加服务器)缩容(减少服务器)**时,普通哈希取模算法导致的大规模数据迁移问题

哈希环 映射服务器

会有数据倾斜的问题:虚拟节点

  1. 垃圾回收的三色标记

白色 = 还没看 / 垃圾

灰色 = 正在看 / 还没看完

黑色 = 看完了 / 安全

  1. jvm的调优命令

jps (JVM Process Status Tool)

用来查看当前运行的 Java 进程 ID (PID) 及主类名。

  • jps -l:输出主类的全限定名或 Jar 包全路径。
  • jps -v:输出传递给 JVM 的参数(如 -Xmx, -Xms 等)

jstat (JVM Statistics Monitoring Tool)

用于监视虚拟机各种运行状态信息,是分析 GC 情况的首选工具

jmap (Memory Map for Java) 用于生成堆转储快照(Heap Dump)或查看内存细节

jstack (Java Stack Trace) 用于生成虚拟机当前时刻的线程快照,主要用于排查死锁、线程死循环、CPU 飙升

  1. SQL优化

EXPLAIN SQL语句

type:连接类型。性能从好到差依次为:system > const > eq_ref > ref > range > index > ALL(全表扫描,必须优化)。

key:实际使用的索引。如果为 NULL,说明没走索引。

rows:预计扫描的行数。越小越好。

Extra:额外信息。看到 Using filesort(手工排序)或 Using temporary(临时表)通常意味着性能瓶颈

  1. 进程线程协程
特性 进程 (Process) 线程 (Thread) 协程 (Coroutine)
本质 资源分配的最小单位 执行和调度的最小单位 用户态的轻量级线程
拥有资源 拥有独立的内存、文件描述符等 共享所属进程的资源 共享所属线程的资源(极省内存)
管理者 操作系统内核 操作系统内核 程序/程序员(编译器/运行时)
切换成本 极高(涉及硬件上下文切换) 中等(涉及内核态切换) 极低(只在内存里改个寄存器)
并发性质 真并行(多核CPU下) 并行/并发 逻辑上的并发(单线程内交替)
稳定性 一个崩了不影响别的进程 一个崩了整个进程全完 一个崩了整个线程/进程全完
  1. mysql的日志

Redo Log:确保掉电不丢数据,异步刷盘 D持久性

Undo Log:事务回滚和 MVCC(多版本并发控制)A原⼦性

bin Log: 主从复制和数据恢复

  1. 超卖问题

第一层:数据库层 悲观锁(Pessimistic Lock)

第二层:缓存层 将“查询库存、校验权限、扣减库存”写在一个 Lua 脚本里,Redis 保证整个 Lua 脚本的执行是原子性的,中间不会被其他请求打断。这是目前大厂解决秒杀超卖的主流做法

第三层:架构层(削峰填谷)MQ和令牌桶 用户下单请求进入系统后,校验完 Redis 库存,直接发一条消息到 MQ,然后立刻告诉用户“抢购成功,正在出单”


Gelato
http://example.com/2024/10/29/Gelato/
作者
Mercury
发布于
2024年10月29日
许可协议