Skip to content

Commit

Permalink
add internal client id for use in hypothetical changes
Browse files Browse the repository at this point in the history
  • Loading branch information
wiedehopf committed Jan 10, 2025
1 parent f11a5b8 commit 722e325
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 7 deletions.
3 changes: 2 additions & 1 deletion gdb.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ mkdir -p /run/${instance}
chown readsb /run/${instance}
source /etc/default/${instance}

runuser -u readsb -- gdb -batch -ex 'set confirm off' -ex 'handle SIGTERM nostop print pass' -ex 'handle SIGINT nostop print pass' -ex run -ex 'bt full' --args /usr/bin/readsb --quiet $RECEIVER_OPTIONS $DECODER_OPTIONS $NET_OPTIONS $JSON_OPTIONS --write-json /run/${instance} $@
runuser -u readsb -- gdb -batch -ex 'set confirm off' -ex 'handle SIGTERM nostop print pass' -ex 'handle SIGINT nostop print pass' -ex run -ex 'bt full' \
--args /usr/bin/readsb --quiet $RECEIVER_OPTIONS $DECODER_OPTIONS $NET_OPTIONS $JSON_OPTIONS --write-json /run/${instance} $@
#/usr/bin/sudo -u readsb /usr/bin/readsb --quiet $RECEIVER_OPTIONS $DECODER_OPTIONS $NET_OPTIONS $JSON_OPTIONS --write-json /run/${instance} $@

83 changes: 77 additions & 6 deletions net_io.c
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,55 @@ static void setBuffers(int fd, int sndsize, int rcvsize) {
}
}

static struct client *getClientById(int32_t id) {
return Modes.clientsById[id];
}

static int32_t getClientId(struct client *c) {
//fprintf(stderr, "getClientId called, clientCount: %d clientsByIdAlloc: %d\n", (int) Modes.modesClientCount, (int) Modes.clientsByIdAlloc);

// due to the somewhat crude approach to find a free slot, keep plenty of slots free
if (Modes.modesClientCount * 2 > Modes.clientsByIdAlloc) {
if (0) {
getClientById(0);
for (int i = 0; i < Modes.clientsByIdAlloc; i++) {
fprintf(stderr, "%p\n", Modes.clientsById[i]);
}
}
int oldBytes = Modes.clientsByIdAlloc * sizeof(*Modes.clientsById);
int newSize = Modes.clientsByIdAlloc * 2;
int newBytes = newSize * sizeof(*Modes.clientsById);
int addedBytes = newBytes - oldBytes;
Modes.clientsById = realloc(Modes.clientsById, newBytes);
if (Modes.clientsById == NULL) {
fprintf(stderr, "FATAL: realloc clientsById: out of memory\n");
exit(1);
}

memset((char *)Modes.clientsById + oldBytes, 0, addedBytes);
Modes.clientsByIdAlloc = newSize;

}

int32_t stride = 16;
for (int k = 0; k < Modes.clientsByIdAlloc / stride; k++) {
int32_t startValue = ((random() % Modes.clientsByIdAlloc) / stride) * stride;
int32_t endValue = startValue + stride;
if (endValue > Modes.clientsByIdAlloc) {
endValue = Modes.clientsByIdAlloc;
}
for (int32_t i = startValue; i < endValue; i++) {
if (Modes.clientsById[i] == NULL) {
Modes.clientsById[i] = c;
return i;
}
}
}

fprintf(stderr, "warning: getClientId no available slots found, this should be rare!\n");
return -1;
}

