Gelato

总流程

image-20241105152704486

Websocket模块

服务端推送web方案

短轮询

web端不停地间隔一段时间像服务端发一个http请求,如果有新消息,就会在某次请求返回

适用于

  1. 扫码登陆
  2. 客户端使用量不大

缺点

  1. 大量无效请求
  2. 服务端请求压力大

长轮询

优点:相比短轮询模式

  1. 大幅降低短轮询模式中客户端高频无用的轮询导致的网络开销和功耗开销
  2. 降低了服务端处理请求的 QPS

缺点:

  1. 无效请求:长轮询在超时时间内没有获取到消息时,会结束返回,因此仍然没有完全解决客户端“无效”请求的问题。
  2. 服务端压力大:服务端总挂(hang)住请求,只是降低了入口请求的 QPS,并没有减少对后端资源轮询的压力。假如有 1000 个请求在等待消息,可能意味着有 1000 个线程在不断轮询消息存储资源。(轮询转移到了后端)

Websocket长连接

长轮询和短轮询都算作是服务端没法主动向客户端推送的一种曲线救国的方式,那最好的方案,就是能不能解决这个问题,因此诞生了websocket。
实现原理:客户端和服务器之间维持一个TCP/IP长连接,全双工通道。

如何使用Tomcat实现WebSocket即时通讯服务服务端_tomcat websocket-CSDN博客

netty对websocket协议的实现_netty实现websocket-CSDN博客

为啥我选netty不用tomcat?

  1. netty是nio基于事件驱动 的多路框架,使用单线程或少量线程处理大量的并发连接。相比之下,
    Tomcat 是基于多线程的架构,每个连接都会分配一个线程,适用于处理相对较少的并发连接。最近
    的 Tomcat 版本(如 Tomcat 8、9)引入了 NIO(New I/0)模型。所以这个点并不是重点。
  2. Netty 提供了丰富的功能和组件,可以灵活地构建自定义的网络应用。它具有强大的编解码器和处理
    器,可以轻松处理复杂的协议和数据格式。Netty的扩展性也非常好,可以根据需要添加自定义的组
    件。比如我们可以用netty的pipeline方便的进行前置后置的处理,可以用netty的心跳处理器来检查连
    接的状态。这些都是netty的优势。

netty实现websocket

心跳包
如果用户突然关闭网页,是不会有断开通知给服务端的。那么服务端永远感知不到用户下线。因此需要客
户端维持一个心跳,当指定时间没有心跳,服务端主动断开,进行用户下线操作。
直接接入netty的现有组件newIdleStateHand1er(30,0,θ)可以实现30秒链接没有读请求,就主动
关闭链接。我们的web前端需要保持每10s发送一个心跳包。

请求处理

自己实现的处理器NettyWebSocketServerHandler接受websocket信息。根据消息类型进行路由处理。
目前请求对websocket依赖很低,只做这一件事

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
//反序列化请求报文
WSBaseReq wsBaseReq = JSONUtil.toBean(msg.text(), WSBaseReq.class);
//解析type
WSReqTypeEnum wsReqTypeEnum = WSReqTypeEnum.of(wsBaseReq.getType());
//根据type做不同的处理
switch (wsReqTypeEnum) {
case LOGIN:
this.webSocketService.handleLoginReq(ctx.channel());
log.info("请求二维码 = " + msg.text());
break;
case HEARTBEAT:
break;
default:
log.info("未知类型");
}
}

websocket前后端交互

我们用websocket的目的,主要是用于后端推送前端,前端能用http的就尽量用http。这样的好处是,http丰富
的拦截器,注解,请求头等功能,可以更好地实现或者是收口我们想要的功能。尽量对websocket的依赖降到最
低。前后端的交互用的是json串,里面通过type标识次此次的事件类型。

前端请求

1
2
3
4
{
type:1,
data:{}
}

1.请求登录二维码

2.心跳检测

3用户认证

后端返回

