总流程
Websocket模块
短轮询
长轮询
Websocket长连接
如何使用Tomcat实现WebSocket即时通讯服务服务端_tomcat websocket-CSDN博客
netty对websocket协议的实现_netty实现websocket-CSDN博客
为啥我选netty不用tomcat?
1.netty是nio基于事件驱动 的多路框架,使用单线程或少量线程处理大量的并发连接。相比之下,
Tomcat 是基于多线程的架构,每个连接都会分配一个线程,适用于处理相对较少的并发连接。最近
的 Tomcat 版本(如 Tomcat 8、9)引入了 NIO(New I/0)模型。所以这个点并不是重点。
2.Netty 提供了丰富的功能和组件,可以灵活地构建自定义的网络应用。它具有强大的编解码器和处理
器,可以轻松处理复杂的协议和数据格式。Netty的扩展性也非常好,可以根据需要添加自定义的组
件。比如我们可以用netty的pipeline方便的进行前置后置的处理,可以用netty的心跳处理器来检查连
接的状态。这些都是netty的优势。
IM顶层设计
消息可靠ACK
消息重复
发送消息幂等
发送消息的一个随机md5,保证幂等标识 1s内的去重保证,
接收消息幂等
消息的唯一性,靠消息id来判断
离线消息推送
推拉模式
推拉结合
客户端维护一个游标,需要保证唯一性和有效性,全局递增的id
单聊群聊
抽象出一个房间表,房间id就是会话id
表设计单聊群聊 | DrawSQL
会话表 | DrawSQL
消息已读未读
微信最大只展示99条未读数
1 2 3 4 5 6 7 select count (0 ) from ( select 1 from msg where room_id = 我房间 and create_time> 我阅读 Limit 100 )
这样未读数最多扫描100条
消息表 | DrawSQL
redis 存储token
消息模块
rocketMQ
rocketmq
rocketmq 使用docker-compose部署
配置文件
1 vi data/rocketmq/broker/conf/broker.conf
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 # 所属集群名字 brokerClusterName=Defaultcluster# broker名字 brokerName=broker-a# 0表示master >0表示slave brokerId=0# nameServer地址,分号分割 # namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876 namesrvAddr=rocketmq-namesrv:9876# 这个很有讲究 如果是正式环境 这里一定要填写内网地址(安全) # 如果是用于测试或者本地这里建议要填外网地址,因为你的本地代码是无法连接到阿里云内网,只能连接外网。 brokerIP1=47.96.119.233# 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数 defaultTopicQueueNums=4# 是否允许Broker自动创建Topic,建议线下开启,线上关闭 autoCreateTopicEnable=true# 是否允许broker 自动创建订阅组,建议线下开启,线上关闭 autoCreateSubscriptionGroup=true# Broker 对外服务的监听端口, listenPort=10911# 此参数控制是否开启密码,不开启设置false aclEnable=false# 删除文件时间点,默认凌晨4点 deleteWhen=04# 文件保留时间,默认48小时 fileReservedTime=120# commitLog 每个文件的大小默认1G mapedFileSizeCommitLog=1073741824# ConsumeQueue 每个文件默认存 30W 条,根据业务情况调整 mapedFileSizeConsumeQueue=300000# destroyMapedFileIntervalForcibly=120000 # redeleteHangedFileInterval=120000 # 检测物理文件磁盘空间 diskMaxUsedSpaceRatio=88# Broker 的角色 # - ASYNC_MASTER 异步复制Master # - SYNC_MASTER 同步双写Master # - SLAVE brokerRole=ASYNC_MASTER# 刷盘方式 # - ASYNC_FLUSH 异步刷盘 # - SYNC_FLUSH 同步刷盘 flushDiskType=ASYNC_FLUSH
创建acl文件,用于开启用户名密码
1 vi data/rocketmq/broker/conf/plain_acl.yml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 # 全局白名单,如果配置了则不需要走acl校验,慎重配置 globalwhiteRemoteAddresses:# - 47.180.93.* # - 156.254.120. accounts: - accesskey: RocketMQ secretkey: 12345678 whiteRemoteAddress: admin: false defaultTopicPerm: DENY defaultGroupPerm: SUB topicPerms: - topiCA=DENY - topicB=PUB|SUB - topicC=SUB groupPerms : # the group should convert to retry topic - groupA=DENY - groupB=PUB|SUB - groupC=SUB - accessKey: galileoim # 密码不能小于6位数 secretKey: 12345678 whiteRemoteAddress: # if it is admin,it could access all resources 上面的用于教学,我们用超级管理员账号 admin: true
给console加上账号密码
1 vi data/rocketmq/console/data/users.properties
1 2 3 4 5 6 7 # Define Admin # 用户名和密码规则:[用户名=密码,权限],这里的权限为1表示管理员,0表示普通用户 galileoim=123456,1# Define Users # user1=user1 # user1=user1
创建yml文件
1 vi data/rocketmq/docker-compose.yml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 version: '3.5' services: rocketmq-namesrv: image: foxiswho/rocketmq:4.8.0 container_name: rocketmq-namesrv restart: always ports: - 9876:9876 volumes: - ./namesrv/logs/nameserver-a:/home/rocketmq/logs - ./namesrv/store:/home/rocketmq/store environment: JAVA_OPT_EXT: "-Duser.home=/home/rocketmq -Xms128m -Xmx128m -Xmn64m" command: ["sh","mqnamesrv"] networks: rocketmq_net: aliases: - rocketmq-namesrv rocketmq-broker: image: foxiswho/rocketmq:4.8.0 container_name: rocketmq-broker restart: always ports: - 10909:10909 - 10911:10911 volumes: - ./broker/logs:/home/rocketmq/logs - ./broker/store:/home/rocketmq/store - ./broker/conf/plain_acl.yml:/home/rocketmq/rocketmq-4.8.0/conf/plain_acl.yml - ./broker/conf/broker.conf:/etc/rocketmq/broker.conf environment: JAVA_OPT_EXT: "-Duser.home=/home/rocketmq -Xms128m -Xmx128m -Xmn64m" command: ["sh","mqbroker","-c","/etc/rocketmq/broker.conf"] depends_on: - rocketmq-namesrv networks: rocketmq_net: aliases: - rocketmq-broker rocketmq-console: image: iamverygood/rocketmq-console:4.7.1 container_name: rocketmq-console restart: always ports: - 8180:8080 volumes: - ./console/data:/tmp/rocketmq-console/data environment: JAVA_OPTS: "-Drocketmq.namesrv.addr=rocketmq-namesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" # -Drocketmq.config.loginRequired=true -Drocketmq.config.aclEnabled=true -Drocketmq.config.accessKey=galileoim -Drocketmq.config.secretKey=12345678 depends_on: - rocketmq-namesrv networks: rocketmq_net: aliases: - rocketmq-console networks: rocketmq_net: name: rocketmq_net driver: bridge
打开9876 ,10911 ,8180 这几个端口
本地消息表
确保本地操作和第三方保证一致 确保数据库操作和MQ一致,请求超时导致不一致性 使用超时态,使用定时重试 保证分布式一致性
我们讨论了分布式一致性的背最。导致不一致的场景(网络)。以及如何去保证一致性,讨论到持久化,重试,幂等。以及
保证一致性的几个方案,其中引出了本地消息表这种常见的方案。通过高度抽象,把对第三方的操作,!
转为对本地
方法执行的保证。通过操作记录和本地其他事务一同入库,持久化,通过spring的定时任务保证重试,通过
spring容器的getbean,来找到对应的方法,执行反射调用。
本文不但带你了解了常用的本息消息表企业级框架搭建方案。同时还了解了分布式不一致的具体背景。这些点,都
是在面试的时候,面试官会考验你对分布式的基本素养,需要用心去理解。
其他
线程池
线程池配置和使用
Executor 框架
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 package org.william.galileoim.common.common.config;import org.william.galileoim.common.common.factory.MyThreadFactory;import org.william.galileoim.transaction.annotation.SecureInvokeConfigurer;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.context.annotation.Primary;import org.springframework.scheduling.annotation.AsyncConfigurer;import org.springframework.scheduling.annotation.EnableAsync;import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;import java.util.concurrent.Executor;import java.util.concurrent.ThreadPoolExecutor;@Configuration @EnableAsync public class ThreadPoolConfig implements AsyncConfigurer , SecureInvokeConfigurer { public static final String GALILEOIM_EXECUTOR = "galileoimExecutor" ; public static final String WS_EXECUTOR = "websocketExecutor" ; public static final String AICHAT_EXECUTOR = "aichatExecutor" ; @Override public Executor getAsyncExecutor () { return galileoimExecutor(); } @Override public Executor getSecureInvokeExecutor () { return galileoimExecutor(); } @Bean(GALILEOIM_EXECUTOR) @Primary public ThreadPoolTaskExecutor galileoimExecutor () { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(10 ); executor.setMaxPoolSize(10 ); executor.setQueueCapacity(200 ); executor.setThreadNamePrefix("galileoim-executor-" ); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); executor.setThreadFactory(new MyThreadFactory(executor)); executor.initialize(); return executor; } @Bean(WS_EXECUTOR) public ThreadPoolTaskExecutor websocketExecutor () { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(16 ); executor.setMaxPoolSize(16 ); executor.setQueueCapacity(1000 ); executor.setThreadNamePrefix("websocket-executor-" ); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy()); executor.setThreadFactory(new MyThreadFactory(executor)); executor.initialize(); return executor; } @Bean(AICHAT_EXECUTOR) public ThreadPoolTaskExecutor chatAiExecutor () { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(10 ); executor.setMaxPoolSize(10 ); executor.setQueueCapacity(15 ); executor.setThreadNamePrefix("aichat-executor-" ); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy()); executor.setThreadFactory(new MyThreadFactory(executor)); return executor; } }
拦截
使用拦截器拦截请求 本地threadlocal存储jwt的token