实时通讯技术实现

实时通讯技术实现
前言
在CS架构中,经常会有实时通信的需求。客户端和服务端建立连接,服务端实时推送数据给客户端。本文介绍几种常见的实现方式,希望能给读者们一点点参考。
实时通讯的主要实现技术
长轮询(Long Polling)
WebSocket
服务器发送事件(Server-Sent Events, SSE)
XMPP (Extensible Messaging and Presence Protocol)
MQTT (Message Queuing Telemetry Transport)
长轮询

长轮询(Long Polling): 一种网络通信机制,用于实现客户端和服务器之间的实时数据传输。
Http长轮询机制:

图片

原理图
长轮询工作原理:
client端请求server端,并约定好超时时间;
server端收到请求后,判断数据是否有变化:
有变化:立即返回数据;
没变化:则阻塞http请求,并且将长轮询请求任务放入队列中,然后开启任务调度,调度任务在长连接维持时间到期后,会将长轮询请求移除队列,并返回对应数据。
如果在挂起的这段时间内,数据有变化,服务器会移除队列中的长轮询请求,并响应数据给客户端。
长轮询优缺点:
优点:
兼容性好
实现简单
即时性
缺点:
服务器hold住连接,占用资源
会有延迟,服务器响应后,客户端要重新发起连接(这段时间内有新消息不能即时触达)
Java 示例代码
@Controller
public class LongPollingController {

private final Map<String, DeferredResult<String>> deferredResults = new ConcurrentHashMap<>();

@GetMapping("/longpolling")
@ResponseBody
public Object longPolling() {
    DeferredResult<String> deferredResult = new DeferredResult<>(30000L, "time out");
    deferredResults.put("key", deferredResult); // 假设每个客户端有一个唯一的key
    return deferredResult;
}

@GetMapping("/push")
public void push() {
    // 模拟异步数据获取
    deferredResults.get("key").setResult("data update"); // 当数据准备好时,触发长轮询
}

}
DeferredResult 是 Spring MVC 提供的一种用于处理异步请求的机制,它允许在处理请求时延迟产生结果,并且允许在处理请求的不同线程中生成结果。DeferredResult 可以用于异步处理 HTTP 请求,并在处理完成后返回结果给客户端。
WebSocket

WebSocket 是 HTML5 开始提供的一种浏览器与服务器间进行全双工通信的网络技术。WebSocket 基于 TCP 双向全双工进行消息传递,在同一时刻,既可以发送消息,也可以接收消息。
WebSocket原理图:

图片

WebSocket特点:
单一的TCP连接,采用全双工模式通信;
对代理、防火墙和路由器透明;
无头部信息、Cookie和身份验证;
无安全开销;
通过“ping/pong”帧保持链路激活;
服务器可以主动传递消息给客户端,不再需要客户端轮询。
示例代码(netty实现websocket通信)
WebSocket服务端启动类:
@Component
@Slf4j
public class WebSocketServer {

/**
 * webSocket协议名
 */
private static final String WEBSOCKET_PROTOCOL = "WebSocket";

/**
 * 端口号
 */
@Value("${webSocket.netty.port:58080}")
private int port;

/**
 * webSocket路径
 */
@Value("${webSocket.netty.path:/webSocket}")
private String webSocketPath;

@Autowired
private WebSocketHandler webSocketHandler;

private EventLoopGroup bossGroup;
private EventLoopGroup workGroup;

/**
 * 启动
 *
 * @throws InterruptedException
 */
private void start() throws InterruptedException {
    bossGroup = new NioEventLoopGroup();
    workGroup = new NioEventLoopGroup();
    ServerBootstrap bootstrap = new ServerBootstrap();
    // bossGroup负责客户端的tcp连接请求, workGroup负责与客户端之前的读写操作
    bootstrap.group(bossGroup, workGroup);
    // 设置NIO类型的channel
    bootstrap.channel(NioServerSocketChannel.class);
    // 设置监听端口
    bootstrap.localAddress(new InetSocketAddress(port));
    // 连接到达时会创建一个通道
    bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {

        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            // 流水线管理通道中的处理程序(Handler),用来处理业务
            // webSocket协议本身是基于http协议的,所以这边也要使用http编解码器
            ch.pipeline().addLast(new HttpServerCodec());
            ch.pipeline().addLast(new ObjectEncoder());
            // 以块的方式来写的处理器
            ch.pipeline().addLast(new ChunkedWriteHandler());
            //将收到的 HTTP 请求或响应的多个部分合并成一个完整的对象
            ch.pipeline().addLast(new HttpObjectAggregator(8192));
            /*
            说明:
            1、对应webSocket,它的数据是以帧(frame)的形式传递
            2、浏览器请求时 ws://localhost:58080/xxx 表示请求的uri
            3、核心功能是将http协议升级为ws协议,保持长连接
            */
            ch.pipeline().addLast(new WebSocketServerProtocolHandler(webSocketPath, WEBSOCKET_PROTOCOL, true, 65536 * 10));
            ch.pipeline().addLast(new IdleStateHandler(10, 0, 0));
            ch.pipeline().addLast(new HeartBeatHandler());
            // 自定义的handler,处理业务逻辑
            ch.pipeline().addLast(webSocketHandler);

        }
    });
    // 配置完成,开始绑定server,通过调用sync同步方法阻塞直到绑定成功
    ChannelFuture channelFuture = bootstrap.bind().sync();
    log.info("Server started and listen on:{}", channelFuture.channel().localAddress());
    // 对关闭通道进行监听
    channelFuture.channel().closeFuture().sync();
}

/**
 * 释放资源
 *
 * @throws InterruptedException
 */