1
2
3
4
{
type:1,
datajson:jsondata
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public enum WSRespTypeEnum {
LOGIN_URL(1, "登录二维码返回", WSLoginUrl.class),
LOGIN_SCAN_SUCCESS(2, "用户扫描成功等待授权", null),
LOGIN_SUCCESS(3, "用户登录成功返回用户信息", WSLoginSuccess.class),
MESSAGE(4, "新消息", WSMessage.class),
ONLINE_OFFLINE_NOTIFY(5, "上下线通知", WSOnlineOfflineNotify.class),
INVALIDATE_TOKEN(6, "使前端的token失效,意味着前端需要重新登录", null),
BLACK(7, "拉黑用户", WSBlack.class),
MARK(8, "消息标记", WSMsgMark.class),
RECALL(9, "消息撤回", WSMsgRecall.class),
APPLY(10, "好友申请", WSFriendApply.class),
MEMBER_CHANGE(11, "成员变动", WSMemberChange.class),
;

private final Integer type;
private final String desc;
private final Class dataClass;

private static Map<Integer, WSRespTypeEnum> cache;

static {
cache = Arrays.stream(WSRespTypeEnum.values()).collect(Collectors.toMap(WSRespTypeEnum::getType, Function.identity()));
}

public static WSRespTypeEnum of(Integer type) {
return cache.get(type);
}
}

用户模块

用户表设计

image-20250417160859261

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
package com.abin.mallchat.common.user.domain.entity;

import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import com.baomidou.mybatisplus.extension.handlers.JacksonTypeHandler;
import lombok.*;

import java.io.Serializable;
import java.util.Date;

/**
* <p>
* 用户表
* </p>
*
*/
@Data
@EqualsAndHashCode(callSuper = false)
@Builder
@AllArgsConstructor
@NoArgsConstructor
@TableName(value = "user", autoResultMap = true)
public class User implements Serializable {

private static final long serialVersionUID = 1L;

public static Long UID_SYSTEM = 1L;//系统uid

/**
* 用户id
*/
@TableId(value = "id", type = IdType.AUTO)
private Long id;

/**
* 用户昵称
*/
@TableField("name")
private String name;

/**
* 用户头像
*/
@TableField("avatar")
private String avatar;

/**
* 性别 1为男性,2为女性
*/
@TableField("sex")
private Integer sex;

/**
* 微信openid用户标识
*/
@TableField("open_id")
private String openId;

/**
* 上下线状态 1在线 2离线
*/
@TableField("active_status")
private Integer activeStatus;

/**
* 最后上下线时间
*/
@TableField("last_opt_time")
private Date lastOptTime;

/**
* 最后上下线时间
*/
@TableField(value = "ip_info", typeHandler = JacksonTypeHandler.class)
private IpInfo ipInfo;

/**
* 佩戴的徽章id
*/
@TableField("item_id")
private Long itemId;

/**
* 用户状态 0正常 1拉黑
*/
@TableField("status")
private Integer status;

/**
* 创建时间
*/
@TableField("create_time")
private Date createTime;

/**
* 修改时间
*/
@TableField("update_time")
private Date updateTime;

public void refreshIp(String ip) {
if (ipInfo == null) {
ipInfo = new IpInfo();
}
ipInfo.refreshIp(ip);
}
}

IM顶层设计

消息可靠ACK

消息重复

发送消息幂等

​ 发送消息的一个随机md5,保证幂等标识 1s内的去重保证,

接收消息幂等

​ 消息的唯一性,靠消息id来判断

离线消息推送

推拉模式

推拉结合

客户端维护一个游标,需要保证唯一性和有效性,全局递增的id

单聊群聊

抽象出一个房间表,房间id就是会话id

表设计单聊群聊 | DrawSQL

会话表 | DrawSQL

消息已读未读

微信最大只展示99条未读数

1
2
3
4
5
6
7
select count(0) from
(
select 1
from msg
where room_id = 我房间 and create_time>我阅读
Limit 100
)

这样未读数最多扫描100条

消息表 | DrawSQL

redis 存储token

image-20241114185112346

消息模块

rocketMQ

rocketmq

rocketmq 使用docker-compose部署

配置文件

1
vi data/rocketmq/broker/conf/broker.conf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# 所属集群名字
brokerClusterName=Defaultcluster

# broker名字
brokerName=broker-a

# 0表示master >0表示slave
brokerId=0

# nameServer地址,分号分割
# namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
namesrvAddr=rocketmq-namesrv:9876

#这个很有讲究 如果是正式环境 这里一定要填写内网地址(安全)
#如果是用于测试或者本地这里建议要填外网地址,因为你的本地代码是无法连接到阿里云内网,只能连接外网。
brokerIP1=47.96.119.233

# 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4

# 是否允许Broker自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true

# 是否允许broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true

#Broker 对外服务的监听端口,
listenPort=10911

# 此参数控制是否开启密码,不开启设置false
aclEnable=false

# 删除文件时间点,默认凌晨4点
deleteWhen=04

# 文件保留时间,默认48小时
fileReservedTime=120

# commitLog 每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824

# ConsumeQueue 每个文件默认存 30W 条,根据业务情况调整
mapedFileSizeConsumeQueue=300000

# destroyMapedFileIntervalForcibly=120000
# redeleteHangedFileInterval=120000
# 检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88

# Broker 的角色
# - ASYNC_MASTER 异步复制Master
# - SYNC_MASTER 同步双写Master
# - SLAVE
brokerRole=ASYNC_MASTER

# 刷盘方式
# - ASYNC_FLUSH 异步刷盘
# - SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH

创建acl文件,用于开启用户名密码

1
vi data/rocketmq/broker/conf/plain_acl.yml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#全局白名单,如果配置了则不需要走acl校验,慎重配置
globalwhiteRemoteAddresses:

# - 47.180.93.*
# - 156.254.120.
accounts:
- accesskey: RocketMQ
secretkey: 12345678
whiteRemoteAddress:
admin: false
defaultTopicPerm: DENY
defaultGroupPerm: SUB
topicPerms:
- topiCA=DENY
- topicB=PUB|SUB
- topicC=SUB
groupPerms :
# the group should convert to retry topic
- groupA=DENY
- groupB=PUB|SUB
- groupC=SUB

- accessKey: galileoim
# 密码不能小于6位数
secretKey: 12345678
whiteRemoteAddress:
# if it is admin,it could access all resources 上面的用于教学,我们用超级管理员账号
admin: true

给console加上账号密码

1
vi data/rocketmq/console/data/users.properties
1
2
3
4
5
6
7
#Define Admin
# 用户名和密码规则:[用户名=密码,权限],这里的权限为1表示管理员,0表示普通用户
galileoim=123456,1

#Define Users
#user1=user1
#user1=user1

创建yml文件

1
vi data/rocketmq/docker-compose.yml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
version: '3.5'
services:
rocketmq-namesrv:
image: foxiswho/rocketmq:4.8.0
container_name: rocketmq-namesrv
restart: always
ports:
- 9876:9876
volumes:
- ./namesrv/logs/nameserver-a:/home/rocketmq/logs
- ./namesrv/store:/home/rocketmq/store
environment:
JAVA_OPT_EXT: "-Duser.home=/home/rocketmq -Xms128m -Xmx128m -Xmn64m"
command: ["sh","mqnamesrv"]
networks:
rocketmq_net:
aliases:
- rocketmq-namesrv

rocketmq-broker:
image: foxiswho/rocketmq:4.8.0
container_name: rocketmq-broker
restart: always
ports:
- 10909:10909
- 10911:10911
volumes:
- ./broker/logs:/home/rocketmq/logs
- ./broker/store:/home/rocketmq/store
- ./broker/conf/plain_acl.yml:/home/rocketmq/rocketmq-4.8.0/conf/plain_acl.yml
- ./broker/conf/broker.conf:/etc/rocketmq/broker.conf
environment:
JAVA_OPT_EXT: "-Duser.home=/home/rocketmq -Xms128m -Xmx128m -Xmn64m"
command: ["sh","mqbroker","-c","/etc/rocketmq/broker.conf"]
depends_on:
- rocketmq-namesrv
networks:
rocketmq_net:
aliases:
- rocketmq-broker

rocketmq-console:
image: iamverygood/rocketmq-console:4.7.1
container_name: rocketmq-console
restart: always
ports:
- 8180:8080
volumes:
- ./console/data:/tmp/rocketmq-console/data
environment:
JAVA_OPTS: "-Drocketmq.namesrv.addr=rocketmq-namesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false"
# -Drocketmq.config.loginRequired=true -Drocketmq.config.aclEnabled=true -Drocketmq.config.accessKey=galileoim -Drocketmq.config.secretKey=12345678
depends_on:
- rocketmq-namesrv
networks:
rocketmq_net:
aliases:
- rocketmq-console
networks:
rocketmq_net:
name: rocketmq_net
driver: bridge

打开9876109118180这几个端口

到对应目录下启动容器

1
2
cd /data/rocketmq
docker-compose up -d

本地消息表

确保本地操作和第三方保证一致 确保数据库操作和MQ一致,请求超时导致不一致性 使用超时态,使用定时重试 保证分布式一致性

我们讨论了分布式一致性的背最。导致不一致的场景(网络)。以及如何去保证一致性,讨论到持久化,重试,幂等。以及
保证一致性的几个方案,其中引出了本地消息表这种常见的方案。通过高度抽象,把对第三方的操作,!
转为对本地
方法执行的保证。通过操作记录和本地其他事务一同入库,持久化,通过spring的定时任务保证重试,通过
spring容器的getbean,来找到对应的方法,执行反射调用。
本文不但带你了解了常用的本息消息表企业级框架搭建方案。同时还了解了分布式不一致的具体背景。这些点,都
是在面试的时候,面试官会考验你对分布式的基本素养,需要用心去理解。

其他

线程池

线程池配置和使用

Executor 框架

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
package org.william.galileoim.common.common.config;

import org.william.galileoim.common.common.factory.MyThreadFactory;
import org.william.galileoim.transaction.annotation.SecureInvokeConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;

/**
* Description: 线程池配置
*/
@Configuration
@EnableAsync
public class ThreadPoolConfig implements AsyncConfigurer, SecureInvokeConfigurer {
/**
* 项目共用线程池
*/
public static final String GALILEOIM_EXECUTOR = "galileoimExecutor";
/**
* websocket通信线程池
*/
public static final String WS_EXECUTOR = "websocketExecutor";


public static final String AICHAT_EXECUTOR = "aichatExecutor";

@Override
public Executor getAsyncExecutor() {
return galileoimExecutor();
}

@Override
public Executor getSecureInvokeExecutor() {
return galileoimExecutor();
}

@Bean(GALILEOIM_EXECUTOR)
@Primary
public ThreadPoolTaskExecutor galileoimExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(200);
executor.setThreadNamePrefix("galileoim-executor-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());//满了调用线程执行,认为重要任务
executor.setThreadFactory(new MyThreadFactory(executor));
executor.initialize();
return executor;
}

@Bean(WS_EXECUTOR)
public ThreadPoolTaskExecutor websocketExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(16);
executor.setMaxPoolSize(16);
executor.setQueueCapacity(1000);//支持同时推送1000人
executor.setThreadNamePrefix("websocket-executor-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());//满了直接丢弃,默认为不重要消息推送
executor.setThreadFactory(new MyThreadFactory(executor));
executor.initialize();
return executor;
}

@Bean(AICHAT_EXECUTOR)
public ThreadPoolTaskExecutor chatAiExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(15);
executor.setThreadNamePrefix("aichat-executor-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());//满了直接丢弃,默认为不重要消息推送
executor.setThreadFactory(new MyThreadFactory(executor));
return executor;
}
}

拦截

使用拦截器拦截请求 本地threadlocal存储jwt的token


Gelato
http://example.com/2024/10/29/Gelato/
作者
Mercury
发布于
2024年10月29日
许可协议