GlileoIM

总流程

image-20241105152704486

Websocket模块

短轮询

长轮询

Websocket长连接

如何使用Tomcat实现WebSocket即时通讯服务服务端_tomcat websocket-CSDN博客

netty对websocket协议的实现_netty实现websocket-CSDN博客

为啥我选netty不用tomcat?
1.netty是nio基于事件驱动 的多路框架,使用单线程或少量线程处理大量的并发连接。相比之下,
Tomcat 是基于多线程的架构,每个连接都会分配一个线程,适用于处理相对较少的并发连接。最近
的 Tomcat 版本(如 Tomcat 8、9)引入了 NIO(New I/0)模型。所以这个点并不是重点。
2.Netty 提供了丰富的功能和组件,可以灵活地构建自定义的网络应用。它具有强大的编解码器和处理
器,可以轻松处理复杂的协议和数据格式。Netty的扩展性也非常好,可以根据需要添加自定义的组
件。比如我们可以用netty的pipeline方便的进行前置后置的处理,可以用netty的心跳处理器来检查连
接的状态。这些都是netty的优势。

IM顶层设计

消息可靠ACK

消息重复

发送消息幂等

​ 发送消息的一个随机md5,保证幂等标识 1s内的去重保证,

接收消息幂等

​ 消息的唯一性,靠消息id来判断

离线消息推送

推拉模式

推拉结合

客户端维护一个游标,需要保证唯一性和有效性,全局递增的id

单聊群聊

抽象出一个房间表,房间id就是会话id

表设计单聊群聊 | DrawSQL

会话表 | DrawSQL

消息已读未读

微信最大只展示99条未读数

1
2
3
4
5
6
7
select count(0) from
(
select 1
from msg
where room_id = 我房间 and create_time>我阅读
Limit 100
)

这样未读数最多扫描100条

消息表 | DrawSQL

redis 存储token

image-20241114185112346

消息模块

rocketMQ

rocketmq

rocketmq 使用docker-compose部署

配置文件

1
vi data/rocketmq/broker/conf/broker.conf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# 所属集群名字
brokerClusterName=Defaultcluster

# broker名字
brokerName=broker-a

# 0表示master >0表示slave
brokerId=0

# nameServer地址,分号分割
# namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
namesrvAddr=rocketmq-namesrv:9876

#这个很有讲究 如果是正式环境 这里一定要填写内网地址(安全)
#如果是用于测试或者本地这里建议要填外网地址,因为你的本地代码是无法连接到阿里云内网,只能连接外网。
brokerIP1=47.96.119.233

# 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4

# 是否允许Broker自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true

# 是否允许broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true

#Broker 对外服务的监听端口,
listenPort=10911

# 此参数控制是否开启密码,不开启设置false
aclEnable=false

# 删除文件时间点,默认凌晨4点
deleteWhen=04

# 文件保留时间,默认48小时
fileReservedTime=120

# commitLog 每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824

# ConsumeQueue 每个文件默认存 30W 条,根据业务情况调整
mapedFileSizeConsumeQueue=300000

# destroyMapedFileIntervalForcibly=120000
# redeleteHangedFileInterval=120000
# 检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88

# Broker 的角色
# - ASYNC_MASTER 异步复制Master
# - SYNC_MASTER 同步双写Master
# - SLAVE
brokerRole=ASYNC_MASTER

# 刷盘方式
# - ASYNC_FLUSH 异步刷盘
# - SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH

创建acl文件,用于开启用户名密码

1
vi data/rocketmq/broker/conf/plain_acl.yml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#全局白名单,如果配置了则不需要走acl校验,慎重配置
globalwhiteRemoteAddresses:

# - 47.180.93.*
# - 156.254.120.
accounts:
- accesskey: RocketMQ
secretkey: 12345678
whiteRemoteAddress:
admin: false
defaultTopicPerm: DENY
defaultGroupPerm: SUB
topicPerms:
- topiCA=DENY
- topicB=PUB|SUB
- topicC=SUB
groupPerms :
# the group should convert to retry topic
- groupA=DENY
- groupB=PUB|SUB
- groupC=SUB

- accessKey: galileoim
# 密码不能小于6位数
secretKey: 12345678
whiteRemoteAddress:
# if it is admin,it could access all resources 上面的用于教学,我们用超级管理员账号
admin: true

给console加上账号密码

1
vi data/rocketmq/console/data/users.properties
1
2
3
4
5
6
7
#Define Admin
# 用户名和密码规则:[用户名=密码,权限],这里的权限为1表示管理员,0表示普通用户
galileoim=123456,1

#Define Users
#user1=user1
#user1=user1

创建yml文件

