Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split connection count metirc for longpolling #929

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 40 additions & 27 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -229,36 +229,49 @@ The logged metric is a json dictonary like:

```json
{
"connected_users_anonymous_connections": 1,
"connected_users_average_connections": 3,
"connected_users_current": 3,
"connected_users_current_local": 3,
"connected_users_total": 3,
"connected_users_total_local": 3,
"current_connections": 8,
"current_connections_local": 8,
"datastore_cache_key_len": 343,
"datastore_cache_size": 3114,
"runtime_goroutines": 42
"connections_longpolling_connected_users_anonymous_connections": 0,
"connections_longpolling_connected_users_average_connections": 3,
"connections_longpolling_connected_users_current": 1,
"connections_longpolling_connected_users_current_local": 1,
"connections_longpolling_connected_users_total": 1,
"connections_longpolling_connected_users_total_local": 1,
"connections_longpolling_current_connections": 3,
"connections_longpolling_current_connections_local": 3,
"connections_stream_connected_users_anonymous_connections": 0,
"connections_stream_connected_users_average_connections": 6,
"connections_stream_connected_users_current": 2,
"connections_stream_connected_users_current_local": 2,
"connections_stream_connected_users_total": 3,
"connections_stream_connected_users_total_local": 3,
"connections_stream_current_connections": 13,
"connections_stream_current_connections_local": 13,
"datastore_cache_key_len": 236478,
"datastore_cache_size": 1722987,
"runtime_goroutines": 68
}
```

The values are:

* `connected_users_anonymous_connections`: Number of connections from the
anonymous users from all autoupdate instances.
* `connected_users_average_connections`: Average connection count for each user
except for anonymous user.
* `connected_users_current`: Amount of connected users that have at least one
open connection.
* `connected_users_current_local`: Amount of connected users that have at least
one open connection of this instance.
* `connected_users_total`: Amount of different users that are currently
connected or were connected since the autoupdate service was started.
* `connected_users_total_local`: Same as `connected_users_total`, but only for this
instance.
* `current_connections`: Amount of all connections.
* `current_connections_local`: Amount of all connections of this instance.
The prefix `connections_stream` are for "normal" connections.
`connections_longpolling` are for connections, that use the longpolling
fallback.



* `connections_stream_connected_users_anonymous_connections`: Number of
connections from the anonymous users from all autoupdate instances.
* `connections_stream_connected_users_average_connections`: Average connection
count for each user except for anonymous user.
* `connections_stream_connected_users_current`: Amount of connected users that
have at least one open connection.
* `connections_stream_connected_users_current_local`: Amount of connected users
that have at least one open connection of this instance.
* `connections_stream_connected_users_total`: Amount of different users that are
currently connected or were connected since the autoupdate service was
started.
* `connections_stream_connected_users_total_local`: Same as
`connected_users_total`, but only for this instance.
* `connections_stream_current_connections`: Amount of all connections.
* `connections_stream_current_connections_local`: Amount of all connections of this instance.
* `datastore_cache_key_len`: Amount of keys in the cache.
* `datastore_cache_size`: Combined size of all values in the cache.
* `runtime_goroutines`: Current goroutines used by the instance.
Expand Down
59 changes: 33 additions & 26 deletions internal/http/connection_count.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,27 @@ type RedisMetric interface {
Get(ctx context.Context) (map[int]int, error)
}

