diff --git a/docs/sse.md b/docs/sse.md index bafc553..b0224e8 100644 --- a/docs/sse.md +++ b/docs/sse.md @@ -40,27 +40,20 @@ r.GET("/events", func(c *touka.Context) { ## 模式二:通道模式 (EventStreamChan) -如果您需要更高级的并发控制(例如从多个异步源接收数据),可以使用通道模式。 +如果您需要更高级的并发控制(例如从多个异步源接收数据),可以使用通道模式。与回调模式类似,此方法是**阻塞的**:handler 会在此方法中停留,直到事件 channel 被关闭或客户端断开连接。 ```go r.GET("/events-chan", func(c *touka.Context) { - eventChan, errChan := c.EventStreamChan() + eventChan := make(chan touka.Event) - // 监听错误/断开连接 + // 在独立的 goroutine 中发送事件. go func() { - if err := <-errChan; err != nil { - log.Printf("SSE 错误: %v", err) - } - }() - - // 发送数据 - go func() { - defer close(eventChan) // 务必在结束时关闭 + defer close(eventChan) // 务必在结束时关闭以结束事件流. for i := 0; i < 10; i++ { select { case <-c.Request.Context().Done(): - return + return // 客户端已断开, 退出 goroutine. default: eventChan <- touka.Event{ Data: fmt.Sprintf("消息 #%d", i), @@ -69,14 +62,18 @@ r.GET("/events-chan", func(c *touka.Context) { } } }() + + // EventStreamChan 会阻塞直到流结束. + c.EventStreamChan(eventChan) }) ``` ## 最佳实践 -1. **资源回收**: 确保在 `EventStreamChan` 模式下正确监听 `c.Request.Context().Done()` 以避免 Goroutine 泄漏。 -2. **数据格式**: SSE 协议要求数据为 UTF-8。Touka 的 `Render` 方法会自动处理多行数据并加上必要的 `data:` 前缀。 -3. **超时管理**: SSE 连接通常是长连接,请确保您的反向代理(如 Nginx)配置了足够大的写超时时间。 +1. **资源回收**: `EventStreamChan` 是阻塞的,handler 在事件流结束前不会返回。请确保生产者 goroutine 在 `select` 中监听 `c.Request.Context().Done()` 以响应客户端断开。 +2. **关闭 Channel**: 生产者完成发送后必须 `close(eventChan)`,否则 handler 会永远阻塞。 +3. **数据格式**: SSE 协议要求数据为 UTF-8。Touka 的 `Render` 方法会自动处理多行数据并加上必要的 `data:` 前缀。 +4. **超时管理**: SSE 连接通常是长连接,请确保您的反向代理(如 Nginx)配置了足够大的写超时时间。 ## 优雅关闭与资源清理 diff --git a/sse.go b/sse.go index ab6c226..3ce050a 100644 --- a/sse.go +++ b/sse.go @@ -111,27 +111,22 @@ func (c *Context) EventStream(streamer func(w io.Writer) bool) { // EventStreamChan 返回用于 SSE 事件流的 channel. // 这是为高级并发场景设计的、更灵活的API. // -// 重要: -// - 调用者必须 close(eventChan) 来结束事件流. -// - 调用者必须在独立的 goroutine 中消费 errChan 来处理错误和连接断开. -// - 为防止 goroutine 泄漏, 建议发送方在 select 中同时监听 c.Request.Context().Done(). +// 与 EventStream 回调模式类似, 此方法是阻塞的: handler 会在此方法中停留, +// 直到事件 channel 被关闭 (close eventChan) 或客户端断开连接. +// 这保证了 Context 不会在 SSE 流期间被 pool 回收. +// +// eventChan 必须在调用此方法之前创建, 以便调用者可以在独立的 goroutine 中发送事件. +// 调用者必须在完成后 close(eventChan) 来结束流. +// 生产者 goroutine 必须在 select 中监听 c.Request.Context().Done(), 否则在客户端断开时会产生 goroutine 泄漏. // // 详细用法: // // r.GET("/sse/channel", func(c *touka.Context) { -// eventChan, errChan := c.EventStreamChan() +// eventChan := make(chan touka.Event) // -// // 必须在独立的goroutine中处理错误和连接断开. +// // 在独立的 goroutine 中异步发送事件. // go func() { -// if err := <-errChan; err != nil { -// c.Errorf("SSE channel error: %v", err) -// } -// }() -// -// // 在另一个goroutine中异步发送事件. -// go func() { -// // 重要: 必须在逻辑结束时关闭channel, 以通知框架. -// defer close(eventChan) +// defer close(eventChan) // 完成后关闭 channel 以结束事件流. // // for i := 1; i <= 5; i++ { // select { @@ -146,11 +141,11 @@ func (c *Context) EventStream(streamer func(w io.Writer) bool) { // } // } // }() +// +// // 阻塞直到事件流结束. +// c.EventStreamChan(eventChan) // }) -func (c *Context) EventStreamChan() (chan<- Event, <-chan error) { - eventChan := make(chan Event) - errChan := make(chan error, 1) - +func (c *Context) EventStreamChan(eventChan <-chan Event) { c.Writer.Header().Set("Content-Type", "text/event-stream; charset=utf-8") c.Writer.Header().Set("Cache-Control", "no-cache, no-transform") c.Writer.Header().Del("Connection") @@ -159,8 +154,15 @@ func (c *Context) EventStreamChan() (chan<- Event, <-chan error) { c.Writer.WriteHeader(http.StatusOK) c.Writer.Flush() + // 捕获稳定的引用, 不持有 *Context 指针, 以免 Context 被 pool 回收后出现竞态. + fl, _ := c.Writer.(http.Flusher) + reqCtx := c.Request.Context() + + goroutineExited := make(chan struct{}) + + // 写入 goroutine: 从 eventChan 消费事件并写入响应. go func() { - defer close(errChan) + defer close(goroutineExited) for { select { @@ -169,16 +171,22 @@ func (c *Context) EventStreamChan() (chan<- Event, <-chan error) { return } if err := event.Render(c.Writer); err != nil { - errChan <- err return } - c.Writer.Flush() - case <-c.Request.Context().Done(): - errChan <- c.Request.Context().Err() + if fl != nil { + fl.Flush() + } + case <-reqCtx.Done(): return } } }() - return eventChan, errChan + // 阻塞直到: + // 1. 写入 goroutine 退出 (eventChan 关闭或写入失败) + // 2. 客户端断开连接 (reqCtx 取消) + select { + case <-goroutineExited: + case <-reqCtx.Done(): + } } diff --git a/sse_test.go b/sse_test.go new file mode 100644 index 0000000..f50859a --- /dev/null +++ b/sse_test.go @@ -0,0 +1,143 @@ +package touka + +import ( + "context" + "net/http/httptest" + "strings" + "testing" + "time" +) + +// TestEventStreamChanBlocksHandler verifies that EventStreamChan blocks until +// the event channel is closed. +func TestEventStreamChanBlocksHandler(t *testing.T) { + rr := httptest.NewRecorder() + req := httptest.NewRequest("GET", "/sse", nil) + c, _ := CreateTestContextWithRequest(rr, req) + + handlerReturned := make(chan struct{}) + eventChan := make(chan Event) + + // Start producer goroutine before EventStreamChan blocks + go func() { + defer close(eventChan) + time.Sleep(30 * time.Millisecond) + eventChan <- Event{Data: "hello"} + time.Sleep(30 * time.Millisecond) + }() + + go func() { + c.EventStreamChan(eventChan) + close(handlerReturned) + }() + + // Wait for goroutine to start + time.Sleep(10 * time.Millisecond) + + // Handler should NOT have returned (eventChan not closed) + select { + case <-handlerReturned: + t.Fatal("Handler returned before eventChan was closed - EventStreamChan is not blocking") + case <-time.After(40 * time.Millisecond): + // good, still blocking + } + + // Wait for producer to finish (30+30ms + margin) + select { + case <-handlerReturned: + // good, handler returned + case <-time.After(200 * time.Millisecond): + t.Fatal("Handler did not return after eventChan was closed") + } +} + +// TestEventStreamChanUnblocksOnClientDisconnect verifies the handler returns +// when the request context is cancelled, even if eventChan is never closed. +func TestEventStreamChanUnblocksOnClientDisconnect(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + rr := httptest.NewRecorder() + req := httptest.NewRequest("GET", "/sse", nil).WithContext(ctx) + c, _ := CreateTestContextWithRequest(rr, req) + + eventChan := make(chan Event) + handlerReturned := make(chan struct{}) + + // Producer never closes eventChan + go func() { + for { + select { + case <-ctx.Done(): + return + default: + eventChan <- Event{Data: "tick"} + time.Sleep(10 * time.Millisecond) + } + } + }() + + go func() { + c.EventStreamChan(eventChan) + close(handlerReturned) + }() + + // Handler should NOT have returned + select { + case <-handlerReturned: + t.Fatal("Handler returned before stream ended") + case <-time.After(60 * time.Millisecond): + // good, still blocked + } + + // Cancel context to simulate client disconnect + cancel() + + select { + case <-handlerReturned: + // good + case <-time.After(200 * time.Millisecond): + t.Fatal("Handler did not return after client disconnect") + } +} + +// TestEventStreamChanWritesEvents verifies the SSE event format is correct. +func TestEventStreamChanWritesEvents(t *testing.T) { + rr := httptest.NewRecorder() + req := httptest.NewRequest("GET", "/sse", nil) + c, _ := CreateTestContextWithRequest(rr, req) + + eventChan := make(chan Event) + + go func() { + defer close(eventChan) + eventChan <- Event{Id: "1", Event: "tick", Data: "hello\nworld"} + eventChan <- Event{Id: "2", Data: "second"} + }() + + c.EventStreamChan(eventChan) + + body := rr.Body.String() + + ct := rr.Header().Get("Content-Type") + if !strings.Contains(ct, "text/event-stream") { + t.Fatalf("expected text/event-stream content type, got %q", ct) + } + + if !strings.Contains(body, "id: 1") { + t.Fatal("missing id field in first event") + } + if !strings.Contains(body, "event: tick") { + t.Fatal("missing event field in first event") + } + if !strings.Contains(body, "data: hello") { + t.Fatal("missing data line 1 in first event") + } + if !strings.Contains(body, "data: world") { + t.Fatal("missing data line 2 in first event") + } + if !strings.Contains(body, "id: 2") { + t.Fatal("missing id field in second event") + } + if !strings.Contains(body, "data: second") { + t.Fatal("missing data in second event") + } +}