1
vi data/rocketmq/docker-compose.yml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
version: '3.5'
services:
rocketmq-namesrv:
image: foxiswho/rocketmq:4.8.0
container_name: rocketmq-namesrv
restart: always
ports:
- 9876:9876
volumes:
- ./namesrv/logs/nameserver-a:/home/rocketmq/logs
- ./namesrv/store:/home/rocketmq/store
environment:
JAVA_OPT_EXT: "-Duser.home=/home/rocketmq -Xms128m -Xmx128m -Xmn64m"
command: ["sh","mqnamesrv"]
networks:
rocketmq_net:
aliases:
- rocketmq-namesrv

rocketmq-broker:
image: foxiswho/rocketmq:4.8.0
container_name: rocketmq-broker
restart: always
ports:
- 10909:10909
- 10911:10911
volumes:
- ./broker/logs:/home/rocketmq/logs
- ./broker/store:/home/rocketmq/store
- ./broker/conf/plain_acl.yml:/home/rocketmq/rocketmq-4.8.0/conf/plain_acl.yml
- ./broker/conf/broker.conf:/etc/rocketmq/broker.conf
environment:
JAVA_OPT_EXT: "-Duser.home=/home/rocketmq -Xms128m -Xmx128m -Xmn64m"
command: ["sh","mqbroker","-c","/etc/rocketmq/broker.conf"]
depends_on:
- rocketmq-namesrv
networks:
rocketmq_net:
aliases:
- rocketmq-broker

rocketmq-console:
image: iamverygood/rocketmq-console:4.7.1
container_name: rocketmq-console
restart: always
ports:
- 8180:8080
volumes:
- ./console/data:/tmp/rocketmq-console/data
environment:
JAVA_OPTS: "-Drocketmq.namesrv.addr=rocketmq-namesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false"
# -Drocketmq.config.loginRequired=true -Drocketmq.config.aclEnabled=true -Drocketmq.config.accessKey=galileoim -Drocketmq.config.secretKey=12345678
depends_on:
- rocketmq-namesrv
networks:
rocketmq_net:
aliases:
- rocketmq-console
networks:
rocketmq_net:
name: rocketmq_net
driver: bridge

打开9876109118180这几个端口

1
docker-compose up -d

本地消息表

确保本地操作和第三方保证一致 确保数据库操作和MQ一致,请求超时导致不一致性 使用超时态,使用定时重试 保证分布式一致性

我们讨论了分布式一致性的背最。导致不一致的场景(网络)。以及如何去保证一致性,讨论到持久化,重试,幂等。以及
保证一致性的几个方案,其中引出了本地消息表这种常见的方案。通过高度抽象,把对第三方的操作,!
转为对本地
方法执行的保证。通过操作记录和本地其他事务一同入库,持久化,通过spring的定时任务保证重试,通过
spring容器的getbean,来找到对应的方法,执行反射调用。
本文不但带你了解了常用的本息消息表企业级框架搭建方案。同时还了解了分布式不一致的具体背景。这些点,都
是在面试的时候,面试官会考验你对分布式的基本素养,需要用心去理解。

其他

线程池

线程池配置和使用

Executor 框架

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
package org.william.galileoim.common.common.config;

import org.william.galileoim.common.common.factory.MyThreadFactory;
import org.william.galileoim.transaction.annotation.SecureInvokeConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;

/**
* Description: 线程池配置
*/
@Configuration
@EnableAsync
public class ThreadPoolConfig implements AsyncConfigurer, SecureInvokeConfigurer {
/**
* 项目共用线程池
*/
public static final String GALILEOIM_EXECUTOR = "galileoimExecutor";
/**
* websocket通信线程池
*/
public static final String WS_EXECUTOR = "websocketExecutor";


public static final String AICHAT_EXECUTOR = "aichatExecutor";

@Override
public Executor getAsyncExecutor() {
return galileoimExecutor();
}

@Override
public Executor getSecureInvokeExecutor() {
return galileoimExecutor();
}

@Bean(GALILEOIM_EXECUTOR)
@Primary
public ThreadPoolTaskExecutor galileoimExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(200);
executor.setThreadNamePrefix("galileoim-executor-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());//满了调用线程执行,认为重要任务
executor.setThreadFactory(new MyThreadFactory(executor));
executor.initialize();
return executor;
}

@Bean(WS_EXECUTOR)
public ThreadPoolTaskExecutor websocketExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(16);
executor.setMaxPoolSize(16);
executor.setQueueCapacity(1000);//支持同时推送1000人
executor.setThreadNamePrefix("websocket-executor-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());//满了直接丢弃,默认为不重要消息推送
executor.setThreadFactory(new MyThreadFactory(executor));
executor.initialize();
return executor;
}

@Bean(AICHAT_EXECUTOR)
public ThreadPoolTaskExecutor chatAiExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(15);
executor.setThreadNamePrefix("aichat-executor-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());//满了直接丢弃,默认为不重要消息推送
executor.setThreadFactory(new MyThreadFactory(executor));
return executor;
}
}

拦截

使用拦截器拦截请求 本地threadlocal存储jwt的token


GlileoIM
http://example.com/2024/10/29/GlileoIM/
作者
Mercury
发布于
2024年10月29日
许可协议