跳转至

在Aqueduct中使用Websockets

一个标准的HTTP请求将产生一个来自Web服务器的HTTP响应。为了让服务器向客户机发送数据,客户机必须已经发送了一个数据请求。一个websocket是一种特殊类型的HTTP请求,它保持开放,服务器和客户端都可以随时向对方发送数据。

例如,一个聊天应用可能使用websocket向聊天室中的每个人发送消息。在这种情况下,聊天客户端应用程序会打开一个websocket连接到服务器应用程序。当用户键入一条消息时,他们的聊天客户端会在其websocket上发送该消息。有效负载可能是看起来像这样的JSON数据:

{
  "action": "send_message",
  "room": "general",
  "text": "Hi everyone"
}

服务器会接收到这些数据,然后转过身来,将修改后的版本发送到它所拥有的每一个websocket连接。这些数据可能看起来像这样:

{
  "action": "receive_message",
  "room": "general",
  "from": "Bob",
  "text": "Hi everyone"
}

每个连接的用户都会接收到这个数据,并在屏幕上画出 Bob: Hi everyone

请注意,关于websocket的信息并没有说明你必须使用JSON数据,你可以使用任何你喜欢的数据格式。

将HTTP请求升级到WebSocket

在Aqueduct中,websocket由Dart的标准库WebSocket类型处理。下面是一个例子。在Aqueduct中,websocket由Dart的标准库WebSocket类型处理。下面是一个例子:

router
  .route("/connect")
  .linkFunction((request) async {
    var socket = await WebSocketTransformer.upgrade(request.raw);
    socket.listen(listener);

    return null;
  });

重要的是,升级为websocket的请求通过从控制器中返回null从通道中删除。(有关更多详细信息,请参见本指南中的Aqueduct和dart:io部分)。

客户端应用程序可以连接到URL ws://localhost:888888/connect。Dart应用程序可以这样连接:

var socket = await WebSocket.connect("ws://localhost:8888/connect");
socket.listen(...);

双向通信

在上面的简单例子中,服务器只监听来自客户端的数据。为了将数据发送到客户端,必须保留对 WebSocket的引用,以便将数据添加到其中。Aqueduct应用程序如何管理其Websocket连接在很大程度上取决于应用程序的行为、应用程序运行的隔离器数量以及整个系统的基础设施。

一个简单的应用程序可能会在一个 Map中跟踪Websocket连接,其中的密钥是通过请求授权获得的用户标识符:

router
  .route("/connect")
  .link(() => new Authorizer(authServer));
  .linkFunction((request) async {
    var userID = request.authorization.ownerID;
    var socket = await WebSocketTransformer.upgrade(request.raw);
    socket.listen((event) => handleEvent(event, fromUserID: userID));

    connections[userID] = socket;

    return null;
  });

如果我们继续以 "聊天应用程序 "为例,handleEvent的代码可能是这样的。

void handleEvent(dynamic event, {int fromUserID}) {
  var incoming = json.decode(UTF8.decode(event));
  var outgoing = utf8.encode(json.encode({
    "text": incoming["text"],
    ...
  }));

  connections.keys
    .where((userID) => userID != fromUserID)
    .forEach((userID) {
      var connection = connections[userID];
      connection.add(outgoing);
    });
}

注意,这个简单的实现没有考虑到来自同一用户的多个连接或多隔离的应用程序。

多隔离和多实例应用程序的注意事项

默认情况下,一个Aqueduct应用程序在多个隔离体上运行。由于每个隔离体都有自己的堆,因此在一个隔离体上创建的websocket不能被另一个隔离体直接访问。在上面的例子中,每个隔离体都有自己的连接map, 因此,消息只发送到聊天消息来自同一个隔离体上打开的连接。

一个简单的解决方案是只在一个隔离器上运行应用程序,确保所有的websocket都在一个隔离器上,并且可以互相访问:

aqueduct serve -n 1

对于许多应用程序,这是一个很好的解决方案。对于其他人,可能并非如此。

回想一下,Aqueduct的多隔离体系结构的好处之一是,在单个实例上测试的代码将可扩展到负载均衡器后面的多个实例。如果Aqueduct应用程序在单个多隔离实例上正确运行,则它将在多个实例上正确运行。这种(某种程度上)强制执行的结构阻止我们在单个隔离体上天真地跟踪websocket连接,这在我们扩展到多实例系统时会引起问题。

如果你发现自己的应用非常受欢迎,需要多个服务器来有效地服务于请求,那么你就会对如何架构一个合适的解决方案有一个很好的想法(或者你会有足够的钱去雇人去做)。在许多情况下,REST API和websocket服务器无论如何都是独立的实例--它们有不同的生命周期和部署行为。在单个隔离体上运行websocket服务器可能是有意义的,因为你很可能是IO绑定而不是CPU绑定。

如果你仍然希望使用带有Websocket的多隔离服务器,ApplicationMessageHub将派上用场。当在应用程序中向连接的websocket广播消息时,你首先要将数据发送到每个连接到发出消息的隔离器的websocket。然后,消息被添加到ApplicationMessageHub中:

void onChatMessage(String message) {
  connectedSockets.forEach((socket) {
    socket.add(message);
  });

  ApplicationChannel.messageHub.add({"event": "websocket_broadcast", "message": message});
}

添加到messageHub中的任何东西都将被传递给每一个其他消息集线器的监听器,也就是说,每一个其他隔离器都会收到这个数据。然后,其他隔离器会将消息发送到每个连接的websocket:

class ChatChannel extends ApplicationChannel {
  @override
  Future prepare() async {
    messageHub.listen((event) {
      if (event is Map && event["event"] == "websocket_broadcast") {
        connectedSockets.forEach((socket) {
          socket.add(event["message"]);
        });
      }
    });
  }
}