Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 52 additions & 29 deletions trunk/src/app/srs_app_listener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,25 +198,11 @@ srs_error_t SrsUdpListener::cycle()
return srs_error_wrap(err, "udp listener");
}

int nread = 0;
sockaddr_storage from;
int nb_from = sizeof(from);
if ((nread = srs_recvfrom(lfd, buf, nb_buf, (sockaddr*)&from, &nb_from, SRS_UTIME_NO_TIMEOUT)) <= 0) {
return srs_error_new(ERROR_SOCKET_READ, "udp read, nread=%d", nread);
}

// Drop UDP health check packet of Aliyun SLB.
// Healthcheck udp check
// @see https://help.aliyun.com/document_detail/27595.html
if (nread == 21 && buf[0] == 0x48 && buf[1] == 0x65 && buf[2] == 0x61 && buf[3] == 0x6c
&& buf[19] == 0x63 && buf[20] == 0x6b) {
continue;
if ((err = do_cycle()) != srs_success) {
srs_warn("%s listener: Ignore error, %s", label_.c_str(), srs_error_desc(err).c_str());
srs_freep(err);
}

if ((err = handler->on_udp_packet((const sockaddr*)&from, nb_from, buf, nread)) != srs_success) {
return srs_error_wrap(err, "handle packet %d bytes", nread);
}

if (SrsUdpPacketRecvCycleInterval > 0) {
srs_usleep(SrsUdpPacketRecvCycleInterval);
}
Expand All @@ -225,6 +211,31 @@ srs_error_t SrsUdpListener::cycle()
return err;
}

srs_error_t SrsUdpListener::do_cycle()
{
srs_error_t err = srs_success;

int nread = 0;
sockaddr_storage from;
int nb_from = sizeof(from);
if ((nread = srs_recvfrom(lfd, buf, nb_buf, (sockaddr *)&from, &nb_from, SRS_UTIME_NO_TIMEOUT)) <= 0) {
return srs_error_new(ERROR_SOCKET_READ, "udp read, nread=%d", nread);
}

// Drop UDP health check packet of Aliyun SLB.
// Healthcheck udp check
// @see https://help.aliyun.com/document_detail/27595.html
if (nread == 21 && buf[0] == 0x48 && buf[1] == 0x65 && buf[2] == 0x61 && buf[3] == 0x6c && buf[19] == 0x63 && buf[20] == 0x6b) {
return err;
}

if ((err = handler->on_udp_packet((const sockaddr *)&from, nb_from, buf, nread)) != srs_success) {
return srs_error_wrap(err, "handle packet %d bytes", nread);
}

return err;
}

SrsTcpListener::SrsTcpListener(ISrsTcpHandler* h)
{
handler = h;
Expand Down Expand Up @@ -303,24 +314,36 @@ srs_error_t SrsTcpListener::cycle()
if ((err = trd->pull()) != srs_success) {
return srs_error_wrap(err, "tcp listener");
}

srs_netfd_t fd = srs_accept(lfd, NULL, NULL, SRS_UTIME_NO_TIMEOUT);
if(fd == NULL){
return srs_error_new(ERROR_SOCKET_ACCEPT, "accept at fd=%d", srs_netfd_fileno(lfd));
}

if ((err = srs_fd_closeexec(srs_netfd_fileno(fd))) != srs_success) {
return srs_error_wrap(err, "set closeexec");
}

if ((err = handler->on_tcp_client(this, fd)) != srs_success) {
return srs_error_wrap(err, "handle fd=%d", srs_netfd_fileno(fd));

if ((err = do_cycle()) != srs_success) {
srs_warn("%s listener: Ignore error, %s", label_.c_str(), srs_error_desc(err).c_str());
srs_freep(err);
}
}

return err;
}

srs_error_t SrsTcpListener::do_cycle()
{
srs_error_t err = srs_success;

srs_netfd_t fd = srs_accept(lfd, NULL, NULL, SRS_UTIME_NO_TIMEOUT);
if (fd == NULL) {
return srs_error_new(ERROR_SOCKET_ACCEPT, "accept at fd=%d", srs_netfd_fileno(lfd));
}

if ((err = srs_fd_closeexec(srs_netfd_fileno(fd))) != srs_success) {
return srs_error_wrap(err, "set closeexec");
}

if ((err = handler->on_tcp_client(this, fd)) != srs_success) {
return srs_error_wrap(err, "handle fd=%d", srs_netfd_fileno(fd));
}

return err;
}

