From 559aefeb85bf38c1947494f71e1b9f8f8531400c Mon Sep 17 00:00:00 2001 From: wjqserver <114663932+WJQSERVER@users.noreply.github.com> Date: Sun, 29 Mar 2026 16:05:09 +0800 Subject: [PATCH] fix(SSE): capture Writer before goroutine, use select for channel send Address PR review feedback: - Capture w := c.Writer before goroutine start, use w (not c.Writer) inside the goroutine to avoid holding *Context reference - Move channel send into select alongside context cancellation in all examples and tests, preventing goroutine leak when client disconnects while blocked on unbuffered send --- docs/sse.md | 14 +++++++------- sse.go | 16 ++++++++-------- sse_test.go | 3 +-- 3 files changed, 16 insertions(+), 17 deletions(-) diff --git a/docs/sse.md b/docs/sse.md index b0224e8..1b44521 100644 --- a/docs/sse.md +++ b/docs/sse.md @@ -45,6 +45,7 @@ r.GET("/events", func(c *touka.Context) { ```go r.GET("/events-chan", func(c *touka.Context) { eventChan := make(chan touka.Event) + ctx := c.Request.Context() // 在独立的 goroutine 中发送事件. go func() { @@ -52,14 +53,13 @@ r.GET("/events-chan", func(c *touka.Context) { for i := 0; i < 10; i++ { select { - case <-c.Request.Context().Done(): + case <-ctx.Done(): return // 客户端已断开, 退出 goroutine. - default: - eventChan <- touka.Event{ - Data: fmt.Sprintf("消息 #%d", i), - } - time.Sleep(1 * time.Second) + case eventChan <- touka.Event{ + Data: fmt.Sprintf("消息 #%d", i), + }: } + time.Sleep(1 * time.Second) } }() @@ -70,7 +70,7 @@ r.GET("/events-chan", func(c *touka.Context) { ## 最佳实践 -1. **资源回收**: `EventStreamChan` 是阻塞的,handler 在事件流结束前不会返回。请确保生产者 goroutine 在 `select` 中监听 `c.Request.Context().Done()` 以响应客户端断开。 +1. **资源回收**: `EventStreamChan` 是阻塞的,handler 在事件流结束前不会返回。将 `c.Request.Context().Done()` 和 `eventChan <- ...` 作为同一个 `select` 的两个分支,确保发送操作本身能够响应客户端断开。 2. **关闭 Channel**: 生产者完成发送后必须 `close(eventChan)`,否则 handler 会永远阻塞。 3. **数据格式**: SSE 协议要求数据为 UTF-8。Touka 的 `Render` 方法会自动处理多行数据并加上必要的 `data:` 前缀。 4. **超时管理**: SSE 连接通常是长连接,请确保您的反向代理(如 Nginx)配置了足够大的写超时时间。 diff --git a/sse.go b/sse.go index 3ce050a..0284775 100644 --- a/sse.go +++ b/sse.go @@ -132,13 +132,12 @@ func (c *Context) EventStream(streamer func(w io.Writer) bool) { // select { // case <-c.Request.Context().Done(): // return // 客户端已断开, 退出 goroutine. -// default: -// eventChan <- touka.Event{ -// Id: fmt.Sprintf("%d", i), -// Data: "hello from channel", -// } -// time.Sleep(2 * time.Second) +// case eventChan <- touka.Event{ +// Id: fmt.Sprintf("%d", i), +// Data: "hello from channel", +// }: // } +// time.Sleep(2 * time.Second) // } // }() // @@ -155,7 +154,8 @@ func (c *Context) EventStreamChan(eventChan <-chan Event) { c.Writer.Flush() // 捕获稳定的引用, 不持有 *Context 指针, 以免 Context 被 pool 回收后出现竞态. - fl, _ := c.Writer.(http.Flusher) + w := c.Writer + fl, _ := w.(http.Flusher) reqCtx := c.Request.Context() goroutineExited := make(chan struct{}) @@ -170,7 +170,7 @@ func (c *Context) EventStreamChan(eventChan <-chan Event) { if !ok { return } - if err := event.Render(c.Writer); err != nil { + if err := event.Render(w); err != nil { return } if fl != nil { diff --git a/sse_test.go b/sse_test.go index f50859a..cfb9ae3 100644 --- a/sse_test.go +++ b/sse_test.go @@ -68,8 +68,7 @@ func TestEventStreamChanUnblocksOnClientDisconnect(t *testing.T) { select { case <-ctx.Done(): return - default: - eventChan <- Event{Data: "tick"} + case eventChan <- Event{Data: "tick"}: time.Sleep(10 * time.Millisecond) } }