// Create a client attached to the given service using the provided socket FD ... not a socket in some exceptions
static struct client *createSocketClient(struct net_service *service, int fd) {
struct client *c;
Expand All @@ -290,6 +339,11 @@ static struct client *createSocketClient(struct net_service *service, int fd) {
}
memset(c, 0, sizeof (struct client));

c->id = getClientId(c);
if (c->id < 0) {
sfree(c);
return NULL;
}
c->service = service;
c->fd = fd;
c->last_flush = now;
Expand Down Expand Up @@ -889,6 +943,11 @@ void modesInitNet(void) {

if (!Modes.net)
return;

Modes.clientsByIdAlloc = 32; // initial
Modes.clientsById = cmalloc(Modes.clientsByIdAlloc * sizeof(*Modes.clientsById));
memset(Modes.clientsById, 0, Modes.clientsByIdAlloc * sizeof(*Modes.clientsById));

struct net_service *beast_out;
struct net_service *beast_reduce_out;
struct net_service *garbage_out;
Expand Down Expand Up @@ -1135,19 +1194,30 @@ static void modesAcceptClients(struct client *c, int64_t now) {

if (Modes.modesClientCount > Modes.max_fds_net) {
// drop new modes clients if the count gets near resource limits
anetCloseSocket(c->fd);
anetCloseSocket(fd);
static int64_t antiSpam;
if (now > antiSpam) {
antiSpam = now + 30 * SECONDS;
fprintf(stderr, "<3> Can't accept new connection, limited to %d clients, consider increasing ulimit!\n", Modes.max_fds_net);
}
continue;
}

c = createSocketClient(s, fd);
if (s->unixSocket && c) {
if (!c) {
anetCloseSocket(fd);
static int64_t antiSpam;
if (now > antiSpam) {
antiSpam = now + 30 * SECONDS;
fprintf(stderr, "%s: Error accepting new connection (createSocketClient failed)\n", s->descr);
}
continue;
}

if (s->unixSocket) {
strcpy(c->host, s->unixSocket);
fprintf(stderr, "%s: new c at %s\n", c->service->descr, s->unixSocket);
} else if (c) {
} else {
// We created the client, save the sockaddr info and 'hostport'
getnameinfo(saddr, slen,
c->host, sizeof(c->host),
Expand All @@ -1161,9 +1231,6 @@ static void modesAcceptClients(struct client *c, int64_t now) {
}
if (anetTcpKeepAlive(Modes.aneterr, fd) != ANET_OK)
fprintf(stderr, "%s: Unable to set keepalive on connection from %s port %s (fd %d)\n", c->service->descr, c->host, c->port, fd);
} else {
fprintf(stderr, "%s: Fatal: createSocketClient shouldn't fail!\n", s->descr);
exit(1);
}


Expand Down Expand Up @@ -1240,6 +1307,8 @@ static void modesCloseClient(struct client *c) {
c->service = NULL;
c->modeac_requested = 0;

Modes.clientsById[c->id] = NULL;

if (Modes.mode_ac_auto)
autoset_modeac();
}
Expand Down Expand Up @@ -5742,6 +5811,8 @@ void cleanupNetwork(void) {
sfree(Modes.net_connectors);
sfree(Modes.net_events);

sfree(Modes.clientsById);

Modes.net_connectors_count = 0;

}
Expand Down
1 change: 1 addition & 0 deletions net_io.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ struct client
char *buf; // read buffer
char *som;
char *eod;
int32_t id;
int buflen; // Amount of data on read buffer
int bufmax; // size of the read buffer
int fd; // File descriptor
Expand Down
3 changes: 3 additions & 0 deletions readsb.h
Original file line number Diff line number Diff line change
Expand Up @@ -570,6 +570,9 @@ struct _Modes
pthread_mutex_t trackLock;
pthread_mutex_t outputLock;

struct client **clientsById;
int clientsByIdAlloc;

int max_fds;
int max_fds_api;
int max_fds_net;
Expand Down
3 changes: 3 additions & 0 deletions valgrind.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,8 @@ MEM="--show-leak-kinds=all --leak-check=full"
MEM="--show-leak-kinds=all --track-origins=yes --leak-check=full"
MEM=""

#runuser -u readsb -- gdb -batch -ex 'set confirm off' -ex 'handle SIGTERM nostop print pass' -ex 'handle SIGINT nostop print pass' -ex run -ex 'bt full' \
# --args /tmp/test123 --net --net-bo-port 1234 $@
#valgrind $MASSIF $FIRST $MEM /tmp/test123 --net --net-bo-port 1234 $@
valgrind $MASSIF $FIRST $MEM /tmp/test123 $RECEIVER_OPTIONS $DECODER_OPTIONS $NET_OPTIONS $JSON_OPTIONS --quiet --db-file=none $@

0 comments on commit 722e325

Please sign in to comment.