Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 16 additions & 4 deletions cmd/goatak_server/admin_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func NewAdminAPI(app *App, addr string, webtakRoot string) *AdminAPI {

engine.Delims("[[", "]]")

api.f = fiber.New(fiber.Config{EnablePrintRoutes: false, DisableStartupMessage: true, Views: engine})
api.f = fiber.New(fiber.Config{EnablePrintRoutes: false, DisableStartupMessage: true, Views: engine, BodyLimit: 64 * 1024 * 1024})

api.f.Use(log.NewFiberLogger(&log.LoggerConfig{Name: "admin_api", Level: slog.LevelDebug}))

Expand Down Expand Up @@ -71,9 +71,12 @@ func NewAdminAPI(app *App, addr string, webtakRoot string) *AdminAPI {
api.f.Get("/mission", getAllMissionHandler(app))
}

api.f.All("/webtak", webTakPathHandler())
if webtakRoot != "" {
api.f.Static("/webtak", webtakRoot)

addMartiRoutes(app, api.f)
} else {
staticfiles.EmbedWebTak(api.f)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't want to use /webtak handle at all if it's not a webtacRoot

addMartiRoutes(app, api.f)
}

Expand All @@ -83,6 +86,15 @@ func NewAdminAPI(app *App, addr string, webtakRoot string) *AdminAPI {
return api
}

func webTakPathHandler() fiber.Handler {
return func(c *fiber.Ctx) error {
if c.Path() == "/webtak" {
return c.Redirect("/webtak/", http.StatusMovedPermanently)
}
return c.Next()
}
}

func (api *AdminAPI) Address() string {
return api.addr
}
Expand Down Expand Up @@ -224,7 +236,7 @@ func getConnHandler(app *App) fiber.Handler {
Uids: ch.GetUids(),
User: ch.GetUser().GetLogin(),
Ver: ch.GetVersion(),
Addr: ch.GetName(),
Addr: ch.GetIdentifier(),
Scope: ch.GetUser().GetScope(),
LastSeen: ch.GetLastSeen(),
}
Expand Down Expand Up @@ -370,7 +382,7 @@ func getTakWsHandler(app *App) fiber.Handler {
app.AddClientHandler(w)
w.Listen()
app.logger.Info("ws disconnected")
app.RemoveClientHandler(w.GetName())
app.RemoveClientHandler(w.GetIdentifier())
})
}

Expand Down
53 changes: 53 additions & 0 deletions cmd/goatak_server/federationclient.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package main

import (
"context"
"fmt"
"github.com/kdudkov/goatak/internal/client"
"log/slog"
"sync"
"time"
)

// ConnectToFedServer 这是Fed服务器,只应该从服务器获取信息,但不要发送任何信息过去
func (app *App) ConnectToFedServer(ctx context.Context, fed *FedConfig) {
for ctx.Err() == nil {
addr := fmt.Sprintf("%s:%d:%s", fed.Host, fed.Port, fed.Proto) // localhost:8087:tcp
conn, err := app.connect(addr)
if err != nil {
app.logger.Error("Fed Server connect error", slog.Any("error", err))
time.Sleep(time.Second * 5)
continue
}

fedName := fmt.Sprintf("fed_%s:%v", fed.Host, fed.Port)
app.logger.Info(fmt.Sprintf("Federation to %s connected", fedName))

wg := &sync.WaitGroup{}
wg.Add(1)

h := client.NewConnClientHandler(addr, conn, &client.HandlerConfig{
MessageCb: app.NewCotMessage,
RemoveCb: func(ch client.ClientHandler) {
wg.Done()
app.handlers.Delete(addr)
app.logger.Info("disconnected")
},
NewContactCb: app.NewContactCb,
Name: fedName,
DisableSend: true,
IsClient: true,
UID: app.uid,
})

go h.Start()
app.AddClientHandler(h)

wg.Wait()
}
}

func (app *App) connToFed(ctx context.Context, fed *FedConfig) error {
app.ConnectToFedServer(ctx, fed)
return nil
}
58 changes: 58 additions & 0 deletions cmd/goatak_server/federationserver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package main

import (
"context"
"fmt"
"github.com/kdudkov/goatak/internal/client"
"log/slog"
"net"
"strings"
)

// fed server 作为服务器,主要工作是为客户端提供数据,但客户端不会向服务端传输数据,如有此需求应当配置双向连接

