Skip to content

Commit

Permalink
Split connection count metirc for longpolling (#929) (#930)
Browse files Browse the repository at this point in the history
Co-authored-by: Oskar Hahn <[email protected]>
  • Loading branch information
peb-adr and ostcar authored May 2, 2024
1 parent c819b7d commit 0963365
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 70 deletions.
67 changes: 40 additions & 27 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -229,36 +229,49 @@ The logged metric is a json dictonary like:

```json
{
"connected_users_anonymous_connections": 1,
"connected_users_average_connections": 3,
"connected_users_current": 3,
"connected_users_current_local": 3,
"connected_users_total": 3,
"connected_users_total_local": 3,
"current_connections": 8,
"current_connections_local": 8,
"datastore_cache_key_len": 343,
"datastore_cache_size": 3114,
"runtime_goroutines": 42
"connections_longpolling_connected_users_anonymous_connections": 0,
"connections_longpolling_connected_users_average_connections": 3,
"connections_longpolling_connected_users_current": 1,
"connections_longpolling_connected_users_current_local": 1,
"connections_longpolling_connected_users_total": 1,
"connections_longpolling_connected_users_total_local": 1,
"connections_longpolling_current_connections": 3,
"connections_longpolling_current_connections_local": 3,
"connections_stream_connected_users_anonymous_connections": 0,
"connections_stream_connected_users_average_connections": 6,
"connections_stream_connected_users_current": 2,
"connections_stream_connected_users_current_local": 2,
"connections_stream_connected_users_total": 3,
"connections_stream_connected_users_total_local": 3,
"connections_stream_current_connections": 13,
"connections_stream_current_connections_local": 13,
"datastore_cache_key_len": 236478,
"datastore_cache_size": 1722987,
"runtime_goroutines": 68
}
```

The values are:

* `connected_users_anonymous_connections`: Number of connections from the
anonymous users from all autoupdate instances.
* `connected_users_average_connections`: Average connection count for each user
except for anonymous user.
* `connected_users_current`: Amount of connected users that have at least one
open connection.
* `connected_users_current_local`: Amount of connected users that have at least
one open connection of this instance.
* `connected_users_total`: Amount of different users that are currently
connected or were connected since the autoupdate service was started.
* `connected_users_total_local`: Same as `connected_users_total`, but only for this
instance.
* `current_connections`: Amount of all connections.
* `current_connections_local`: Amount of all connections of this instance.
The prefix `connections_stream` are for "normal" connections.
`connections_longpolling` are for connections, that use the longpolling
fallback.



* `connections_stream_connected_users_anonymous_connections`: Number of
connections from the anonymous users from all autoupdate instances.
* `connections_stream_connected_users_average_connections`: Average connection
count for each user except for anonymous user.
* `connections_stream_connected_users_current`: Amount of connected users that
have at least one open connection.
* `connections_stream_connected_users_current_local`: Amount of connected users
that have at least one open connection of this instance.
* `connections_stream_connected_users_total`: Amount of different users that are
currently connected or were connected since the autoupdate service was
started.
* `connections_stream_connected_users_total_local`: Same as
`connected_users_total`, but only for this instance.
* `connections_stream_current_connections`: Amount of all connections.
* `connections_stream_current_connections_local`: Amount of all connections of this instance.
* `datastore_cache_key_len`: Amount of keys in the cache.
* `datastore_cache_size`: Combined size of all values in the cache.
* `runtime_goroutines`: Current goroutines used by the instance.
Expand Down
59 changes: 33 additions & 26 deletions internal/http/connection_count.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,27 @@ type RedisMetric interface {
Get(ctx context.Context) (map[int]int, error)
}

// connectionCount counts, how many connections a user has.
// ConnectionCount counts, how many connections a user has.
//
// It holds a local counter and saves it to redis after a connection is created
// or closed.
// It holds a local counter and saves it to redis from time to time. The
// argument `saveInterval` defines, how oftem it is saved.
//
// It also pings redis from time to time to show, that this instance is
// still running.
type connectionCount struct {
// It also pings redis from time to time to show, that this instance is still
// running.
type ConnectionCount struct {
metric RedisMetric
name string

mu sync.Mutex
connections map[int]int
}

func newConnectionCount(ctx context.Context, r *redis.Redis, saveInterval time.Duration) *connectionCount {
redisMetric := redis.NewMetric[map[int]int](r, "autoupdate_connection_count", mapIntCombiner{}, saveInterval*2, time.Now)
func newConnectionCount(ctx context.Context, r *redis.Redis, saveInterval time.Duration, name string) *ConnectionCount {
redisMetric := redis.NewMetric[map[int]int](r, name, mapIntCombiner{}, saveInterval*2, time.Now)

c := connectionCount{
c := ConnectionCount{
metric: redisMetric,
name: name,
connections: make(map[int]int),
}

Expand All @@ -60,7 +62,7 @@ func newConnectionCount(ctx context.Context, r *redis.Redis, saveInterval time.D
return &c
}

func (c *connectionCount) save(ctx context.Context) error {
func (c *ConnectionCount) save(ctx context.Context) error {
c.mu.Lock()
converted, err := json.Marshal(c.connections)
c.mu.Unlock()
Expand All @@ -75,31 +77,38 @@ func (c *connectionCount) save(ctx context.Context) error {
return nil
}

func (c *connectionCount) increment(uid int, increment int) {
func (c *ConnectionCount) increment(uid int, increment int) {
c.mu.Lock()
c.connections[uid] += increment
c.mu.Unlock()
}

func (c *connectionCount) Add(uid int) {
// Add adds one connection to the counter.
func (c *ConnectionCount) Add(uid int) {
c.increment(uid, 1)
}

func (c *connectionCount) Done(uid int) {
// Done removes one connection from the counter.
func (c *ConnectionCount) Done(uid int) {
c.increment(uid, -1)
}

func (c *connectionCount) Show(ctx context.Context) (map[int]int, error) {
// Show shoes the counter.
func (c *ConnectionCount) Show(ctx context.Context, filter func(ctx context.Context, count map[int]int) error) (map[int]int, error) {
data, err := c.metric.Get(ctx)
if err != nil {
return nil, fmt.Errorf("getting counter from redis: %w", err)
}

if err := filter(ctx, data); err != nil {
return nil, fmt.Errorf("filtering counter: %w", err)
}

return data, nil
}

// Metric is a function needed my the openslides metric system to fetch some values.
func (c *connectionCount) Metric(con metric.Container) {
func (c *ConnectionCount) Metric(con metric.Container) {
ctx := context.Background()

data, err := c.metric.Get(ctx)
Expand Down Expand Up @@ -145,17 +154,15 @@ func (c *connectionCount) Metric(con metric.Container) {
average = averageSum / averageCount
}

prefix := "connected_users"
con.Add(prefix+"_current", currentConnectedUsers)
con.Add(prefix+"_total", len(data))
con.Add(prefix+"_current_local", localCurrentUsers)
con.Add(prefix+"_total_local", totalCurrentConnections)
con.Add(prefix+"_average_connections", average)
con.Add(prefix+"_anonymous_connections", data[0])

prefix = "current_connections"
con.Add(prefix, currentConnections)
con.Add(prefix+"_local", localCurrentConnections)
con.Add(c.name+"_connected_users_current", currentConnectedUsers)
con.Add(c.name+"_connected_users_total", len(data))
con.Add(c.name+"_connected_users_current_local", localCurrentUsers)
con.Add(c.name+"_connected_users_total_local", totalCurrentConnections)
con.Add(c.name+"_connected_users_average_connections", average)
con.Add(c.name+"_connected_users_anonymous_connections", data[0])

con.Add(c.name+"_current_connections", currentConnections)
con.Add(c.name+"_current_connections_local", localCurrentConnections)
}

// mapIntCombiner tells the redis Metric, how to combine the metric values.
Expand Down
55 changes: 41 additions & 14 deletions internal/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,12 @@ func Run(
redisConnection *redis.Redis,
saveIntercal time.Duration,
) error {
var connectionCount *connectionCount
var connectionCount [2]*ConnectionCount
if redisConnection != nil {
connectionCount = newConnectionCount(ctx, redisConnection, saveIntercal)
metric.Register(connectionCount.Metric)
connectionCount[0] = newConnectionCount(ctx, redisConnection, saveIntercal, "connections_stream")
connectionCount[1] = newConnectionCount(ctx, redisConnection, saveIntercal, "connections_longpolling")
metric.Register(connectionCount[0].Metric)
metric.Register(connectionCount[1].Metric)
}

mux := http.NewServeMux()
Expand Down Expand Up @@ -244,7 +246,7 @@ func parseBodyNormal(r *http.Request) ([]byte, string, bool, error) {

// HandleAutoupdate builds the requested keys from the body of a request. The
// body has to be in the format specified in the keysbuilder package.
func HandleAutoupdate(mux *http.ServeMux, auth Authenticater, connecter Connecter, history History, connectionCount *connectionCount) {
func HandleAutoupdate(mux *http.ServeMux, auth Authenticater, connecter Connecter, history History, connectionCount [2]*ConnectionCount) {
mux.Handle(
prefixPublic,
validRequest(
Expand Down Expand Up @@ -303,9 +305,9 @@ func writeData(w io.Writer, data map[dskey.Key][]byte, compress bool) error {
}

// HandleShowConnectionCount adds a handler to show the result of the connection counter.
func HandleShowConnectionCount(mux *http.ServeMux, autoupdate *autoupdate.Autoupdate, auth Authenticater, connectionCount *connectionCount) {
func HandleShowConnectionCount(mux *http.ServeMux, autoupdate *autoupdate.Autoupdate, auth Authenticater, connectionCount [2]*ConnectionCount) {
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if connectionCount == nil {
if connectionCount[0] == nil {
oserror.Handle(fmt.Errorf("Error connection count is not initialized"))
http.Error(w, "Counting not possible", 500)
return
Expand All @@ -326,20 +328,31 @@ func HandleShowConnectionCount(mux *http.ServeMux, autoupdate *autoupdate.Autoup
return
}

val, err := connectionCount.Show(ctx)
filter := func(ctx context.Context, count map[int]int) error {
return autoupdate.FilterConnectionCount(ctx, meetingIDs, count)
}

val1, err := connectionCount[0].Show(ctx, filter)
if err != nil {
oserror.Handle(fmt.Errorf("Error counting normal connection: %w", err))
http.Error(w, "Counting not possible", 500)
return
}

val2, err := connectionCount[1].Show(ctx, filter)
if err != nil {
oserror.Handle(fmt.Errorf("Error counting connection: %w", err))
oserror.Handle(fmt.Errorf("Error counting longpolling connection: %w", err))
http.Error(w, "Counting not possible", 500)
return
}

if err := autoupdate.FilterConnectionCount(ctx, meetingIDs, val); err != nil {
if err := autoupdate.FilterConnectionCount(ctx, meetingIDs, val2); err != nil {
oserror.Handle(fmt.Errorf("Error filtering connection count: %w", err))
http.Error(w, "Counting not possible", 500)
return
}

if err := json.NewEncoder(w).Encode(val); err != nil {
if err := json.NewEncoder(w).Encode([2]map[int]int{val1, val2}); err != nil {
oserror.Handle(fmt.Errorf("Error decoding counter %w", err))
http.Error(w, "Counting not possible", 500)
return
Expand Down Expand Up @@ -563,18 +576,32 @@ func validRequest(next http.Handler) http.Handler {
})
}

func connectionCountMiddleware(next http.Handler, auth Authenticater, counter *connectionCount) http.Handler {
if counter == nil {
// isLongPollingRequest returns, if the request is a longpolling fallback
// request.
//
// This is the case, if it has the argument "longpolling" or if the body is
// multipart.
func isLongPollingRequest(r *http.Request) bool {
return r.URL.Query().Has("longpolling") || strings.HasPrefix(strings.ToLower(r.Header.Get("Content-Type")), "multipart/")
}

func connectionCountMiddleware(next http.Handler, auth Authenticater, counter [2]*ConnectionCount) http.Handler {
if counter[0] == nil {
return next
}

return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
uid := auth.FromContext(ctx)
counter.Add(uid)
count := counter[0]
if isLongPollingRequest(r) {
count = counter[1]
}

count.Add(uid)

defer func() {
counter.Done(uid)
count.Done(uid)
}()

next.ServeHTTP(w, r)
Expand Down
6 changes: 3 additions & 3 deletions internal/http/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func TestKeysHandler(t *testing.T) {
f: func() (func(ctx context.Context) (map[dskey.Key][]byte, error), bool) { return f, true },
}

ahttp.HandleAutoupdate(mux, fakeAuth(1), connecter, nil, nil)
ahttp.HandleAutoupdate(mux, fakeAuth(1), connecter, nil, [2]*ahttp.ConnectionCount{})

req := httptest.NewRequest("GET", "/system/autoupdate?k=user/1/username,user/2/username", nil).WithContext(ctx)
rec := httptest.NewRecorder()
Expand Down Expand Up @@ -98,7 +98,7 @@ func TestComplexHandler(t *testing.T) {
f: func() (func(ctx context.Context) (map[dskey.Key][]byte, error), bool) { return f, true },
}

ahttp.HandleAutoupdate(mux, fakeAuth(1), connecter, nil, nil)
ahttp.HandleAutoupdate(mux, fakeAuth(1), connecter, nil, [2]*ahttp.ConnectionCount{})

req := httptest.NewRequest(
"GET",
Expand Down Expand Up @@ -153,7 +153,7 @@ func TestErrors(t *testing.T) {
f: func() (func(ctx context.Context) (map[dskey.Key][]byte, error), bool) { return f, true },
}

ahttp.HandleAutoupdate(mux, fakeAuth(1), connecter, nil, nil)
ahttp.HandleAutoupdate(mux, fakeAuth(1), connecter, nil, [2]*ahttp.ConnectionCount{})

for _, tt := range []struct {
name string
Expand Down

0 comments on commit 0963365

Please sign in to comment.