摘要:使用 Spring Boot、Redis Pub/Sub 和 Redis Streams 水平扩展 WebSocket 服务器我的 WebSocket 服务器系,实现高并发WebSocket服务:Spring 实现高并发的文件存储方式是

运用 Spring Boot、Redis Pub/Sub 和 Redis Streams 水平扩展 WebSocket 服务器</p>n<p>我的 WebSocket 服务器系列</p>n<p>01:在微服务架构中构建 WebSocket 服务器</p>n<p>02:运用公开-订阅玩法横给扩展 WebSocket 服务器的设计注意事项</p>n<p>03:运用 Spring Boot、Redis Pub/Sub 和 Redis Streams 实现可扩展的 WebSocket 服务器</p>n<p>快速回顾</p>n<p>运用公开-订阅玩法在微服务架构中扩展 WebSocket 服务器的完整设计</p>n<p>在上一篇文章中,大家确定了水平扩展 WebSocket 服务器和后端微服务时会出现的两个难题:</p>n<p>难题 #1:由于负载均衡器导致的消息丢失</p>n<p>难题 #2:由于多个订阅者导致的重复消息处理</p>n<p>化解方法是将带有消费者群体概念的公开-订阅消息传递玩法应用于架构设计。</p>n<p>开始</p>n<p>接下来运用 Spring Boot、Stomp、Redis Pub/Sub 和 Redis Streams 构建可扩展的 WebSocket 服务器。</p>n<p>第 1 步:构建 WebSocket 服务器</p>n<p>按照我之前文章的第 1 步和第 2 步,运用 Spring Boot 和 STOMP 消息传递协议初始化 WebSocket 服务器。</p>n<p>第二步:启动 Redis 服务器</p>n<p>如需快速配置,请运用 docker 在本地运行 Redis 服务器。</p>n<p>docker run --name redis -p 6379:6379 -d redis</p>n<p>第 3 步:配置和 Redis 服务器的连接</p>n<p>在WebSocket服务器的文件中添加如下配置,application.yml连接Redis服务器。</p>n<p># application.yml</p>n<p>spring.redis:</p>n<p>主机: localhost</p>n<p>端口: 6379</p>n<p>第 4 步:为单给实时通信实施 Pub/Sub(广播频道)</p>n<p>运用 API 和 Pub/Sub(广播)的单给实时通信设计</p>n<p>流程 4.1:创建 BroadcastEvent 类</p>n<p>BroadcastEvent一个自定义对象,用于将消息从 WebSocket 服务器的壹个实例广播到 WebSocket 服务器的全部实例。</p>n<p>data class BroadcastEvent( @JsonProperty("topic") val topic: String, @JsonProperty("message") val message: String): Serializable</p>n<p>流程 4.2:配置 Redis Pub/Sub — ReactiveRedisTemplate</p>n<p>ReactiveRedisTemplate一个帮助类,用于简化 Redis 数据访问代码。在大家的配置中,大家公开/订阅该值BroadcastEvent并Jackson2JsonRedisSerializer用于执行该值的自动序列化/反序列化。</p>n<p>@Configurationclass RedisConfig { @Bean fun reactiveRedisTemplate(factory: LettuceConnectionFactory): ReactiveRedisTemplate<String, BroadcastEvent> { val serializer = Jackson2JsonRedisSerializer(BroadcastEvent::class.java) val builder = RedisSerializationContext.newSerializationContext<String, BroadcastEvent>(StringRedisSerializer()) val context = builder.value(serializer).build() return ReactiveRedisTemplate(factory, context) }}</p>n<p>流程 4.3:配置 Redis Pub/Sub — 广播服务</p>n<p>RedisBroadcastService包含公开和订阅自定义频道 ( BROADCAST-CHANNEL) 的逻辑。这是用于将消息从 WebSocket 服务器的壹个实例广播到 WebSocket 服务器的全部实例的通道。</p>n<p>@Serviceclass RedisBroadcastService( private val reactiveRedisTemplate: ReactiveRedisTemplate<String, BroadcastEvent>, private val websocketTemplate: SimpMessagingTemplate) { fun publish(event: BroadcastEvent) { reactiveRedisTemplate.convertAndSend("BROADCAST-CHANNEL", event).subscribe() } @PostConstruct fun subscribe() { reactiveRedisTemplate.listenTo(ChannelTopic.of("BROADCAST-CHANNEL")) .map(ReactiveSubscription.Message<String, BroadcastEvent>::getMessage) .subscribe { message -> websocketTemplate.convertAndSend(message.topic, message.message) } }}</p>n<p>注意:@PostConstruct一个 Spring 注释,它允许大家将自定义操作附加到 bean 创建,而且这些方式只运行一次。在大家的例子中,大家订阅了BROADCAST-CHANNELon bean 创建。</p>n<p>流程 4.4:创建 API 端点</p>n<p>下面的代码创建壹个带有 POST 请求端点的 REST 控制器,该端点接收请求正文NewMessageRequest。topic是客户端(前端)订阅的 STOMP 目的地,是message字符串格式的实际消息。</p>n<p>@RestController@RequestMapping("/api/notification")class NotificationController(private val redisBroadcastService: RedisBroadcastService) { @PostMapping fun newMessage(@RequestBody request: NewMessageRequest) { val event = BroadcastEvent(request.topic, request.message) redisBroadcastService.publish(event) }}</p>n<p>API 请求将被广播到上面流程 4.3 中配置的全部 WebSocket 服务器实例。</p>n<p>流程 4.5:通过 API 测试单给实时通信</p>n<p>启动 WebSocket 服务器,运用WebSocket 调试工具ws://localhost:8080/stomp通过 STOMP 协议连接到 WebSocket 服务器。连接后,配置 WebSocket 调试器工具以订阅主题。</p>n<p style=\"text-align:center;\"></p>n<p>接下来,运用下面内容 curl 命令给 WebSocket 服务器发送 HTTP POST 请求:</p>n<p>curl -X POST -d '{"topic": "/topic/frontend", "message": "测试 API 端点" }' -H 'Content-Type: application/json' localhost:8080/api/notification</p>n<p>WebSocket 调试器工具应具有如下所示的输出:</p>n<p>WebSocket 调试器工具的输出屏幕截图</p>n<p>这表明大家已经成功地为 WebSocket 服务器配置了 Redis Pub/Sub,以实现后端微服务和 Web 应用程序(前端)之间的可扩展单给实时通信。</p>n<p>第 5 步:实施 Pub/Sub 和消费者组的双给实时通信</p>n<p>运用 Pub/Sub 和消费者组的双给实时通信设计</p>n<p>在第 5 步中,大家将运用 Redis Streams 作为大家的 Pub/Sub 体系,用于后端微服务和 Web 应用程序(前端)之间的双给实时通信。大家没有运用 Redis Pub/Sub,由于它不支持消费者组的概念。</p>n<p>流程 5.1:创建 StreamDataEvent 类</p>n<p>StreamDataEvent是订阅者和公开者之间数据交换的自定义对象。message是字符串格式的实际消息,是topicWebSocket 服务器了解要将消息发送到何者 STOMP 目标的必填字段。</p>n<p>data class StreamDataEvent ( @JsonProperty("message") val message: String, @JsonProperty("topic") val topic: String? = null,)</p>n<p>流程 5.2:WebSocket 服务器——实现 Redis 流消费者</p>n<p>注意:不需要广播消息,由于全部 WebSocket 服务器实例都会从 Redis Streams 接收消息。</p>n<p>@Serviceclass RedisStreamConsumer( private val websocketTemplate: SimpMessagingTemplate): StreamListener<String, ObjectRecord<String, StreamDataEvent>> { companion object { private val logger = LoggerFactory.getLogger(RedisStreamConsumer::class.java) } override fun onMessage(record: ObjectRecord<String, StreamDataEvent>) { logger.info("[NEW] --> received message: ${record.value} from stream: ${record.stream}") record.value.topic?.let { destination -> websocketTemplate.convertAndSend(destination, record.value.message) } }}</p>n<p>流程 5.3:WebSocket 服务器——实现 Redis 流配置</p>n<p>下面内容代码包含订阅 Redis 流的配置,其中消息将由RedisStreamConsumer大家在流程 5.2 中配置的处理。</p>n<p>在这里,大家将 WebSocket 服务器配置为侦听由 key 标识的流TEST_EVENT_TO_WEBSOCKET_SERVER。无论兄弟们可以根据无论兄弟们的用例创建更多订阅。</p>n<p>@Configurationclass RedisStreamConfig(private val streamListener: StreamListener<String, ObjectRecord<String, StreamDataEvent>>) { private fun initListenerContainer(redisConnectionFactory: RedisConnectionFactory): StreamMessageListenerContainer<String, ObjectRecord<String, StreamDataEvent>> { val options = StreamMessageListenerContainer .StreamMessageListenerContainerOptions .builder() .pollTimeout(Duration.ofSeconds(1)) .targetType(StreamDataEvent::class.java) .build() return StreamMessageListenerContainer.create(redisConnectionFactory, options) } @Bean("TestEventSubscription") fun subscription(redisConnectionFactory: RedisConnectionFactory): Subscription { val listenerContainer = initListenerContainer(redisConnectionFactory) val subscription = listenerContainer.receive(StreamOffset.latest("TEST_EVENT_TO_WEBSOCKET_SERVER"), streamListener) listenerContainer.start() return subscription }}</p>n<p>流程 5.4:WebSocket 服务器——实现 Redis 流生产者</p>n<p>流程 5.5 WebSocket 服务器——实现 WebSocket 配置</p>n<p>注意:不需要将消息广播到全部 WebSocket 实例,由于公开到 Redis Streams 已经确保全部后端微服务都接收到消息。有关详细信息,请参阅流程 5 中的图表。</p>n<p>流程 5.6:后端微服务——实现 Redis 流消费者</p>n<p>同样,在示例后端微服务中,实现 Redis 流消费者。</p>n<p>@Serviceclass RedisStreamConsumer: StreamListener<String, ObjectRecord<String, StreamDataEvent>> { companion object { private val logger = LoggerFactory.getLogger(RedisStreamConsumer::class.java) } override fun onMessage(record: ObjectRecord<String, StreamDataEvent>?) { logger.info("[NEW] --> received message: ${record?.value} from stream: ${record?.stream}") }}</p>n<p>流程 5.6:后端微服务——实现 Redis 流配置</p>n<p>这里的配置和 WebSocket 服务器的配置类似。唯一的不同差异是大家添加了消费者组 ( CONSUMER_GROUP),它确保只有壹个后端微服务实例会运用来自 Redis 流的数据。</p>n<p>注意:也可以运用代码来实现这一点,但我将运用 Redis CLI 命令来保持简单。</p>n<p>流程 5.7:后端微服务——实现 Redis 流生产者</p>n<p>生产者配置类似于 WebSocket 服务器配置。</p>n<p>请注意,微服务有壹个定期公开到 Redis 流的规划作业,而且/topic/to-frontend作为大家示例的一部分,该消息经过精心设计以发送到目标主题的 Web 应用程序(前端)。</p>n<p>@Serviceclass RedisStreamProducer( private val reactiveRedisTemplate: ReactiveRedisTemplate<String, String>, @Value("\\${spring.application.name}") private val applicationName: String,) { companion object { private val atomicInteger = AtomicInteger(0) private val logger = LoggerFactory.getLogger(RedisStreamProducer::class.java) } fun publishEvent(streamTopic: String, data: StreamDataEvent) { val record = StreamRecords.newRecord().ofObject(data).withStreamKey(streamTopic) reactiveRedisTemplate.opsForStream<String, String>().add(record).subscribe() } @Scheduled(initialDelay = 10000, fixedRate = 5000) fun publishTestMessageToBackend() { val data = StreamDataEvent( topic = "/topic/to-frontend", message = "New Message from $applicationName -- ID = ${atomicInteger.incrementAndGet()}" ) logger.info("Publishing Message: $data to Stream: TEST_EVENT_TO_WEBSOCKET_SERVER") publishEvent("TEST_EVENT_TO_WEBSOCKET_SERVER", data) }}</p>n<p>流程 5.8:通过 Pub/Sub 测试双给实时通信</p>n<p>大家已经配置了 WebSocket 服务器和示例后端微服务。让大家运用大家在两者中进行的规划数据公开配置来测试来自 Redis 流的数据的公开和订阅RedisStreamProducer。</p>n<p>启动 WebSocket 服务器的两个实例和示例后端微服务的两个实例。无论兄弟们应该注意到输出日志和下面的类似。</p>n<p>Figure 6 后端微服务的输出日志(实例 A)</p>n<p>Figure 7 后端微服务的输出日志(实例 B)</p>n<p>Figure 8 WebSocket 服务器的输出日志(实例 A)</p>n<p>Figure 9 WebSocket 的输出日志(实例 B)</p>n<p>如果无论兄弟们要运用WebSocket 调试器工具连接到 WebSocket 服务器并订阅主题/topic/to-frontend,无论兄弟们应该会看到下面内容日志:</p>n<p>Figure 10 WebSocket 调试器工具的输出日志(前端)</p>n<p>这表明大家已经成功地为 WebSocket 服务器配置了 Redis Streams,以实现后端微服务和 Web 应用程序(前端)之间的可扩展双给实时通信。