This commit is contained in:
wjqserver 2025-09-10 02:40:41 +08:00
parent b4e073ae2f
commit 38ff5126e3

97
sse.go
View file

@ -6,7 +6,6 @@ package touka
import ( import (
"bytes" "bytes"
"fmt"
"io" "io"
"net/http" "net/http"
"strings" "strings"
@ -66,25 +65,26 @@ func (e *Event) Render(w io.Writer) error {
// 这是推荐的、更简单安全的方式, 采用阻塞和回调的设计, 框架负责管理连接生命周期. // 这是推荐的、更简单安全的方式, 采用阻塞和回调的设计, 框架负责管理连接生命周期.
// //
// 详细用法: // 详细用法:
// r.GET("/sse/callback", func(c *touka.Context) {
// // streamer 回调函数会在一个循环中被调用.
// c.EventStream(func(w io.Writer) bool {
// event := touka.Event{
// Event: "time-tick",
// Data: time.Now().Format(time.RFC1123),
// }
// //
// if err := event.Render(w); err != nil { // r.GET("/sse/callback", func(c *touka.Context) {
// // 发生写入错误, 停止发送. // // streamer 回调函数会在一个循环中被调用.
// return false // 返回 false 结束事件流. // c.EventStream(func(w io.Writer) bool {
// } // event := touka.Event{
// Event: "time-tick",
// Data: time.Now().Format(time.RFC1123),
// }
// //
// time.Sleep(2 * time.Second) // if err := event.Render(w); err != nil {
// return true // 返回 true 继续事件流. // // 发生写入错误, 停止发送.
// }) // return false // 返回 false 结束事件流.
// // 当事件流结束后(例如客户端关闭页面), 这行代码会被执行. // }
// fmt.Println("Client disconnected from /sse/callback") //
// }) // time.Sleep(2 * time.Second)
// return true // 返回 true 继续事件流.
// })
// // 当事件流结束后(例如客户端关闭页面), 这行代码会被执行.
// fmt.Println("Client disconnected from /sse/callback")
// })
func (c *Context) EventStream(streamer func(w io.Writer) bool) { func (c *Context) EventStream(streamer func(w io.Writer) bool) {
// 为现代网络协议优化头部. // 为现代网络协议优化头部.
c.Writer.Header().Set("Content-Type", "text/event-stream; charset=utf-8") c.Writer.Header().Set("Content-Type", "text/event-stream; charset=utf-8")
@ -112,40 +112,41 @@ func (c *Context) EventStream(streamer func(w io.Writer) bool) {
// 这是为高级并发场景设计的、更灵活的API. // 这是为高级并发场景设计的、更灵活的API.
// //
// 重要: // 重要:
// - 调用者必须 close(eventChan) 来结束事件流. // - 调用者必须 close(eventChan) 来结束事件流.
// - 调用者必须在独立的 goroutine 中消费 errChan 来处理错误和连接断开. // - 调用者必须在独立的 goroutine 中消费 errChan 来处理错误和连接断开.
// - 为防止 goroutine 泄漏, 建议发送方在 select 中同时监听 c.Request.Context().Done(). // - 为防止 goroutine 泄漏, 建议发送方在 select 中同时监听 c.Request.Context().Done().
// //
// 详细用法: // 详细用法:
// r.GET("/sse/channel", func(c *touka.Context) {
// eventChan, errChan := c.EventStreamChan()
// //
// // 必须在独立的goroutine中处理错误和连接断开. // r.GET("/sse/channel", func(c *touka.Context) {
// go func() { // eventChan, errChan := c.EventStreamChan()
// if err := <-errChan; err != nil {
// c.Errorf("SSE channel error: %v", err)
// }
// }()
// //
// // 在另一个goroutine中异步发送事件. // // 必须在独立的goroutine中处理错误和连接断开.
// go func() { // go func() {
// // 重要: 必须在逻辑结束时关闭channel, 以通知框架. // if err := <-errChan; err != nil {
// defer close(eventChan) // c.Errorf("SSE channel error: %v", err)
// }
// }()
// //
// for i := 1; i <= 5; i++ { // // 在另一个goroutine中异步发送事件.
// select { // go func() {
// case <-c.Request.Context().Done(): // // 重要: 必须在逻辑结束时关闭channel, 以通知框架.
// return // 客户端已断开, 退出 goroutine. // defer close(eventChan)
// default: //
// eventChan <- touka.Event{ // for i := 1; i <= 5; i++ {
// Id: fmt.Sprintf("%d", i), // select {
// Data: "hello from channel", // case <-c.Request.Context().Done():
// } // return // 客户端已断开, 退出 goroutine.
// time.Sleep(2 * time.Second) // default:
// } // eventChan <- touka.Event{
// } // Id: fmt.Sprintf("%d", i),
// }() // Data: "hello from channel",
// }) // }
// time.Sleep(2 * time.Second)
// }
// }
// }()
// })
func (c *Context) EventStreamChan() (chan<- Event, <-chan error) { func (c *Context) EventStreamChan() (chan<- Event, <-chan error) {
eventChan := make(chan Event) eventChan := make(chan Event)
errChan := make(chan error, 1) errChan := make(chan error, 1)
@ -156,7 +157,7 @@ func (c *Context) EventStreamChan() (chan<- Event, <-chan error) {
c.Writer.Header().Del("Transfer-Encoding") c.Writer.Header().Del("Transfer-Encoding")
c.Writer.WriteHeader(http.StatusOK) c.Writer.WriteHeader(http.StatusOK)
c.Writer.Flush() c.Writer.Flush()
go func() { go func() {
defer close(errChan) defer close(errChan)