perf: reuse reverse proxy candidate slices

This commit is contained in:
wjqserver 2026-04-10 06:18:52 +08:00
parent 71a344a3de
commit 017bb13295
3 changed files with 86 additions and 8 deletions

View file

@ -91,6 +91,13 @@ var reverseProxyCopyBufferPool = sync.Pool{
}, },
} }
var reverseProxyCandidatePool = sync.Pool{
New: func() any {
s := make([]*reverseProxyUpstream, 0, 8)
return &s
},
}
type reverseProxyStatusError struct { type reverseProxyStatusError struct {
status int status int
err error err error

View file

@ -110,10 +110,10 @@ func BenchmarkReverseProxyCopyResponse(b *testing.B) {
func BenchmarkReverseProxyAvailableUpstreams(b *testing.B) { func BenchmarkReverseProxyAvailableUpstreams(b *testing.B) {
proxy := &reverseProxyHandler{ proxy := &reverseProxyHandler{
upstreams: []*reverseProxyUpstream{ upstreams: []*reverseProxyUpstream{
{key: "a"}, {key: "a", index: 0},
{key: "b"}, {key: "b", index: 1},
{key: "c"}, {key: "c", index: 2},
{key: "d"}, {key: "d", index: 3},
}, },
config: ReverseProxyConfig{ config: ReverseProxyConfig{
PassiveHealth: ReverseProxyPassiveHealthConfig{ PassiveHealth: ReverseProxyPassiveHealthConfig{
@ -135,6 +135,38 @@ func BenchmarkReverseProxyAvailableUpstreams(b *testing.B) {
} }
} }
func BenchmarkReverseProxySelectUpstream(b *testing.B) {
proxy := &reverseProxyHandler{
upstreams: []*reverseProxyUpstream{
{key: "a", index: 0},
{key: "b", index: 1},
{key: "c", index: 2},
{key: "d", index: 3},
},
config: ReverseProxyConfig{
LoadBalancing: ReverseProxyLoadBalancingConfig{Policy: LBRoundRobin()},
PassiveHealth: ReverseProxyPassiveHealthConfig{
FailDuration: time.Minute,
MaxFails: 3,
},
},
}
proxy.upstreams[0].failures = []time.Time{time.Now().Add(-30 * time.Second)}
c, _ := CreateTestContext(nil)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
selected, err := proxy.selectUpstream(c, nil)
if err != nil {
b.Fatalf("selectUpstream failed: %v", err)
}
benchmarkReverseProxySink = selected.index
}
}
func TestReverseProxyCopyResponseWithoutBufferPool(t *testing.T) { func TestReverseProxyCopyResponseWithoutBufferPool(t *testing.T) {
proxy := newReverseProxyHandler(ReverseProxyConfig{}) proxy := newReverseProxyHandler(ReverseProxyConfig{})
dst := newBenchmarkResponseWriter() dst := newBenchmarkResponseWriter()
@ -203,4 +235,29 @@ func TestReverseProxyCopyResponseRespectsCustomBufferLength(t *testing.T) {
} }
} }
func TestReverseProxyAvailableUpstreamsFiltersExcludedAndUnhealthy(t *testing.T) {
now := time.Now()
proxy := &reverseProxyHandler{
upstreams: []*reverseProxyUpstream{
{key: "a"},
{key: "b", failures: []time.Time{now.Add(-20 * time.Second), now.Add(-10 * time.Second)}},
{key: "c"},
},
config: ReverseProxyConfig{
PassiveHealth: ReverseProxyPassiveHealthConfig{
FailDuration: time.Minute,
MaxFails: 2,
},
},
}
available := proxy.availableUpstreams(now, map[string]struct{}{"c": {}})
if len(available) != 1 {
t.Fatalf("expected only one available upstream, got %d", len(available))
}
if available[0].key != "a" {
t.Fatalf("expected upstream 'a', got %q", available[0].key)
}
}
var _ io.Writer = (*benchmarkResponseWriter)(nil) var _ io.Writer = (*benchmarkResponseWriter)(nil)

View file

@ -137,18 +137,32 @@ func validateReverseProxyLBPolicy(policy ReverseProxyLBPolicy) error {
func (p *reverseProxyHandler) selectUpstream(c *Context, excluded map[string]struct{}) (*reverseProxyUpstream, error) { func (p *reverseProxyHandler) selectUpstream(c *Context, excluded map[string]struct{}) (*reverseProxyUpstream, error) {
now := time.Now() now := time.Now()
policy := p.config.LoadBalancing.Policy policy := p.config.LoadBalancing.Policy
candidates := p.availableUpstreams(now, excluded) candidateBuf := reverseProxyCandidatePool.Get().(*[]*reverseProxyUpstream)
candidates := p.availableUpstreamsInto(now, excluded, *candidateBuf)
if len(candidates) == 0 && len(excluded) > 0 { if len(candidates) == 0 && len(excluded) > 0 {
candidates = p.availableUpstreams(now, nil) candidates = p.availableUpstreamsInto(now, nil, candidates[:0])
} }
if len(candidates) == 0 { if len(candidates) == 0 {
*candidateBuf = candidates[:0]
reverseProxyCandidatePool.Put(candidateBuf)
return nil, errReverseProxyNoAvailableUpstreams return nil, errReverseProxyNoAvailableUpstreams
} }
return p.selectUpstreamWithPolicy(c, candidates, policy), nil selected := p.selectUpstreamWithPolicy(c, candidates, policy)
*candidateBuf = candidates[:0]
reverseProxyCandidatePool.Put(candidateBuf)
return selected, nil
} }
func (p *reverseProxyHandler) availableUpstreams(now time.Time, excluded map[string]struct{}) []*reverseProxyUpstream { func (p *reverseProxyHandler) availableUpstreams(now time.Time, excluded map[string]struct{}) []*reverseProxyUpstream {
candidates := make([]*reverseProxyUpstream, 0, len(p.upstreams)) return p.availableUpstreamsInto(now, excluded, nil)
}
func (p *reverseProxyHandler) availableUpstreamsInto(now time.Time, excluded map[string]struct{}, candidates []*reverseProxyUpstream) []*reverseProxyUpstream {
if cap(candidates) < len(p.upstreams) {
candidates = make([]*reverseProxyUpstream, 0, len(p.upstreams))
} else {
candidates = candidates[:0]
}
for _, upstream := range p.upstreams { for _, upstream := range p.upstreams {
if _, skip := excluded[upstream.key]; skip { if _, skip := excluded[upstream.key]; skip {
continue continue