From b4e073ae2f179cb93820fb95fde7fabcafe50d2c Mon Sep 17 00:00:00 2001 From: WJQSERVER <114663932+WJQSERVER@users.noreply.github.com> Date: Sun, 7 Sep 2025 02:24:28 +0800 Subject: [PATCH] Update sse.go --- sse.go | 134 +++++++++++++++++++++++++++------------------------------ 1 file changed, 63 insertions(+), 71 deletions(-) diff --git a/sse.go b/sse.go index b856a08..597e930 100644 --- a/sse.go +++ b/sse.go @@ -1,6 +1,6 @@ // This Source Code Form is subject to the terms of the Mozilla Public License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at https://mozilla.org/MPL/2.0/. -// Copyright 2024 WJQSERVER. All rights reserved. +// Copyright 2025 WJQSERVER. All rights reserved. // All rights reserved by WJQSERVER, related rights can be exercised by the infinite-iroha organization. package touka @@ -25,6 +25,7 @@ type Event struct { } // Render 将事件格式化并写入给定的 writer. +// 通过逐行处理数据, 此方法可防止因数据中包含换行符而导致的CRLF注入问题. // 为了性能, 它使用 bytes.Buffer 并通过 WriteTo 直接写入, 以避免不必要的内存分配. func (e *Event) Render(w io.Writer) error { var buf bytes.Buffer @@ -65,40 +66,34 @@ func (e *Event) Render(w io.Writer) error { // 这是推荐的、更简单安全的方式, 采用阻塞和回调的设计, 框架负责管理连接生命周期. // // 详细用法: +// r.GET("/sse/callback", func(c *touka.Context) { +// // streamer 回调函数会在一个循环中被调用. +// c.EventStream(func(w io.Writer) bool { +// event := touka.Event{ +// Event: "time-tick", +// Data: time.Now().Format(time.RFC1123), +// } // -// r.GET("/sse/callback", func(c *touka.Context) { -// // streamer 回调函数会在一个循环中被调用. -// c.EventStream(func(w io.Writer) bool { -// event := touka.Event{ -// Event: "time-tick", -// Data: time.Now().Format(time.RFC1123), -// } +// if err := event.Render(w); err != nil { +// // 发生写入错误, 停止发送. +// return false // 返回 false 结束事件流. +// } // -// if err := event.Render(w); err != nil { -// // 发生写入错误, 停止发送. -// return false // 返回 false 结束事件流. -// } -// -// time.Sleep(2 * time.Second) -// return true // 返回 true 继续事件流. -// }) -// // 当事件流结束后(例如客户端关闭页面), 这行代码会被执行. -// fmt.Println("Client disconnected from /sse/callback") -// }) +// time.Sleep(2 * time.Second) +// return true // 返回 true 继续事件流. +// }) +// // 当事件流结束后(例如客户端关闭页面), 这行代码会被执行. +// fmt.Println("Client disconnected from /sse/callback") +// }) func (c *Context) EventStream(streamer func(w io.Writer) bool) { - c.Writer.Header().Set("Content-Type", "text/event-stream") - c.Writer.Header().Set("Cache-Control", "no-cache") - c.Writer.Header().Set("Connection", "keep-alive") - c.Writer.Header().Set("Transfer-Encoding", "chunked") - - flusher, ok := c.Writer.(http.Flusher) - if !ok { - c.Errorf("streaming unsupported: http.ResponseWriter does not implement http.Flusher") - return - } + // 为现代网络协议优化头部. + c.Writer.Header().Set("Content-Type", "text/event-stream; charset=utf-8") + c.Writer.Header().Set("Cache-Control", "no-cache, no-transform") + c.Writer.Header().Del("Connection") + c.Writer.Header().Del("Transfer-Encoding") c.Writer.WriteHeader(http.StatusOK) - flusher.Flush() + c.Writer.Flush() // 直接调用, ResponseWriter 接口保证了 Flush 方法的存在. for { select { @@ -108,63 +103,60 @@ func (c *Context) EventStream(streamer func(w io.Writer) bool) { if !streamer(c.Writer) { return } - flusher.Flush() + c.Writer.Flush() } } } // EventStreamChan 返回用于 SSE 事件流的 channel. // 这是为高级并发场景设计的、更灵活的API. -// 调用者必须负责关闭 event channel 并处理 error channel 以避免 goroutine 泄漏. +// +// 重要: +// - 调用者必须 close(eventChan) 来结束事件流. +// - 调用者必须在独立的 goroutine 中消费 errChan 来处理错误和连接断开. +// - 为防止 goroutine 泄漏, 建议发送方在 select 中同时监听 c.Request.Context().Done(). // // 详细用法: +// r.GET("/sse/channel", func(c *touka.Context) { +// eventChan, errChan := c.EventStreamChan() // -// r.GET("/sse/channel", func(c *touka.Context) { -// eventChan, errChan := c.EventStreamChan() +// // 必须在独立的goroutine中处理错误和连接断开. +// go func() { +// if err := <-errChan; err != nil { +// c.Errorf("SSE channel error: %v", err) +// } +// }() // -// // 必须在独立的goroutine中处理错误和连接断开. -// go func() { -// if err := <-errChan; err != nil { -// c.Errorf("SSE channel error: %v", err) -// } -// }() +// // 在另一个goroutine中异步发送事件. +// go func() { +// // 重要: 必须在逻辑结束时关闭channel, 以通知框架. +// defer close(eventChan) // -// // 在另一个goroutine中异步发送事件. -// go func() { -// // 重要: 必须在逻辑结束时关闭channel, 以通知框架. -// defer close(eventChan) -// -// for i := 1; i <= 5; i++ { -// event := touka.Event{ -// Id: fmt.Sprintf("%d", i), -// Data: "hello from channel", -// } -// eventChan <- event -// time.Sleep(2 * time.Second) -// } -// }() -// }) +// for i := 1; i <= 5; i++ { +// select { +// case <-c.Request.Context().Done(): +// return // 客户端已断开, 退出 goroutine. +// default: +// eventChan <- touka.Event{ +// Id: fmt.Sprintf("%d", i), +// Data: "hello from channel", +// } +// time.Sleep(2 * time.Second) +// } +// } +// }() +// }) func (c *Context) EventStreamChan() (chan<- Event, <-chan error) { eventChan := make(chan Event) errChan := make(chan error, 1) - c.Writer.Header().Set("Content-Type", "text/event-stream") - c.Writer.Header().Set("Cache-Control", "no-cache") - c.Writer.Header().Set("Connection", "keep-alive") - c.Writer.Header().Set("Transfer-Encoding", "chunked") - - flusher, ok := c.Writer.(http.Flusher) - if !ok { - err := fmt.Errorf("streaming unsupported: http.ResponseWriter does not implement http.Flusher") - c.Errorf(err.Error()) - errChan <- err - close(errChan) - close(eventChan) - return eventChan, errChan - } + c.Writer.Header().Set("Content-Type", "text/event-stream; charset=utf-8") + c.Writer.Header().Set("Cache-Control", "no-cache, no-transform") + c.Writer.Header().Del("Connection") + c.Writer.Header().Del("Transfer-Encoding") c.Writer.WriteHeader(http.StatusOK) - flusher.Flush() + c.Writer.Flush() go func() { defer close(errChan) @@ -179,7 +171,7 @@ func (c *Context) EventStreamChan() (chan<- Event, <-chan error) { errChan <- err return } - flusher.Flush() + c.Writer.Flush() case <-c.Request.Context().Done(): errChan <- c.Request.Context().Err() return