diff --git a/src/i2p.cpp b/src/i2p.cpp index 5a3dde54ced..05a5dde3966 100644 --- a/src/i2p.cpp +++ b/src/i2p.cpp @@ -12,6 +12,7 @@ #include #include #include +#include #include #include #include @@ -153,27 +154,59 @@ bool Session::Listen(Connection& conn) bool Session::Accept(Connection& conn) { - try { - while (!*m_interrupt) { - Sock::Event occurred; - if (!conn.sock->Wait(MAX_WAIT_FOR_IO, Sock::RECV, &occurred)) { - throw std::runtime_error("wait on socket failed"); - } + AssertLockNotHeld(m_mutex); - if (occurred == 0) { - // Timeout, no incoming connections or errors within MAX_WAIT_FOR_IO. - continue; - } + std::string errmsg; + bool disconnect{false}; - const std::string& peer_dest = - conn.sock->RecvUntilTerminator('\n', MAX_WAIT_FOR_IO, *m_interrupt, MAX_MSG_SIZE); + while (!*m_interrupt) { + Sock::Event occurred; + if (!conn.sock->Wait(MAX_WAIT_FOR_IO, Sock::RECV, &occurred)) { + errmsg = "wait on socket failed"; + break; + } - conn.peer = CService(DestB64ToAddr(peer_dest), I2P_SAM31_PORT); + if (occurred == 0) { + // Timeout, no incoming connections or errors within MAX_WAIT_FOR_IO. + continue; + } - return true; + std::string peer_dest; + try { + peer_dest = conn.sock->RecvUntilTerminator('\n', MAX_WAIT_FOR_IO, *m_interrupt, MAX_MSG_SIZE); + } catch (const std::runtime_error& e) { + errmsg = e.what(); + break; } - } catch (const std::runtime_error& e) { - Log("Error accepting: %s", e.what()); + + CNetAddr peer_addr; + try { + peer_addr = DestB64ToAddr(peer_dest); + } catch (const std::runtime_error& e) { + // The I2P router is expected to send the Base64 of the connecting peer, + // but it may happen that something like this is sent instead: + // STREAM STATUS RESULT=I2P_ERROR MESSAGE="Session was closed" + // In that case consider the session damaged and close it right away, + // even if the control socket is alive. + if (peer_dest.find("RESULT=I2P_ERROR") != std::string::npos) { + errmsg = strprintf("unexpected reply that hints the session is unusable: %s", peer_dest); + disconnect = true; + } else { + errmsg = e.what(); + } + break; + } + + conn.peer = CService(peer_addr, I2P_SAM31_PORT); + + return true; + } + + Log("Error accepting%s: %s", disconnect ? " (will close the session)" : "", errmsg); + if (disconnect) { + LOCK(m_mutex); + Disconnect(); + } else { CheckControlSock(); } return false; diff --git a/src/i2p.h b/src/i2p.h index cb9da64816f..375abaccfcc 100644 --- a/src/i2p.h +++ b/src/i2p.h @@ -105,7 +105,7 @@ class Session * completion the `peer` member will be set to the address of the incoming peer. * @return true on success */ - bool Accept(Connection& conn); + bool Accept(Connection& conn) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex); /** * Connect to an I2P peer. diff --git a/src/net.cpp b/src/net.cpp index fecb4205ffa..09a3d8617a7 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -2928,6 +2928,13 @@ void CConnman::ThreadI2PAcceptIncoming() bool advertising_listen_addr = false; i2p::Connection conn; + auto SleepOnFailure = [&]() { + interruptNet.sleep_for(err_wait); + if (err_wait < err_wait_cap) { + err_wait += 1s; + } + }; + while (!interruptNet) { if (!m_i2p_sam_session->Listen(conn)) { @@ -2935,12 +2942,7 @@ void CConnman::ThreadI2PAcceptIncoming() RemoveLocal(conn.me); advertising_listen_addr = false; } - - interruptNet.sleep_for(err_wait); - if (err_wait < err_wait_cap) { - err_wait *= 2; - } - + SleepOnFailure(); continue; } @@ -2950,11 +2952,14 @@ void CConnman::ThreadI2PAcceptIncoming() } if (!m_i2p_sam_session->Accept(conn)) { + SleepOnFailure(); continue; } CreateNodeFromAcceptedSocket(std::move(conn.sock), NetPermissionFlags::None, CAddress{conn.me, NODE_NONE}, CAddress{conn.peer, NODE_NONE}); + + err_wait = err_wait_begin; } } diff --git a/src/test/i2p_tests.cpp b/src/test/i2p_tests.cpp index b2e1ae43bed..5b8b0e92158 100644 --- a/src/test/i2p_tests.cpp +++ b/src/test/i2p_tests.cpp @@ -16,14 +16,34 @@ #include #include -BOOST_FIXTURE_TEST_SUITE(i2p_tests, BasicTestingSetup) +/// Save the log level and the value of CreateSock and restore them when the test ends. +class EnvTestingSetup : public BasicTestingSetup +{ +public: + explicit EnvTestingSetup(const ChainType chainType = ChainType::MAIN, + const std::vector& extra_args = {}) + : BasicTestingSetup{chainType, extra_args}, + m_prev_log_level{LogInstance().LogLevel()}, + m_create_sock_orig{CreateSock} + { + LogInstance().SetLogLevel(BCLog::Level::Trace); + } + + ~EnvTestingSetup() + { + CreateSock = m_create_sock_orig; + LogInstance().SetLogLevel(m_prev_log_level); + } + +private: + const BCLog::Level m_prev_log_level; + const std::function(const CService&)> m_create_sock_orig; +}; + +BOOST_FIXTURE_TEST_SUITE(i2p_tests, EnvTestingSetup) BOOST_AUTO_TEST_CASE(unlimited_recv) { - const auto prev_log_level{LogInstance().LogLevel()}; - LogInstance().SetLogLevel(BCLog::Level::Trace); - auto CreateSockOrig = CreateSock; - // Mock CreateSock() to create MockSock. CreateSock = [](const CService&) { return std::make_unique(std::string(i2p::sam::MAX_MSG_SIZE + 1, 'a')); @@ -40,9 +60,69 @@ BOOST_AUTO_TEST_CASE(unlimited_recv) bool proxy_error; BOOST_REQUIRE(!session.Connect(CService{}, conn, proxy_error)); } +} + +BOOST_AUTO_TEST_CASE(listen_ok_accept_fail) +{ + size_t num_sockets{0}; + CreateSock = [&num_sockets](const CService&) { + // clang-format off + ++num_sockets; + // First socket is the control socket for creating the session. + if (num_sockets == 1) { + return std::make_unique( + // reply to HELLO + "HELLO REPLY RESULT=OK VERSION=3.1\n" + // reply to DEST GENERATE + "DEST REPLY PUB=WnGOLXRBqHQhdVjFlWqRxJwz9hxx~2~wGc2Vplta1KhacY4tdEGodCF1WMWVapHEnDP2HHH~b~AZzZWmW1rUqFpxji10Qah0IXVYxZVqkcScM~Yccf9v8BnNlaZbWtSoWnGOLXRBqHQhdVjFlWqRxJwz9hxx~2~wGc2Vplta1KhacY4tdEGodCF1WMWVapHEnDP2HHH~b~AZzZWmW1rUqFpxji10Qah0IXVYxZVqkcScM~Yccf9v8BnNlaZbWtSoWnGOLXRBqHQhdVjFlWqRxJwz9hxx~2~wGc2Vplta1KhacY4tdEGodCF1WMWVapHEnDP2HHH~b~AZzZWmW1rUqFpxji10Qah0IXVYxZVqkcScM~Yccf9v8BnNlaZbWtSoWnGOLXRBqHQhdVjFlWqRxJwz9hxx~2~wGc2Vplta1KhacY4tdEGodCF1WMWVapHEnDP2HHH~b~AZzZWmW1rUqLE4SD-yjT48UNI7qiTUfIPiDitCoiTTz2cr4QGfw89rBQAEAAcAAA== PRIV=WnGOLXRBqHQhdVjFlWqRxJwz9hxx~2~wGc2Vplta1KhacY4tdEGodCF1WMWVapHEnDP2HHH~b~AZzZWmW1rUqFpxji10Qah0IXVYxZVqkcScM~Yccf9v8BnNlaZbWtSoWnGOLXRBqHQhdVjFlWqRxJwz9hxx~2~wGc2Vplta1KhacY4tdEGodCF1WMWVapHEnDP2HHH~b~AZzZWmW1rUqFpxji10Qah0IXVYxZVqkcScM~Yccf9v8BnNlaZbWtSoWnGOLXRBqHQhdVjFlWqRxJwz9hxx~2~wGc2Vplta1KhacY4tdEGodCF1WMWVapHEnDP2HHH~b~AZzZWmW1rUqFpxji10Qah0IXVYxZVqkcScM~Yccf9v8BnNlaZbWtSoWnGOLXRBqHQhdVjFlWqRxJwz9hxx~2~wGc2Vplta1KhacY4tdEGodCF1WMWVapHEnDP2HHH~b~AZzZWmW1rUqLE4SD-yjT48UNI7qiTUfIPiDitCoiTTz2cr4QGfw89rBQAEAAcAAOvuCIKTyv5f~1QgGq7XQl-IqBULTB5WzB3gw5yGPtd1p0AeoADrq1ccZggLPQ4ZLUsGK-HVw373rcTfvxrcuwenqVjiN4tbbYLWtP7xXGWj6fM6HyORhU63GphrjEePpMUHDHXd3o7pWGM-ieVVQSK~1MzF9P93pQWI3Do52EeNAayz4HbpPjNhVBzG1hUEFwznfPmUZBPuaOR4-uBm1NEWEuONlNOCctE4-U0Ukh94z-Qb55U5vXjR5G4apmBblr68t6Wm1TKlzpgFHzSqLryh3stWqrOKY1H0z9eZ2z1EkHFOpD5LyF6nf51e-lV7HLMl44TYzoEHK8RRVodtLcW9lacVdBpv~tOzlZERIiDziZODPETENZMz5oy9DQ7UUw==\n" + // reply to SESSION CREATE + "SESSION STATUS RESULT=OK\n" + // dummy to avoid reporting EOF on the socket + "a" + ); + } + // Subsequent sockets are for recreating the session or for listening and accepting incoming connections. + if (num_sockets % 2 == 0) { + // Replies to Listen() and Accept() + return std::make_unique( + // reply to HELLO + "HELLO REPLY RESULT=OK VERSION=3.1\n" + // reply to STREAM ACCEPT + "STREAM STATUS RESULT=OK\n" + // continued reply to STREAM ACCEPT, violating the protocol described at + // https://geti2p.net/en/docs/api/samv3#Accept%20Response + // should be base64, something like + // "IchV608baDoXbqzQKSqFDmTXPVgoDbPAhZJvNRXXxi4hyFXrTxtoOhdurNApKoUOZNc9WCgNs8CFkm81FdfGLiHIVetPG2g6F26s0CkqhQ5k1z1YKA2zwIWSbzUV18YuIchV608baDoXbqzQKSqFDmTXPVgoDbPAhZJvNRXXxi4hyFXrTxtoOhdurNApKoUOZNc9WCgNs8CFkm81FdfGLiHIVetPG2g6F26s0CkqhQ5k1z1YKA2zwIWSbzUV18YuIchV608baDoXbqzQKSqFDmTXPVgoDbPAhZJvNRXXxi4hyFXrTxtoOhdurNApKoUOZNc9WCgNs8CFkm81FdfGLiHIVetPG2g6F26s0CkqhQ5k1z1YKA2zwIWSbzUV18YuIchV608baDoXbqzQKSqFDmTXPVgoDbPAhZJvNRXXxi4hyFXrTxtoOhdurNApKoUOZNc9WCgNs8CFkm81FdfGLlSreVaCuCS5sdb-8ToWULWP7kt~lRPDeUNxQMq3cRSBBQAEAAcAAA==\n" + "STREAM STATUS RESULT=I2P_ERROR MESSAGE=\"Session was closed\"\n" + ); + } else { + // Another control socket, but without creating a destination (it is cached in the session). + return std::make_unique( + // reply to HELLO + "HELLO REPLY RESULT=OK VERSION=3.1\n" + // reply to SESSION CREATE + "SESSION STATUS RESULT=OK\n" + // dummy to avoid reporting EOF on the socket + "a" + ); + } + // clang-format on + }; - CreateSock = CreateSockOrig; - LogInstance().SetLogLevel(prev_log_level); + CThreadInterrupt interrupt; + i2p::sam::Session session(gArgs.GetDataDirNet() / "test_i2p_private_key", + CService{in6_addr(IN6ADDR_LOOPBACK_INIT), /*port=*/7656}, + &interrupt); + + i2p::Connection conn; + for (size_t i = 0; i < 5; ++i) { + ASSERT_DEBUG_LOG("Creating persistent SAM session"); + ASSERT_DEBUG_LOG("Persistent SAM session" /* ... created */); + ASSERT_DEBUG_LOG("Error accepting"); + ASSERT_DEBUG_LOG("Destroying SAM session"); + BOOST_REQUIRE(session.Listen(conn)); + BOOST_REQUIRE(!session.Accept(conn)); + } } BOOST_AUTO_TEST_SUITE_END()