mirror of
https://github.com/infinite-iroha/touka.git
synced 2026-06-13 15:47:38 +08:00
Merge pull request #84 from infinite-iroha/perf/go126-memory-pass
Perf/go126 memory pass
This commit is contained in:
commit
271e54eb4d
5 changed files with 364 additions and 7 deletions
|
|
@ -23,7 +23,7 @@ func TestContextResetKeepsKeysNilUntilSet(t *testing.T) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("failed to build request: %v", err)
|
t.Fatalf("failed to build request: %v", err)
|
||||||
}
|
}
|
||||||
c.reset(c.Writer, req)
|
c.reset(UnwrapResponseWriter(c.Writer), req)
|
||||||
|
|
||||||
if c.Keys != nil {
|
if c.Keys != nil {
|
||||||
t.Fatalf("expected reset to clear Keys without allocating a new map")
|
t.Fatalf("expected reset to clear Keys without allocating a new map")
|
||||||
|
|
@ -47,6 +47,7 @@ func TestContextResetKeepsKeysNilUntilSet(t *testing.T) {
|
||||||
func BenchmarkContextReset(b *testing.B) {
|
func BenchmarkContextReset(b *testing.B) {
|
||||||
b.Run("NoKeysUse", func(b *testing.B) {
|
b.Run("NoKeysUse", func(b *testing.B) {
|
||||||
c, _ := CreateTestContext(nil)
|
c, _ := CreateTestContext(nil)
|
||||||
|
rawWriter := UnwrapResponseWriter(c.Writer)
|
||||||
req, err := http.NewRequest(http.MethodGet, "/", nil)
|
req, err := http.NewRequest(http.MethodGet, "/", nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
b.Fatalf("failed to build request: %v", err)
|
b.Fatalf("failed to build request: %v", err)
|
||||||
|
|
@ -56,12 +57,13 @@ func BenchmarkContextReset(b *testing.B) {
|
||||||
b.ResetTimer()
|
b.ResetTimer()
|
||||||
|
|
||||||
for i := 0; i < b.N; i++ {
|
for i := 0; i < b.N; i++ {
|
||||||
c.reset(c.Writer, req)
|
c.reset(rawWriter, req)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
b.Run("WithKeysUse", func(b *testing.B) {
|
b.Run("WithKeysUse", func(b *testing.B) {
|
||||||
c, _ := CreateTestContext(nil)
|
c, _ := CreateTestContext(nil)
|
||||||
|
rawWriter := UnwrapResponseWriter(c.Writer)
|
||||||
req, err := http.NewRequest(http.MethodGet, "/", nil)
|
req, err := http.NewRequest(http.MethodGet, "/", nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
b.Fatalf("failed to build request: %v", err)
|
b.Fatalf("failed to build request: %v", err)
|
||||||
|
|
@ -71,8 +73,9 @@ func BenchmarkContextReset(b *testing.B) {
|
||||||
b.ResetTimer()
|
b.ResetTimer()
|
||||||
|
|
||||||
for i := 0; i < b.N; i++ {
|
for i := 0; i < b.N; i++ {
|
||||||
c.reset(c.Writer, req)
|
c.reset(rawWriter, req)
|
||||||
c.Set("request-id", i)
|
c.Set("request-id", i)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
59
ecw_benchmark_test.go
Normal file
59
ecw_benchmark_test.go
Normal file
|
|
@ -0,0 +1,59 @@
|
||||||
|
package touka
|
||||||
|
|
||||||
|
import (
|
||||||
|
"net/http"
|
||||||
|
"net/http/httptest"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestErrorCapturingResponseWriterResetClearsHeaderSnapshot(t *testing.T) {
|
||||||
|
c, _ := CreateTestContext(nil)
|
||||||
|
ecw := AcquireErrorCapturingResponseWriter(c)
|
||||||
|
defer ReleaseErrorCapturingResponseWriter(ecw)
|
||||||
|
|
||||||
|
ecw.capturedErrorSignal = true
|
||||||
|
ecw.Header().Set("Content-Type", "text/plain")
|
||||||
|
ecw.Header().Add("X-Test", "one")
|
||||||
|
|
||||||
|
req, err := http.NewRequest(http.MethodGet, "/", nil)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to build request: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
ecw.reset(httptest.NewRecorder(), req, c, c.engine.errorHandle.handler)
|
||||||
|
|
||||||
|
if len(ecw.headerSnapshot) != 0 {
|
||||||
|
t.Fatalf("expected header snapshot to be empty after reset, got %#v", ecw.headerSnapshot)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func BenchmarkErrorCapturingResponseWriterReset(b *testing.B) {
|
||||||
|
c, _ := CreateTestContext(nil)
|
||||||
|
ecw := AcquireErrorCapturingResponseWriter(c)
|
||||||
|
defer ReleaseErrorCapturingResponseWriter(ecw)
|
||||||
|
|
||||||
|
rawWriter := httptest.NewRecorder()
|
||||||
|
req, err := http.NewRequest(http.MethodGet, "/", nil)
|
||||||
|
if err != nil {
|
||||||
|
b.Fatalf("failed to build request: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
keys := make([]string, 16)
|
||||||
|
for i := range keys {
|
||||||
|
keys[i] = http.CanonicalHeaderKey("X-Test-" + string(rune('A'+i)))
|
||||||
|
}
|
||||||
|
values := []string{"one", "two", "three"}
|
||||||
|
for _, key := range keys {
|
||||||
|
ecw.headerSnapshot[key] = values
|
||||||
|
}
|
||||||
|
|
||||||
|
b.ReportAllocs()
|
||||||
|
b.ResetTimer()
|
||||||
|
|
||||||
|
for i := 0; i < b.N; i++ {
|
||||||
|
ecw.reset(rawWriter, req, c, c.engine.errorHandle.handler)
|
||||||
|
for _, key := range keys {
|
||||||
|
ecw.headerSnapshot[key] = values
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -84,6 +84,20 @@ type reverseProxyHandler struct {
|
||||||
roundRobin atomic.Uint64
|
roundRobin atomic.Uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var reverseProxyCopyBufferPool = sync.Pool{
|
||||||
|
New: func() any {
|
||||||
|
buf := make([]byte, 32*1024)
|
||||||
|
return &buf
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
var reverseProxyCandidatePool = sync.Pool{
|
||||||
|
New: func() any {
|
||||||
|
s := make([]*reverseProxyUpstream, 0, 8)
|
||||||
|
return &s
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
type reverseProxyStatusError struct {
|
type reverseProxyStatusError struct {
|
||||||
status int
|
status int
|
||||||
err error
|
err error
|
||||||
|
|
@ -1153,6 +1167,10 @@ func (p *reverseProxyHandler) copyResponse(dst ResponseWriter, src io.Reader, fl
|
||||||
if p.config.BufferPool != nil {
|
if p.config.BufferPool != nil {
|
||||||
buf = p.config.BufferPool.Get()
|
buf = p.config.BufferPool.Get()
|
||||||
defer p.config.BufferPool.Put(buf)
|
defer p.config.BufferPool.Put(buf)
|
||||||
|
} else {
|
||||||
|
bufp := reverseProxyCopyBufferPool.Get().(*[]byte)
|
||||||
|
buf = *bufp
|
||||||
|
defer reverseProxyCopyBufferPool.Put(bufp)
|
||||||
}
|
}
|
||||||
_, err := p.copyBuffer(writer, src, buf)
|
_, err := p.copyBuffer(writer, src, buf)
|
||||||
return err
|
return err
|
||||||
|
|
|
||||||
263
reverseproxy_benchmark_test.go
Normal file
263
reverseproxy_benchmark_test.go
Normal file
|
|
@ -0,0 +1,263 @@
|
||||||
|
package touka
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bufio"
|
||||||
|
"bytes"
|
||||||
|
"errors"
|
||||||
|
"io"
|
||||||
|
"net"
|
||||||
|
"net/http"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
type benchmarkReadSeeker struct {
|
||||||
|
data []byte
|
||||||
|
off int
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *benchmarkReadSeeker) Read(p []byte) (int, error) {
|
||||||
|
if r.off >= len(r.data) {
|
||||||
|
return 0, io.EOF
|
||||||
|
}
|
||||||
|
n := copy(p, r.data[r.off:])
|
||||||
|
r.off += n
|
||||||
|
return n, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *benchmarkReadSeeker) Reset() {
|
||||||
|
r.off = 0
|
||||||
|
}
|
||||||
|
|
||||||
|
type benchmarkResponseWriter struct {
|
||||||
|
header http.Header
|
||||||
|
status int
|
||||||
|
size int
|
||||||
|
}
|
||||||
|
|
||||||
|
func newBenchmarkResponseWriter() *benchmarkResponseWriter {
|
||||||
|
return &benchmarkResponseWriter{header: make(http.Header)}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *benchmarkResponseWriter) Header() http.Header {
|
||||||
|
return w.header
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *benchmarkResponseWriter) WriteHeader(statusCode int) {
|
||||||
|
if w.status == 0 {
|
||||||
|
w.status = statusCode
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *benchmarkResponseWriter) Write(p []byte) (int, error) {
|
||||||
|
if w.status == 0 {
|
||||||
|
w.status = http.StatusOK
|
||||||
|
}
|
||||||
|
w.size += len(p)
|
||||||
|
return len(p), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *benchmarkResponseWriter) Flush() {}
|
||||||
|
|
||||||
|
func (w *benchmarkResponseWriter) Status() int {
|
||||||
|
return w.status
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *benchmarkResponseWriter) Size() int {
|
||||||
|
return w.size
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *benchmarkResponseWriter) Written() bool {
|
||||||
|
return w.status != 0
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *benchmarkResponseWriter) IsHijacked() bool {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *benchmarkResponseWriter) Hijack() (net.Conn, *bufio.ReadWriter, error) {
|
||||||
|
return nil, nil, http.ErrNotSupported
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *benchmarkResponseWriter) reset() {
|
||||||
|
clear(w.header)
|
||||||
|
w.status = 0
|
||||||
|
w.size = 0
|
||||||
|
}
|
||||||
|
|
||||||
|
var benchmarkReverseProxySink int
|
||||||
|
|
||||||
|
func BenchmarkReverseProxyCopyResponse(b *testing.B) {
|
||||||
|
body := bytes.Repeat([]byte("0123456789abcdef"), 4096)
|
||||||
|
proxy := newReverseProxyHandler(ReverseProxyConfig{})
|
||||||
|
dst := newBenchmarkResponseWriter()
|
||||||
|
src := &benchmarkReadSeeker{data: body}
|
||||||
|
|
||||||
|
b.ReportAllocs()
|
||||||
|
b.ResetTimer()
|
||||||
|
|
||||||
|
for i := 0; i < b.N; i++ {
|
||||||
|
dst.reset()
|
||||||
|
src.Reset()
|
||||||
|
if err := proxy.copyResponse(dst, src, 0); err != nil {
|
||||||
|
b.Fatalf("copyResponse failed: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
benchmarkReverseProxySink = dst.Size()
|
||||||
|
}
|
||||||
|
|
||||||
|
func BenchmarkReverseProxyAvailableUpstreams(b *testing.B) {
|
||||||
|
proxy := &reverseProxyHandler{
|
||||||
|
upstreams: []*reverseProxyUpstream{
|
||||||
|
{key: "a", index: 0},
|
||||||
|
{key: "b", index: 1},
|
||||||
|
{key: "c", index: 2},
|
||||||
|
{key: "d", index: 3},
|
||||||
|
},
|
||||||
|
config: ReverseProxyConfig{
|
||||||
|
PassiveHealth: ReverseProxyPassiveHealthConfig{
|
||||||
|
FailDuration: time.Minute,
|
||||||
|
MaxFails: 3,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
now := time.Now()
|
||||||
|
proxy.upstreams[0].failures = []time.Time{now.Add(-30 * time.Second)}
|
||||||
|
proxy.upstreams[1].failures = []time.Time{now.Add(-20 * time.Second), now.Add(-10 * time.Second)}
|
||||||
|
|
||||||
|
b.ReportAllocs()
|
||||||
|
b.ResetTimer()
|
||||||
|
|
||||||
|
for i := 0; i < b.N; i++ {
|
||||||
|
benchmarkReverseProxySink = len(proxy.availableUpstreams(now, nil))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func BenchmarkReverseProxySelectUpstream(b *testing.B) {
|
||||||
|
proxy := &reverseProxyHandler{
|
||||||
|
upstreams: []*reverseProxyUpstream{
|
||||||
|
{key: "a", index: 0},
|
||||||
|
{key: "b", index: 1},
|
||||||
|
{key: "c", index: 2},
|
||||||
|
{key: "d", index: 3},
|
||||||
|
},
|
||||||
|
config: ReverseProxyConfig{
|
||||||
|
LoadBalancing: ReverseProxyLoadBalancingConfig{Policy: LBRoundRobin()},
|
||||||
|
PassiveHealth: ReverseProxyPassiveHealthConfig{
|
||||||
|
FailDuration: time.Minute,
|
||||||
|
MaxFails: 3,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
proxy.upstreams[0].failures = []time.Time{time.Now().Add(-30 * time.Second)}
|
||||||
|
|
||||||
|
c, _ := CreateTestContext(nil)
|
||||||
|
|
||||||
|
b.ReportAllocs()
|
||||||
|
b.ResetTimer()
|
||||||
|
|
||||||
|
for i := 0; i < b.N; i++ {
|
||||||
|
selected, err := proxy.selectUpstream(c, nil)
|
||||||
|
if err != nil {
|
||||||
|
b.Fatalf("selectUpstream failed: %v", err)
|
||||||
|
}
|
||||||
|
benchmarkReverseProxySink = selected.index
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestReverseProxyCopyResponseWithoutBufferPool(t *testing.T) {
|
||||||
|
proxy := newReverseProxyHandler(ReverseProxyConfig{})
|
||||||
|
dst := newBenchmarkResponseWriter()
|
||||||
|
src := bytes.NewBufferString("hello, reverse proxy")
|
||||||
|
|
||||||
|
if err := proxy.copyResponse(dst, src, 0); err != nil {
|
||||||
|
t.Fatalf("copyResponse failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if got, want := dst.Size(), len("hello, reverse proxy"); got != want {
|
||||||
|
t.Fatalf("expected %d bytes copied, got %d", want, got)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type fixedLenBufferPool struct {
|
||||||
|
buf []byte
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *fixedLenBufferPool) Get() []byte {
|
||||||
|
return p.buf
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *fixedLenBufferPool) Put(buf []byte) {
|
||||||
|
p.buf = buf
|
||||||
|
}
|
||||||
|
|
||||||
|
type recordingReader struct {
|
||||||
|
chunk int
|
||||||
|
reads []int
|
||||||
|
left int
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *recordingReader) Read(p []byte) (int, error) {
|
||||||
|
if r.left == 0 {
|
||||||
|
return 0, io.EOF
|
||||||
|
}
|
||||||
|
n := min(r.chunk, len(p), r.left)
|
||||||
|
if n == 0 {
|
||||||
|
return 0, errors.New("reader received zero-length buffer")
|
||||||
|
}
|
||||||
|
for i := 0; i < n; i++ {
|
||||||
|
p[i] = 'x'
|
||||||
|
}
|
||||||
|
r.left -= n
|
||||||
|
r.reads = append(r.reads, len(p))
|
||||||
|
return n, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestReverseProxyCopyResponseRespectsCustomBufferLength(t *testing.T) {
|
||||||
|
pool := &fixedLenBufferPool{buf: make([]byte, 8, 32*1024)}
|
||||||
|
proxy := newReverseProxyHandler(ReverseProxyConfig{BufferPool: pool})
|
||||||
|
dst := newBenchmarkResponseWriter()
|
||||||
|
src := &recordingReader{chunk: 8, left: 24}
|
||||||
|
|
||||||
|
if err := proxy.copyResponse(dst, src, 0); err != nil {
|
||||||
|
t.Fatalf("copyResponse failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(src.reads) == 0 {
|
||||||
|
t.Fatal("expected reader to be used")
|
||||||
|
}
|
||||||
|
for _, size := range src.reads {
|
||||||
|
if size != 8 {
|
||||||
|
t.Fatalf("expected custom buffer length 8 to be preserved, got read size %d", size)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestReverseProxyAvailableUpstreamsFiltersExcludedAndUnhealthy(t *testing.T) {
|
||||||
|
now := time.Now()
|
||||||
|
proxy := &reverseProxyHandler{
|
||||||
|
upstreams: []*reverseProxyUpstream{
|
||||||
|
{key: "a"},
|
||||||
|
{key: "b", failures: []time.Time{now.Add(-20 * time.Second), now.Add(-10 * time.Second)}},
|
||||||
|
{key: "c"},
|
||||||
|
},
|
||||||
|
config: ReverseProxyConfig{
|
||||||
|
PassiveHealth: ReverseProxyPassiveHealthConfig{
|
||||||
|
FailDuration: time.Minute,
|
||||||
|
MaxFails: 2,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
available := proxy.availableUpstreams(now, map[string]struct{}{"c": {}})
|
||||||
|
if len(available) != 1 {
|
||||||
|
t.Fatalf("expected only one available upstream, got %d", len(available))
|
||||||
|
}
|
||||||
|
if available[0].key != "a" {
|
||||||
|
t.Fatalf("expected upstream 'a', got %q", available[0].key)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ io.Writer = (*benchmarkResponseWriter)(nil)
|
||||||
|
|
@ -137,18 +137,32 @@ func validateReverseProxyLBPolicy(policy ReverseProxyLBPolicy) error {
|
||||||
func (p *reverseProxyHandler) selectUpstream(c *Context, excluded map[string]struct{}) (*reverseProxyUpstream, error) {
|
func (p *reverseProxyHandler) selectUpstream(c *Context, excluded map[string]struct{}) (*reverseProxyUpstream, error) {
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
policy := p.config.LoadBalancing.Policy
|
policy := p.config.LoadBalancing.Policy
|
||||||
candidates := p.availableUpstreams(now, excluded)
|
candidateBuf := reverseProxyCandidatePool.Get().(*[]*reverseProxyUpstream)
|
||||||
|
candidates := p.availableUpstreamsInto(now, excluded, *candidateBuf)
|
||||||
if len(candidates) == 0 && len(excluded) > 0 {
|
if len(candidates) == 0 && len(excluded) > 0 {
|
||||||
candidates = p.availableUpstreams(now, nil)
|
candidates = p.availableUpstreamsInto(now, nil, candidates[:0])
|
||||||
}
|
}
|
||||||
if len(candidates) == 0 {
|
if len(candidates) == 0 {
|
||||||
|
*candidateBuf = candidates[:0]
|
||||||
|
reverseProxyCandidatePool.Put(candidateBuf)
|
||||||
return nil, errReverseProxyNoAvailableUpstreams
|
return nil, errReverseProxyNoAvailableUpstreams
|
||||||
}
|
}
|
||||||
return p.selectUpstreamWithPolicy(c, candidates, policy), nil
|
selected := p.selectUpstreamWithPolicy(c, candidates, policy)
|
||||||
|
*candidateBuf = candidates[:0]
|
||||||
|
reverseProxyCandidatePool.Put(candidateBuf)
|
||||||
|
return selected, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *reverseProxyHandler) availableUpstreams(now time.Time, excluded map[string]struct{}) []*reverseProxyUpstream {
|
func (p *reverseProxyHandler) availableUpstreams(now time.Time, excluded map[string]struct{}) []*reverseProxyUpstream {
|
||||||
candidates := make([]*reverseProxyUpstream, 0, len(p.upstreams))
|
return p.availableUpstreamsInto(now, excluded, nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *reverseProxyHandler) availableUpstreamsInto(now time.Time, excluded map[string]struct{}, candidates []*reverseProxyUpstream) []*reverseProxyUpstream {
|
||||||
|
if cap(candidates) < len(p.upstreams) {
|
||||||
|
candidates = make([]*reverseProxyUpstream, 0, len(p.upstreams))
|
||||||
|
} else {
|
||||||
|
candidates = candidates[:0]
|
||||||
|
}
|
||||||
for _, upstream := range p.upstreams {
|
for _, upstream := range p.upstreams {
|
||||||
if _, skip := excluded[upstream.key]; skip {
|
if _, skip := excluded[upstream.key]; skip {
|
||||||
continue
|
continue
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue