fix(SSE): capture Writer before goroutine, use select for channel send

Address PR review feedback:
- Capture w := c.Writer before goroutine start, use w (not c.Writer)
  inside the goroutine to avoid holding *Context reference
- Move channel send into select alongside context cancellation in all
  examples and tests, preventing goroutine leak when client disconnects
  while blocked on unbuffered send
This commit is contained in:
wjqserver 2026-03-29 16:05:09 +08:00
parent 2f94763c65
commit 559aefeb85
3 changed files with 16 additions and 17 deletions

View file

@ -45,6 +45,7 @@ r.GET("/events", func(c *touka.Context) {
```go ```go
r.GET("/events-chan", func(c *touka.Context) { r.GET("/events-chan", func(c *touka.Context) {
eventChan := make(chan touka.Event) eventChan := make(chan touka.Event)
ctx := c.Request.Context()
// 在独立的 goroutine 中发送事件. // 在独立的 goroutine 中发送事件.
go func() { go func() {
@ -52,15 +53,14 @@ r.GET("/events-chan", func(c *touka.Context) {
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
select { select {
case <-c.Request.Context().Done(): case <-ctx.Done():
return // 客户端已断开, 退出 goroutine. return // 客户端已断开, 退出 goroutine.
default: case eventChan <- touka.Event{
eventChan <- touka.Event{
Data: fmt.Sprintf("消息 #%d", i), Data: fmt.Sprintf("消息 #%d", i),
}:
} }
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
} }
}
}() }()
// EventStreamChan 会阻塞直到流结束. // EventStreamChan 会阻塞直到流结束.
@ -70,7 +70,7 @@ r.GET("/events-chan", func(c *touka.Context) {
## 最佳实践 ## 最佳实践
1. **资源回收**: `EventStreamChan` 是阻塞的handler 在事件流结束前不会返回。请确保生产者 goroutine 在 `select` 中监听 `c.Request.Context().Done()`响应客户端断开。 1. **资源回收**: `EventStreamChan` 是阻塞的handler 在事件流结束前不会返回。`c.Request.Context().Done()``eventChan <- ...` 作为同一个 `select` 的两个分支,确保发送操作本身能够响应客户端断开。
2. **关闭 Channel**: 生产者完成发送后必须 `close(eventChan)`,否则 handler 会永远阻塞。 2. **关闭 Channel**: 生产者完成发送后必须 `close(eventChan)`,否则 handler 会永远阻塞。
3. **数据格式**: SSE 协议要求数据为 UTF-8。Touka 的 `Render` 方法会自动处理多行数据并加上必要的 `data:` 前缀。 3. **数据格式**: SSE 协议要求数据为 UTF-8。Touka 的 `Render` 方法会自动处理多行数据并加上必要的 `data:` 前缀。
4. **超时管理**: SSE 连接通常是长连接,请确保您的反向代理(如 Nginx配置了足够大的写超时时间。 4. **超时管理**: SSE 连接通常是长连接,请确保您的反向代理(如 Nginx配置了足够大的写超时时间。

10
sse.go
View file

@ -132,14 +132,13 @@ func (c *Context) EventStream(streamer func(w io.Writer) bool) {
// select { // select {
// case <-c.Request.Context().Done(): // case <-c.Request.Context().Done():
// return // 客户端已断开, 退出 goroutine. // return // 客户端已断开, 退出 goroutine.
// default: // case eventChan <- touka.Event{
// eventChan <- touka.Event{
// Id: fmt.Sprintf("%d", i), // Id: fmt.Sprintf("%d", i),
// Data: "hello from channel", // Data: "hello from channel",
// }:
// } // }
// time.Sleep(2 * time.Second) // time.Sleep(2 * time.Second)
// } // }
// }
// }() // }()
// //
// // 阻塞直到事件流结束. // // 阻塞直到事件流结束.
@ -155,7 +154,8 @@ func (c *Context) EventStreamChan(eventChan <-chan Event) {
c.Writer.Flush() c.Writer.Flush()
// 捕获稳定的引用, 不持有 *Context 指针, 以免 Context 被 pool 回收后出现竞态. // 捕获稳定的引用, 不持有 *Context 指针, 以免 Context 被 pool 回收后出现竞态.
fl, _ := c.Writer.(http.Flusher) w := c.Writer
fl, _ := w.(http.Flusher)
reqCtx := c.Request.Context() reqCtx := c.Request.Context()
goroutineExited := make(chan struct{}) goroutineExited := make(chan struct{})
@ -170,7 +170,7 @@ func (c *Context) EventStreamChan(eventChan <-chan Event) {
if !ok { if !ok {
return return
} }
if err := event.Render(c.Writer); err != nil { if err := event.Render(w); err != nil {
return return
} }
if fl != nil { if fl != nil {

View file

@ -68,8 +68,7 @@ func TestEventStreamChanUnblocksOnClientDisconnect(t *testing.T) {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
default: case eventChan <- Event{Data: "tick"}:
eventChan <- Event{Data: "tick"}
time.Sleep(10 * time.Millisecond) time.Sleep(10 * time.Millisecond)
} }
} }