diff --git a/go.mod b/go.mod index 0b8d97b..f1c81da 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/infinite-iroha/touka -go 1.25.1 +go 1.24.5 require ( github.com/WJQSERVER-STUDIO/go-utils/iox v0.0.2 diff --git a/sse.go b/sse.go deleted file mode 100644 index 597e930..0000000 --- a/sse.go +++ /dev/null @@ -1,183 +0,0 @@ -// This Source Code Form is subject to the terms of the Mozilla Public License, v. 2.0. If a copy of the MPL was not distributed with this -// file, You can obtain one at https://mozilla.org/MPL/2.0/. -// Copyright 2025 WJQSERVER. All rights reserved. -// All rights reserved by WJQSERVER, related rights can be exercised by the infinite-iroha organization. -package touka - -import ( - "bytes" - "fmt" - "io" - "net/http" - "strings" -) - -// Event 代表一个服务器发送事件(SSE). -type Event struct { - // Event 是事件的名称. - Event string - // Data 是事件的内容, 可以是多行文本. - Data string - // Id 是事件的唯一标识符. - Id string - // Retry 是指定客户端在连接丢失后应等待多少毫秒后尝试重新连接. - Retry string -} - -// Render 将事件格式化并写入给定的 writer. -// 通过逐行处理数据, 此方法可防止因数据中包含换行符而导致的CRLF注入问题. -// 为了性能, 它使用 bytes.Buffer 并通过 WriteTo 直接写入, 以避免不必要的内存分配. -func (e *Event) Render(w io.Writer) error { - var buf bytes.Buffer - - if len(e.Id) > 0 { - buf.WriteString("id: ") - buf.WriteString(e.Id) - buf.WriteString("\n") - } - if len(e.Event) > 0 { - buf.WriteString("event: ") - buf.WriteString(e.Event) - buf.WriteString("\n") - } - if len(e.Data) > 0 { - lines := strings.Split(e.Data, "\n") - for _, line := range lines { - buf.WriteString("data: ") - buf.WriteString(line) - buf.WriteString("\n") - } - } - if len(e.Retry) > 0 { - buf.WriteString("retry: ") - buf.WriteString(e.Retry) - buf.WriteString("\n") - } - - // 每个事件都以一个额外的换行符结尾. - buf.WriteString("\n") - - // 直接将 buffer 的内容写入 writer, 避免生成中间字符串. - _, err := buf.WriteTo(w) - return err -} - -// EventStream 启动一个 SSE 事件流. -// 这是推荐的、更简单安全的方式, 采用阻塞和回调的设计, 框架负责管理连接生命周期. -// -// 详细用法: -// r.GET("/sse/callback", func(c *touka.Context) { -// // streamer 回调函数会在一个循环中被调用. -// c.EventStream(func(w io.Writer) bool { -// event := touka.Event{ -// Event: "time-tick", -// Data: time.Now().Format(time.RFC1123), -// } -// -// if err := event.Render(w); err != nil { -// // 发生写入错误, 停止发送. -// return false // 返回 false 结束事件流. -// } -// -// time.Sleep(2 * time.Second) -// return true // 返回 true 继续事件流. -// }) -// // 当事件流结束后(例如客户端关闭页面), 这行代码会被执行. -// fmt.Println("Client disconnected from /sse/callback") -// }) -func (c *Context) EventStream(streamer func(w io.Writer) bool) { - // 为现代网络协议优化头部. - c.Writer.Header().Set("Content-Type", "text/event-stream; charset=utf-8") - c.Writer.Header().Set("Cache-Control", "no-cache, no-transform") - c.Writer.Header().Del("Connection") - c.Writer.Header().Del("Transfer-Encoding") - - c.Writer.WriteHeader(http.StatusOK) - c.Writer.Flush() // 直接调用, ResponseWriter 接口保证了 Flush 方法的存在. - - for { - select { - case <-c.Request.Context().Done(): - return - default: - if !streamer(c.Writer) { - return - } - c.Writer.Flush() - } - } -} - -// EventStreamChan 返回用于 SSE 事件流的 channel. -// 这是为高级并发场景设计的、更灵活的API. -// -// 重要: -// - 调用者必须 close(eventChan) 来结束事件流. -// - 调用者必须在独立的 goroutine 中消费 errChan 来处理错误和连接断开. -// - 为防止 goroutine 泄漏, 建议发送方在 select 中同时监听 c.Request.Context().Done(). -// -// 详细用法: -// r.GET("/sse/channel", func(c *touka.Context) { -// eventChan, errChan := c.EventStreamChan() -// -// // 必须在独立的goroutine中处理错误和连接断开. -// go func() { -// if err := <-errChan; err != nil { -// c.Errorf("SSE channel error: %v", err) -// } -// }() -// -// // 在另一个goroutine中异步发送事件. -// go func() { -// // 重要: 必须在逻辑结束时关闭channel, 以通知框架. -// defer close(eventChan) -// -// for i := 1; i <= 5; i++ { -// select { -// case <-c.Request.Context().Done(): -// return // 客户端已断开, 退出 goroutine. -// default: -// eventChan <- touka.Event{ -// Id: fmt.Sprintf("%d", i), -// Data: "hello from channel", -// } -// time.Sleep(2 * time.Second) -// } -// } -// }() -// }) -func (c *Context) EventStreamChan() (chan<- Event, <-chan error) { - eventChan := make(chan Event) - errChan := make(chan error, 1) - - c.Writer.Header().Set("Content-Type", "text/event-stream; charset=utf-8") - c.Writer.Header().Set("Cache-Control", "no-cache, no-transform") - c.Writer.Header().Del("Connection") - c.Writer.Header().Del("Transfer-Encoding") - - c.Writer.WriteHeader(http.StatusOK) - c.Writer.Flush() - - go func() { - defer close(errChan) - - for { - select { - case event, ok := <-eventChan: - if !ok { - return - } - if err := event.Render(c.Writer); err != nil { - errChan <- err - return - } - c.Writer.Flush() - case <-c.Request.Context().Done(): - errChan <- c.Request.Context().Err() - return - } - } - }() - - return eventChan, errChan -}