Spring BlockingQueue 另一种生产者消费者问题

Spring BlockingQueue 另一种生产者消费者问题

前提:

我使用了 Spring + Websocket + Stomp 的方式在页面上显示内容。后端产生内容,放入到一个 BlockingQueue 中,当浏览器打开相应页面时,显示内容,当浏览器关闭页面时,不显示内容,但是后端还是会继续产生并放入队列。

配置文件

@Configuration
@EnableWebSocketMessageBroker
@EnableAsync
public class WebsocketConfiguration implements WebSocketMessageBrokerConfigurer {
    @Override
    public void configureMessageBroker(MessageBrokerRegistry config) {
        config.enableSimpleBroker("/topic");
        config.setApplicationDestinationPrefixes("/app");
    }
    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/stock-ticks").withSockJS();
    }
}
@Configuration
public class MyQueueConfiguration {
    @Bean(name = "saveProductQueue")
    public BlockingQueue<String> saveProductQueue() {
        return new LinkedBlockingQueue<>();
    }
}

Controller

@Controller
public class StockTicksController {
    @Autowired
    private ProducerService producerService;

    @GetMapping("/")
    public String index() {
        producerService.sendTicks();
        return "index";
    }

}

ProducerService

@Service
public class ProducerService {

    @Resource
    private BlockingQueue<String> saveProductQueue;
    @Async
    @SneakyThrows
    public void sendTicks() {
        while (true) {
            saveProductQueue.add(getStockTicks());
            System.out.println(saveProductQueue);
            TimeUnit.SECONDS.sleep(5);
        }
    }
    private String getStockTicks() {
        String time = new SimpleDateFormat("HH:mm:ss").format(new Date());
        return "Hello, World: " + time;
    }
}

生产者每5秒将 Hello, World 和 当前时间写入到队列,并且打印出队列

ConsumerService

@Service
public class ConsumerService {
    private final TaskScheduler taskScheduler;
    private ScheduledFuture<?> yourTaskState;
    private final SimpMessagingTemplate simpMessagingTemplate;
    @Resource
    private BlockingQueue<String> saveProductQueue;
    @Autowired
    public ConsumerService(TaskScheduler taskScheduler,  SimpMessagingTemplate simpMessagingTemplate) {
        this.taskScheduler = taskScheduler;
        this.simpMessagingTemplate = simpMessagingTemplate;
    }
    class ScheduledTaskExecutor implements Runnable{
        @Override
        public void run() {
            while (!saveProductQueue.isEmpty())
            simpMessagingTemplate.convertAndSend("/topic/ticks", saveProductQueue.poll());
        }
    }
    public void start(){
        long fixedRate = 1000L;
        yourTaskState = taskScheduler.scheduleAtFixedRate(new ScheduledTaskExecutor(), fixedRate);
    }
    public void stop(){
        yourTaskState.cancel(true);
    }
}

消费者将队列中的消息取出然后发送出去

因为要实现打开页面后开始发送消息,关闭页面后停止发送,所以使用了事件监听:

@Component
public class ConnectListenerEvent implements ApplicationListener<SessionConnectedEvent> {

    @Autowired
    private ConsumerService consumerService;

    @Override
    public void onApplicationEvent(SessionConnectedEvent sessionConnectedEvent) {
        consumerService.start();
    }
}

当连接上时开始消费者服务

@Component
public class DisconnectListenerEvent implements ApplicationListener<SessionDisconnectEvent> {
    @Autowired
    private ConsumerService consumerService;

    @Override
    public void onApplicationEvent(SessionDisconnectEvent sessionDisconnectEvent) {
        consumerService.stop();
    }
}

当断开时停止消费者服务

html页面

  <script type="text/javascript">
    var stompClient = null;

    function connect() {
      var socket = new SockJS('/stock-ticks');
      stompClient = Stomp.over(socket);
      stompClient.connect({}, function(frame) {
        console.log('Connected: ' + frame);
        stompClient.subscribe('/topic/ticks', function(ticks) {
          showProcessMessage(ticks);
        });
      });
    }

    function showProcessMessage(processMessage) {
      let response = document.getElementById('response');
      let p = document.createElement("p");
      p.style.wordBreak = "break-word";
      p.appendChild(document.createTextNode(processMessage.body));
      response.appendChild(p);
    }

    function disconnect() {
      if(stompClient != null) {
        stompClient.disconnect();
      }
      console.log("Disconnected");
    }

  </script>

现在遇到的问题是


更新:

其实问题在于simpMessagingTemplate.convertAndSend("/topic/ticks", saveProductQueue.poll());,当发送批量数据的时候要加上延时,可能短时间内一次性发送数据的时候 STOMP 协议没处理好,导致粘包了。BlockingQueue队列里面还是有之前的数据的,只是前端页面没显示出来。