func (app *App) ListenTcpFed(ctx context.Context, addr string) (err error) {
app.logger.Info("listening TCP Federation at " + addr)
defer func() {
if r := recover(); r != nil {
app.logger.Error("panic in ListenTCP", slog.Any("error", r))
}
}()

listener, err := net.Listen("tcp", addr)
if err != nil {
app.logger.Error("Failed to listen", slog.Any("error", err))

return err
}

defer listener.Close()

for ctx.Err() == nil {
conn, err := listener.Accept()
if err != nil {
app.logger.Error("Unable to accept connections", slog.Any("error", err))

return err
}

remoteAddr := conn.RemoteAddr().String()
localAddr := conn.LocalAddr().String()
app.logger.Info("TCP Federation connection from " + remoteAddr)
h := client.NewConnClientHandler(
conn.RemoteAddr().Network()+"_"+remoteAddr,
conn, &client.HandlerConfig{
// 创建一个处理客户端请求的功能,不要接收来自它的消息,但需要把消息发给它
MessageCb: app.DummyHandler,
RemoveCb: app.RemoveHandlerCb,
NewContactCb: app.NewContactCb,
DropMetric: dropMetric,
DisableRecv: true,
Name: fmt.Sprintf("fed_%s:%v", strings.Split(remoteAddr, ":")[0], strings.Split(localAddr, ":")[1]),
})
app.AddClientHandler(h)
h.Start()
}

return nil
}
127 changes: 114 additions & 13 deletions cmd/goatak_server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,23 @@ var (
lastSeenOfflineTimeout = time.Minute * 5
)

type FedConfig struct {
Host string `mapstructure:"host"`
Proto string `mapstructure:"proto"`
Port int `mapstructure:"port"`
Name string `mapstructure:"name"`
}

type AppConfig struct {
udpAddr string
tcpAddr string
adminAddr string
apiAddr string
certAddr string
tlsAddr string
udpAddr string
tcpAddr string
tcpFedAddr string
adminAddr string
apiAddr string
certAddr string
tlsAddr string

feds *[]FedConfig

usersFile string

Expand Down Expand Up @@ -168,6 +178,25 @@ func (app *App) Run() {
}()
}

if app.config.tcpFedAddr != "" {
go func() {
if err := app.ListenTcpFed(ctx, app.config.tcpFedAddr); err != nil {
panic(err)
}
}()
}

// 这里配置连接到fed服务器的逻辑
if app.config.feds != nil {
for _, fed := range *app.config.feds {
go func() {
if err := app.connToFed(ctx, &fed); err != nil {
panic(err)
}
}()
}
}

