构建一个即时消息应用(五):实时消息

本文是该系列的第五篇。

创新互联专注于企业成都全网营销推广、网站重做改版、甘南网站定制设计、自适应品牌网站建设、html5商城系统网站开发、集团公司官网建设、成都外贸网站建设、高端网站制作、响应式网页设计等建站业务,价格优惠性价比高,为甘南等各大城市提供网站开发制作服务。

  • 第一篇: 模式
  • 第二篇: OAuth
  • 第三篇: 对话
  • 第四篇: 消息

对于实时消息,我们将使用 服务器发送事件Server-Sent Events。这是一个打开的连接,我们可以在其中传输数据流。我们会有个端点,用户会在其中订阅发送给他的所有消息。

消息户端

在 HTTP 部分之前,让我们先编写一个映射map ,让所有客户端都监听消息。 像这样全局初始化:

 
 
 
  1. type MessageClient struct {
  2.     Messages chan Message
  3.     UserID   string
  4. }
  5. var messageClients sync.Map

已创建的新消息

还记得在 上一篇文章 中,当我们创建这条消息时,我们留下了一个 “TODO” 注释。在那里,我们将使用这个函数来调度一个 goroutine。

 
 
 
  1. go messageCreated(message)

把这行代码插入到我们留注释的位置。

 
 
 
  1. func messageCreated(message Message) error {
  2.     if err := db.QueryRow(`
  3.         SELECT user_id FROM participants
  4.         WHERE user_id != $1 and conversation_id = $2
  5.     `, message.UserID, message.ConversationID).
  6.     Scan(&message.ReceiverID); err != nil {
  7.         return err
  8.     }
  9.     go broadcastMessage(message)
  10.     return nil
  11. }
  12. func broadcastMessage(message Message) {
  13.     messageClients.Range(func(key, _ interface{}) bool {
  14.         client := key.(*MessageClient)
  15.         if client.UserID == message.ReceiverID {
  16.             client.Messages <- message
  17.         }
  18.         return true
  19.     })
  20. }

该函数查询接收者 ID(其他参与者 ID),并将消息发送给所有客户端。

订阅消息

让我们转到 main() 函数并添加以下路由:

 
 
 
  1. router.HandleFunc("GET", "/api/messages", guard(subscribeToMessages))

此端点处理 /api/messages 上的 GET 请求。请求应该是一个 EventSource 连接。它用一个事件流响应,其中的数据是 JSON 格式的。

 
 
 
  1. func subscribeToMessages(w http.ResponseWriter, r *http.Request) {
  2.     if a := r.Header.Get("Accept"); !strings.Contains(a, "text/event-stream") {
  3.         http.Error(w, "This endpoint requires an EventSource connection", http.StatusNotAcceptable)
  4.         return
  5.     }
  6.     f, ok := w.(http.Flusher)
  7.     if !ok {
  8.         respondError(w, errors.New("streaming unsupported"))
  9.         return
  10.     }
  11.     ctx := r.Context()
  12.     authUserID := ctx.Value(keyAuthUserID).(string)
  13.     h := w.Header()
  14.     h.Set("Cache-Control", "no-cache")
  15.     h.Set("Connection", "keep-alive")
  16.     h.Set("Content-Type", "text/event-stream")
  17.     messages := make(chan Message)
  18.     defer close(messages)
  19.     client := &MessageClient{Messages: messages, UserID: authUserID}
  20.     messageClients.Store(client, nil)
  21.     defer messageClients.Delete(client)
  22.     for {
  23.         select {
  24.         case <-ctx.Done():
  25.             return
  26.         case message := <-messages:
  27.             if b, err := json.Marshal(message); err != nil {
  28.                 log.Printf("could not marshall message: %v\n", err)
  29.                 fmt.Fprintf(w, "event: error\ndata: %v\n\n", err)
  30.             } else {
  31.                 fmt.Fprintf(w, "data: %s\n\n", b)
  32.             }
  33.             f.Flush()
  34.         }
  35.     }
  36. }

首先,它检查请求头是否正确,并检查服务器是否支持流式传输。我们创建一个消息通道,用它来构建一个客户端,并将其存储在客户端映射中。每当创建新消息时,它都会进入这个通道,因此我们可以通过 for-select 循环从中读取。

服务器发送事件Server-Sent Events使用以下格式发送数据:

 
 
 
  1. data: some data here\n\n

我们以 JSON 格式发送:

 
 
 
  1. data: {"foo":"bar"}\n\n

我们使用 fmt.Fprintf() 以这种格式写入响应写入器writter,并在循环的每次迭代中刷新数据。

这个循环会一直运行,直到使用请求上下文关闭连接为止。我们延迟了通道的关闭和客户端的删除,因此,当循环结束时,通道将被关闭,客户端不会收到更多的消息。

注意,服务器发送事件Server-Sent Events(EventSource)的 JavaScript API 不支持设置自定义请求头,所以我们不能设置 Authorization: Bearer 。这就是为什么 guard() 中间件也会从 URL 查询字符串中读取令牌的原因。


实时消息部分到此结束。我想说的是,这就是后端的全部内容。但是为了编写前端代码,我将再增加一个登录端点:一个仅用于开发的登录。

  • 源代码

分享标题:构建一个即时消息应用(五):实时消息
本文路径:http://www.csdahua.cn/qtweb/news21/246121.html

网站建设、网络推广公司-快上网,是专注品牌与效果的网站制作,网络营销seo公司;服务项目有等

广告

声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 快上网