diff --git a/README.md b/README.md index eccaf412..b460d2d4 100644 --- a/README.md +++ b/README.md @@ -229,36 +229,49 @@ The logged metric is a json dictonary like: ```json { - "connected_users_anonymous_connections": 1, - "connected_users_average_connections": 3, - "connected_users_current": 3, - "connected_users_current_local": 3, - "connected_users_total": 3, - "connected_users_total_local": 3, - "current_connections": 8, - "current_connections_local": 8, - "datastore_cache_key_len": 343, - "datastore_cache_size": 3114, - "runtime_goroutines": 42 + "connections_longpolling_connected_users_anonymous_connections": 0, + "connections_longpolling_connected_users_average_connections": 3, + "connections_longpolling_connected_users_current": 1, + "connections_longpolling_connected_users_current_local": 1, + "connections_longpolling_connected_users_total": 1, + "connections_longpolling_connected_users_total_local": 1, + "connections_longpolling_current_connections": 3, + "connections_longpolling_current_connections_local": 3, + "connections_stream_connected_users_anonymous_connections": 0, + "connections_stream_connected_users_average_connections": 6, + "connections_stream_connected_users_current": 2, + "connections_stream_connected_users_current_local": 2, + "connections_stream_connected_users_total": 3, + "connections_stream_connected_users_total_local": 3, + "connections_stream_current_connections": 13, + "connections_stream_current_connections_local": 13, + "datastore_cache_key_len": 236478, + "datastore_cache_size": 1722987, + "runtime_goroutines": 68 } ``` -The values are: - -* `connected_users_anonymous_connections`: Number of connections from the - anonymous users from all autoupdate instances. -* `connected_users_average_connections`: Average connection count for each user - except for anonymous user. -* `connected_users_current`: Amount of connected users that have at least one - open connection. -* `connected_users_current_local`: Amount of connected users that have at least - one open connection of this instance. -* `connected_users_total`: Amount of different users that are currently - connected or were connected since the autoupdate service was started. -* `connected_users_total_local`: Same as `connected_users_total`, but only for this -instance. -* `current_connections`: Amount of all connections. -* `current_connections_local`: Amount of all connections of this instance. +The prefix `connections_stream` are for "normal" connections. +`connections_longpolling` are for connections, that use the longpolling +fallback. + + + +* `connections_stream_connected_users_anonymous_connections`: Number of + connections from the anonymous users from all autoupdate instances. +* `connections_stream_connected_users_average_connections`: Average connection + count for each user except for anonymous user. +* `connections_stream_connected_users_current`: Amount of connected users that + have at least one open connection. +* `connections_stream_connected_users_current_local`: Amount of connected users + that have at least one open connection of this instance. +* `connections_stream_connected_users_total`: Amount of different users that are + currently connected or were connected since the autoupdate service was + started. +* `connections_stream_connected_users_total_local`: Same as +`connected_users_total`, but only for this instance. +* `connections_stream_current_connections`: Amount of all connections. +* `connections_stream_current_connections_local`: Amount of all connections of this instance. * `datastore_cache_key_len`: Amount of keys in the cache. * `datastore_cache_size`: Combined size of all values in the cache. * `runtime_goroutines`: Current goroutines used by the instance. diff --git a/internal/http/connection_count.go b/internal/http/connection_count.go index 2098943a..ca13ea8d 100644 --- a/internal/http/connection_count.go +++ b/internal/http/connection_count.go @@ -18,25 +18,27 @@ type RedisMetric interface { Get(ctx context.Context) (map[int]int, error) } -// connectionCount counts, how many connections a user has. +// ConnectionCount counts, how many connections a user has. // -// It holds a local counter and saves it to redis after a connection is created -// or closed. +// It holds a local counter and saves it to redis from time to time. The +// argument `saveInterval` defines, how oftem it is saved. // -// It also pings redis from time to time to show, that this instance is -// still running. -type connectionCount struct { +// It also pings redis from time to time to show, that this instance is still +// running. +type ConnectionCount struct { metric RedisMetric + name string mu sync.Mutex connections map[int]int } -func newConnectionCount(ctx context.Context, r *redis.Redis, saveInterval time.Duration) *connectionCount { - redisMetric := redis.NewMetric[map[int]int](r, "autoupdate_connection_count", mapIntCombiner{}, saveInterval*2, time.Now) +func newConnectionCount(ctx context.Context, r *redis.Redis, saveInterval time.Duration, name string) *ConnectionCount { + redisMetric := redis.NewMetric[map[int]int](r, name, mapIntCombiner{}, saveInterval*2, time.Now) - c := connectionCount{ + c := ConnectionCount{ metric: redisMetric, + name: name, connections: make(map[int]int), } @@ -60,7 +62,7 @@ func newConnectionCount(ctx context.Context, r *redis.Redis, saveInterval time.D return &c } -func (c *connectionCount) save(ctx context.Context) error { +func (c *ConnectionCount) save(ctx context.Context) error { c.mu.Lock() converted, err := json.Marshal(c.connections) c.mu.Unlock() @@ -75,31 +77,38 @@ func (c *connectionCount) save(ctx context.Context) error { return nil } -func (c *connectionCount) increment(uid int, increment int) { +func (c *ConnectionCount) increment(uid int, increment int) { c.mu.Lock() c.connections[uid] += increment c.mu.Unlock() } -func (c *connectionCount) Add(uid int) { +// Add adds one connection to the counter. +func (c *ConnectionCount) Add(uid int) { c.increment(uid, 1) } -func (c *connectionCount) Done(uid int) { +// Done removes one connection from the counter. +func (c *ConnectionCount) Done(uid int) { c.increment(uid, -1) } -func (c *connectionCount) Show(ctx context.Context) (map[int]int, error) { +// Show shoes the counter. +func (c *ConnectionCount) Show(ctx context.Context, filter func(ctx context.Context, count map[int]int) error) (map[int]int, error) { data, err := c.metric.Get(ctx) if err != nil { return nil, fmt.Errorf("getting counter from redis: %w", err) } + if err := filter(ctx, data); err != nil { + return nil, fmt.Errorf("filtering counter: %w", err) + } + return data, nil } // Metric is a function needed my the openslides metric system to fetch some values. -func (c *connectionCount) Metric(con metric.Container) { +func (c *ConnectionCount) Metric(con metric.Container) { ctx := context.Background() data, err := c.metric.Get(ctx) @@ -145,17 +154,15 @@ func (c *connectionCount) Metric(con metric.Container) { average = averageSum / averageCount } - prefix := "connected_users" - con.Add(prefix+"_current", currentConnectedUsers) - con.Add(prefix+"_total", len(data)) - con.Add(prefix+"_current_local", localCurrentUsers) - con.Add(prefix+"_total_local", totalCurrentConnections) - con.Add(prefix+"_average_connections", average) - con.Add(prefix+"_anonymous_connections", data[0]) - - prefix = "current_connections" - con.Add(prefix, currentConnections) - con.Add(prefix+"_local", localCurrentConnections) + con.Add(c.name+"_connected_users_current", currentConnectedUsers) + con.Add(c.name+"_connected_users_total", len(data)) + con.Add(c.name+"_connected_users_current_local", localCurrentUsers) + con.Add(c.name+"_connected_users_total_local", totalCurrentConnections) + con.Add(c.name+"_connected_users_average_connections", average) + con.Add(c.name+"_connected_users_anonymous_connections", data[0]) + + con.Add(c.name+"_current_connections", currentConnections) + con.Add(c.name+"_current_connections_local", localCurrentConnections) } // mapIntCombiner tells the redis Metric, how to combine the metric values. diff --git a/internal/http/http.go b/internal/http/http.go index 810c82f6..869c1046 100644 --- a/internal/http/http.go +++ b/internal/http/http.go @@ -43,10 +43,12 @@ func Run( redisConnection *redis.Redis, saveIntercal time.Duration, ) error { - var connectionCount *connectionCount + var connectionCount [2]*ConnectionCount if redisConnection != nil { - connectionCount = newConnectionCount(ctx, redisConnection, saveIntercal) - metric.Register(connectionCount.Metric) + connectionCount[0] = newConnectionCount(ctx, redisConnection, saveIntercal, "connections_stream") + connectionCount[1] = newConnectionCount(ctx, redisConnection, saveIntercal, "connections_longpolling") + metric.Register(connectionCount[0].Metric) + metric.Register(connectionCount[1].Metric) } mux := http.NewServeMux() @@ -244,7 +246,7 @@ func parseBodyNormal(r *http.Request) ([]byte, string, bool, error) { // HandleAutoupdate builds the requested keys from the body of a request. The // body has to be in the format specified in the keysbuilder package. -func HandleAutoupdate(mux *http.ServeMux, auth Authenticater, connecter Connecter, history History, connectionCount *connectionCount) { +func HandleAutoupdate(mux *http.ServeMux, auth Authenticater, connecter Connecter, history History, connectionCount [2]*ConnectionCount) { mux.Handle( prefixPublic, validRequest( @@ -303,9 +305,9 @@ func writeData(w io.Writer, data map[dskey.Key][]byte, compress bool) error { } // HandleShowConnectionCount adds a handler to show the result of the connection counter. -func HandleShowConnectionCount(mux *http.ServeMux, autoupdate *autoupdate.Autoupdate, auth Authenticater, connectionCount *connectionCount) { +func HandleShowConnectionCount(mux *http.ServeMux, autoupdate *autoupdate.Autoupdate, auth Authenticater, connectionCount [2]*ConnectionCount) { handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if connectionCount == nil { + if connectionCount[0] == nil { oserror.Handle(fmt.Errorf("Error connection count is not initialized")) http.Error(w, "Counting not possible", 500) return @@ -326,20 +328,31 @@ func HandleShowConnectionCount(mux *http.ServeMux, autoupdate *autoupdate.Autoup return } - val, err := connectionCount.Show(ctx) + filter := func(ctx context.Context, count map[int]int) error { + return autoupdate.FilterConnectionCount(ctx, meetingIDs, count) + } + + val1, err := connectionCount[0].Show(ctx, filter) + if err != nil { + oserror.Handle(fmt.Errorf("Error counting normal connection: %w", err)) + http.Error(w, "Counting not possible", 500) + return + } + + val2, err := connectionCount[1].Show(ctx, filter) if err != nil { - oserror.Handle(fmt.Errorf("Error counting connection: %w", err)) + oserror.Handle(fmt.Errorf("Error counting longpolling connection: %w", err)) http.Error(w, "Counting not possible", 500) return } - if err := autoupdate.FilterConnectionCount(ctx, meetingIDs, val); err != nil { + if err := autoupdate.FilterConnectionCount(ctx, meetingIDs, val2); err != nil { oserror.Handle(fmt.Errorf("Error filtering connection count: %w", err)) http.Error(w, "Counting not possible", 500) return } - if err := json.NewEncoder(w).Encode(val); err != nil { + if err := json.NewEncoder(w).Encode([2]map[int]int{val1, val2}); err != nil { oserror.Handle(fmt.Errorf("Error decoding counter %w", err)) http.Error(w, "Counting not possible", 500) return @@ -563,18 +576,32 @@ func validRequest(next http.Handler) http.Handler { }) } -func connectionCountMiddleware(next http.Handler, auth Authenticater, counter *connectionCount) http.Handler { - if counter == nil { +// isLongPollingRequest returns, if the request is a longpolling fallback +// request. +// +// This is the case, if it has the argument "longpolling" or if the body is +// multipart. +func isLongPollingRequest(r *http.Request) bool { + return r.URL.Query().Has("longpolling") || strings.HasPrefix(strings.ToLower(r.Header.Get("Content-Type")), "multipart/") +} + +func connectionCountMiddleware(next http.Handler, auth Authenticater, counter [2]*ConnectionCount) http.Handler { + if counter[0] == nil { return next } return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { ctx := r.Context() uid := auth.FromContext(ctx) - counter.Add(uid) + count := counter[0] + if isLongPollingRequest(r) { + count = counter[1] + } + + count.Add(uid) defer func() { - counter.Done(uid) + count.Done(uid) }() next.ServeHTTP(w, r) diff --git a/internal/http/http_test.go b/internal/http/http_test.go index 81c31b0d..3db4dd35 100644 --- a/internal/http/http_test.go +++ b/internal/http/http_test.go @@ -62,7 +62,7 @@ func TestKeysHandler(t *testing.T) { f: func() (func(ctx context.Context) (map[dskey.Key][]byte, error), bool) { return f, true }, } - ahttp.HandleAutoupdate(mux, fakeAuth(1), connecter, nil, nil) + ahttp.HandleAutoupdate(mux, fakeAuth(1), connecter, nil, [2]*ahttp.ConnectionCount{}) req := httptest.NewRequest("GET", "/system/autoupdate?k=user/1/username,user/2/username", nil).WithContext(ctx) rec := httptest.NewRecorder() @@ -98,7 +98,7 @@ func TestComplexHandler(t *testing.T) { f: func() (func(ctx context.Context) (map[dskey.Key][]byte, error), bool) { return f, true }, } - ahttp.HandleAutoupdate(mux, fakeAuth(1), connecter, nil, nil) + ahttp.HandleAutoupdate(mux, fakeAuth(1), connecter, nil, [2]*ahttp.ConnectionCount{}) req := httptest.NewRequest( "GET", @@ -153,7 +153,7 @@ func TestErrors(t *testing.T) { f: func() (func(ctx context.Context) (map[dskey.Key][]byte, error), bool) { return f, true }, } - ahttp.HandleAutoupdate(mux, fakeAuth(1), connecter, nil, nil) + ahttp.HandleAutoupdate(mux, fakeAuth(1), connecter, nil, [2]*ahttp.ConnectionCount{}) for _, tt := range []struct { name string