if app.config.tlsCert != nil && app.config.tlsAddr != "" {
go func() {
if err := app.listenTLS(ctx, app.config.tlsAddr); err != nil {
Expand All @@ -193,6 +222,8 @@ func (app *App) Run() {
cancel()
}

func (app *App) DummyHandler(msg *cot.CotMessage) {}

func (app *App) NewCotMessage(msg *cot.CotMessage) {
if msg != nil {
t := msg.GetType()
Expand All @@ -212,7 +243,7 @@ func (app *App) NewCotMessage(msg *cot.CotMessage) {
}

func (app *App) AddClientHandler(ch client.ClientHandler) {
app.handlers.Store(ch.GetName(), ch)
app.handlers.Store(ch.GetIdentifier(), ch)
connectionsMetric.With(prometheus.Labels{"scope": ch.GetUser().GetScope()}).Inc()
}

Expand All @@ -227,13 +258,12 @@ func (app *App) RemoveClientHandler(name string) {
func (app *App) ForAllClients(f func(ch client.ClientHandler) bool) {
app.handlers.Range(func(_, value any) bool {
h := value.(client.ClientHandler)

return f(h)
})
}

func (app *App) RemoveHandlerCb(cl client.ClientHandler) {
app.RemoveClientHandler(cl.GetName())
app.RemoveClientHandler(cl.GetIdentifier())

for uid := range cl.GetUids() {
if c := app.items.Get(uid); c != nil {
Expand Down Expand Up @@ -461,40 +491,96 @@ func (app *App) cleanOldUnits() {
}
}

var msgLRU = NewLRUCache[cot.CotMessage](2 << 10)

func mayCauseBroadCastStorm(msg *cot.CotMessage) bool {
//fmt.Printf("msg [%s@%s], send time = %v", msg.GetType(), msg.GetUID(), msg.GetSendTime())
if cachedMsg, ok := msgLRU.get(msg.GetUID()); !ok {
// 这里设定了map的最大限制,不用担心内存泄露
msgLRU.put(msg.GetUID(), msg)
} else {
// 如果消息间隔小于1s,则进行二次判断确认是否转发
if msg.GetSendTime().Sub(cachedMsg.GetSendTime()) < time.Second {
lat, lon := msg.GetLatLon()
clat, clon := cachedMsg.GetLatLon()
if lat == clat && lon == clon {
// 如果此消息的经纬度与上次转发的消息相同,判定为重复,屏蔽转发
slog.Warn(fmt.Sprintf("drop message [%s@%s] to avoid broadcast storm",
msg.TakMessage.CotEvent.Type, msg.TakMessage.CotEvent.Uid))
return true
}
}
// 如果允许发送,要更新cot状态
msgLRU.put(msg.GetUID(), msg)
}
return false
}

func (app *App) sendBroadcast(msg *cot.CotMessage) {
if mayCauseBroadCastStorm(msg) {
return
}
app.ForAllClients(func(ch client.ClientHandler) bool {
if ch.GetName() != msg.From {
// 需要判断是否允许向当前接口发送消息,以及是否是消息来源
if (ch.CanSend() || msg.IsPing() || msg.IsControl()) &&
ch.GetName() != msg.From {
if err := ch.SendMsg(msg); err != nil {
app.logger.Error(fmt.Sprintf("error sending to %s: %v", ch.GetName(), err))
}
}

return true
})
}

func (app *App) sendToCallsign(callsign string, msg *cot.CotMessage) {
if mayCauseBroadCastStorm(msg) {
return
}

app.ForAllClients(func(ch client.ClientHandler) bool {
if !ch.CanSend() && !msg.IsPing() && !msg.IsControl() {
return true
}

// 对 fed 服务器的链路使用无条件广播,确保信息能传出
if strings.HasPrefix(ch.GetName(), "fed_") && ch.GetName() != msg.From {
if err := ch.SendMsg(msg); err != nil {
app.logger.Error("send error", slog.Any("error", err))
}
return true
}

for _, c := range ch.GetUids() {
if c == callsign {
if err := ch.SendMsg(msg); err != nil {
app.logger.Error("send error", slog.Any("error", err))
}
}
}

return true
})
}

func (app *App) sendToUID(uid string, msg *cot.CotMessage) {
if mayCauseBroadCastStorm(msg) {
return
}

app.ForAllClients(func(ch client.ClientHandler) bool {
if !ch.CanSend() && !msg.IsPing() && !msg.IsControl() {
return true
}
if strings.HasPrefix(ch.GetName(), "fed_") && ch.GetName() != msg.From {
if err := ch.SendMsg(msg); err != nil {
app.logger.Error("send error", slog.Any("error", err))
}
return true
}
if ch.HasUID(uid) {
if err := ch.SendMsg(msg); err != nil {
app.logger.Error("send error", slog.Any("error", err))
}
}

return true
})
}
Expand Down Expand Up @@ -608,6 +694,7 @@ func main() {
config := &AppConfig{
udpAddr: viper.GetString("udp_addr"),
tcpAddr: viper.GetString("tcp_addr"),
tcpFedAddr: viper.GetString("tcp_fed_addr"),
adminAddr: viper.GetString("admin_addr"),
apiAddr: viper.GetString("api_addr"),
certAddr: viper.GetString("cert_addr"),
Expand All @@ -621,6 +708,20 @@ func main() {
webtakRoot: viper.GetString("webtak_root"),
certTTLDays: viper.GetInt("ssl.cert_ttl_days"),
dataSync: viper.GetBool("datasync"),
feds: &[]FedConfig{},
}

feds, ok := viper.Get("feds").([]interface{})
if ok && feds != nil {
for _, fed := range feds {
var fedConf FedConfig
if err := decodeMapToStruct(&fed, &fedConf); err != nil {
slog.Default().Error(err.Error())
}
*config.feds = append(*config.feds, fedConf)
}
} else {
slog.Default().Info("no feds found in configuration")
}

if err := processCerts(config); err != nil {
Expand Down
Loading