总流程
Websocket模块
服务端推送web方案
短轮询
web端不停地间隔一段时间像服务端发一个http请求,如果有新消息,就会在某次请求返回
适用于
扫码登陆
客户端使用量不大
缺点
大量无效请求
服务端请求压力大
长轮询
优点:相比短轮询模式
大幅降低短轮询模式中客户端高频无用的轮询导致的网络开销和功耗开销
降低了服务端处理请求的 QPS
缺点:
无效请求:长轮询在超时时间内没有获取到消息时,会结束返回,因此仍然没有完全解决客户端“无效”请求的问题。
服务端压力大:服务端总挂(hang)住请求,只是降低了入口请求的 QPS,并没有减少对后端资源轮询的压力。假如有 1000 个请求在等待消息,可能意味着有 1000 个线程在不断轮询消息存储资源。(轮询转移到了后端)
Websocket长连接
长轮询和短轮询都算作是服务端没法主动向客户端推送的一种曲线救国的方式,那最好的方案,就是能不能解决这个问题,因此诞生了websocket。
实现原理:客户端和服务器之间维持一个TCP/IP长连接,全双工通道。
如何使用Tomcat实现WebSocket即时通讯服务服务端_tomcat websocket-CSDN博客
netty对websocket协议的实现_netty实现websocket-CSDN博客
为啥我选netty不用tomcat?
netty是nio基于事件驱动 的多路框架,使用单线程或少量线程处理大量的并发连接。相比之下,
Tomcat 是基于多线程的架构,每个连接都会分配一个线程,适用于处理相对较少的并发连接。最近
的 Tomcat 版本(如 Tomcat 8、9)引入了 NIO(New I/0)模型。所以这个点并不是重点。
Netty 提供了丰富的功能和组件,可以灵活地构建自定义的网络应用。它具有强大的编解码器和处理
器,可以轻松处理复杂的协议和数据格式。Netty的扩展性也非常好,可以根据需要添加自定义的组
件。比如我们可以用netty的pipeline方便的进行前置后置的处理,可以用netty的心跳处理器来检查连
接的状态。这些都是netty的优势。
netty实现websocket
心跳包
如果用户突然关闭网页,是不会有断开通知给服务端的。那么服务端永远感知不到用户下线。因此需要客
户端维持一个心跳,当指定时间没有心跳,服务端主动断开,进行用户下线操作。
直接接入netty的现有组件newIdleStateHand1er(30,0,θ)
可以实现30秒链接没有读请求,就主动
关闭链接。我们的web前端需要保持每10s发送一个心跳包。
请求处理
自己实现的处理器NettyWebSocketServerHandler
接受websocket
信息。根据消息类型进行路由处理。
目前请求对websocket依赖很低,只做这一件事
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 @Override protected void channelRead0 (ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception { WSBaseReq wsBaseReq = JSONUtil.toBean(msg.text(), WSBaseReq.class); WSReqTypeEnum wsReqTypeEnum = WSReqTypeEnum.of(wsBaseReq.getType()); switch (wsReqTypeEnum) { case LOGIN: this .webSocketService.handleLoginReq(ctx.channel()); log.info("请求二维码 = " + msg.text()); break ; case HEARTBEAT: break ; default : log.info("未知类型" ); } }
websocket前后端交互
我们用websocket的目的,主要是用于后端推送前端,前端能用http的就尽量用http。这样的好处是,http丰富
的拦截器,注解,请求头等功能,可以更好地实现或者是收口我们想要的功能。尽量对websocket的依赖降到最
低。前后端的交互用的是json串,里面通过type标识次此次的事件类型。
前端请求
1.请求登录二维码
2.心跳检测
3用户认证
后端返回
1 2 3 4 { type:1 , datajson:jsondata }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public enum WSRespTypeEnum { LOGIN_URL(1 , "登录二维码返回" , WSLoginUrl.class), LOGIN_SCAN_SUCCESS(2 , "用户扫描成功等待授权" , null ), LOGIN_SUCCESS(3 , "用户登录成功返回用户信息" , WSLoginSuccess.class), MESSAGE(4 , "新消息" , WSMessage.class), ONLINE_OFFLINE_NOTIFY(5 , "上下线通知" , WSOnlineOfflineNotify.class), INVALIDATE_TOKEN(6 , "使前端的token失效,意味着前端需要重新登录" , null ), BLACK(7 , "拉黑用户" , WSBlack.class), MARK(8 , "消息标记" , WSMsgMark.class), RECALL(9 , "消息撤回" , WSMsgRecall.class), APPLY(10 , "好友申请" , WSFriendApply.class), MEMBER_CHANGE(11 , "成员变动" , WSMemberChange.class), ; private final Integer type; private final String desc; private final Class dataClass; private static Map<Integer, WSRespTypeEnum> cache; static { cache = Arrays.stream(WSRespTypeEnum.values()).collect(Collectors.toMap(WSRespTypeEnum::getType, Function.identity())); } public static WSRespTypeEnum of (Integer type) { return cache.get(type); } }
用户模块
用户表设计
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 package com.abin.mallchat.common.user.domain.entity;import com.baomidou.mybatisplus.annotation.IdType;import com.baomidou.mybatisplus.annotation.TableField;import com.baomidou.mybatisplus.annotation.TableId;import com.baomidou.mybatisplus.annotation.TableName;import com.baomidou.mybatisplus.extension.handlers.JacksonTypeHandler;import lombok.*;import java.io.Serializable;import java.util.Date;@Data @EqualsAndHashCode(callSuper = false) @Builder @AllArgsConstructor @NoArgsConstructor @TableName(value = "user", autoResultMap = true) public class User implements Serializable { private static final long serialVersionUID = 1L ; public static Long UID_SYSTEM = 1L ; @TableId(value = "id", type = IdType.AUTO) private Long id; @TableField("name") private String name; @TableField("avatar") private String avatar; @TableField("sex") private Integer sex; @TableField("open_id") private String openId; @TableField("active_status") private Integer activeStatus; @TableField("last_opt_time") private Date lastOptTime; @TableField(value = "ip_info", typeHandler = JacksonTypeHandler.class) private IpInfo ipInfo; @TableField("item_id") private Long itemId; @TableField("status") private Integer status; @TableField("create_time") private Date createTime; @TableField("update_time") private Date updateTime; public void refreshIp (String ip) { if (ipInfo == null ) { ipInfo = new IpInfo(); } ipInfo.refreshIp(ip); } }
IM顶层设计
消息可靠ACK
消息重复
发送消息幂等
发送消息的一个随机md5,保证幂等标识 1s内的去重保证,
接收消息幂等
消息的唯一性,靠消息id来判断
离线消息推送
推拉模式
推拉结合
客户端维护一个游标,需要保证唯一性和有效性,全局递增的id
单聊群聊
抽象出一个房间表,房间id就是会话id
表设计单聊群聊 | DrawSQL
会话表 | DrawSQL
消息已读未读
微信最大只展示99条未读数
1 2 3 4 5 6 7 select count (0 ) from ( select 1 from msg where room_id = 我房间 and create_time> 我阅读 Limit 100 )
这样未读数最多扫描100条
消息表 | DrawSQL
redis 存储token
消息模块
rocketMQ
rocketmq
rocketmq 使用docker-compose部署
配置文件
1 vi data/rocketmq/broker/conf/broker.conf
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 # 所属集群名字 brokerClusterName=Defaultcluster# broker名字 brokerName=broker-a# 0表示master >0表示slave brokerId=0# nameServer地址,分号分割 # namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876 namesrvAddr=rocketmq-namesrv:9876# 这个很有讲究 如果是正式环境 这里一定要填写内网地址(安全) # 如果是用于测试或者本地这里建议要填外网地址,因为你的本地代码是无法连接到阿里云内网,只能连接外网。 brokerIP1=47.96.119.233# 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数 defaultTopicQueueNums=4# 是否允许Broker自动创建Topic,建议线下开启,线上关闭 autoCreateTopicEnable=true# 是否允许broker 自动创建订阅组,建议线下开启,线上关闭 autoCreateSubscriptionGroup=true# Broker 对外服务的监听端口, listenPort=10911# 此参数控制是否开启密码,不开启设置false aclEnable=false# 删除文件时间点,默认凌晨4点 deleteWhen=04# 文件保留时间,默认48小时 fileReservedTime=120# commitLog 每个文件的大小默认1G mapedFileSizeCommitLog=1073741824# ConsumeQueue 每个文件默认存 30W 条,根据业务情况调整 mapedFileSizeConsumeQueue=300000# destroyMapedFileIntervalForcibly=120000 # redeleteHangedFileInterval=120000 # 检测物理文件磁盘空间 diskMaxUsedSpaceRatio=88# Broker 的角色 # - ASYNC_MASTER 异步复制Master # - SYNC_MASTER 同步双写Master # - SLAVE brokerRole=ASYNC_MASTER# 刷盘方式 # - ASYNC_FLUSH 异步刷盘 # - SYNC_FLUSH 同步刷盘 flushDiskType=ASYNC_FLUSH
创建acl文件,用于开启用户名密码
1 vi data/rocketmq/broker/conf/plain_acl.yml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 # 全局白名单,如果配置了则不需要走acl校验,慎重配置 globalwhiteRemoteAddresses:# - 47.180.93.* # - 156.254.120. accounts: - accesskey: RocketMQ secretkey: 12345678 whiteRemoteAddress: admin: false defaultTopicPerm: DENY defaultGroupPerm: SUB topicPerms: - topiCA=DENY - topicB=PUB|SUB - topicC=SUB groupPerms : # the group should convert to retry topic - groupA=DENY - groupB=PUB|SUB - groupC=SUB - accessKey: galileoim # 密码不能小于6位数 secretKey: 12345678 whiteRemoteAddress: # if it is admin,it could access all resources 上面的用于教学,我们用超级管理员账号 admin: true
给console加上账号密码
1 vi data/rocketmq/console/data/users.properties
1 2 3 4 5 6 7 # Define Admin # 用户名和密码规则:[用户名=密码,权限],这里的权限为1表示管理员,0表示普通用户 galileoim=123456,1# Define Users # user1=user1 # user1=user1
创建yml文件
1 vi data/rocketmq/docker-compose.yml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 version: '3.5' services: rocketmq-namesrv: image: foxiswho/rocketmq:4.8.0 container_name: rocketmq-namesrv restart: always ports: - 9876:9876 volumes: - ./namesrv/logs/nameserver-a:/home/rocketmq/logs - ./namesrv/store:/home/rocketmq/store environment: JAVA_OPT_EXT: "-Duser.home=/home/rocketmq -Xms128m -Xmx128m -Xmn64m" command: ["sh","mqnamesrv"] networks: rocketmq_net: aliases: - rocketmq-namesrv rocketmq-broker: image: foxiswho/rocketmq:4.8.0 container_name: rocketmq-broker restart: always ports: - 10909:10909 - 10911:10911 volumes: - ./broker/logs:/home/rocketmq/logs - ./broker/store:/home/rocketmq/store - ./broker/conf/plain_acl.yml:/home/rocketmq/rocketmq-4.8.0/conf/plain_acl.yml - ./broker/conf/broker.conf:/etc/rocketmq/broker.conf environment: JAVA_OPT_EXT: "-Duser.home=/home/rocketmq -Xms128m -Xmx128m -Xmn64m" command: ["sh","mqbroker","-c","/etc/rocketmq/broker.conf"] depends_on: - rocketmq-namesrv networks: rocketmq_net: aliases: - rocketmq-broker rocketmq-console: image: iamverygood/rocketmq-console:4.7.1 container_name: rocketmq-console restart: always ports: - 8180:8080 volumes: - ./console/data:/tmp/rocketmq-console/data environment: JAVA_OPTS: "-Drocketmq.namesrv.addr=rocketmq-namesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" # -Drocketmq.config.loginRequired=true -Drocketmq.config.aclEnabled=true -Drocketmq.config.accessKey=galileoim -Drocketmq.config.secretKey=12345678 depends_on: - rocketmq-namesrv networks: rocketmq_net: aliases: - rocketmq-console networks: rocketmq_net: name: rocketmq_net driver: bridge
打开9876 ,10911 ,8180 这几个端口
到对应目录下启动容器
1 2 cd /data/rocketmq docker-compose up -d
本地消息表
确保本地操作和第三方保证一致 确保数据库操作和MQ一致,请求超时导致不一致性 使用超时态,使用定时重试 保证分布式一致性
我们讨论了分布式一致性的背最。导致不一致的场景(网络)。以及如何去保证一致性,讨论到持久化,重试,幂等。以及
保证一致性的几个方案,其中引出了本地消息表这种常见的方案。通过高度抽象,把对第三方的操作,!
转为对本地
方法执行的保证。通过操作记录和本地其他事务一同入库,持久化,通过spring的定时任务保证重试,通过
spring容器的getbean,来找到对应的方法,执行反射调用。
本文不但带你了解了常用的本息消息表企业级框架搭建方案。同时还了解了分布式不一致的具体背景。这些点,都
是在面试的时候,面试官会考验你对分布式的基本素养,需要用心去理解。
其他
线程池
线程池配置和使用
Executor 框架
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 package org.william.galileoim.common.common.config;import org.william.galileoim.common.common.factory.MyThreadFactory;import org.william.galileoim.transaction.annotation.SecureInvokeConfigurer;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.context.annotation.Primary;import org.springframework.scheduling.annotation.AsyncConfigurer;import org.springframework.scheduling.annotation.EnableAsync;import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;import java.util.concurrent.Executor;import java.util.concurrent.ThreadPoolExecutor;@Configuration @EnableAsync public class ThreadPoolConfig implements AsyncConfigurer , SecureInvokeConfigurer { public static final String GALILEOIM_EXECUTOR = "galileoimExecutor" ; public static final String WS_EXECUTOR = "websocketExecutor" ; public static final String AICHAT_EXECUTOR = "aichatExecutor" ; @Override public Executor getAsyncExecutor () { return galileoimExecutor(); } @Override public Executor getSecureInvokeExecutor () { return galileoimExecutor(); } @Bean(GALILEOIM_EXECUTOR) @Primary public ThreadPoolTaskExecutor galileoimExecutor () { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(10 ); executor.setMaxPoolSize(10 ); executor.setQueueCapacity(200 ); executor.setThreadNamePrefix("galileoim-executor-" ); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); executor.setThreadFactory(new MyThreadFactory(executor)); executor.initialize(); return executor; } @Bean(WS_EXECUTOR) public ThreadPoolTaskExecutor websocketExecutor () { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(16 ); executor.setMaxPoolSize(16 ); executor.setQueueCapacity(1000 ); executor.setThreadNamePrefix("websocket-executor-" ); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy()); executor.setThreadFactory(new MyThreadFactory(executor)); executor.initialize(); return executor; } @Bean(AICHAT_EXECUTOR) public ThreadPoolTaskExecutor chatAiExecutor () { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(10 ); executor.setMaxPoolSize(10 ); executor.setQueueCapacity(15 ); executor.setThreadNamePrefix("aichat-executor-" ); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy()); executor.setThreadFactory(new MyThreadFactory(executor)); return executor; } }
拦截
使用拦截器拦截请求 本地threadlocal存储jwt的token