Spring BlockingQueue 另一种生产者消费者问题
前提:
我使用了 Spring + Websocket + Stomp 的方式在页面上显示内容。后端产生内容,放入到一个 BlockingQueue 中,当浏览器打开相应页面时,显示内容,当浏览器关闭页面时,不显示内容,但是后端还是会继续产生并放入队列。
配置文件
@Configuration
@EnableWebSocketMessageBroker
@EnableAsync
public class WebsocketConfiguration implements WebSocketMessageBrokerConfigurer {
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.enableSimpleBroker("/topic");
config.setApplicationDestinationPrefixes("/app");
}
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/stock-ticks").withSockJS();
}
}
@Configuration
public class MyQueueConfiguration {
@Bean(name = "saveProductQueue")
public BlockingQueue<String> saveProductQueue() {
return new LinkedBlockingQueue<>();
}
}
Controller
@Controller
public class StockTicksController {
@Autowired
private ProducerService producerService;
@GetMapping("/")
public String index() {
producerService.sendTicks();
return "index";
}
}
ProducerService
@Service
public class ProducerService {
@Resource
private BlockingQueue<String> saveProductQueue;
@Async
@SneakyThrows
public void sendTicks() {
while (true) {
saveProductQueue.add(getStockTicks());
System.out.println(saveProductQueue);
TimeUnit.SECONDS.sleep(5);
}
}
private String getStockTicks() {
String time = new SimpleDateFormat("HH:mm:ss").format(new Date());
return "Hello, World: " + time;
}
}
生产者每5秒将 Hello, World 和 当前时间写入到队列,并且打印出队列
ConsumerService
@Service
public class ConsumerService {
private final TaskScheduler taskScheduler;
private ScheduledFuture<?> yourTaskState;
private final SimpMessagingTemplate simpMessagingTemplate;
@Resource
private BlockingQueue<String> saveProductQueue;
@Autowired
public ConsumerService(TaskScheduler taskScheduler, SimpMessagingTemplate simpMessagingTemplate) {
this.taskScheduler = taskScheduler;
this.simpMessagingTemplate = simpMessagingTemplate;
}
class ScheduledTaskExecutor implements Runnable{
@Override
public void run() {
while (!saveProductQueue.isEmpty())
simpMessagingTemplate.convertAndSend("/topic/ticks", saveProductQueue.poll());
}
}
public void start(){
long fixedRate = 1000L;
yourTaskState = taskScheduler.scheduleAtFixedRate(new ScheduledTaskExecutor(), fixedRate);
}
public void stop(){
yourTaskState.cancel(true);
}
}
消费者将队列中的消息取出然后发送出去
因为要实现打开页面后开始发送消息,关闭页面后停止发送,所以使用了事件监听:
@Component
public class ConnectListenerEvent implements ApplicationListener<SessionConnectedEvent> {
@Autowired
private ConsumerService consumerService;
@Override
public void onApplicationEvent(SessionConnectedEvent sessionConnectedEvent) {
consumerService.start();
}
}
当连接上时开始消费者服务
@Component
public class DisconnectListenerEvent implements ApplicationListener<SessionDisconnectEvent> {
@Autowired
private ConsumerService consumerService;
@Override
public void onApplicationEvent(SessionDisconnectEvent sessionDisconnectEvent) {
consumerService.stop();
}
}
当断开时停止消费者服务
html页面
<script type="text/javascript">
var stompClient = null;
function connect() {
var socket = new SockJS('/stock-ticks');
stompClient = Stomp.over(socket);
stompClient.connect({}, function(frame) {
console.log('Connected: ' + frame);
stompClient.subscribe('/topic/ticks', function(ticks) {
showProcessMessage(ticks);
});
});
}
function showProcessMessage(processMessage) {
let response = document.getElementById('response');
let p = document.createElement("p");
p.style.wordBreak = "break-word";
p.appendChild(document.createTextNode(processMessage.body));
response.appendChild(p);
}
function disconnect() {
if(stompClient != null) {
stompClient.disconnect();
}
console.log("Disconnected");
}
</script>
现在遇到的问题是

更新:
其实问题在于simpMessagingTemplate.convertAndSend("/topic/ticks", saveProductQueue.poll());,当发送批量数据的时候要加上延时,可能短时间内一次性发送数据的时候 STOMP 协议没处理好,导致粘包了。BlockingQueue队列里面还是有之前的数据的,只是前端页面没显示出来。