Update sse.go

This commit is contained in:
WJQSERVER 2025-09-07 02:24:28 +08:00 committed by GitHub
parent af0a99acda
commit b4e073ae2f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

60
sse.go
View file

@ -1,6 +1,6 @@
// This Source Code Form is subject to the terms of the Mozilla Public License, v. 2.0. If a copy of the MPL was not distributed with this // This Source Code Form is subject to the terms of the Mozilla Public License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/. // file, You can obtain one at https://mozilla.org/MPL/2.0/.
// Copyright 2024 WJQSERVER. All rights reserved. // Copyright 2025 WJQSERVER. All rights reserved.
// All rights reserved by WJQSERVER, related rights can be exercised by the infinite-iroha organization. // All rights reserved by WJQSERVER, related rights can be exercised by the infinite-iroha organization.
package touka package touka
@ -25,6 +25,7 @@ type Event struct {
} }
// Render 将事件格式化并写入给定的 writer. // Render 将事件格式化并写入给定的 writer.
// 通过逐行处理数据, 此方法可防止因数据中包含换行符而导致的CRLF注入问题.
// 为了性能, 它使用 bytes.Buffer 并通过 WriteTo 直接写入, 以避免不必要的内存分配. // 为了性能, 它使用 bytes.Buffer 并通过 WriteTo 直接写入, 以避免不必要的内存分配.
func (e *Event) Render(w io.Writer) error { func (e *Event) Render(w io.Writer) error {
var buf bytes.Buffer var buf bytes.Buffer
@ -65,7 +66,6 @@ func (e *Event) Render(w io.Writer) error {
// 这是推荐的、更简单安全的方式, 采用阻塞和回调的设计, 框架负责管理连接生命周期. // 这是推荐的、更简单安全的方式, 采用阻塞和回调的设计, 框架负责管理连接生命周期.
// //
// 详细用法: // 详细用法:
//
// r.GET("/sse/callback", func(c *touka.Context) { // r.GET("/sse/callback", func(c *touka.Context) {
// // streamer 回调函数会在一个循环中被调用. // // streamer 回调函数会在一个循环中被调用.
// c.EventStream(func(w io.Writer) bool { // c.EventStream(func(w io.Writer) bool {
@ -86,19 +86,14 @@ func (e *Event) Render(w io.Writer) error {
// fmt.Println("Client disconnected from /sse/callback") // fmt.Println("Client disconnected from /sse/callback")
// }) // })
func (c *Context) EventStream(streamer func(w io.Writer) bool) { func (c *Context) EventStream(streamer func(w io.Writer) bool) {
c.Writer.Header().Set("Content-Type", "text/event-stream") // 为现代网络协议优化头部.
c.Writer.Header().Set("Cache-Control", "no-cache") c.Writer.Header().Set("Content-Type", "text/event-stream; charset=utf-8")
c.Writer.Header().Set("Connection", "keep-alive") c.Writer.Header().Set("Cache-Control", "no-cache, no-transform")
c.Writer.Header().Set("Transfer-Encoding", "chunked") c.Writer.Header().Del("Connection")
c.Writer.Header().Del("Transfer-Encoding")
flusher, ok := c.Writer.(http.Flusher)
if !ok {
c.Errorf("streaming unsupported: http.ResponseWriter does not implement http.Flusher")
return
}
c.Writer.WriteHeader(http.StatusOK) c.Writer.WriteHeader(http.StatusOK)
flusher.Flush() c.Writer.Flush() // 直接调用, ResponseWriter 接口保证了 Flush 方法的存在.
for { for {
select { select {
@ -108,17 +103,20 @@ func (c *Context) EventStream(streamer func(w io.Writer) bool) {
if !streamer(c.Writer) { if !streamer(c.Writer) {
return return
} }
flusher.Flush() c.Writer.Flush()
} }
} }
} }
// EventStreamChan 返回用于 SSE 事件流的 channel. // EventStreamChan 返回用于 SSE 事件流的 channel.
// 这是为高级并发场景设计的、更灵活的API. // 这是为高级并发场景设计的、更灵活的API.
// 调用者必须负责关闭 event channel 并处理 error channel 以避免 goroutine 泄漏. //
// 重要:
// - 调用者必须 close(eventChan) 来结束事件流.
// - 调用者必须在独立的 goroutine 中消费 errChan 来处理错误和连接断开.
// - 为防止 goroutine 泄漏, 建议发送方在 select 中同时监听 c.Request.Context().Done().
// //
// 详细用法: // 详细用法:
//
// r.GET("/sse/channel", func(c *touka.Context) { // r.GET("/sse/channel", func(c *touka.Context) {
// eventChan, errChan := c.EventStreamChan() // eventChan, errChan := c.EventStreamChan()
// //
@ -135,36 +133,30 @@ func (c *Context) EventStream(streamer func(w io.Writer) bool) {
// defer close(eventChan) // defer close(eventChan)
// //
// for i := 1; i <= 5; i++ { // for i := 1; i <= 5; i++ {
// event := touka.Event{ // select {
// case <-c.Request.Context().Done():
// return // 客户端已断开, 退出 goroutine.
// default:
// eventChan <- touka.Event{
// Id: fmt.Sprintf("%d", i), // Id: fmt.Sprintf("%d", i),
// Data: "hello from channel", // Data: "hello from channel",
// } // }
// eventChan <- event
// time.Sleep(2 * time.Second) // time.Sleep(2 * time.Second)
// } // }
// }
// }() // }()
// }) // })
func (c *Context) EventStreamChan() (chan<- Event, <-chan error) { func (c *Context) EventStreamChan() (chan<- Event, <-chan error) {
eventChan := make(chan Event) eventChan := make(chan Event)
errChan := make(chan error, 1) errChan := make(chan error, 1)
c.Writer.Header().Set("Content-Type", "text/event-stream") c.Writer.Header().Set("Content-Type", "text/event-stream; charset=utf-8")
c.Writer.Header().Set("Cache-Control", "no-cache") c.Writer.Header().Set("Cache-Control", "no-cache, no-transform")
c.Writer.Header().Set("Connection", "keep-alive") c.Writer.Header().Del("Connection")
c.Writer.Header().Set("Transfer-Encoding", "chunked") c.Writer.Header().Del("Transfer-Encoding")
flusher, ok := c.Writer.(http.Flusher)
if !ok {
err := fmt.Errorf("streaming unsupported: http.ResponseWriter does not implement http.Flusher")
c.Errorf(err.Error())
errChan <- err
close(errChan)
close(eventChan)
return eventChan, errChan
}
c.Writer.WriteHeader(http.StatusOK) c.Writer.WriteHeader(http.StatusOK)
flusher.Flush() c.Writer.Flush()
go func() { go func() {
defer close(errChan) defer close(errChan)
@ -179,7 +171,7 @@ func (c *Context) EventStreamChan() (chan<- Event, <-chan error) {
errChan <- err errChan <- err
return return
} }
flusher.Flush() c.Writer.Flush()
case <-c.Request.Context().Done(): case <-c.Request.Context().Done():
errChan <- c.Request.Context().Err() errChan <- c.Request.Context().Err()
return return