Skip to content

SSE封装

API:

java
@Api(tags = "SSE推送接口")
@RestController
@RequestMapping("/sse")
@AllArgsConstructor
public class ServerSentEventsController {

    private ServerSentEventsService serverSentEventsService;

    @ApiOperation("鉴权SSE推送接口")
    @GetMapping(path = "/authorization", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public SseEmitter streamEventsForAuthorization() {

        return serverSentEventsService.createConnect();
    }

}

服务:

java
public interface ServerSentEventsService {
    /**
     * 创建连接
     * @return SseEmitter对象
     */
    SseEmitter createConnect();

    /**
     * 根据客户端ID,获取SseEmitter对象
     * @param clientId 客户端ID
     * @return
     */
    Set<SseEmitter> getSseEmittersByClientId(String clientId);

    /**
     * 给所有的客户端发送消息
     * @param content
     * @param sseEnums
     */
    <T> void sendMessageToAllClient(T content, SseEnums sseEnums);

    /**
     * 给指定的客户端发送消息
     * @param content
     * @param clientId
     * @param sseEnums
     */
    <T> void sendMessageToClient(T content,String clientId, SseEnums sseEnums);


    /**
     * 关闭连接
     * @param clientId
     */
    void closeConnect(String clientId);
}

服务实现:

java
@Slf4j
@Service
public class ServerSentEventsImpl implements ServerSentEventsService {


    /**
     * 容器,保存连接,用于输出返回
     */
    private static final Map<String, Set<SseEmitter>> sseCache = new ConcurrentHashMap<>();

    /**
     * 异步事件总线
     */
    private final ScheduledExecutorService executor;

    private final List<SseInitializerService> initializers;

    public ServerSentEventsImpl(@Qualifier("sseTaskScheduler") ScheduledExecutorService executor,
                                List<SseInitializerService> initializers
                                ) {
        this.executor = executor;

        this.initializers = initializers;
    }

    @Override
    public SseEmitter createConnect() {
        String clientId = SecurityUtils.getUserId().toString();

        // 设置超时时间,0表示不过期。默认30毫秒,超过时间未完成会抛出异常:AsyncRequestTimeoutException
        SseEmitter sseEmitter = new SseEmitter(0L);

        // 长链接完成后回调接口(即关闭连接时调用)
        sseEmitter.onCompletion(completionCallBack(clientId, sseEmitter));
        // 连接超时回调
        sseEmitter.onTimeout(timeoutCallBack(clientId, sseEmitter));
        // 推送消息异常时调用
        sseEmitter.onError(errorCallBack(clientId, sseEmitter));
        // 获取当前用户的全部连接
        Set<SseEmitter> emitters = sseCache.get(clientId);
        // 没有连接时新建set集合保存连接
        if (emitters == null) {
            emitters = ConcurrentHashMap.newKeySet();
        }
        // 添加当前的连接
        emitters.add(sseEmitter);
        // 保存到用户集合中
        sseCache.put(clientId, emitters);
        log.info("创建新的sse连接,当前用户:{}    累计用户:{}", clientId, sseCache.size());
        try {
            // 发送一条连接成功消息
            initConnect(sseEmitter, clientId);
        } catch (IOException e) {
            log.error("创建长链接异常,客户端ID:{}   异常信息:{}", clientId, e.getMessage());
        }
        return sseEmitter;
    }

    /**
     * 初始化连接,发送连接成功消息
     * @param sseEmitter
     * @param clientId
     * @throws IOException
     */
    private void initConnect(SseEmitter sseEmitter, String clientId) throws IOException {
        // 自动调用所有 SseInitializerService 的 initialize 方法
        for (SseInitializerService initializer : initializers) {
            initializer.initialize(sseEmitter, clientId);
        }
    }

    /**
     * 推送消息异常时,回调方法
     *
     * @param clientId
     * @return
     */
    private Consumer<Throwable> errorCallBack(String clientId, SseEmitter sseEmitter) {
        return throwable -> {
            log.error("SseEmitterServiceImpl[errorCallBack]:连接异常,客户端ID:{}", clientId, throwable);
            // 如果连接异常,移除当前连接
            if (sseCache.containsKey(clientId)) {
                removeSseEmitter(clientId, sseEmitter);
            } else {
                log.warn("客户端ID:{} 不存在,无法移除连接", clientId);
            }
        };
    }

    /**
     * 长连接超时时调用
     *
     * @param clientId   客户端id
     * @param sseEmitter sse事件
     * @return
     */
    private Runnable timeoutCallBack(String clientId, SseEmitter sseEmitter) {
        return () -> {
            log.info("连接超时:用户:{} - 连接:{}", clientId, sseEmitter.hashCode());
            removeSseEmitter(clientId, sseEmitter);
        };
    }