SrsMultipleTcpListeners::SrsMultipleTcpListeners(ISrsTcpHandler* h)
{
handler_ = h;
Expand Down
4 changes: 4 additions & 0 deletions trunk/src/app/srs_app_listener.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ class SrsUdpListener : public ISrsCoroutineHandler
// Interface ISrsReusableThreadHandler.
public:
virtual srs_error_t cycle();
private:
srs_error_t do_cycle();
};

// Bind and listen tcp port, use handler to process the client.
Expand Down Expand Up @@ -130,6 +132,8 @@ class SrsTcpListener : public ISrsCoroutineHandler, public ISrsListener
// Interface ISrsReusableThreadHandler.
public:
virtual srs_error_t cycle();
private:
srs_error_t do_cycle();
};

// Bind and listen tcp port, use handler to process the client.
Expand Down
114 changes: 114 additions & 0 deletions trunk/src/utest/srs_utest_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1574,3 +1574,117 @@ VOID TEST(ThreadCriticalTest, FailIfCloseActiveFD)
h.fd = NULL;
}

class MockFailedTcpHandler : public ISrsTcpHandler
{
public:
int connection_count;
public:
MockFailedTcpHandler() {
connection_count = 0;
}
virtual ~MockFailedTcpHandler() {
}
public:
virtual srs_error_t on_tcp_client(ISrsListener *listener, srs_netfd_t stfd)
{
srs_close_stfd(stfd);
connection_count++;
return srs_error_new(ERROR_SYSTEM_ASSERT_FAILED, "Intentional failure for testing error handling");
}
};

VOID TEST(TCPServerTest, ListenerContinuesToAcceptAfterError)
{
srs_error_t err;

MockFailedTcpHandler h;
SrsTcpListener l(&h);
l.set_endpoint(_srs_tmp_host, _srs_tmp_port);

HELPER_EXPECT_SUCCESS(l.listen());

// Simulate multiple failed connection attempts
const int num_attempts = 3;
for (int i = 0; i < num_attempts; i++)
{
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
ASSERT_TRUE(sockfd > 0);

struct sockaddr_in server_addr;
memset(&server_addr, 0, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = inet_addr(_srs_tmp_host.c_str());
server_addr.sin_port = htons(_srs_tmp_port);

int ret = connect(sockfd, (struct sockaddr *)&server_addr, sizeof(server_addr));
ASSERT_TRUE(ret == 0);

close(sockfd);
srs_usleep(10 * SRS_UTIME_MILLISECONDS); // Wait for listener to process the connection
}

EXPECT_EQ(h.connection_count, num_attempts);

l.close();

}

class MockFailedUdpHandler : public ISrsUdpHandler
{
public:
int packet_count;

public:
MockFailedUdpHandler()
{
packet_count = 0;
}
virtual ~MockFailedUdpHandler()
{
}

public:
virtual srs_error_t on_udp_packet(const sockaddr *from, const int fromlen, char *buf, int nb_buf)
{
packet_count++;
return srs_error_new(ERROR_SYSTEM_ASSERT_FAILED, "Intentional failure for testing UDP error handling");
}
};

VOID TEST(UDPServerTest, ListenerContinuesToReceiveAfterError)
{
srs_error_t err;

MockFailedUdpHandler h;
SrsUdpListener l(&h);
l.set_endpoint(_srs_tmp_host, _srs_tmp_port);

HELPER_EXPECT_SUCCESS(l.listen());

// Create a UDP socket to send packets to the listener
int sockfd = socket(AF_INET, SOCK_DGRAM, 0);
ASSERT_TRUE(sockfd > 0);

struct sockaddr_in server_addr;
memset(&server_addr, 0, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = inet_addr(_srs_tmp_host.c_str());
server_addr.sin_port = htons(_srs_tmp_port);

// Send multiple failed packet attempts
const int num_attempts = 3;
const char *test_data = "test data";
int data_len = strlen(test_data);
for (int i = 0; i < num_attempts; i++)
{
int ret = sendto(sockfd, test_data, data_len, 0, (struct sockaddr *)&server_addr, sizeof(server_addr));
ASSERT_TRUE(ret == data_len);

srs_usleep(10 * SRS_UTIME_MILLISECONDS); // Wait for listener to process the packet
}

EXPECT_EQ(h.packet_count, num_attempts);

close(sockfd);
l.close();
}
Loading