// connectionCount counts, how many connections a user has.
// ConnectionCount counts, how many connections a user has.
//
// It holds a local counter and saves it to redis after a connection is created
// or closed.
// It holds a local counter and saves it to redis from time to time. The
// argument `saveInterval` defines, how oftem it is saved.
//
// It also pings redis from time to time to show, that this instance is
// still running.
type connectionCount struct {
// It also pings redis from time to time to show, that this instance is still
// running.
type ConnectionCount struct {
metric RedisMetric
name string

mu sync.Mutex
connections map[int]int
}

func newConnectionCount(ctx context.Context, r *redis.Redis, saveInterval time.Duration) *connectionCount {
redisMetric := redis.NewMetric[map[int]int](r, "autoupdate_connection_count", mapIntCombiner{}, saveInterval*2, time.Now)
func newConnectionCount(ctx context.Context, r *redis.Redis, saveInterval time.Duration, name string) *ConnectionCount {
redisMetric := redis.NewMetric[map[int]int](r, name, mapIntCombiner{}, saveInterval*2, time.Now)

c := connectionCount{
c := ConnectionCount{
metric: redisMetric,
name: name,
connections: make(map[int]int),
}

Expand All @@ -60,7 +62,7 @@ func newConnectionCount(ctx context.Context, r *redis.Redis, saveInterval time.D
return &c
}

func (c *connectionCount) save(ctx context.Context) error {
func (c *ConnectionCount) save(ctx context.Context) error {
c.mu.Lock()
converted, err := json.Marshal(c.connections)
c.mu.Unlock()
Expand All @@ -75,31 +77,38 @@ func (c *connectionCount) save(ctx context.Context) error {
return nil
}

func (c *connectionCount) increment(uid int, increment int) {
func (c *ConnectionCount) increment(uid int, increment int) {
c.mu.Lock()
c.connections[uid] += increment
c.mu.Unlock()
}

func (c *connectionCount) Add(uid int) {
// Add adds one connection to the counter.
func (c *ConnectionCount) Add(uid int) {
c.increment(uid, 1)
}

func (c *connectionCount) Done(uid int) {
// Done removes one connection from the counter.
func (c *ConnectionCount) Done(uid int) {
c.increment(uid, -1)
}

func (c *connectionCount) Show(ctx context.Context) (map[int]int, error) {
// Show shoes the counter.
func (c *ConnectionCount) Show(ctx context.Context, filter func(ctx context.Context, count map[int]int) error) (map[int]int, error) {
data, err := c.metric.Get(ctx)
if err != nil {
return nil, fmt.Errorf("getting counter from redis: %w", err)
}

if err := filter(ctx, data); err != nil {
return nil, fmt.Errorf("filtering counter: %w", err)
}

return data, nil
}

// Metric is a function needed my the openslides metric system to fetch some values.
func (c *connectionCount) Metric(con metric.Container) {
func (c *ConnectionCount) Metric(con metric.Container) {
ctx := context.Background()

data, err := c.metric.Get(ctx)
Expand Down Expand Up @@ -145,17 +154,15 @@ func (c *connectionCount) Metric(con metric.Container) {
average = averageSum / averageCount
}

prefix := "connected_users"
con.Add(prefix+"_current", currentConnectedUsers)
con.Add(prefix+"_total", len(data))
con.Add(prefix+"_current_local", localCurrentUsers)
con.Add(prefix+"_total_local", totalCurrentConnections)
con.Add(prefix+"_average_connections", average)
con.Add(prefix+"_anonymous_connections", data[0])

prefix = "current_connections"
con.Add(prefix, currentConnections)
con.Add(prefix+"_local", localCurrentConnections)
con.Add(c.name+"_connected_users_current", currentConnectedUsers)
con.Add(c.name+"_connected_users_total", len(data))
con.Add(c.name+"_connected_users_current_local", localCurrentUsers)
con.Add(c.name+"_connected_users_total_local", totalCurrentConnections)
con.Add(c.name+"_connected_users_average_connections", average)
con.Add(c.name+"_connected_users_anonymous_connections", data[0])

con.Add(c.name+"_current_connections", currentConnections)
con.Add(c.name+"_current_connections_local", localCurrentConnections)
}

// mapIntCombiner tells the redis Metric, how to combine the metric values.
Expand Down
55 changes: 41 additions & 14 deletions internal/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,12 @@ func Run(
redisConnection *redis.Redis,
saveIntercal time.Duration,
) error {
var connectionCount *connectionCount
var connectionCount [2]*ConnectionCount
if redisConnection != nil {
connectionCount = newConnectionCount(ctx, redisConnection, saveIntercal)
metric.Register(connectionCount.Metric)
connectionCount[0] = newConnectionCount(ctx, redisConnection, saveIntercal, "connections_stream")
connectionCount[1] = newConnectionCount(ctx, redisConnection, saveIntercal, "connections_longpolling")
metric.Register(connectionCount[0].Metric)
metric.Register(connectionCount[1].Metric)
}

mux := http.NewServeMux()
Expand Down Expand Up @@ -244,7 +246,7 @@ func parseBodyNormal(r *http.Request) ([]byte, string, bool, error) {

// HandleAutoupdate builds the requested keys from the body of a request. The
// body has to be in the format specified in the keysbuilder package.
func HandleAutoupdate(mux *http.ServeMux, auth Authenticater, connecter Connecter, history History, connectionCount *connectionCount) {
func HandleAutoupdate(mux *http.ServeMux, auth Authenticater, connecter Connecter, history History, connectionCount [2]*ConnectionCount) {
mux.Handle(
prefixPublic,
validRequest(
Expand Down Expand Up @@ -303,9 +305,9 @@ func writeData(w io.Writer, data map[dskey.Key][]byte, compress bool) error {
}

// HandleShowConnectionCount adds a handler to show the result of the connection counter.
func HandleShowConnectionCount(mux *http.ServeMux, autoupdate *autoupdate.Autoupdate, auth Authenticater, connectionCount *connectionCount) {
func HandleShowConnectionCount(mux *http.ServeMux, autoupdate *autoupdate.Autoupdate, auth Authenticater, connectionCount [2]*ConnectionCount) {
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if connectionCount == nil {
if connectionCount[0] == nil {
oserror.Handle(fmt.Errorf("Error connection count is not initialized"))
http.Error(w, "Counting not possible", 500)
return
Expand All @@ -326,20 +328,31 @@ func HandleShowConnectionCount(mux *http.ServeMux, autoupdate *autoupdate.Autoup
return
}

val, err := connectionCount.Show(ctx)
filter := func(ctx context.Context, count map[int]int) error {
return autoupdate.FilterConnectionCount(ctx, meetingIDs, count)
}

val1, err := connectionCount[0].Show(ctx, filter)
if err != nil {
oserror.Handle(fmt.Errorf("Error counting normal connection: %w", err))
http.Error(w, "Counting not possible", 500)
return
}

val2, err := connectionCount[1].Show(ctx, filter)
if err != nil {
oserror.Handle(fmt.Errorf("Error counting connection: %w", err))
oserror.Handle(fmt.Errorf("Error counting longpolling connection: %w", err))
http.Error(w, "Counting not possible", 500)
return
}

if err := autoupdate.FilterConnectionCount(ctx, meetingIDs, val); err != nil {
if err := autoupdate.FilterConnectionCount(ctx, meetingIDs, val2); err != nil {
oserror.Handle(fmt.Errorf("Error filtering connection count: %w", err))
http.Error(w, "Counting not possible", 500)
return
}

if err := json.NewEncoder(w).Encode(val); err != nil {
if err := json.NewEncoder(w).Encode([2]map[int]int{val1, val2}); err != nil {
oserror.Handle(fmt.Errorf("Error decoding counter %w", err))
http.Error(w, "Counting not possible", 500)
return
Expand Down Expand Up @@ -563,18 +576,32 @@ func validRequest(next http.Handler) http.Handler {
})
}

func connectionCountMiddleware(next http.Handler, auth Authenticater, counter *connectionCount) http.Handler {
if counter == nil {
// isLongPollingRequest returns, if the request is a longpolling fallback
// request.
//
// This is the case, if it has the argument "longpolling" or if the body is
// multipart.
func isLongPollingRequest(r *http.Request) bool {
return r.URL.Query().Has("longpolling") || strings.HasPrefix(strings.ToLower(r.Header.Get("Content-Type")), "multipart/")
}

func connectionCountMiddleware(next http.Handler, auth Authenticater, counter [2]*ConnectionCount) http.Handler {
if counter[0] == nil {
return next
}

return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
uid := auth.FromContext(ctx)
counter.Add(uid)
count := counter[0]
if isLongPollingRequest(r) {
count = counter[1]
}

count.Add(uid)

defer func() {
counter.Done(uid)
count.Done(uid)
}()

next.ServeHTTP(w, r)
Expand Down
6 changes: 3 additions & 3 deletions internal/http/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func TestKeysHandler(t *testing.T) {
f: func() (func(ctx context.Context) (map[dskey.Key][]byte, error), bool) { return f, true },
}

ahttp.HandleAutoupdate(mux, fakeAuth(1), connecter, nil, nil)
ahttp.HandleAutoupdate(mux, fakeAuth(1), connecter, nil, [2]*ahttp.ConnectionCount{})

req := httptest.NewRequest("GET", "/system/autoupdate?k=user/1/username,user/2/username", nil).WithContext(ctx)
rec := httptest.NewRecorder()
Expand Down Expand Up @@ -98,7 +98,7 @@ func TestComplexHandler(t *testing.T) {
f: func() (func(ctx context.Context) (map[dskey.Key][]byte, error), bool) { return f, true },
}

ahttp.HandleAutoupdate(mux, fakeAuth(1), connecter, nil, nil)
ahttp.HandleAutoupdate(mux, fakeAuth(1), connecter, nil, [2]*ahttp.ConnectionCount{})

req := httptest.NewRequest(
"GET",
Expand Down Expand Up @@ -153,7 +153,7 @@ func TestErrors(t *testing.T) {
f: func() (func(ctx context.Context) (map[dskey.Key][]byte, error), bool) { return f, true },
}

ahttp.HandleAutoupdate(mux, fakeAuth(1), connecter, nil, nil)
ahttp.HandleAutoupdate(mux, fakeAuth(1), connecter, nil, [2]*ahttp.ConnectionCount{})

for _, tt := range []struct {
name string
Expand Down
Loading