    /**
     * 长连接完成后回调接口(即关闭连接时调用)
     *
     * @param clientId   客户端id
     * @param sseEmitter sse事件
     * @return
     */
    private Runnable completionCallBack(String clientId, SseEmitter sseEmitter) {
        return () -> {
            log.info("结束连接:用户:{} - 连接:{}", clientId, sseEmitter.hashCode());
            removeSseEmitter(clientId, sseEmitter);
        };
    }

    /**
     * 移除客户端连接 此方法会将客户端下全部连接移除
     *
     * @param clientId 客户端ID
     */
    private void removeClient(String clientId) {
        sseCache.remove(clientId);
        log.info("移除用户:{}", clientId);
    }

    /**
     * 移除客户端下指定的SseEmitter连接
     *
     * @param clientId   客户端ID
     * @param sseEmitter 指定的SseEmitter连接
     */
    private void removeSseEmitter(String clientId, SseEmitter sseEmitter) {
        sseCache.get(clientId).remove(sseEmitter);
        if (sseCache.get(clientId).isEmpty()) {
            sseCache.remove(clientId);
            log.info("用户连接数为空,移除用户:{}", clientId);
        }

    }

    /**
     * 获取指定客户端的SseEmitter集合
     *
     * @param clientId 客户端ID
     * @return
     */
    @Override
    public Set<SseEmitter> getSseEmittersByClientId(String clientId) {
        return sseCache.get(clientId);
    }

    /**
     * 向所有客户端发送消息
     *
     * @param content  消息内容
     * @param sseEnums 事件枚举
     */
    @Override
    public <T> void sendMessageToAllClient(T content, SseEnums sseEnums) {
        if (MapUtil.isEmpty(sseCache)) {
            return;
        }
        // 判断发送的消息是否为空
        for (Map.Entry<String, Set<SseEmitter>> entry : sseCache.entrySet()) {
            MessageVo<T> messageVo = new MessageVo<T>();
            messageVo.setClientId(entry.getKey());
            messageVo.setData(content);
            sendMessageToClientByClientId(entry.getKey(), messageVo, entry.getValue(), sseEnums);
        }
    }

    /**
     * 向指定客户端发送消息
     *
     * @param clientId    客户端ID
     * @param messageVo   消息内容
     * @param sseEmitters 客户端的SseEmitter集合
     * @param sseEnums    事件枚举
     */
    private <T> void sendMessageToClientByClientId(
            String clientId, MessageVo<T> messageVo, Set<SseEmitter> sseEmitters, SseEnums sseEnums) {
        if (null == sseEmitters || sseEmitters.isEmpty()) {
            log.error("推送消息失败:客户端{}未创建长链接,失败消息:{}",
                    clientId, messageVo.toString());
            return;
        }
        SseEmitter.SseEventBuilder sendData = SseEmitter
                .event()
                .id(String.valueOf(HttpStatus.OK))
                .name(sseEnums.getEventType())
                .data(messageVo, MediaType.APPLICATION_JSON);

        sseEmitters.forEach(
                sseEmitter -> {
                    try {
                        sseEmitter.send(sendData);
                    } catch (IOException e) {
                        // 推送消息失败,记录错误日志,进行重推
                        log.error("推送消息失败:客户端{} - {}:{},尝试进行重推", clientId, sseEmitter.hashCode(), messageVo);
                        sendMessageWithRetry(sseEmitter, clientId, sendData);
                    }
                });

    }

    /**
     * 重试发送消息
     *
     * @param emitter  SseEmitter 实例
     * @param clientId 客户端ID
     */
    private void sendMessageWithRetry(SseEmitter emitter, String clientId, SseEmitter.SseEventBuilder sendData) {
        AtomicInteger retryCount = new AtomicInteger(0);
        Runnable task = new Runnable() {
            @Override
            public void run() {
                if (retryCount.incrementAndGet() > 5) {
                    // 重试次数超过5次,停止重试
                    return;
                }
                ;
                try {
                    emitter.send(sendData);
                    log.info("第 {} 次重推成功: {}", retryCount.get(), clientId);
                } catch (IllegalStateException e) {
                    log.error("推送消息失败:客户端 {} 已经断开连接,无法发送消息: {}", clientId, e.getMessage());
                    removeClient(clientId);
                } catch (Exception e) {
                    log.error(e.getMessage());
                    log.warn("第 {} 次重推失败: {}", retryCount.get(), clientId);
                    executor.schedule(this, 5, TimeUnit.SECONDS); // 现在可以安全使用 task
                }
            }
        };

        task.run(); // 第一次尝试
    }

