From f9d7bdde5565082073905f17291880f05fe94fd7 Mon Sep 17 00:00:00 2001 From: Matthias Wirth Date: Fri, 13 Dec 2024 14:51:41 +0000 Subject: [PATCH] add internal client id for use in hypothetical changes --- gdb.sh | 3 +- net_io.c | 82 +++++++++++++++++++++++++++++++++++++++++++++++++---- net_io.h | 1 + readsb.h | 3 ++ valgrind.sh | 3 ++ 5 files changed, 85 insertions(+), 7 deletions(-) diff --git a/gdb.sh b/gdb.sh index 89b864f7..acc2e12f 100755 --- a/gdb.sh +++ b/gdb.sh @@ -6,6 +6,7 @@ mkdir -p /run/${instance} chown readsb /run/${instance} source /etc/default/${instance} -runuser -u readsb -- gdb -batch -ex 'set confirm off' -ex 'handle SIGTERM nostop print pass' -ex 'handle SIGINT nostop print pass' -ex run -ex 'bt full' --args /usr/bin/readsb --quiet $RECEIVER_OPTIONS $DECODER_OPTIONS $NET_OPTIONS $JSON_OPTIONS --write-json /run/${instance} $@ +runuser -u readsb -- gdb -batch -ex 'set confirm off' -ex 'handle SIGTERM nostop print pass' -ex 'handle SIGINT nostop print pass' -ex run -ex 'bt full' \ + --args /usr/bin/readsb --quiet $RECEIVER_OPTIONS $DECODER_OPTIONS $NET_OPTIONS $JSON_OPTIONS --write-json /run/${instance} $@ #/usr/bin/sudo -u readsb /usr/bin/readsb --quiet $RECEIVER_OPTIONS $DECODER_OPTIONS $NET_OPTIONS $JSON_OPTIONS --write-json /run/${instance} $@ diff --git a/net_io.c b/net_io.c index c926df2b..c47ad9f2 100644 --- a/net_io.c +++ b/net_io.c @@ -275,6 +275,54 @@ static void setBuffers(int fd, int sndsize, int rcvsize) { } } +static struct client *getClientById(int32_t id) { + return Modes.clientsById[id]; +} + +static int32_t getClientId(struct client *c) { + //fprintf(stderr, "getClientId called, clientCount: %d clientsByIdAlloc: %d\n", (int) Modes.modesClientCount, (int) Modes.clientsByIdAlloc); + + if (Modes.modesClientCount * 2 > Modes.clientsByIdAlloc) { + if (0) { + getClientById(0); + for (int i = 0; i < Modes.clientsByIdAlloc; i++) { + fprintf(stderr, "%p\n", Modes.clientsById[i]); + } + } + int oldBytes = Modes.clientsByIdAlloc * sizeof(*Modes.clientsById); + int newSize = Modes.clientsByIdAlloc * 2; + int newBytes = newSize * sizeof(*Modes.clientsById); + int addedBytes = newBytes - oldBytes; + Modes.clientsById = realloc(Modes.clientsById, newBytes); + if (Modes.clientsById == NULL) { + fprintf(stderr, "FATAL: realloc clientsById: out of memory\n"); + exit(1); + } + + memset((char *)Modes.clientsById + oldBytes, 0, addedBytes); + Modes.clientsByIdAlloc = newSize; + + } + + int32_t stride = 16; + for (int k = 0; k < Modes.clientsByIdAlloc / stride; k++) { + int32_t startValue = ((random() % Modes.clientsByIdAlloc) / stride) * stride; + int32_t endValue = startValue + stride; + if (endValue > Modes.clientsByIdAlloc) { + endValue = Modes.clientsByIdAlloc; + } + for (int32_t i = startValue; i < endValue; i++) { + if (Modes.clientsById[i] == NULL) { + Modes.clientsById[i] = c; + return i; + } + } + } + + fprintf(stderr, "warning: getClientId no available slots found, this should be rare!\n"); + return -1; +} + // Create a client attached to the given service using the provided socket FD ... not a socket in some exceptions static struct client *createSocketClient(struct net_service *service, int fd) { struct client *c; @@ -290,6 +338,11 @@ static struct client *createSocketClient(struct net_service *service, int fd) { } memset(c, 0, sizeof (struct client)); + c->id = getClientId(c); + if (c->id < 0) { + sfree(c); + return NULL; + } c->service = service; c->fd = fd; c->last_flush = now; @@ -889,6 +942,11 @@ void modesInitNet(void) { if (!Modes.net) return; + + Modes.clientsByIdAlloc = 32; // initial + Modes.clientsById = cmalloc(Modes.clientsByIdAlloc * sizeof(*Modes.clientsById)); + memset(Modes.clientsById, 0, Modes.clientsByIdAlloc * sizeof(*Modes.clientsById)); + struct net_service *beast_out; struct net_service *beast_reduce_out; struct net_service *garbage_out; @@ -1135,19 +1193,30 @@ static void modesAcceptClients(struct client *c, int64_t now) { if (Modes.modesClientCount > Modes.max_fds_net) { // drop new modes clients if the count gets near resource limits - anetCloseSocket(c->fd); + anetCloseSocket(fd); static int64_t antiSpam; if (now > antiSpam) { antiSpam = now + 30 * SECONDS; fprintf(stderr, "<3> Can't accept new connection, limited to %d clients, consider increasing ulimit!\n", Modes.max_fds_net); } + continue; } c = createSocketClient(s, fd); - if (s->unixSocket && c) { + if (!c) { + anetCloseSocket(fd); + static int64_t antiSpam; + if (now > antiSpam) { + antiSpam = now + 30 * SECONDS; + fprintf(stderr, "%s: Error accepting new connection (createSocketClient failed)\n", s->descr); + } + continue; + } + + if (s->unixSocket) { strcpy(c->host, s->unixSocket); fprintf(stderr, "%s: new c at %s\n", c->service->descr, s->unixSocket); - } else if (c) { + } else { // We created the client, save the sockaddr info and 'hostport' getnameinfo(saddr, slen, c->host, sizeof(c->host), @@ -1161,9 +1230,6 @@ static void modesAcceptClients(struct client *c, int64_t now) { } if (anetTcpKeepAlive(Modes.aneterr, fd) != ANET_OK) fprintf(stderr, "%s: Unable to set keepalive on connection from %s port %s (fd %d)\n", c->service->descr, c->host, c->port, fd); - } else { - fprintf(stderr, "%s: Fatal: createSocketClient shouldn't fail!\n", s->descr); - exit(1); } @@ -1240,6 +1306,8 @@ static void modesCloseClient(struct client *c) { c->service = NULL; c->modeac_requested = 0; + Modes.clientsById[c->id] = NULL; + if (Modes.mode_ac_auto) autoset_modeac(); } @@ -5732,6 +5800,8 @@ void cleanupNetwork(void) { sfree(Modes.net_connectors); sfree(Modes.net_events); + sfree(Modes.clientsById); + Modes.net_connectors_count = 0; } diff --git a/net_io.h b/net_io.h index 7eb5e92f..2ec34e7c 100644 --- a/net_io.h +++ b/net_io.h @@ -95,6 +95,7 @@ struct client char *buf; // read buffer char *som; char *eod; + int32_t id; int buflen; // Amount of data on read buffer int bufmax; // size of the read buffer int fd; // File descriptor diff --git a/readsb.h b/readsb.h index ad7d739c..358d0b44 100644 --- a/readsb.h +++ b/readsb.h @@ -570,6 +570,9 @@ struct _Modes pthread_mutex_t trackLock; pthread_mutex_t outputLock; + struct client **clientsById; + int clientsByIdAlloc; + int max_fds; int max_fds_api; int max_fds_net; diff --git a/valgrind.sh b/valgrind.sh index 551f1f3b..7911f616 100755 --- a/valgrind.sh +++ b/valgrind.sh @@ -14,5 +14,8 @@ MEM="--show-leak-kinds=all --leak-check=full" MEM="--show-leak-kinds=all --track-origins=yes --leak-check=full" MEM="" +#runuser -u readsb -- gdb -batch -ex 'set confirm off' -ex 'handle SIGTERM nostop print pass' -ex 'handle SIGINT nostop print pass' -ex run -ex 'bt full' \ +# --args /tmp/test123 --net --net-bo-port 1234 $@ +#valgrind $MASSIF $FIRST $MEM /tmp/test123 --net --net-bo-port 1234 $@ valgrind $MASSIF $FIRST $MEM /tmp/test123 $RECEIVER_OPTIONS $DECODER_OPTIONS $NET_OPTIONS $JSON_OPTIONS --quiet --db-file=none $@