diff --git a/.github/workflows/codeql-analysis.yml b/.github/workflows/codeql-analysis.yml new file mode 100644 index 0000000..041ca05 --- /dev/null +++ b/.github/workflows/codeql-analysis.yml @@ -0,0 +1,67 @@ +# For most projects, this workflow file will not need changing; you simply need +# to commit it to your repository. +# +# You may wish to alter this file to override the set of languages analyzed, +# or to provide custom queries or build logic. +# +# ******** NOTE ******** +# We have attempted to detect the languages in your repository. Please check +# the `language` matrix defined below to confirm you have the correct set of +# supported CodeQL languages. +# +name: "CodeQL" + +on: + push: + branches: [ master, v1 ] + pull_request: + # The branches below must be a subset of the branches above + branches: [ master ] + schedule: + - cron: '16 4 * * 1' + +jobs: + analyze: + name: Analyze + runs-on: ubuntu-latest + + strategy: + fail-fast: false + matrix: + language: [ 'go' ] + # CodeQL supports [ 'cpp', 'csharp', 'go', 'java', 'javascript', 'python' ] + # Learn more: + # https://docs.github.com/en/free-pro-team@latest/github/finding-security-vulnerabilities-and-errors-in-your-code/configuring-code-scanning#changing-the-languages-that-are-analyzed + + steps: + - name: Checkout repository + uses: actions/checkout@v2 + + # Initializes the CodeQL tools for scanning. + - name: Initialize CodeQL + uses: github/codeql-action/init@v1 + with: + languages: ${{ matrix.language }} + # If you wish to specify custom queries, you can do so here or in a config file. + # By default, queries listed here will override any specified in a config file. + # Prefix the list here with "+" to use these queries and those in the config file. + # queries: ./path/to/local/query, your-org/your-repo/queries@main + + # Autobuild attempts to build any compiled languages (C/C++, C#, or Java). + # If this step fails, then you should remove it and run the build manually (see below) + - name: Autobuild + uses: github/codeql-action/autobuild@v1 + + # ℹī¸ Command-line programs to run using the OS shell. + # 📚 https://git.io/JvXDl + + # ✏ī¸ If the Autobuild fails above, remove it and uncomment the following three lines + # and modify them (or add more) to build your code if your project + # uses a compiled language + + #- run: | + # make bootstrap + # make release + + - name: Perform CodeQL Analysis + uses: github/codeql-action/analyze@v2 \ No newline at end of file diff --git a/.github/workflows/main.yaml b/.github/workflows/main.yaml new file mode 100644 index 0000000..5abee19 --- /dev/null +++ b/.github/workflows/main.yaml @@ -0,0 +1,26 @@ +name: Test + +on: [push, pull_request] + +jobs: + build: + name: Test + runs-on: ubuntu-latest + strategy: + matrix: + goVer: [1.22] + steps: + - name: Set up Go ${{ matrix.goVer }} + uses: actions/setup-go@v2 + with: + go-version: ${{ matrix.goVer }} + id: go + + - name: Check out code into the Go module directory + uses: actions/checkout@v2 + + - name: Get dependencies + run: go get + + - name: Run tests + run: go test ./... \ No newline at end of file diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..31df106 --- /dev/null +++ b/.gitignore @@ -0,0 +1,69 @@ +### GoLand+all template +# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio, WebStorm and Rider +# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839 + +# User-specific stuff +.idea/ + +# *.iml +# *.ipr + +# CMake +cmake-build-*/ + +# Mongo Explorer plugin +.idea/**/mongoSettings.xml + +# File-based project format +*.iws + +# IntelliJ +out/ + +# mpeltonen/sbt-idea plugin +.idea_modules/ + +# JIRA plugin +atlassian-ide-plugin.xml + +# Cursive Clojure plugin +.idea/replstate.xml + +# SonarLint plugin +.idea/sonarlint/ + +# Crashlytics plugin (for Android Studio and IntelliJ) +com_crashlytics_export_strings.xml +crashlytics.properties +crashlytics-build.properties +fabric.properties + +# Editor-based Rest Client +.idea/httpRequests + +# Android studio 3.1+ serialized cache file +.idea/caches/build_file_checksums.ser + +### Go template +# If you prefer the allow list template instead of the deny list, see community template: +# https://github.com/github/gitignore/blob/main/community/Golang/Go.AllowList.gitignore +# +# Binaries for programs and plugins +*.exe +*.exe~ +*.dll +*.so +*.dylib + +# Test binary, built with `go test -c` +*.test + +# Output of the go coverage tool, specifically when used with LiteIDE +*.out + +# Dependency directories (remove the comment below to include it) +# vendor/ + +# Go workspace file +go.work + diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..339635f --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2024 Tim Voronov + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..43f75d7 --- /dev/null +++ b/README.md @@ -0,0 +1,39 @@ +# Throttle +> Dead-simple thread-safe throttler. + +``throttle`` provides a thread-safe mechanism to throttle function calls, ensuring that the execution rate does not exceed a specified limit. +Each instance operates independently, making it possible to control various functions or processes with different rate limits concurrently. + +## Install +```shell +go get github.com/ziflex/throttle +``` + +## Quick start + +```go +package myapp + +import ( + "net/http" + "github.com/ziflex/throttle" +) + +type ApiClient struct { + transport *http.Client + throttler *throttle.Throttler +} + +func NewApiClient(rps uint64) *ApiClient { + return &ApiClient{ + transport: &http.Client{}, + throttler: throttle.New(rps), + } +} + +func (c *ApiClient) Do(req *http.Request) (*http.Response, error) { + c.throttler.Wait() + + return c.transport.Do(req) +} +``` \ No newline at end of file diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..a9fd40e --- /dev/null +++ b/go.mod @@ -0,0 +1,3 @@ +module github.com/ziflex/throttle + +go 1.22 diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..ee6593c --- /dev/null +++ b/go.sum @@ -0,0 +1,2 @@ +github.com/benbjohnson/clock v1.3.5 h1:VvXlSJBzZpA/zum6Sj74hxwYI2DIxRWuNIoXAzHZz5o= +github.com/benbjohnson/clock v1.3.5/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= diff --git a/throttle.go b/throttle.go new file mode 100644 index 0000000..0b26c4f --- /dev/null +++ b/throttle.go @@ -0,0 +1,65 @@ +package throttle + +import ( + "sync" + "time" +) + +type Throttler struct { + mu sync.Mutex + window time.Time + counter uint64 + limit uint64 +} + +func New(limit uint64) *Throttler { + t := new(Throttler) + t.limit = limit + + return t +} + +func (t *Throttler) Wait() { + t.mu.Lock() + defer t.mu.Unlock() + + now := time.Now() + + // if first call + if t.window.IsZero() { + t.window = now + } + + sinceLastCall := now.Sub(t.window) + + // if we are past the current window + // start a new one and exit + if sinceLastCall > time.Second { + t.reset(now) + + return + } + + nextCount := t.counter + 1 + + // if we are in the limit and there is enough time left to process next operation + // we increase the counter and move on + if t.limit >= nextCount { + t.counter = nextCount + + return + } + + leftInWindow := time.Second - sinceLastCall + + // otherwise wait for the next window + time.Sleep(leftInWindow) + + // new window + t.reset(time.Now()) +} + +func (t *Throttler) reset(window time.Time) { + t.window = window + t.counter = 1 +} diff --git a/throttle_test.go b/throttle_test.go new file mode 100644 index 0000000..3b7a45e --- /dev/null +++ b/throttle_test.go @@ -0,0 +1,201 @@ +package throttle_test + +import ( + "fmt" + "github.com/ziflex/throttle" + "math" + "sync" + "testing" + "time" +) + +func TestThrottler_Wait_Consistent(t *testing.T) { + useCases := []struct { + Limit uint64 + Calls int + }{ + { + Limit: 1, + Calls: 10, + }, + { + Limit: 5, + Calls: 10, + }, + { + Limit: 5, + Calls: 16, + }, + { + Limit: 5, + Calls: 14, + }, + } + + for _, useCase := range useCases { + t.Run(fmt.Sprintf("Consistent %d RPS within %d calls", useCase.Limit, useCase.Calls), func(t *testing.T) { + calls := make(chan time.Time, useCase.Calls) + call := func(t *throttle.Throttler) { + t.Wait() + calls <- time.Now() + } + + throttler := throttle.New(useCase.Limit) + ts := time.Now() + wg := sync.WaitGroup{} + wg.Add(useCase.Calls) + + for range useCase.Calls { + go func() { + call(throttler) + wg.Done() + }() + } + + wg.Wait() + close(calls) + + groups := map[float64]uint64{} + + for c := range calls { + diff := c.Sub(ts) + dur := math.Abs(math.Floor(diff.Seconds())) + groups[dur]++ + } + + expected := useCase.Limit + + for _, actual := range groups { + if actual > expected { + t.Fatal(fmt.Sprintf("Expected %d per second, but got %d", expected, actual)) + } + } + }) + } +} + +func TestThrottler_Wait_Sporadic(t *testing.T) { + type Burst struct { + Warmup time.Duration + Latency time.Duration + Calls int + } + + seconds := func(fraction float64) time.Duration { + return time.Duration(float64(time.Second) * fraction) + } + useCases := []struct { + Limit uint64 + Calls []Burst + Expected map[float64]uint64 + }{ + { + Limit: 10, + Calls: []Burst{ + { + Warmup: seconds(0.99), + Calls: 5, + }, + { + Warmup: seconds(0.99), + Calls: 2, + }, + { + Warmup: seconds(0.5), + Calls: 4, + }, + }, + Expected: map[float64]uint64{ + 0: 5, + 1: 2, + 2: 4, + }, + }, + { + Limit: 5, + Calls: []Burst{ + { + Calls: 5, + Latency: seconds(0.255), + }, + { + Warmup: seconds(0.2), + Calls: 6, + Latency: seconds(0.45), + }, + }, + Expected: map[float64]uint64{ + 0: 3, + 1: 3, + 2: 2, + 3: 2, + 4: 1, + }, + }, + } + + for _, useCase := range useCases { + t.Run(fmt.Sprintf("Sporadic %d RPS within %d calls", useCase.Limit, useCase.Calls), func(t *testing.T) { + var totalCalls int + + for _, tp := range useCase.Calls { + totalCalls += tp.Calls + } + + calls := make(chan time.Time, totalCalls) + call := func(t *throttle.Throttler, latency time.Duration) { + if latency > 0 { + time.Sleep(latency) + } + + t.Wait() + calls <- time.Now() + } + + throttler := throttle.New(useCase.Limit) + ts := time.Now() + var wg sync.WaitGroup + wg.Add(totalCalls) + + go func() { + for _, tpl := range useCase.Calls { + if tpl.Warmup > 0 { + time.Sleep(tpl.Warmup) + } + + for range tpl.Calls { + //ts := time.Now() + call(throttler, tpl.Latency) + wg.Done() + //fmt.Println(fmt.Sprintf("Call %dms", time.Since(ts).Milliseconds())) + } + } + }() + + wg.Wait() + close(calls) + + groups := map[float64]uint64{} + + for c := range calls { + diff := c.Sub(ts) + dur := math.Abs(math.Floor(diff.Seconds())) + groups[dur]++ + + //fmt.Println(fmt.Sprintf("Elapsed %ds", int64(dur))) + } + + for sec, actual := range groups { + expected, found := useCase.Expected[sec] + + if !found { + t.Fatal(fmt.Sprintf("Expected to have calls within %ds time range", int64(sec))) + } + + if actual != expected { + t.Fatal(fmt.Sprintf("Expected %d per second, but got %d", expected, actual)) + } + } + }) + } +}