From 2d693e3b13bfc075c32281401d34fb88568dd268 Mon Sep 17 00:00:00 2001 From: wjqserver <114663932+WJQSERVER@users.noreply.github.com> Date: Wed, 22 Apr 2026 09:27:53 +0800 Subject: [PATCH] =?UTF-8?q?refactor:=20mergectx=20=E7=AE=80=E5=8C=96?= =?UTF-8?q?=E7=BB=93=E6=9E=84=EF=BC=8C=E4=BF=AE=E5=A4=8D=20Gemini=20?= =?UTF-8?q?=E5=AE=A1=E6=9F=A5=E6=84=8F=E8=A7=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - deadlineCtx 改为 cancelCtx 的子 context,建立父子层级关系 - 嵌入 cancelCtx/context.Context 直接提供 Done()/Err()/Deadline(),移除冗余方法 - orDone 中加入 cancelCtx,防止手动 cancel() 时 goroutine 泄漏 - 移除 cancelCtx/deadlineCtx/done/doneOnce 字段,struct 简化为 Context + parents - 移除冗余 Cause() 方法(context.Cause 用 Value(&cancelCtxKey) 机制) - 移除 Done()/Err() 显式实现,由嵌入 context 自动提供 Alina Agent生成 --- mergectx.go | 122 +++++++++++++++------------------------------------- 1 file changed, 34 insertions(+), 88 deletions(-) diff --git a/mergectx.go b/mergectx.go index 9aab5bb..9c30b92 100644 --- a/mergectx.go +++ b/mergectx.go @@ -6,23 +6,15 @@ package touka import ( "context" - "sync" "time" ) // mergedContext 实现了 context.Context 接口, 是 Merge 函数返回的实际类型. +// 嵌入 cancelCtx 作为基础 context, 支持 cause 传播. +// deadlineCtx 作为 cancelCtx 的子 context, 确保 deadline 到期时 cancelCtx 也被取消. type mergedContext struct { - // 嵌入一个基础 context, 用于 Deadline() 和 Value() 查找. context.Context - // 保存了所有的父 context, 用于 Value() 方法的查找. parents []context.Context - // cancelCtx 由 CancelCause 管理, 当 cause 取消时其 Done() 关闭. - cancelCtx context.Context - // deadlineCtx 仅在有 deadline 时非 nil, 用于检测 deadline 到期. - deadlineCtx context.Context - // done 缓存 Done() 的 channel, 避免重复创建 orDone goroutine. - done <-chan struct{} - doneOnce sync.Once } // MergeCtx 创建并返回一个新的 context.Context. @@ -50,70 +42,63 @@ func MergeCtx(parents ...context.Context) (ctx context.Context, cancel context.C } } - // baseCtx 提供 CancelCauseFunc 以支持 cause 传播. - baseCtx, baseCancel := context.WithCancelCause(context.Background()) + // cancelCtx 作为基础 context, 提供 CancelCauseFunc 以支持 cause 传播. + cancelCtx, cancelCause := context.WithCancelCause(context.Background()) - // deadlineCtx 仅用于监听 deadline 到期信号. + // deadlineCtx 作为 cancelCtx 的子 context (如果有 deadline). + // 当 cancelCtx 被取消时, deadlineCtx 也会被取消; + // 当 deadline 到期时, deadlineCtx 自行取消, watcher 负责关闭 cancelCtx. var deadlineCtx context.Context var deadlineCancel context.CancelFunc if !earliestDeadline.IsZero() { - deadlineCtx, deadlineCancel = context.WithDeadlineCause(context.Background(), earliestDeadline, context.DeadlineExceeded) + deadlineCtx, deadlineCancel = context.WithDeadlineCause(cancelCtx, earliestDeadline, context.DeadlineExceeded) } - // 嵌入的 context: 有 deadline 时用 deadlineCtx, 否则用 baseCtx. - embedCtx := baseCtx + // 嵌入的 context: 有 deadline 时用 deadlineCtx (以返回正确的 Deadline), + // 否则用 cancelCtx. + embedCtx := cancelCtx if deadlineCtx != nil { embedCtx = deadlineCtx } mc := &mergedContext{ - Context: embedCtx, - parents: parents, - cancelCtx: baseCtx, - deadlineCtx: deadlineCtx, + Context: embedCtx, + parents: parents, } - // 启动监控 goroutine. + // 启动监控 goroutine, 监听 parent 取消或 deadline 到期. go func() { - var once sync.Once - doCancel := func(cause error) { - once.Do(func() { baseCancel(cause) }) - } - defer doCancel(nil) - - parentDone := orDone(mc.parents...) + // 将 cancelCtx 加入 orDone, 确保手动 cancel() 时 orDone goroutine 能退出, 防止泄漏. + parentDone := orDone(append(mc.parents, cancelCtx)...) if deadlineCtx != nil { defer deadlineCancel() select { case <-parentDone: + // parent 取消或手动 cancel() for _, p := range mc.parents { if p.Err() != nil { - doCancel(context.Cause(p)) + cancelCause(context.Cause(p)) return } } - doCancel(nil) + // 手动 cancel(), cause 已由 cancelCause() 设置 case <-deadlineCtx.Done(): - doCancel(context.DeadlineExceeded) - case <-baseCtx.Done(): + // deadline 到期, 需要关闭 cancelCtx 并设置 cause + cancelCause(context.DeadlineExceeded) } } else { - select { - case <-parentDone: - for _, p := range mc.parents { - if p.Err() != nil { - doCancel(context.Cause(p)) - return - } + <-parentDone + for _, p := range mc.parents { + if p.Err() != nil { + cancelCause(context.Cause(p)) + return } - doCancel(nil) - case <-baseCtx.Done(): } } }() - return mc, func() { baseCancel(nil) } + return mc, func() { cancelCause(nil) } } // Value 返回当前Ctx Value. 先检查嵌入的 context (以支持 context.Cause), @@ -130,62 +115,23 @@ func (mc *mergedContext) Value(key any) any { return nil } -// Deadline 实现了 context.Context 的 Deadline 方法. -func (mc *mergedContext) Deadline() (deadline time.Time, ok bool) { - return mc.Context.Deadline() -} +// Deadline, Done, Err 均由嵌入的 context.Context 提供. -// Done 实现了 context.Context 的 Done 方法. -func (mc *mergedContext) Done() <-chan struct{} { - if mc.deadlineCtx != nil { - mc.doneOnce.Do(func() { - mc.done = orDone(mc.cancelCtx, mc.deadlineCtx) - }) - return mc.done - } - return mc.cancelCtx.Done() -} - -// Err 实现了 context.Context 的 Err 方法. -func (mc *mergedContext) Err() error { - if mc.cancelCtx.Err() != nil { - return mc.cancelCtx.Err() - } - if mc.deadlineCtx != nil { - return mc.deadlineCtx.Err() - } - return nil -} - -// Cause 返回取消原因, 使 context.Cause() 能正确传播 cause. -func (mc *mergedContext) Cause() error { - return context.Cause(mc.cancelCtx) -} - -// orDone 是一个辅助函数, 返回一个 channel. -// 当任意一个输入 context 的 Done() channel 关闭时, orDone 返回的 channel 也会关闭. -// 这是一个非阻塞的、不会泄漏 goroutine 的实现. +// orDone 返回一个 channel, 当任意一个输入 context 的 Done() channel 关闭时关闭. func orDone(contexts ...context.Context) <-chan struct{} { done := make(chan struct{}) - - var once sync.Once - closeDone := func() { - once.Do(func() { - close(done) - }) - } - - // 为每个父 context 启动一个 goroutine. for _, ctx := range contexts { go func(c context.Context) { select { case <-c.Done(): - closeDone() + select { + case <-done: + default: + close(done) + } case <-done: - // orDone 已经被其他 goroutine 关闭了, 当前 goroutine 可以安全退出. } }(ctx) } - return done }