From 2f94763c65bdc747fb5e4ced419dbc1d62e14a76 Mon Sep 17 00:00:00 2001 From: wjqserver <114663932+WJQSERVER@users.noreply.github.com> Date: Sun, 29 Mar 2026 15:42:01 +0800 Subject: [PATCH] fix(SSE)!: redesign EventStreamChan to prevent context pool recycling MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit BREAKING CHANGE: EventStreamChan signature changed from (chan<- Event, <-chan error) to (eventChan <-chan Event) The caller now creates and passes the channel instead of receiving it. The errChan return value is removed. The old non-blocking design allowed the handler to return before the SSE stream ended, causing ServeHTTP to return the Context to the pool while the internal goroutine was still writing to the pooled writer — a data race across requests. The new blocking design keeps the handler inside EventStreamChan until the event channel is closed or the client disconnects, ensuring the Context remains bound throughout the stream. - Caller creates channel, producer goroutine sends events - EventStreamChan blocks handler until stream ends - Internal goroutine captures stable references (Flusher, context.Context) instead of holding *Context pointer - Nil guard on Flusher type assertion - Add sse_test.go covering blocking, disconnect, and event format - Update docs/sse.md for new API --- docs/sse.md | 27 +++++----- sse.go | 58 ++++++++++++--------- sse_test.go | 143 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 188 insertions(+), 40 deletions(-) create mode 100644 sse_test.go diff --git a/docs/sse.md b/docs/sse.md index bafc553..b0224e8 100644 --- a/docs/sse.md +++ b/docs/sse.md @@ -40,27 +40,20 @@ r.GET("/events", func(c *touka.Context) { ## 模式二:通道模式 (EventStreamChan) -如果您需要更高级的并发控制(例如从多个异步源接收数据),可以使用通道模式。 +如果您需要更高级的并发控制(例如从多个异步源接收数据),可以使用通道模式。与回调模式类似,此方法是**阻塞的**:handler 会在此方法中停留,直到事件 channel 被关闭或客户端断开连接。 ```go r.GET("/events-chan", func(c *touka.Context) { - eventChan, errChan := c.EventStreamChan() + eventChan := make(chan touka.Event) - // 监听错误/断开连接 + // 在独立的 goroutine 中发送事件. go func() { - if err := <-errChan; err != nil { - log.Printf("SSE 错误: %v", err) - } - }() - - // 发送数据 - go func() { - defer close(eventChan) // 务必在结束时关闭 + defer close(eventChan) // 务必在结束时关闭以结束事件流. for i := 0; i < 10; i++ { select { case <-c.Request.Context().Done(): - return + return // 客户端已断开, 退出 goroutine. default: eventChan <- touka.Event{ Data: fmt.Sprintf("消息 #%d", i), @@ -69,14 +62,18 @@ r.GET("/events-chan", func(c *touka.Context) { } } }() + + // EventStreamChan 会阻塞直到流结束. + c.EventStreamChan(eventChan) }) ``` ## 最佳实践 -1. **资源回收**: 确保在 `EventStreamChan` 模式下正确监听 `c.Request.Context().Done()` 以避免 Goroutine 泄漏。 -2. **数据格式**: SSE 协议要求数据为 UTF-8。Touka 的 `Render` 方法会自动处理多行数据并加上必要的 `data:` 前缀。 -3. **超时管理**: SSE 连接通常是长连接,请确保您的反向代理(如 Nginx)配置了足够大的写超时时间。 +1. **资源回收**: `EventStreamChan` 是阻塞的,handler 在事件流结束前不会返回。请确保生产者 goroutine 在 `select` 中监听 `c.Request.Context().Done()` 以响应客户端断开。 +2. **关闭 Channel**: 生产者完成发送后必须 `close(eventChan)`,否则 handler 会永远阻塞。 +3. **数据格式**: SSE 协议要求数据为 UTF-8。Touka 的 `Render` 方法会自动处理多行数据并加上必要的 `data:` 前缀。 +4. **超时管理**: SSE 连接通常是长连接,请确保您的反向代理(如 Nginx)配置了足够大的写超时时间。 ## 优雅关闭与资源清理 diff --git a/sse.go b/sse.go index ab6c226..3ce050a 100644 --- a/sse.go +++ b/sse.go @@ -111,27 +111,22 @@ func (c *Context) EventStream(streamer func(w io.Writer) bool) { // EventStreamChan 返回用于 SSE 事件流的 channel. // 这是为高级并发场景设计的、更灵活的API. // -// 重要: -// - 调用者必须 close(eventChan) 来结束事件流. -// - 调用者必须在独立的 goroutine 中消费 errChan 来处理错误和连接断开. -// - 为防止 goroutine 泄漏, 建议发送方在 select 中同时监听 c.Request.Context().Done(). +// 与 EventStream 回调模式类似, 此方法是阻塞的: handler 会在此方法中停留, +// 直到事件 channel 被关闭 (close eventChan) 或客户端断开连接. +// 这保证了 Context 不会在 SSE 流期间被 pool 回收. +// +// eventChan 必须在调用此方法之前创建, 以便调用者可以在独立的 goroutine 中发送事件. +// 调用者必须在完成后 close(eventChan) 来结束流. +// 生产者 goroutine 必须在 select 中监听 c.Request.Context().Done(), 否则在客户端断开时会产生 goroutine 泄漏. // // 详细用法: // // r.GET("/sse/channel", func(c *touka.Context) { -// eventChan, errChan := c.EventStreamChan() +// eventChan := make(chan touka.Event) // -// // 必须在独立的goroutine中处理错误和连接断开. +// // 在独立的 goroutine 中异步发送事件. // go func() { -// if err := <-errChan; err != nil { -// c.Errorf("SSE channel error: %v", err) -// } -// }() -// -// // 在另一个goroutine中异步发送事件. -// go func() { -// // 重要: 必须在逻辑结束时关闭channel, 以通知框架. -// defer close(eventChan) +// defer close(eventChan) // 完成后关闭 channel 以结束事件流. // // for i := 1; i <= 5; i++ { // select { @@ -146,11 +141,11 @@ func (c *Context) EventStream(streamer func(w io.Writer) bool) { // } // } // }() +// +// // 阻塞直到事件流结束. +// c.EventStreamChan(eventChan) // }) -func (c *Context) EventStreamChan() (chan<- Event, <-chan error) { - eventChan := make(chan Event) - errChan := make(chan error, 1) - +func (c *Context) EventStreamChan(eventChan <-chan Event) { c.Writer.Header().Set("Content-Type", "text/event-stream; charset=utf-8") c.Writer.Header().Set("Cache-Control", "no-cache, no-transform") c.Writer.Header().Del("Connection") @@ -159,8 +154,15 @@ func (c *Context) EventStreamChan() (chan<- Event, <-chan error) { c.Writer.WriteHeader(http.StatusOK) c.Writer.Flush() + // 捕获稳定的引用, 不持有 *Context 指针, 以免 Context 被 pool 回收后出现竞态. + fl, _ := c.Writer.(http.Flusher) + reqCtx := c.Request.Context() + + goroutineExited := make(chan struct{}) + + // 写入 goroutine: 从 eventChan 消费事件并写入响应. go func() { - defer close(errChan) + defer close(goroutineExited) for { select { @@ -169,16 +171,22 @@ func (c *Context) EventStreamChan() (chan<- Event, <-chan error) { return } if err := event.Render(c.Writer); err != nil { - errChan <- err return } - c.Writer.Flush() - case <-c.Request.Context().Done(): - errChan <- c.Request.Context().Err() + if fl != nil { + fl.Flush() + } + case <-reqCtx.Done(): return } } }() - return eventChan, errChan + // 阻塞直到: + // 1. 写入 goroutine 退出 (eventChan 关闭或写入失败) + // 2. 客户端断开连接 (reqCtx 取消) + select { + case <-goroutineExited: + case <-reqCtx.Done(): + } } diff --git a/sse_test.go b/sse_test.go new file mode 100644 index 0000000..f50859a --- /dev/null +++ b/sse_test.go @@ -0,0 +1,143 @@ +package touka + +import ( + "context" + "net/http/httptest" + "strings" + "testing" + "time" +) + +// TestEventStreamChanBlocksHandler verifies that EventStreamChan blocks until +// the event channel is closed. +func TestEventStreamChanBlocksHandler(t *testing.T) { + rr := httptest.NewRecorder() + req := httptest.NewRequest("GET", "/sse", nil) + c, _ := CreateTestContextWithRequest(rr, req) + + handlerReturned := make(chan struct{}) + eventChan := make(chan Event) + + // Start producer goroutine before EventStreamChan blocks + go func() { + defer close(eventChan) + time.Sleep(30 * time.Millisecond) + eventChan <- Event{Data: "hello"} + time.Sleep(30 * time.Millisecond) + }() + + go func() { + c.EventStreamChan(eventChan) + close(handlerReturned) + }() + + // Wait for goroutine to start + time.Sleep(10 * time.Millisecond) + + // Handler should NOT have returned (eventChan not closed) + select { + case <-handlerReturned: + t.Fatal("Handler returned before eventChan was closed - EventStreamChan is not blocking") + case <-time.After(40 * time.Millisecond): + // good, still blocking + } + + // Wait for producer to finish (30+30ms + margin) + select { + case <-handlerReturned: + // good, handler returned + case <-time.After(200 * time.Millisecond): + t.Fatal("Handler did not return after eventChan was closed") + } +} + +// TestEventStreamChanUnblocksOnClientDisconnect verifies the handler returns +// when the request context is cancelled, even if eventChan is never closed. +func TestEventStreamChanUnblocksOnClientDisconnect(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + rr := httptest.NewRecorder() + req := httptest.NewRequest("GET", "/sse", nil).WithContext(ctx) + c, _ := CreateTestContextWithRequest(rr, req) + + eventChan := make(chan Event) + handlerReturned := make(chan struct{}) + + // Producer never closes eventChan + go func() { + for { + select { + case <-ctx.Done(): + return + default: + eventChan <- Event{Data: "tick"} + time.Sleep(10 * time.Millisecond) + } + } + }() + + go func() { + c.EventStreamChan(eventChan) + close(handlerReturned) + }() + + // Handler should NOT have returned + select { + case <-handlerReturned: + t.Fatal("Handler returned before stream ended") + case <-time.After(60 * time.Millisecond): + // good, still blocked + } + + // Cancel context to simulate client disconnect + cancel() + + select { + case <-handlerReturned: + // good + case <-time.After(200 * time.Millisecond): + t.Fatal("Handler did not return after client disconnect") + } +} + +// TestEventStreamChanWritesEvents verifies the SSE event format is correct. +func TestEventStreamChanWritesEvents(t *testing.T) { + rr := httptest.NewRecorder() + req := httptest.NewRequest("GET", "/sse", nil) + c, _ := CreateTestContextWithRequest(rr, req) + + eventChan := make(chan Event) + + go func() { + defer close(eventChan) + eventChan <- Event{Id: "1", Event: "tick", Data: "hello\nworld"} + eventChan <- Event{Id: "2", Data: "second"} + }() + + c.EventStreamChan(eventChan) + + body := rr.Body.String() + + ct := rr.Header().Get("Content-Type") + if !strings.Contains(ct, "text/event-stream") { + t.Fatalf("expected text/event-stream content type, got %q", ct) + } + + if !strings.Contains(body, "id: 1") { + t.Fatal("missing id field in first event") + } + if !strings.Contains(body, "event: tick") { + t.Fatal("missing event field in first event") + } + if !strings.Contains(body, "data: hello") { + t.Fatal("missing data line 1 in first event") + } + if !strings.Contains(body, "data: world") { + t.Fatal("missing data line 2 in first event") + } + if !strings.Contains(body, "id: 2") { + t.Fatal("missing id field in second event") + } + if !strings.Contains(body, "data: second") { + t.Fatal("missing data in second event") + } +}