Skip to content

Commit

Permalink
Go: Compose MAC addresses sent by multiple devices (#59)
Browse files Browse the repository at this point in the history
With this change you'll be able to send MAC addresses from multiple devices, because MAC addresses are accumulated in special asynchronous data structure: macs.SetTTL.

macs.SetTTL will keep unique mac addresses and delete them after specified amount of time (TTL aka time to live). If long-season will receive more MAC addresses, macs.SetTTL will reset TTL of MAC address that occurred again during update process.

TTL of single mac address can be configure with LS_SINGLE_ADDR_TTL environment variable. You can also modify mac addresses refresh time with LA_REFRESH_TIME environmental variable.
  • Loading branch information
thinkofher authored May 21, 2021
2 parents f727eaf + b668ef6 commit 185e8de
Show file tree
Hide file tree
Showing 6 changed files with 215 additions and 34 deletions.
11 changes: 6 additions & 5 deletions cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,12 @@ func main() {
}

ctx := context.Background()
macChannel, macDeamon := status.NewDaemon(
ctx,
factoryStorage.StatusIterator(),
factoryStorage.StatusTx(),
)
macChannel, macDeamon := status.NewDaemon(ctx, status.DaemonArgs{
Iter: factoryStorage.StatusIterator(),
Counters: factoryStorage.StatusTx(),
RefreshTime: config.RefreshTime,
SingleAddrTTL: config.SingleAddrTTL,
})

// CORS (Cross-Origin Resource Sharing) middleware that enables public
// access to GET/OPTIONS requests. Used to expose APIs to XHR consumers in
Expand Down
17 changes: 10 additions & 7 deletions pkg/models/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package models
import (
"encoding/gob"
"fmt"
"time"

"github.com/cristalhq/jwt/v3"
)
Expand Down Expand Up @@ -55,13 +56,15 @@ type DevicePublicData struct {
// Config represents configuration that is
// being used by server.
type Config struct {
Debug bool
Host string
Port string
DatabasePath string
JWTSecret string
UpdateSecret string
AppName string
Debug bool
Host string
Port string
DatabasePath string
JWTSecret string
UpdateSecret string
AppName string
RefreshTime time.Duration
SingleAddrTTL time.Duration
}

// Address returns address string that is compatible
Expand Down
41 changes: 34 additions & 7 deletions pkg/services/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package config

import (
"os"
"strconv"
"strings"
"time"

"github.com/hakierspejs/long-season/pkg/models"
)
Expand Down Expand Up @@ -31,20 +33,28 @@ const (

debugEnv = "LS_DEBUG"
defaultDebug = "0"

refreshTimeEnv = "LS_REFRESH_TIME"
defaultRefreshTime = time.Duration(60) // seconds

singleAddrTTLEnv = "LS_SINGLE_ADDR_TTL"
defaultSingleAddrTTL = time.Duration(60 * 5) // seconds
)

// Env returns pointer to models.Config which is
// parsed from environmental variables. Cannot be nil.
// Unset variables will be
func Env() *models.Config {
return &models.Config{
Debug: parseBoolEnv(DefaultEnv(debugEnv, defaultDebug)),
Host: DefaultEnv(hostEnv, defaultHost),
Port: DefaultEnv(portEnv, defaultPort),
DatabasePath: DefaultEnv(boltENV, defaultBoltDB),
JWTSecret: DefaultEnv(jwtSecretEnv, defaultJWTSecret),
UpdateSecret: DefaultEnv(updateSecretEnv, defaultUpdateSecret),
AppName: DefaultEnv(appNameEnv, defaultAppName),
Debug: parseBoolEnv(DefaultEnv(debugEnv, defaultDebug)),
Host: DefaultEnv(hostEnv, defaultHost),
Port: DefaultEnv(portEnv, defaultPort),
DatabasePath: DefaultEnv(boltENV, defaultBoltDB),
JWTSecret: DefaultEnv(jwtSecretEnv, defaultJWTSecret),
UpdateSecret: DefaultEnv(updateSecretEnv, defaultUpdateSecret),
AppName: DefaultEnv(appNameEnv, defaultAppName),
RefreshTime: time.Second * DefaultDurationEnv(refreshTimeEnv, defaultRefreshTime),
SingleAddrTTL: time.Second * DefaultDurationEnv(singleAddrTTLEnv, defaultSingleAddrTTL),
}
}

Expand All @@ -59,6 +69,23 @@ func DefaultEnv(key, fallback string) string {
return res
}

// DefaultIntEnv returns content of shell variable
// assigned to given key. If result is empty or
// parsing process failed, returns fallback value.
func DefaultDurationEnv(key string, fallback time.Duration) time.Duration {
res := os.Getenv(key)
if res == "" {
return fallback
}

parsed, err := strconv.ParseInt(res, 10, 64)
if err != nil {
return fallback
}

return time.Duration(parsed)
}

func parseBoolEnv(env string) bool {
return !(env == "" || env == "0" || strings.ToLower(env) == "false")
}
3 changes: 3 additions & 0 deletions pkg/services/macs/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
// Package macs contains functions, procedures and data types
// for manipulating hardware addresses.
package macs
130 changes: 130 additions & 0 deletions pkg/services/macs/macs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package macs

import (
"context"
"net"
"time"
)

// SetTTL is set for mac addresses with special daemon running
// in the background that will delete given mac addresses after
// specified amount of time (TTL).
//
// SetTTL is completely thread safe. Probably.
type SetTTL struct {
m map[string]*time.Timer
toAdd chan setItem
toDel chan string
retrieveSignal chan struct{}
macSlice chan []net.HardwareAddr
}

type setItem struct {
value string
ttl time.Duration
}

// NewSetTTL returns initialised pointer to SetTTL.
func NewSetTTL(ctx context.Context) *SetTTL {
res := &SetTTL{
m: map[string]*time.Timer{},
toAdd: make(chan setItem),
toDel: make(chan string),
retrieveSignal: make(chan struct{}),
macSlice: make(chan []net.HardwareAddr),
}

// start daemon in new goroutine
go res.daemon(ctx)

return res
}

// Push adds given HardwareAddr to set and setup its TTL.
// If given HardwareAddr is already in the set, it's reset
// its TTL to given amount of time duration.
func (s *SetTTL) Push(addr net.HardwareAddr, ttl time.Duration) {
s.toAdd <- setItem{
value: string(addr),
ttl: ttl,
}
}

// Slice returns slice of current Hardware addresses.
func (s *SetTTL) Slice() []net.HardwareAddr {
s.retrieveSignal <- struct{}{}
return <-s.macSlice
}

func delMac(val string, c chan string) func() {
return func() {
c <- val
}
}

// daemon runs infinite loop that will end when
// given context will be done.
func (s *SetTTL) daemon(ctx context.Context) {
// oh boy, here starts fun: infinite select loop
// with different branches
for {
// lets select from multiple channels
// attached to given set, this way
// we can ensure that everything
// will be synced together
select {
case newMac := <-s.toAdd:
// first scenario, client want to
// ad new mac address to set

// lets check if new mac address is already in the
// map
if timer, contains := s.m[newMac.value]; contains {
// if it is, reset timer with given ttl value
timer.Reset(newMac.ttl)
} else {
// if given mac address is not present
// at the map, lets create new timer
// that will send delete signal to our
// daemon
s.m[newMac.value] = time.AfterFunc(
newMac.ttl,
delMac(newMac.value, s.toDel),
)
}
case toDel := <-s.toDel:
// simple scenario: delete received mac
// from our map and go on
delete(s.m, toDel)
case <-s.retrieveSignal:
// we've just received retrieveSignal signal!
// lets allocate new slice that we will
// send to our client
res := make([]net.HardwareAddr, len(s.m), len(s.m))

// loop over collection of addresses.
index := 0
for k := range s.m {
// we have to cast every key to net.HardwareAddr
// we can omit net.ParseMAC, because SetTTL permits
// only net.HardwareAddr to Push, so we assume here
// that client properly parsed HardwareAddr
res[index] = net.HardwareAddr(k)
index += 1
}

// send result to client
s.macSlice <- res
case <-ctx.Done():
// context is Done, so we're closing channel and
// return to escape from loop
close(s.toAdd)
close(s.toDel)
close(s.retrieveSignal)
close(s.macSlice)
return
}

// there are no mutexes lol
}
}
47 changes: 32 additions & 15 deletions pkg/services/status/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,39 +6,56 @@ import (
"net"
"time"

"github.com/hakierspejs/long-season/pkg/services/macs"
"github.com/hakierspejs/long-season/pkg/storage"
)

// Daemon is function to be runned in the background.
// Daemon is a background function
type Daemon func()

// NewDeamon returns channel for communicating with deamon and deamon
// to be run in the background in the separate gourtine.
func NewDaemon(ctx context.Context,
iter storage.StatusIterator, counters storage.StatusTx,
) (chan<- []net.HardwareAddr, Daemon) {
// DaemonArgs contains list of arguments for NewDeamon constructor.
type DaemonArgs struct {
Iter storage.StatusIterator

Counters storage.StatusTx

// RefreshTime is duration, that every time when passes, users
// get their online status updated.
RefreshTime time.Duration

// SingleAddrTTL represents time to live for single
// mac address. After this period of time user with
// given mac address will be marked as offline
// during next status update.
SingleAddrTTL time.Duration
}

// NewDeamon returns channel for communicating with daemon and daemon
// to be run in the background in the separate gourtine .
func NewDaemon(ctx context.Context, args DaemonArgs) (chan<- []net.HardwareAddr, Daemon) {
ch := make(chan []net.HardwareAddr)

daemon := func() {
// Slice with newest mac addresse
macs := []net.HardwareAddr{}
macs := macs.NewSetTTL(ctx)

// TODO(thinkofher) make time period configurable
ticker := time.NewTicker(time.Minute)
// Update users every t, t = args.RefreshTime
ticker := time.NewTicker(args.RefreshTime)

for {
select {
case <-ctx.Done():
break
case newMacs := <-ch: // Update mac addresses
log.Println("Received new macs")
macs = newMacs
case <-ticker.C: // Update users every minute with newest mac addresses
for _, newMac := range newMacs {
macs.Push(newMac, args.SingleAddrTTL)
}
case <-ticker.C:
// Update online status for every user in db
err := storage.UpdateStatuses(ctx, storage.UpdateStatusesArgs{
Addresses: macs,
Iter: iter,
Counters: counters,
Addresses: macs.Slice(),
Iter: args.Iter,
Counters: args.Counters,
})
if err != nil {
log.Println("Failed to update statuses, reason: ", err.Error())
Expand Down

0 comments on commit 185e8de

Please sign in to comment.