    /**
     * 向指定客户端发送消息
     *
     * @param content  消息内容
     * @param clientId 客户端ID
     * @param sseEnums 事件枚举
     */
    @Override
    public <T> void sendMessageToClient(T content, String clientId, SseEnums sseEnums) {
        MessageVo<T> messageVo = new MessageVo<T>(clientId, content);
        log.info("发送消息到客户端:{},消息内容:{}", sseCache);
        Set<SseEmitter> emitters = sseCache.get(clientId);
        sendMessageToClientByClientId(clientId, messageVo, emitters, sseEnums);
    }

    /**
     * 关闭指定客户端的连接
     *
     * @param clientId 客户端ID
     */
    @Override
    public void closeConnect(String clientId) {
        Set<SseEmitter> emitters = sseCache.get(clientId);
        if (null != emitters && !emitters.isEmpty()) {
            emitters.forEach(sseEmitter -> {
                try {
                    sseEmitter.complete();
                } catch (Exception e) {
                    log.error("关闭连接异常,客户端ID:{}   异常信息:{}", clientId, e.getMessage());
                }
            });
            removeClient(clientId);
        }
    }
}

SSE连接初始化:

java
public interface SseInitializerService {
    void initialize(SseEmitter emitter, String clientId) throws IOException;
}

比如连接成功时发送一条消息:

java
@Order(1)
@Component
public class DefaultSseInitializerService implements SseInitializerService {

    @Override
    public void initialize(SseEmitter emitter, String clientId) throws IOException {
        MessageVo<String> messageVo = new MessageVo<String>(clientId, "connected successfully !");
        // 默认发送一条连接成功的消息
        emitter.send(
                SseEmitter.event()
                        .id(String.valueOf(HttpStatus.CREATED))
                        .name(SseEnums.SUCCESS_CONNECT.getEventType())
                        .data(messageVo, MediaType.APPLICATION_JSON));
    }
}

发送工具:

java
public class SseSendUtils {


    /**
     * 发送事件到指定客户端
     *
     * @param content  消息内容
     * @param clientId 客户端id
     * @param sseEnums 事件枚举
     */
    public static <T> void sendByClientId(T content, String clientId, SseEnums sseEnums) {
        // 获取sse服务
        ServerSentEventsService sseServer = SpringUtils.getBean(ServerSentEventsService.class);
        // 发送
        sseServer.sendMessageToClient(content, clientId, sseEnums);
    }


    /**
     * 发送事件到所有客户端
     *
     * @param content  消息内容
     * @param sseEnums 事件枚举
     */
    public static <T> void sendAll(T content, SseEnums sseEnums) {
        // 获取sse服务
        ServerSentEventsService sseServer = SpringUtils.getBean(ServerSentEventsService.class);
        // 发送
        sseServer.sendMessageToAllClient(content, sseEnums);
    }

    /**
     * 发送消息到指定客户端
     *
     * @param content  消息
     * @param clientId 客户端id
     */
    public static void sendMessageByClientId(String content, String clientId) {
        sendByClientId(content, clientId, SseEnums.MESSAGE);
    }

    /**
     * 发送消息到所有客户端
     *
     * @param content 消息
     */
    public static void sendMessageToAll(String content) {
        sendAll(content, SseEnums.MESSAGE);
    }


}

消息事件类型枚举

java
/**
 * @Description : sse自定义事件枚举
 * 重要:事件的类型绑定前端事件类型如需修改记得修改前端代码
 * @Author : LiuJun
 * @Date: 2025/6/6  8:54
 */
@Getter
@AllArgsConstructor
public enum SseEnums {
    /**
     * 通知
     */
    NOTICE(0, "notice"),

    /**
     * 消息
     */
    MESSAGE(1, "message"),

    /**
     * 连接成功
     */
    SUCCESS_CONNECT(2, "successConnect"),

    /**
     * 定位信息
     */
    GPS_LOCATION(3, "gpsLocation"),
    /**
     * gps报警
     */
    GPS_WARNING(4, "gpsWarning");

    private final Integer eventCode;
    private final String eventType;
}

SSE消息对象

java
/**
 * @Description : SSE消息对象
 * @Author : LiuJun
 * @Date: 2025/6/5  18:10
 */
@Data
@NoArgsConstructor
@AllArgsConstructor
public class MessageVo<T> {
    /**
     * 客户端ID
     */
    private String clientId;

    /**
     * 传输的内容
     */
    private T data;
}
最近更新