SSE封装
API:
java
@Api(tags = "SSE推送接口")
@RestController
@RequestMapping("/sse")
@AllArgsConstructor
public class ServerSentEventsController {
private ServerSentEventsService serverSentEventsService;
@ApiOperation("鉴权SSE推送接口")
@GetMapping(path = "/authorization", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter streamEventsForAuthorization() {
return serverSentEventsService.createConnect();
}
}
服务:
java
public interface ServerSentEventsService {
/**
* 创建连接
* @return SseEmitter对象
*/
SseEmitter createConnect();
/**
* 根据客户端ID,获取SseEmitter对象
* @param clientId 客户端ID
* @return
*/
Set<SseEmitter> getSseEmittersByClientId(String clientId);
/**
* 给所有的客户端发送消息
* @param content
* @param sseEnums
*/
<T> void sendMessageToAllClient(T content, SseEnums sseEnums);
/**
* 给指定的客户端发送消息
* @param content
* @param clientId
* @param sseEnums
*/
<T> void sendMessageToClient(T content,String clientId, SseEnums sseEnums);
/**
* 关闭连接
* @param clientId
*/
void closeConnect(String clientId);
}
服务实现:
java
@Slf4j
@Service
public class ServerSentEventsImpl implements ServerSentEventsService {
/**
* 容器,保存连接,用于输出返回
*/
private static final Map<String, Set<SseEmitter>> sseCache = new ConcurrentHashMap<>();
/**
* 异步事件总线
*/
private final ScheduledExecutorService executor;
private final List<SseInitializerService> initializers;
public ServerSentEventsImpl(@Qualifier("sseTaskScheduler") ScheduledExecutorService executor,
List<SseInitializerService> initializers
) {
this.executor = executor;
this.initializers = initializers;
}
@Override
public SseEmitter createConnect() {
String clientId = SecurityUtils.getUserId().toString();
// 设置超时时间,0表示不过期。默认30毫秒,超过时间未完成会抛出异常:AsyncRequestTimeoutException
SseEmitter sseEmitter = new SseEmitter(0L);
// 长链接完成后回调接口(即关闭连接时调用)
sseEmitter.onCompletion(completionCallBack(clientId, sseEmitter));
// 连接超时回调
sseEmitter.onTimeout(timeoutCallBack(clientId, sseEmitter));
// 推送消息异常时调用
sseEmitter.onError(errorCallBack(clientId, sseEmitter));
// 获取当前用户的全部连接
Set<SseEmitter> emitters = sseCache.get(clientId);
// 没有连接时新建set集合保存连接
if (emitters == null) {
emitters = ConcurrentHashMap.newKeySet();
}
// 添加当前的连接
emitters.add(sseEmitter);
// 保存到用户集合中
sseCache.put(clientId, emitters);
log.info("创建新的sse连接,当前用户:{} 累计用户:{}", clientId, sseCache.size());
try {
// 发送一条连接成功消息
initConnect(sseEmitter, clientId);
} catch (IOException e) {
log.error("创建长链接异常,客户端ID:{} 异常信息:{}", clientId, e.getMessage());
}
return sseEmitter;
}
/**
* 初始化连接,发送连接成功消息
* @param sseEmitter
* @param clientId
* @throws IOException
*/
private void initConnect(SseEmitter sseEmitter, String clientId) throws IOException {
// 自动调用所有 SseInitializerService 的 initialize 方法
for (SseInitializerService initializer : initializers) {
initializer.initialize(sseEmitter, clientId);
}
}
/**
* 推送消息异常时,回调方法
*
* @param clientId
* @return
*/
private Consumer<Throwable> errorCallBack(String clientId, SseEmitter sseEmitter) {
return throwable -> {
log.error("SseEmitterServiceImpl[errorCallBack]:连接异常,客户端ID:{}", clientId, throwable);
// 如果连接异常,移除当前连接
if (sseCache.containsKey(clientId)) {
removeSseEmitter(clientId, sseEmitter);
} else {
log.warn("客户端ID:{} 不存在,无法移除连接", clientId);
}
};
}
/**
* 长连接超时时调用
*
* @param clientId 客户端id
* @param sseEmitter sse事件
* @return
*/
private Runnable timeoutCallBack(String clientId, SseEmitter sseEmitter) {
return () -> {
log.info("连接超时:用户:{} - 连接:{}", clientId, sseEmitter.hashCode());
removeSseEmitter(clientId, sseEmitter);
};
}
/**
* 长连接完成后回调接口(即关闭连接时调用)
*
* @param clientId 客户端id
* @param sseEmitter sse事件
* @return
*/
private Runnable completionCallBack(String clientId, SseEmitter sseEmitter) {
return () -> {
log.info("结束连接:用户:{} - 连接:{}", clientId, sseEmitter.hashCode());
removeSseEmitter(clientId, sseEmitter);
};
}
/**
* 移除客户端连接 此方法会将客户端下全部连接移除
*
* @param clientId 客户端ID
*/
private void removeClient(String clientId) {
sseCache.remove(clientId);
log.info("移除用户:{}", clientId);
}
/**
* 移除客户端下指定的SseEmitter连接
*
* @param clientId 客户端ID
* @param sseEmitter 指定的SseEmitter连接
*/
private void removeSseEmitter(String clientId, SseEmitter sseEmitter) {
sseCache.get(clientId).remove(sseEmitter);
if (sseCache.get(clientId).isEmpty()) {
sseCache.remove(clientId);
log.info("用户连接数为空,移除用户:{}", clientId);
}
}
/**
* 获取指定客户端的SseEmitter集合
*
* @param clientId 客户端ID
* @return
*/
@Override
public Set<SseEmitter> getSseEmittersByClientId(String clientId) {
return sseCache.get(clientId);
}
/**
* 向所有客户端发送消息
*
* @param content 消息内容
* @param sseEnums 事件枚举
*/
@Override
public <T> void sendMessageToAllClient(T content, SseEnums sseEnums) {
if (MapUtil.isEmpty(sseCache)) {
return;
}
// 判断发送的消息是否为空
for (Map.Entry<String, Set<SseEmitter>> entry : sseCache.entrySet()) {
MessageVo<T> messageVo = new MessageVo<T>();
messageVo.setClientId(entry.getKey());
messageVo.setData(content);
sendMessageToClientByClientId(entry.getKey(), messageVo, entry.getValue(), sseEnums);
}
}
/**
* 向指定客户端发送消息
*
* @param clientId 客户端ID
* @param messageVo 消息内容
* @param sseEmitters 客户端的SseEmitter集合
* @param sseEnums 事件枚举
*/
private <T> void sendMessageToClientByClientId(
String clientId, MessageVo<T> messageVo, Set<SseEmitter> sseEmitters, SseEnums sseEnums) {
if (null == sseEmitters || sseEmitters.isEmpty()) {
log.error("推送消息失败:客户端{}未创建长链接,失败消息:{}",
clientId, messageVo.toString());
return;
}
SseEmitter.SseEventBuilder sendData = SseEmitter
.event()
.id(String.valueOf(HttpStatus.OK))
.name(sseEnums.getEventType())
.data(messageVo, MediaType.APPLICATION_JSON);
sseEmitters.forEach(
sseEmitter -> {
try {
sseEmitter.send(sendData);
} catch (IOException e) {
// 推送消息失败,记录错误日志,进行重推
log.error("推送消息失败:客户端{} - {}:{},尝试进行重推", clientId, sseEmitter.hashCode(), messageVo);
sendMessageWithRetry(sseEmitter, clientId, sendData);
}
});
}
/**
* 重试发送消息
*
* @param emitter SseEmitter 实例
* @param clientId 客户端ID
*/
private void sendMessageWithRetry(SseEmitter emitter, String clientId, SseEmitter.SseEventBuilder sendData) {
AtomicInteger retryCount = new AtomicInteger(0);
Runnable task = new Runnable() {
@Override
public void run() {
if (retryCount.incrementAndGet() > 5) {
// 重试次数超过5次,停止重试
return;
}
;
try {
emitter.send(sendData);
log.info("第 {} 次重推成功: {}", retryCount.get(), clientId);
} catch (IllegalStateException e) {
log.error("推送消息失败:客户端 {} 已经断开连接,无法发送消息: {}", clientId, e.getMessage());
removeClient(clientId);
} catch (Exception e) {
log.error(e.getMessage());
log.warn("第 {} 次重推失败: {}", retryCount.get(), clientId);
executor.schedule(this, 5, TimeUnit.SECONDS); // 现在可以安全使用 task
}
}
};
task.run(); // 第一次尝试
}
/**
* 向指定客户端发送消息
*
* @param content 消息内容
* @param clientId 客户端ID
* @param sseEnums 事件枚举
*/
@Override
public <T> void sendMessageToClient(T content, String clientId, SseEnums sseEnums) {
MessageVo<T> messageVo = new MessageVo<T>(clientId, content);
log.info("发送消息到客户端:{},消息内容:{}", sseCache);
Set<SseEmitter> emitters = sseCache.get(clientId);
sendMessageToClientByClientId(clientId, messageVo, emitters, sseEnums);
}
/**
* 关闭指定客户端的连接
*
* @param clientId 客户端ID
*/
@Override
public void closeConnect(String clientId) {
Set<SseEmitter> emitters = sseCache.get(clientId);
if (null != emitters && !emitters.isEmpty()) {
emitters.forEach(sseEmitter -> {
try {
sseEmitter.complete();
} catch (Exception e) {
log.error("关闭连接异常,客户端ID:{} 异常信息:{}", clientId, e.getMessage());
}
});
removeClient(clientId);
}
}
}
SSE连接初始化:
java
public interface SseInitializerService {
void initialize(SseEmitter emitter, String clientId) throws IOException;
}
比如连接成功时发送一条消息:
java
@Order(1)
@Component
public class DefaultSseInitializerService implements SseInitializerService {
@Override
public void initialize(SseEmitter emitter, String clientId) throws IOException {
MessageVo<String> messageVo = new MessageVo<String>(clientId, "connected successfully !");
// 默认发送一条连接成功的消息
emitter.send(
SseEmitter.event()
.id(String.valueOf(HttpStatus.CREATED))
.name(SseEnums.SUCCESS_CONNECT.getEventType())
.data(messageVo, MediaType.APPLICATION_JSON));
}
}
发送工具:
java
public class SseSendUtils {
/**
* 发送事件到指定客户端
*
* @param content 消息内容
* @param clientId 客户端id
* @param sseEnums 事件枚举
*/
public static <T> void sendByClientId(T content, String clientId, SseEnums sseEnums) {
// 获取sse服务
ServerSentEventsService sseServer = SpringUtils.getBean(ServerSentEventsService.class);
// 发送
sseServer.sendMessageToClient(content, clientId, sseEnums);
}
/**
* 发送事件到所有客户端
*
* @param content 消息内容
* @param sseEnums 事件枚举
*/
public static <T> void sendAll(T content, SseEnums sseEnums) {
// 获取sse服务
ServerSentEventsService sseServer = SpringUtils.getBean(ServerSentEventsService.class);
// 发送
sseServer.sendMessageToAllClient(content, sseEnums);
}
/**
* 发送消息到指定客户端
*
* @param content 消息
* @param clientId 客户端id
*/
public static void sendMessageByClientId(String content, String clientId) {
sendByClientId(content, clientId, SseEnums.MESSAGE);
}
/**
* 发送消息到所有客户端
*
* @param content 消息
*/
public static void sendMessageToAll(String content) {
sendAll(content, SseEnums.MESSAGE);
}
}
消息事件类型枚举
java
/**
* @Description : sse自定义事件枚举
* 重要:事件的类型绑定前端事件类型如需修改记得修改前端代码
* @Author : LiuJun
* @Date: 2025/6/6 8:54
*/
@Getter
@AllArgsConstructor
public enum SseEnums {
/**
* 通知
*/
NOTICE(0, "notice"),
/**
* 消息
*/
MESSAGE(1, "message"),
/**
* 连接成功
*/
SUCCESS_CONNECT(2, "successConnect"),
/**
* 定位信息
*/
GPS_LOCATION(3, "gpsLocation"),
/**
* gps报警
*/
GPS_WARNING(4, "gpsWarning");
private final Integer eventCode;
private final String eventType;
}
SSE消息对象
java
/**
* @Description : SSE消息对象
* @Author : LiuJun
* @Date: 2025/6/5 18:10
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class MessageVo<T> {
/**
* 客户端ID
*/
private String clientId;
/**
* 传输的内容
*/
private T data;
}