diff --git a/.gitignore b/.gitignore index 02838fb..0ad54a9 100644 --- a/.gitignore +++ b/.gitignore @@ -4,4 +4,5 @@ demo.toml *.bak list.json repos -pages \ No newline at end of file +pages +*_test \ No newline at end of file diff --git a/CHANGELOG.md b/CHANGELOG.md index 2b9103c..5d07c91 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,32 @@ # 更新日志 +3.3.0 - 2025-05-15 +--- +- CHANGE: 为`httpc`加入`request builder`的`withcontext`选项 +- ADD: 加入带宽限制功能 +- ADD: 为`netpoll`模式开启探测客户端是否断开功能 + +25w36d - 2025-05-14 +--- +- PRE-RELEASE: 此版本是v3.3.0预发布版本,请勿在生产环境中使用; +- ADD: 为`netpoll`模式开启探测客户端是否断开功能 + +25w36c - 2025-05-14 +--- +- PRE-RELEASE: 此版本是v3.3.0预发布版本,请勿在生产环境中使用; +- ADD: 加入带宽限制功能 +- CHANGE: 将`httpc`切换回主分支, `25w36b`测试的部分已被合入`httpc`主线 + +25w36b - 2025-05-13 +--- +- PRE-RELEASE: 此版本是v3.3.0预发布版本,请勿在生产环境中使用; +- CHANGE: `httpc`切换到`dev`, 测试在retry前检查ctx状态 + +25w36a - 2025-05-13 +--- +- PRE-RELEASE: 此版本是v3.3.0预发布版本,请勿在生产环境中使用; +- CHANGE: 为`httpc`加入`request builder`的`withcontext`选项 + 3.2.4 - 2025-05-13 --- - CHANGE: 移除未使用的变量与相关计算 diff --git a/DEV-VERSION b/DEV-VERSION index ef0053d..c9f9483 100644 --- a/DEV-VERSION +++ b/DEV-VERSION @@ -1 +1 @@ -25w35a \ No newline at end of file +25w36d \ No newline at end of file diff --git a/VERSION b/VERSION index 9b7a431..0fa4ae4 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -3.2.4 \ No newline at end of file +3.3.0 \ No newline at end of file diff --git a/config/config.go b/config/config.go index 14c9301..c5b8aa0 100644 --- a/config/config.go +++ b/config/config.go @@ -129,11 +129,35 @@ type WhitelistConfig struct { WhitelistFile string `toml:"whitelistFile"` } +/* +[rateLimit] +enabled = false +rateMethod = "total" # "total" or "ip" +ratePerMinute = 100 +burst = 10 + + [rateLimit.bandwidthLimit] + enabled = false + totalLimit = "100mbps" + totalBurst = "100mbps" + singleLimit = "10mbps" + singleBurst = "10mbps" +*/ + type RateLimitConfig struct { - Enabled bool `toml:"enabled"` - RateMethod string `toml:"rateMethod"` - RatePerMinute int `toml:"ratePerMinute"` - Burst int `toml:"burst"` + Enabled bool `toml:"enabled"` + RateMethod string `toml:"rateMethod"` + RatePerMinute int `toml:"ratePerMinute"` + Burst int `toml:"burst"` + BandwidthLimit BandwidthLimitConfig +} + +type BandwidthLimitConfig struct { + Enabled bool `toml:"enabled"` + TotalLimit string `toml:"totalLimit"` + TotalBurst string `toml:"totalBurst"` + SingleLimit string `toml:"singleLimit"` + SingleBurst string `toml:"singleBurst"` } /* @@ -252,6 +276,13 @@ func DefaultConfig() *Config { RateMethod: "total", RatePerMinute: 100, Burst: 10, + BandwidthLimit: BandwidthLimitConfig{ + Enabled: false, + TotalLimit: "100mbps", + TotalBurst: "100mbps", + SingleLimit: "10mbps", + SingleBurst: "10mbps", + }, }, Outbound: OutboundConfig{ Enabled: false, diff --git a/config/config.toml b/config/config.toml index b43ebf3..ca7b80a 100644 --- a/config/config.toml +++ b/config/config.toml @@ -57,6 +57,13 @@ rateMethod = "total" # "ip" or "total" ratePerMinute = 180 burst = 5 +[rateLimit.bandwidthLimit] + enabled = false + totalLimit = "100mbps" + totalBurst = "100mbps" + singleLimit = "10mbps" + singleBurst = "10mbps" + [outbound] enabled = false url = "socks5://127.0.0.1:1080" # "http://127.0.0.1:7890" diff --git a/go.mod b/go.mod index 4f8bbbd..d20e90a 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.24.3 require ( github.com/BurntSushi/toml v1.5.0 - github.com/WJQSERVER-STUDIO/httpc v0.5.0 + github.com/WJQSERVER-STUDIO/httpc v0.5.1 github.com/WJQSERVER-STUDIO/logger v1.6.0 github.com/cloudwego/hertz v0.9.7 github.com/hertz-contrib/http2 v0.1.8 @@ -12,6 +12,8 @@ require ( golang.org/x/time v0.11.0 ) +require github.com/WJQSERVER-STUDIO/go-utils/limitreader v0.0.2 + require ( github.com/WJQSERVER-STUDIO/go-utils/copyb v0.0.4 // indirect github.com/WJQSERVER-STUDIO/go-utils/log v0.0.2 // indirect @@ -36,3 +38,5 @@ require ( golang.org/x/text v0.25.0 // indirect google.golang.org/protobuf v1.36.6 // indirect ) + +//replace github.com/WJQSERVER-STUDIO/httpc v0.5.1 => /data/github/WJQSERVER-STUDIO/httpc diff --git a/go.sum b/go.sum index 5b8252e..d906a16 100644 --- a/go.sum +++ b/go.sum @@ -2,10 +2,12 @@ github.com/BurntSushi/toml v1.5.0 h1:W5quZX/G/csjUnuI8SUYlsHs9M38FC7znL0lIO+DvMg github.com/BurntSushi/toml v1.5.0/go.mod h1:ukJfTF/6rtPPRCnwkur4qwRxa8vTRFBF0uk2lLoLwho= github.com/WJQSERVER-STUDIO/go-utils/copyb v0.0.4 h1:JLtFd00AdFg/TP+dtvIzLkdHwKUGPOAijN1sMtEYoFg= github.com/WJQSERVER-STUDIO/go-utils/copyb v0.0.4/go.mod h1:FZ6XE+4TKy4MOfX1xWKe6Rwsg0ucYFCdNh1KLvyKTfc= +github.com/WJQSERVER-STUDIO/go-utils/limitreader v0.0.2 h1:8bBkKk6E2Zr+I5szL7gyc5f0DK8N9agIJCpM1Cqw2NE= +github.com/WJQSERVER-STUDIO/go-utils/limitreader v0.0.2/go.mod h1:yPX8xuZH+py7eLJwOYj3VVI/4/Yuy5+x8Mhq8qezcPg= github.com/WJQSERVER-STUDIO/go-utils/log v0.0.2 h1:9CSf+V0ZQPl2ijC/g6v/ObemmhpKcikKVIodsaLExTA= github.com/WJQSERVER-STUDIO/go-utils/log v0.0.2/go.mod h1:j9Q+xnwpOfve7/uJnZ2izRQw6NNoXjvJHz7vUQAaLZE= -github.com/WJQSERVER-STUDIO/httpc v0.5.0 h1:0yJA+dOgbnO3R/mAWPjlbUq5lIqaxRV38XfiX3jt6pg= -github.com/WJQSERVER-STUDIO/httpc v0.5.0/go.mod h1:M7KNUZjjhCkzzcg9lBPs9YfkImI+7vqjAyjdA19+joE= +github.com/WJQSERVER-STUDIO/httpc v0.5.1 h1:+TKCPYBuj7PAHuiduGCGAqsHAa4QtsUfoVwRN777q64= +github.com/WJQSERVER-STUDIO/httpc v0.5.1/go.mod h1:M7KNUZjjhCkzzcg9lBPs9YfkImI+7vqjAyjdA19+joE= github.com/WJQSERVER-STUDIO/logger v1.6.0 h1:xK2xV7hlkMXaWzvj4+cNoNWA+JfnJaHX6VU+RrPnr7Q= github.com/WJQSERVER-STUDIO/logger v1.6.0/go.mod h1:TICMsR7geROHBg6rxwkqUNGydo34XVsX93yeoxyfuyY= github.com/bytedance/gopkg v0.1.1/go.mod h1:576VvJ+eJgyCzdjS+c4+77QF3p7ubbtiKARP3TxducM= diff --git a/main.go b/main.go index 8d61589..2cb9ac4 100644 --- a/main.go +++ b/main.go @@ -181,7 +181,11 @@ func setupRateLimit(cfg *config.Config) { } func InitReq(cfg *config.Config) { - proxy.InitReq(cfg) + err := proxy.InitReq(cfg) + if err != nil { + fmt.Printf("Failed to initialize request: %v\n", err) + os.Exit(1) + } } // loadEmbeddedPages 加载嵌入式页面资源 @@ -397,11 +401,13 @@ func main() { r = server.New( server.WithH2C(true), server.WithHostPorts(addr), + server.WithSenseClientDisconnection(true), ) r.AddProtocol("h2", factory.NewServerFactory()) } else { r = server.New( server.WithHostPorts(addr), + server.WithSenseClientDisconnection(true), ) } } else { @@ -481,8 +487,7 @@ func main() { defer logger.Close() defer func() { if hertZfile != nil { - var err error - err = hertZfile.Close() + err := hertZfile.Close() if err != nil { logError("Failed to close hertz log file: %v", err) } diff --git a/proxy/bandwidth.go b/proxy/bandwidth.go new file mode 100644 index 0000000..a7591c2 --- /dev/null +++ b/proxy/bandwidth.go @@ -0,0 +1,64 @@ +package proxy + +import ( + "errors" + "ghproxy/config" + + "github.com/WJQSERVER-STUDIO/go-utils/limitreader" + "golang.org/x/time/rate" +) + +var ( + bandwidthLimit rate.Limit + bandwidthBurst rate.Limit +) + +func UnDefiendRateStringErrHandle(err error) error { + if errors.Is(err, &limitreader.UnDefiendRateStringErr{}) { + logWarning("UnDefiendRateStringErr: %s", err) + return nil + } + return err +} + +func SetGlobalRateLimit(cfg *config.Config) error { + if cfg.RateLimit.BandwidthLimit.Enabled { + var err error + var totalLimit rate.Limit + var totalBurst rate.Limit + totalLimit, err = limitreader.ParseRate(cfg.RateLimit.BandwidthLimit.TotalLimit) + if UnDefiendRateStringErrHandle(err) != nil { + logError("Failed to parse total bandwidth limit: %v", err) + return err + } + totalBurst, err = limitreader.ParseRate(cfg.RateLimit.BandwidthLimit.TotalBurst) + if UnDefiendRateStringErrHandle(err) != nil { + logError("Failed to parse total bandwidth burst: %v", err) + return err + } + limitreader.SetGlobalRateLimit(totalLimit, int(totalBurst)) + err = SetBandwidthLimit(cfg) + if UnDefiendRateStringErrHandle(err) != nil { + logError("Failed to set bandwidth limit: %v", err) + return err + } + } else { + limitreader.SetGlobalRateLimit(rate.Inf, 0) + } + return nil +} + +func SetBandwidthLimit(cfg *config.Config) error { + var err error + bandwidthLimit, err = limitreader.ParseRate(cfg.RateLimit.BandwidthLimit.SingleLimit) + if UnDefiendRateStringErrHandle(err) != nil { + logError("Failed to parse bandwidth limit: %v", err) + return err + } + bandwidthBurst, err = limitreader.ParseRate(cfg.RateLimit.BandwidthLimit.SingleBurst) + if UnDefiendRateStringErrHandle(err) != nil { + logError("Failed to parse bandwidth burst: %v", err) + return err + } + return nil +} diff --git a/proxy/chunkreq.go b/proxy/chunkreq.go index 22c7b61..c760b9a 100644 --- a/proxy/chunkreq.go +++ b/proxy/chunkreq.go @@ -8,21 +8,34 @@ import ( "net/http" "strconv" + "github.com/WJQSERVER-STUDIO/go-utils/limitreader" "github.com/cloudwego/hertz/pkg/app" ) func ChunkedProxyRequest(ctx context.Context, c *app.RequestContext, u string, cfg *config.Config, matcher string) { var ( - method []byte - req *http.Request - resp *http.Response - err error + req *http.Request + resp *http.Response + err error ) - method = c.Request.Method() + go func() { + <-ctx.Done() + if resp != nil && resp.Body != nil { + resp.Body.Close() + } + if req != nil { + req.Body.Close() + } + }() - req, err = client.NewRequest(string(method), u, c.Request.BodyStream()) + rb := client.NewRequestBuilder(string(c.Request.Method()), u) + rb.NoDefaultHeaders() + rb.SetBody(c.Request.BodyStream()) + rb.WithContext(ctx) + + req, err = rb.Build() if err != nil { HandleError(c, fmt.Sprintf("Failed to create request: %v", err)) return @@ -58,8 +71,7 @@ func ChunkedProxyRequest(ctx context.Context, c *app.RequestContext, u string, c bodySize = -1 } if err == nil && bodySize > sizelimit { - var finalURL string - finalURL = resp.Request.URL.String() + finalURL := resp.Request.URL.String() err = resp.Body.Close() if err != nil { logError("Failed to close response body: %v", err) @@ -92,6 +104,12 @@ func ChunkedProxyRequest(ctx context.Context, c *app.RequestContext, u string, c c.Status(resp.StatusCode) + bodyReader := resp.Body + + if cfg.RateLimit.BandwidthLimit.Enabled { + bodyReader = limitreader.NewRateLimitedReader(bodyReader, bandwidthLimit, int(bandwidthBurst), ctx) + } + if MatcherShell(u) && matchString(matcher, matchedMatchers) && cfg.Shell.Editor { // 判断body是不是gzip var compress string @@ -99,24 +117,25 @@ func ChunkedProxyRequest(ctx context.Context, c *app.RequestContext, u string, c compress = "gzip" } - logDebug("Use Shell Editor: %s %s %s %s %s", c.ClientIP(), method, u, c.Request.Header.Get("User-Agent"), c.Request.Header.GetProtocol()) + logDebug("Use Shell Editor: %s %s %s %s %s", c.ClientIP(), c.Request.Method(), u, c.Request.Header.Get("User-Agent"), c.Request.Header.GetProtocol()) c.Header("Content-Length", "") var reader io.Reader - reader, _, err = processLinks(resp.Body, compress, string(c.Request.Host()), cfg) + reader, _, err = processLinks(bodyReader, compress, string(c.Request.Host()), cfg) c.SetBodyStream(reader, -1) if err != nil { - logError("%s %s %s %s %s Failed to copy response body: %v", c.ClientIP(), method, u, c.Request.Header.Get("User-Agent"), c.Request.Header.GetProtocol(), err) + logError("%s %s %s %s %s Failed to copy response body: %v", c.ClientIP(), c.Request.Method(), u, c.Request.Header.Get("User-Agent"), c.Request.Header.GetProtocol(), err) ErrorPage(c, NewErrorWithStatusLookup(500, fmt.Sprintf("Failed to copy response body: %v", err))) return } } else { + if contentLength != "" { - c.SetBodyStream(resp.Body, bodySize) + c.SetBodyStream(bodyReader, bodySize) return } - c.SetBodyStream(resp.Body, -1) + c.SetBodyStream(bodyReader, -1) } } diff --git a/proxy/docker.go b/proxy/docker.go index 6347f24..696de84 100644 --- a/proxy/docker.go +++ b/proxy/docker.go @@ -7,6 +7,7 @@ import ( "net/http" "strconv" + "github.com/WJQSERVER-STUDIO/go-utils/limitreader" "github.com/cloudwego/hertz/pkg/app" ) @@ -37,11 +38,22 @@ func GhcrRequest(ctx context.Context, c *app.RequestContext, u string, cfg *conf err error ) + go func() { + <-ctx.Done() + if resp != nil && resp.Body != nil { + resp.Body.Close() + } + if req != nil { + req.Body.Close() + } + }() + method = c.Request.Method() rb := client.NewRequestBuilder(string(method), u) rb.NoDefaultHeaders() rb.SetBody(c.Request.BodyStream()) + rb.WithContext(ctx) //req, err = client.NewRequest(string(method), u, c.Request.BodyStream()) req, err = rb.Build() @@ -106,10 +118,16 @@ func GhcrRequest(ctx context.Context, c *app.RequestContext, u string, cfg *conf c.Status(resp.StatusCode) + bodyReader := resp.Body + + if cfg.RateLimit.BandwidthLimit.Enabled { + bodyReader = limitreader.NewRateLimitedReader(bodyReader, bandwidthLimit, int(bandwidthBurst), ctx) + } + if contentLength != "" { - c.SetBodyStream(resp.Body, bodySize) + c.SetBodyStream(bodyReader, bodySize) return } - c.SetBodyStream(resp.Body, -1) + c.SetBodyStream(bodyReader, -1) } diff --git a/proxy/gitreq.go b/proxy/gitreq.go index 81293cd..5667d21 100644 --- a/proxy/gitreq.go +++ b/proxy/gitreq.go @@ -8,15 +8,32 @@ import ( "net/http" "strconv" + "github.com/WJQSERVER-STUDIO/go-utils/limitreader" "github.com/cloudwego/hertz/pkg/app" ) func GitReq(ctx context.Context, c *app.RequestContext, u string, cfg *config.Config, mode string) { + + var ( + req *http.Request + resp *http.Response + ) + + go func() { + <-ctx.Done() + if resp != nil && resp.Body != nil { + resp.Body.Close() + } + if req != nil { + req.Body.Close() + } + }() + method := string(c.Request.Method()) - bodyReader := bytes.NewBuffer(c.Request.Body()) + reqBodyReader := bytes.NewBuffer(c.Request.Body()) - //bodyReader := c.Request.BodyStream() + //bodyReader := c.Request.BodyStream() // 不可替换为此实现 if cfg.GitClone.Mode == "cache" { userPath, repoPath, remainingPath, queryParams, err := extractParts(u) @@ -28,14 +45,11 @@ func GitReq(ctx context.Context, c *app.RequestContext, u string, cfg *config.Co u = cfg.GitClone.SmartGitAddr + userPath + repoPath + remainingPath + "?" + queryParams.Encode() } - var ( - resp *http.Response - ) - if cfg.GitClone.Mode == "cache" { rb := gitclient.NewRequestBuilder(method, u) rb.NoDefaultHeaders() - rb.SetBody(bodyReader) + rb.SetBody(reqBodyReader) + rb.WithContext(ctx) req, err := rb.Build() if err != nil { @@ -54,7 +68,8 @@ func GitReq(ctx context.Context, c *app.RequestContext, u string, cfg *config.Co } else { rb := client.NewRequestBuilder(string(c.Request.Method()), u) rb.NoDefaultHeaders() - rb.SetBody(bodyReader) + rb.SetBody(reqBodyReader) + rb.WithContext(ctx) req, err := rb.Build() if err != nil { @@ -89,7 +104,6 @@ func GitReq(ctx context.Context, c *app.RequestContext, u string, cfg *config.Co for key, values := range resp.Header { for _, value := range values { - //c.Header(key, value) c.Response.Header.Add(key, value) } } @@ -122,5 +136,11 @@ func GitReq(ctx context.Context, c *app.RequestContext, u string, cfg *config.Co c.Response.Header.Set("Expires", "0") } - c.SetBodyStream(resp.Body, -1) + bodyReader := resp.Body + + if cfg.RateLimit.BandwidthLimit.Enabled { + bodyReader = limitreader.NewRateLimitedReader(bodyReader, bandwidthLimit, int(bandwidthBurst), ctx) + } + + c.SetBodyStream(bodyReader, -1) } diff --git a/proxy/httpc.go b/proxy/httpc.go index 120d8a7..83de29b 100644 --- a/proxy/httpc.go +++ b/proxy/httpc.go @@ -18,11 +18,16 @@ var ( gitclient *httpc.Client ) -func InitReq(cfg *config.Config) { +func InitReq(cfg *config.Config) error { initHTTPClient(cfg) if cfg.GitClone.Mode == "cache" { initGitHTTPClient(cfg) } + err := SetGlobalRateLimit(cfg) + if err != nil { + return err + } + return nil } func initHTTPClient(cfg *config.Config) {