@PreDestroy
public void destroy() throws InterruptedException {
    if (bossGroup != null) {
        bossGroup.shutdownGracefully().sync();
    }
    if (workGroup != null) {
        workGroup.shutdownGracefully().sync();
    }
}

@PostConstruct
public void init() {
    //需要开启一个新的线程来执行netty server 服务器
    new Thread(() -> {
        try {
            start();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }).start();
}

}
业务逻辑处理器:
@Component
public class WebSocketHandler extends SimpleChannelInboundHandler {

@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
    // 接收到 WebSocket 文本消息
    System.out.println("Received message: " + msg.text());
    // 响应 WebSocket 文本消息
    ctx.writeAndFlush(new TextWebSocketFrame("Received your message: " + msg.text()));
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    // 异常处理
    cause.printStackTrace();
    ctx.close();
}

}
SSE

SSE(Server-Sent Events,服务器发送事件)是一种用于实现服务器向客户端单向推送数据的技术。它允许服务器端在任何时候发送数据到客户端,而客户端不需要发起请求。SSE 基于 HTTP 协议,使用简单的文本格式进行通信,通常被用于实时更新网页内容、实时通知等场景。
SSE 的工作原理如下:
客户端向服务器发送一个 HTTP 请求,请求的头部包含 Accept: text/event-stream 表示接受 SSE 格式的响应。
服务器接收到请求后,保持连接打开,并在连接上周期性地发送消息给客户端。每个消息都以 data: 开头,并以两个换行符 nn 结束。
客户端接收到消息后,将其通过事件监听器处理。
SSE原理图:

代码示例
@Controller
public class SSEController {

@GetMapping(value = "/sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<String>> sse() {
    return Flux.interval(Duration.ofSeconds(10))
            .map(sequence -> ServerSentEvent.<String>builder()
                    .id(String.valueOf(sequence))
                    .event("message")
                    .data("Hello SSE - " + LocalTime.now())
                    .build());
}

}
Postman 响应示例:

长轮询、WebSocket、SSE 对比

  1. 服务器资源消耗:
    长轮询:服务器需要为每个客户端请求保持一个开放的连接,直到有数据发送。这导致服务器资源(如内存和连接槽)的消耗,特别是在高并发场景下。
    WebSocket:建立后,WebSocket 提供了一个持久的、全双工的连接通道。虽然它也占用服务器资源,但由于其连接是持久的,所以不需要频繁地创建和销毁连接,相对于长轮询,这可以减少资源消耗和延迟。
    SSE:SSE 也保持开放的连接,但只支持单向通信(服务器到客户端)。与长轮询相比,SSE 通过减少连接的建立和销毁次数来优化资源使用,但对于每个客户端,它仍然占用一个连接。
  2. 网络延迟和效率:
    长轮询:每次请求可能在服务器有数据可发送之前保持打开状态,这可能导致网络延迟。
    WebSocket:一旦建立,消息可以几乎无延迟地在客户端和服务器之间传输,提高了效率和实时性。
    SSE:由于连接持续开放,SSE 可以实现低延迟的服务器到客户端消息传输,但不支持客户端到服务器的实时通信。
  3. 实现复杂性:
    长轮询:相对简单,不需要特殊的协议支持,但服务器端需要逻辑来管理多个持续的请求。
    WebSocket:需要在客户端和服务器端实现WebSocket协议,比长轮询实现复杂,连接的建立、错误的处理和断开连接时的重连等机制都需要考虑。
    SSE:客户端实现相对简单,主要的复杂性在于服务器端,需要支持HTTP/1.1的持久连接。
    XMPP

XMPP (Extensible Messaging and Presence Protocol) 是一个支持消息传递和状态显示的开放即时通讯协议。它实现了客户端与服务器之间的双向通信,并可以通过扩展以适应多样的即时通讯服务需求。基于 XML (Extensible Markup Language) 和 TCP/IP 协议构建,XMPP 特点包括灵活性、可扩展性和分布式架构。
XMPP设计的网络结构中定义了3类通信实体:
客户端
服务器
网关
XMPP中基本的通信基于传统的CS模式,即客户端通过TCP/IP连接到服务器,然后通过传输XML流进行通信。
XMPP的系统原理图:

网图
MQTT

MQTT(Message Queuing Telemetry Transport)是一个轻量级的消息协议,专为低带宽和不可靠网络环境设计,广泛应用于物联网(IoT)、移动应用等场景。基于发布/订阅模型,它允许设备发布消息到主题,同时允许其他设备订阅这些主题以接收消息。MQTT运行于TCP/IP协议之上,提供了一种简单有效的方式来进行设备间的通信。
MQTT原理图:

MQTT 和 XMPP 都需要额外的服务器来进行消息通信,这些服务器通常被称为 MQTT 代理(broker)和 XMPP 服务器。
总结

MQTT 和 XMPP 需要中间服务器(分别是 MQTT 代理和 XMPP 服务器)来处理消息的路由、传递和存储,增加了部署的复杂性,并需要确保中间件服务的高可用性。
相比之下,SSE(Server-Sent Events)和 WebSocket 提供了更直接的通信方式,允许服务器和客户端之间建立持久的连接。这两种技术直接基于现有的 HTTP/HTTPS 协议,可以利用现有的 Web 服务器架构进行部署,从而减少了额外的中间件需求。
在复杂性方面,MQTT 和 XMPP 要求开发者具备对相应协议的深入了解,相较于 SSE 和 WebSocket 的简单 API 来说,实现起来会相对复杂,但这些协议也提供了更丰富的灵活性。
总的来说,选择合适的技术实现取决于业务需求和现有架构,各种技术都有其适用的场景。

声明:文中观点不代表本站立场。本文传送门:https://eyangzhen.com/414226.html

联系我们
联系我们
分享本页
返回顶部