diff --git a/include/dpp/discordvoiceclient.h b/include/dpp/discordvoiceclient.h index e35569548a..d79b14954a 100644 --- a/include/dpp/discordvoiceclient.h +++ b/include/dpp/discordvoiceclient.h @@ -549,6 +549,13 @@ class DPP_EXPORT discord_voice_client : public websocket_client dave_version_t dave_version; /** + * @brief Our public IP address + */ + static std::string external_ip; + + + + /** * @brief Send data to UDP socket immediately. * * @param data data to send diff --git a/src/dpp/discordvoiceclient.cpp b/src/dpp/discordvoiceclient.cpp index c92eeaf5bd..417a96ec5c 100644 --- a/src/dpp/discordvoiceclient.cpp +++ b/src/dpp/discordvoiceclient.cpp @@ -31,7 +31,6 @@ #endif #include #include -#include #include #include #include @@ -47,11 +46,6 @@ namespace dpp { -/** - * @brief Our public IP address - */ -static std::string external_ip; - moving_averager::moving_averager(uint64_t collection_count_new) { collectionCount = collection_count_new; } @@ -77,195 +71,13 @@ moving_averager::operator float() { } } -/** - * @brief Represents an RTP packet. Size should always be exactly 12. - */ -struct rtp_header { - uint16_t constant; - uint16_t sequence; - uint32_t timestamp; - uint32_t ssrc; - - rtp_header(uint16_t _seq, uint32_t _ts, uint32_t _ssrc) : constant(htons(0x8078)), sequence(htons(_seq)), timestamp(htonl(_ts)), ssrc(htonl(_ssrc)) { - } -}; - bool discord_voice_client::sodium_initialised = false; -void discord_voice_client::voice_courier_loop(discord_voice_client& client, courier_shared_state_t& shared_state) { -#ifdef HAVE_VOICE - utility::set_thread_name(std::string("vcourier/") + std::to_string(client.server_id)); - while (true) { - std::this_thread::sleep_for(std::chrono::milliseconds{client.iteration_interval}); - - struct flush_data_t { - snowflake user_id; - rtp_seq_t min_seq; - std::priority_queue parked_payloads; - std::vector> pending_decoder_ctls; - std::shared_ptr decoder; - }; - std::vector flush_data; - - /* - * Transport the payloads onto this thread, and - * release the lock as soon as possible. - */ - { - std::unique_lock lk(shared_state.mtx); - - /* mitigates vector resizing while holding the mutex */ - flush_data.reserve(shared_state.parked_voice_payloads.size()); - - bool has_payload_to_deliver = false; - for (auto& [user_id, parking_lot] : shared_state.parked_voice_payloads) { - has_payload_to_deliver = has_payload_to_deliver || !parking_lot.parked_payloads.empty(); - flush_data.push_back(flush_data_t{user_id, - parking_lot.range.min_seq, - std::move(parking_lot.parked_payloads), - /* Quickly check if we already have a decoder and only take the pending ctls if so. */ - parking_lot.decoder ? std::move(parking_lot.pending_decoder_ctls) - : decltype(parking_lot.pending_decoder_ctls){}, - parking_lot.decoder}); - parking_lot.range.min_seq = parking_lot.range.max_seq + 1; - parking_lot.range.min_timestamp = parking_lot.range.max_timestamp + 1; - } - - if (!has_payload_to_deliver) { - if (shared_state.terminating) { - /* We have delivered all data to handlers. Terminate now. */ - break; - } - - shared_state.signal_iteration.wait(lk); - /* - * More data came or about to terminate, or just a spurious wake. - * We need to collect the payloads again to determine what to do next. - */ - continue; - } - } - - if (client.creator->on_voice_receive.empty() && client.creator->on_voice_receive_combined.empty()) { - /* - * We do this check late, to ensure this thread drains the data - * and prevents accumulating them even when there are no handlers. - */ - continue; - } - - /* This 32 bit PCM audio buffer is an upmixed version of the streams - * combined for all users. This is a wider width audio buffer so that - * there is no clipping when there are many loud audio sources at once. - */ - opus_int32 pcm_mix[23040] = { 0 }; - size_t park_count = 0; - int max_samples = 0; - int samples = 0; - - for (auto& d : flush_data) { - if (!d.decoder) { - continue; - } - for (const auto& decoder_ctl : d.pending_decoder_ctls) { - decoder_ctl(*d.decoder); - } - for (rtp_seq_t seq = d.min_seq; !d.parked_payloads.empty(); ++seq) { - opus_int16 pcm[23040]; - if (d.parked_payloads.top().seq != seq) { - /* - * Lost a packet with sequence number "seq", - * But Opus decoder might be able to guess something. - */ - if (int samples = opus_decode(d.decoder.get(), nullptr, 0, pcm, 5760, 0); - samples >= 0) { - /* - * Since this sample comes from a lost packet, - * we can only pretend there is an event, without any raw payload byte. - */ - voice_receive_t vr(nullptr, "", &client, d.user_id, reinterpret_cast(pcm), - samples * opus_channel_count * sizeof(opus_int16)); - - park_count = audio_mix(client, *client.mixer, pcm_mix, pcm, park_count, samples, max_samples); - client.creator->on_voice_receive.call(vr); - } - } else { - voice_receive_t& vr = *d.parked_payloads.top().vr; - if (vr.audio_data.size() > 0x7FFFFFFF) { - throw dpp::length_exception(err_massive_audio, "audio_data > 2GB! This should never happen!"); - } - if (samples = opus_decode(d.decoder.get(), vr.audio_data.data(), - static_cast(vr.audio_data.size() & 0x7FFFFFFF), pcm, 5760, 0); - samples >= 0) { - vr.reassign(&client, d.user_id, reinterpret_cast(pcm), - samples * opus_channel_count * sizeof(opus_int16)); - client.end_gain = 1.0f / client.moving_average; - park_count = audio_mix(client, *client.mixer, pcm_mix, pcm, park_count, samples, max_samples); - client.creator->on_voice_receive.call(vr); - } - - d.parked_payloads.pop(); - } - } - } - - /* If combined receive is bound, dispatch it */ - if (park_count) { - - /* Downsample the 32 bit samples back to 16 bit */ - opus_int16 pcm_downsample[23040] = { 0 }; - opus_int16* pcm_downsample_ptr = pcm_downsample; - opus_int32* pcm_mix_ptr = pcm_mix; - client.increment = (client.end_gain - client.current_gain) / static_cast(samples); - for (int64_t x = 0; x < (samples * opus_channel_count) / client.mixer->byte_blocks_per_register; ++x) { - client.mixer->collect_single_register(pcm_mix_ptr, pcm_downsample_ptr, client.current_gain, client.increment); - client.current_gain += client.increment * static_cast(client.mixer->byte_blocks_per_register); - pcm_mix_ptr += client.mixer->byte_blocks_per_register; - pcm_downsample_ptr += client.mixer->byte_blocks_per_register; - } - - voice_receive_t vr(nullptr, "", &client, 0, reinterpret_cast(pcm_downsample), - max_samples * opus_channel_count * sizeof(opus_int16)); - - client.creator->on_voice_receive_combined.call(vr); - } - } -#endif -} - discord_voice_client::~discord_voice_client() { cleanup(); } -void discord_voice_client::cleanup() -{ - if (runner) { - this->terminating = true; - runner->join(); - delete runner; - runner = nullptr; - } -#if HAVE_VOICE - if (encoder) { - opus_encoder_destroy(encoder); - encoder = nullptr; - } - if (repacketizer) { - opus_repacketizer_destroy(repacketizer); - repacketizer = nullptr; - } - if (voice_courier.joinable()) { - { - std::lock_guard lk(voice_courier_shared_state.mtx); - voice_courier_shared_state.terminating = true; - } - voice_courier_shared_state.signal_iteration.notify_one(); - voice_courier.join(); - } -#endif -} - bool discord_voice_client::is_ready() { return has_secret_key; } @@ -275,65 +87,6 @@ bool discord_voice_client::is_playing() { return (!this->outbuf.empty()); } -void discord_voice_client::thread_run() -{ -#ifdef HAVE_VOICE - - utility::set_thread_name(std::string("vc/") + std::to_string(server_id)); - - size_t times_looped = 0; - time_t last_loop_time = time(nullptr); - - do { - bool error = false; - ssl_client::read_loop(); - ssl_client::close(); - - time_t current_time = time(nullptr); - /* Here, we check if it's been longer than 3 seconds since the previous loop, - * this gives us time to see if it's an actual disconnect, or an error. - * This will prevent us from looping too much, meaning error codes do not cause an infinite loop. - */ - if (current_time - last_loop_time >= 3) - times_looped = 0; - - /* This does mean we'll always have times_looped at a minimum of 1, this is intended. */ - times_looped++; - /* If we've looped 5 or more times, abort the loop. */ - if (times_looped >= 5) { - log(dpp::ll_warning, "Reached max loops whilst attempting to read from the websocket. Aborting websocket."); - break; - } - - last_loop_time = current_time; - - if (!terminating) { - log(dpp::ll_debug, "Attempting to reconnect the websocket..."); - do { - try { - ssl_client::connect(); - websocket_client::connect(); - } - catch (const std::exception &e) { - log(dpp::ll_error, std::string("Error establishing voice websocket connection, retry in 5 seconds: ") + e.what()); - ssl_client::close(); - std::this_thread::sleep_for(std::chrono::seconds(5)); - error = true; - } - } while (error && !terminating); - } - } while(!terminating); -#endif -} - -void discord_voice_client::run() -{ -#ifdef HAVE_VOICE - this->runner = new std::thread(&discord_voice_client::thread_run, this); - this->thread_id = runner->native_handle(); -#endif -} - int discord_voice_client::udp_send(const char* data, size_t length) { sockaddr_in servaddr; @@ -394,379 +147,6 @@ bool discord_voice_client::is_end_to_end_encrypted() const { #endif } -bool discord_voice_client::handle_frame(const std::string &data, ws_opcode opcode) { -#ifdef HAVE_VOICE - json j; - - /** - * MLS frames come in as type OP_BINARY, we can also reply to them as type OP_BINARY. - */ - if (opcode == OP_BINARY && data.size() >= sizeof(dave_binary_header_t)) { - - auto* dave_header = reinterpret_cast(data.data()); - - switch (dave_header->opcode) { - case voice_client_dave_mls_external_sender: { - log(ll_debug, "voice_client_dave_mls_external_sender"); - - mls_state->dave_session->SetExternalSender(dave_header->get_data(data.length())); - - mls_state->encryptor = std::make_unique(); - mls_state->decryptors.clear(); - } - break; - case voice_client_dave_mls_proposals: { - log(ll_debug, "voice_client_dave_mls_proposals"); - - std::optional> response = mls_state->dave_session->ProcessProposals(dave_header->get_data(data.length()), dave_mls_user_list); - if (response.has_value()) { - auto r = response.value(); - mls_state->cached_commit = r; - r.insert(r.begin(), voice_client_dave_mls_commit_message); - this->write(std::string_view(reinterpret_cast(r.data()), r.size()), OP_BINARY); - } - } - break; - case voice_client_dave_announce_commit_transaction: { - log(ll_debug, "voice_client_dave_announce_commit_transaction"); - auto r = mls_state->dave_session->ProcessCommit(mls_state->cached_commit); - for (const auto& user : dave_mls_user_list) { - log(ll_debug, "Setting decryptor key ratchet for user: " + user + ", protocol version: " + std::to_string(mls_state->dave_session->GetProtocolVersion())); - dpp::snowflake u{user}; - mls_state->decryptors.emplace(u, std::make_unique()); - mls_state->decryptors.find(u)->second->TransitionToKeyRatchet(mls_state->dave_session->GetKeyRatchet(user)); - } - mls_state->encryptor->SetKeyRatchet(mls_state->dave_session->GetKeyRatchet(creator->me.id.str())); - - /** - * https://www.ietf.org/archive/id/draft-ietf-mls-protocol-14.html#name-epoch-authenticators - * 9.7. Epoch Authenticators - * The main MLS key schedule provides a per-epoch epoch_authenticator. If one member of the group is being impersonated by an active attacker, - * the epoch_authenticator computed by their client will differ from those computed by the other group members. - */ - mls_state->privacy_code = generate_displayable_code(mls_state->dave_session->GetLastEpochAuthenticator()); - log(ll_debug, "E2EE Privacy Code: " + mls_state->privacy_code); - } - break; - case voice_client_dave_mls_welcome: { - this->mls_state->transition_id = dave_header->get_welcome_transition_id(); - log(ll_debug, "voice_client_dave_mls_welcome with transition id " + std::to_string(this->mls_state->transition_id)); - auto r = mls_state->dave_session->ProcessWelcome(dave_header->get_welcome_data(data.length()), dave_mls_user_list); - if (r.has_value()) { - for (const auto& user : dave_mls_user_list) { - log(ll_debug, "Setting decryptor key ratchet for user: " + user + ", protocol version: " + std::to_string(mls_state->dave_session->GetProtocolVersion())); - dpp::snowflake u{user}; - mls_state->decryptors.emplace(u, std::make_unique()); - mls_state->decryptors.find(u)->second->TransitionToKeyRatchet(mls_state->dave_session->GetKeyRatchet(user)); - } - mls_state->encryptor->SetKeyRatchet(mls_state->dave_session->GetKeyRatchet(creator->me.id.str())); - } - mls_state->privacy_code = generate_displayable_code(mls_state->dave_session->GetLastEpochAuthenticator()); - log(ll_debug, "E2EE Privacy Code: " + mls_state->privacy_code); - } - break; - default: - log(ll_debug, "Unexpected DAVE frame opcode"); - log(dpp::ll_trace, "R: " + dpp::utility::debug_dump((uint8_t*)(data.data()), data.length())); - break; - } - - return true; - } - - try { - log(dpp::ll_trace, std::string("R: ") + data); - j = json::parse(data); - } - catch (const std::exception &e) { - log(dpp::ll_error, std::string("discord_voice_client::handle_frame ") + e.what() + ": " + data); - return true; - } - - if (j.find("seq") != j.end() && j["seq"].is_number()) { - /** - * Save the sequence number needed for heartbeat and resume payload. - * - * NOTE: Contrary to the documentation, discord does not seem to send messages with sequence number - * in order, should we only save the sequence if it's larger number? - */ - receive_sequence = j["seq"].get(); - } - - if (j.find("op") != j.end()) { - uint32_t op = j["op"]; - - switch (op) { - /* Ping acknowledgement */ - case voice_opcode_connection_heartbeat_ack: - /* These opcodes do not require a response or further action */ - break; - case voice_opcode_media_sink: - case voice_client_flags: { - } - break; - case voice_client_platform: { - voice_client_platform_t vcp(nullptr, data); - vcp.voice_client = this; - vcp.user_id = snowflake_not_null(&j["d"], "user_id"); - vcp.platform = static_cast(int8_not_null(&j["d"], "platform")); - creator->on_voice_client_platform.call(vcp); - - } - break; - case voice_opcode_multiple_clients_connect: { - dave_mls_user_list = j["d"]["user_ids"]; - log(ll_debug, "Number of clients in voice channel: " + std::to_string(dave_mls_user_list.size())); - } - break; - case voice_client_dave_mls_invalid_commit_welcome: { - this->mls_state->transition_id = j["d"]["transition_id"]; - log(ll_debug, "voice_client_dave_mls_invalid_commit_welcome transition id " + std::to_string(this->mls_state->transition_id)); - } - break; - case voice_client_dave_execute_transition: { - log(ll_debug, "voice_client_dave_execute_transition"); - this->mls_state->transition_id = j["d"]["transition_id"]; - json obj = { - { "op", voice_client_dave_transition_ready }, - { - "d", - { - { "transition_id", this->mls_state->transition_id }, - } - } - }; - this->write(obj.dump(-1, ' ', false, json::error_handler_t::replace), OP_TEXT); - } - break; - /* "The protocol only uses this opcode to indicate when a downgrade to protocol version 0 is upcoming." */ - case voice_client_dave_prepare_transition: { - uint64_t transition_id = j["d"]["transition_id"]; - uint64_t protocol_version = j["d"]["protocol_version"]; - log(ll_debug, "voice_client_dave_prepare_transition version=" + std::to_string(protocol_version) + " for transition " + std::to_string(transition_id)); - } - break; - case voice_client_dave_prepare_epoch: { - uint64_t protocol_version = j["d"]["protocol_version"]; - uint64_t epoch = j["d"]["epoch"]; - log(ll_debug, "voice_client_dave_prepare_epoch version=" + std::to_string(protocol_version) + " for epoch " + std::to_string(epoch)); - if (epoch == 1) { - mls_state->dave_session->Reset(); - mls_state->dave_session->Init(dave::MaxSupportedProtocolVersion(), channel_id, creator->me.id.str(), mls_state->mls_key); - } - } - break; - /* Client Disconnect */ - case voice_opcode_client_disconnect: { - if (j.find("d") != j.end() && j["d"].find("user_id") != j["d"].end() && !j["d"]["user_id"].is_null()) { - snowflake u_id = snowflake_not_null(&j["d"], "user_id"); - auto it = std::find_if(ssrc_map.begin(), ssrc_map.end(), - [&u_id](const auto & p) { return p.second == u_id; }); - - if (it != ssrc_map.end()) { - ssrc_map.erase(it); - } - - if (!creator->on_voice_client_disconnect.empty()) { - voice_client_disconnect_t vcd(nullptr, data); - vcd.voice_client = this; - vcd.user_id = u_id; - creator->on_voice_client_disconnect.call(vcd); - } - } - } - break; - /* Speaking */ - case voice_opcode_client_speaking: - /* Client Connect (doesn't seem to work) */ - case voice_opcode_client_connect: { - if (j.find("d") != j.end() - && j["d"].find("user_id") != j["d"].end() && !j["d"]["user_id"].is_null() - && j["d"].find("ssrc") != j["d"].end() && !j["d"]["ssrc"].is_null() && j["d"]["ssrc"].is_number_integer()) { - uint32_t u_ssrc = j["d"]["ssrc"].get(); - snowflake u_id = snowflake_not_null(&j["d"], "user_id"); - ssrc_map[u_ssrc] = u_id; - - if (!creator->on_voice_client_speaking.empty()) { - voice_client_speaking_t vcs(nullptr, data); - vcs.voice_client = this; - vcs.user_id = u_id; - vcs.ssrc = u_ssrc; - creator->on_voice_client_speaking.call(vcs); - } - } - } - break; - /* Voice resume */ - case voice_opcode_connection_resumed: - log(ll_debug, "Voice connection resumed"); - break; - /* Voice HELLO */ - case voice_opcode_connection_hello: { - if (j.find("d") != j.end() && j["d"].find("heartbeat_interval") != j["d"].end() && !j["d"]["heartbeat_interval"].is_null()) { - this->heartbeat_interval = j["d"]["heartbeat_interval"].get(); - } - - /* Reset receive_sequence on HELLO */ - receive_sequence = -1; - - if (!modes.empty()) { - log(dpp::ll_debug, "Resuming voice session " + this->sessionid + "..."); - json obj = { - { "op", voice_opcode_connection_resume }, - { - "d", - { - { "server_id", std::to_string(this->server_id) }, - { "session_id", this->sessionid }, - { "token", this->token }, - { "seq_ack", this->receive_sequence }, - } - } - }; - this->write(obj.dump(-1, ' ', false, json::error_handler_t::replace), OP_TEXT); - } else { - log(dpp::ll_debug, "Connecting new voice session (DAVE: " + std::string(dave_version == dave_version_1 ? "Enabled" : "Disabled") + ")..."); - json obj = { - { "op", voice_opcode_connection_identify }, - { - "d", - { - { "user_id", creator->me.id }, - { "server_id", std::to_string(this->server_id) }, - { "session_id", this->sessionid }, - { "token", this->token }, - { "max_dave_protocol_version", dave_version }, - } - } - }; - this->write(obj.dump(-1, ' ', false, json::error_handler_t::replace), OP_TEXT); - } - this->connect_time = time(nullptr); - } - break; - /* Session description */ - case voice_opcode_connection_description: { - json &d = j["d"]; - size_t ofs = 0; - for (auto & c : d["secret_key"]) { - secret_key[ofs++] = (uint8_t)c; - if (ofs > secret_key.size() - 1) { - break; - } - } - has_secret_key = true; - - if (dave_version != dave_version_none) { - if (j["d"]["dave_protocol_version"] != static_cast(dave_version)) { - log(ll_error, "We requested DAVE E2EE but didn't receive it from the server, downgrading..."); - dave_version = dave_version_none; - send_silence(20); - } - - mls_state = std::make_unique(); - mls_state->dave_session = std::make_unique( - nullptr, "" /* sessionid */, [this](std::string const& s1, std::string const& s2) { - log(ll_debug, "Dave session constructor callback: " + s1 + ", " + s2); - }); - mls_state->dave_session->Init(dave::MaxSupportedProtocolVersion(), channel_id, creator->me.id.str(), mls_state->mls_key); - auto key_response = mls_state->dave_session->GetMarshalledKeyPackage(); - key_response.insert(key_response.begin(), voice_client_dave_mls_key_package); - this->write(std::string_view(reinterpret_cast(key_response.data()), key_response.size()), OP_BINARY); - - } else { - /* This is needed to start voice receiving and make sure that the start of sending isn't cut off */ - send_silence(20); - } - - /* Fire on_voice_ready */ - if (!creator->on_voice_ready.empty()) { - voice_ready_t rdy(nullptr, data); - rdy.voice_client = this; - rdy.voice_channel_id = this->channel_id; - creator->on_voice_ready.call(rdy); - } - - /* Reset packet_nonce */ - packet_nonce = 1; - } - break; - /* Voice ready */ - case voice_opcode_connection_ready: { - /* Video stream stuff comes in this frame too, but we can't use it (YET!) */ - json &d = j["d"]; - this->ip = d["ip"].get(); - this->port = d["port"].get(); - this->ssrc = d["ssrc"].get(); - // Modes - for (auto & m : d["modes"]) { - this->modes.push_back(m.get()); - } - log(ll_debug, "Voice websocket established; UDP endpoint: " + ip + ":" + std::to_string(port) + " [ssrc=" + std::to_string(ssrc) + "] with " + std::to_string(modes.size()) + " modes"); - - external_ip = discover_ip(); - - dpp::socket newfd; - if ((newfd = ::socket(AF_INET, SOCK_DGRAM, 0)) >= 0) { - - sockaddr_in servaddr{}; - memset(&servaddr, 0, sizeof(sockaddr_in)); - servaddr.sin_family = AF_INET; - servaddr.sin_addr.s_addr = htonl(INADDR_ANY); - servaddr.sin_port = htons(0); - - if (bind(newfd, (sockaddr*)&servaddr, sizeof(servaddr)) < 0) { - throw dpp::connection_exception(err_bind_failure, "Can't bind() client UDP socket"); - } - - if (!set_nonblocking(newfd, true)) { - throw dpp::connection_exception(err_nonblocking_failure, "Can't switch voice UDP socket to non-blocking mode!"); - } - - /* Hook poll() in the ssl_client to add a new file descriptor */ - this->fd = newfd; - this->custom_writeable_fd = [this] { return want_write(); }; - this->custom_readable_fd = [this] { return want_read(); }; - this->custom_writeable_ready = [this] { write_ready(); }; - this->custom_readable_ready = [this] { read_ready(); }; - - int bound_port = 0; - sockaddr_in sin{}; - socklen_t len = sizeof(sin); - if (getsockname(this->fd, (sockaddr *)&sin, &len) > -1) { - bound_port = ntohs(sin.sin_port); - } - - log(ll_debug, "External IP address: " + external_ip); - - this->write(json({ - { "op", voice_opcode_connection_select_protocol }, - { "d", { - { "protocol", "udp" }, - { "data", { - { "address", external_ip }, - { "port", bound_port }, - { "mode", transport_encryption_protocol } - } - } - } - } - }).dump(-1, ' ', false, json::error_handler_t::replace), OP_TEXT); - } - } - break; - default: { - log(ll_debug, "Unknown voice opcode " + std::to_string(op) + ": " + data); - } - break; - } - } - return true; -#else - return false; -#endif -} - discord_voice_client& discord_voice_client::pause_audio(bool pause) { this->paused = pause; return *this; @@ -808,252 +188,6 @@ void discord_voice_client::send(const char* packet, size_t len, uint64_t duratio outbuf.emplace_back(frame); } -void discord_voice_client::read_ready() -{ -#ifdef HAVE_VOICE - uint8_t buffer[65535]; - int packet_size = this->udp_recv((char*)buffer, sizeof(buffer)); - - bool receive_handler_is_empty = creator->on_voice_receive.empty() && creator->on_voice_receive_combined.empty(); - if (packet_size <= 0 || receive_handler_is_empty) { - /* Nothing to do */ - return; - } - - constexpr size_t header_size = 12; - if (static_cast(packet_size) < header_size) { - /* Invalid RTP payload */ - return; - } - - /* It's a "silence packet" - throw it away. */ - if (packet_size < 44) { - return; - } - - if (uint8_t payload_type = buffer[1] & 0b0111'1111; - 72 <= payload_type && payload_type <= 76) { - /* - * This is an RTCP payload. Discord is known to send - * RTCP Receiver Reports. - * - * See https://datatracker.ietf.org/doc/html/rfc3551#section-6 - */ - return; - } - - voice_payload vp{0, // seq, populate later - 0, // timestamp, populate later - std::make_unique(nullptr, std::string((char*)buffer, packet_size))}; - - vp.vr->voice_client = this; - - uint32_t speaker_ssrc; - { /* Get the User ID of the speaker */ - std::memcpy(&speaker_ssrc, &buffer[8], sizeof(uint32_t)); - speaker_ssrc = ntohl(speaker_ssrc); - vp.vr->user_id = ssrc_map[speaker_ssrc]; - } - - /* Get the sequence number of the voice UDP packet */ - std::memcpy(&vp.seq, &buffer[2], sizeof(rtp_seq_t)); - vp.seq = ntohs(vp.seq); - - /* Get the timestamp of the voice UDP packet */ - std::memcpy(&vp.timestamp, &buffer[4], sizeof(rtp_timestamp_t)); - vp.timestamp = ntohl(vp.timestamp); - - constexpr size_t nonce_size = sizeof(uint32_t); - /* Nonce is 4 byte at the end of payload with zero padding */ - uint8_t nonce[24] = { 0 }; - std::memcpy(nonce, buffer + packet_size - nonce_size, nonce_size); - - /* Get the number of CSRC in header */ - const size_t csrc_count = buffer[0] & 0b0000'1111; - /* Skip to the encrypted voice data */ - const ptrdiff_t offset_to_data = header_size + sizeof(uint32_t) * csrc_count; - size_t total_header_len = offset_to_data; - - uint8_t* ciphertext = buffer + offset_to_data; - size_t ciphertext_len = packet_size - offset_to_data - nonce_size; - - size_t ext_len = 0; - if ([[maybe_unused]] const bool uses_extension = (buffer[0] >> 4) & 0b0001) { - /** - * Get the RTP Extensions size, we only get the size here because - * the extension itself is encrypted along with the opus packet - */ - { - uint16_t ext_len_in_words; - memcpy(&ext_len_in_words, &ciphertext[2], sizeof(uint16_t)); - ext_len_in_words = ntohs(ext_len_in_words); - ext_len = sizeof(uint32_t) * ext_len_in_words; - } - constexpr size_t ext_header_len = sizeof(uint16_t) * 2; - ciphertext += ext_header_len; - ciphertext_len -= ext_header_len; - total_header_len += ext_header_len; - } - - uint8_t decrypted[65535] = { 0 }; - unsigned long long opus_packet_len = 0; - if (crypto_aead_xchacha20poly1305_ietf_decrypt( - decrypted, &opus_packet_len, - nullptr, - ciphertext, ciphertext_len, - buffer, - /** - * Additional Data: - * The whole header (including csrc list) + - * 4 byte extension header (magic 0xBEDE + 16-bit denoting extension length) - */ - total_header_len, - nonce, secret_key.data()) != 0) { - /* Invalid Discord RTP payload. */ - return; - } - - uint8_t *opus_packet = decrypted; - if (ext_len > 0) { - /* Skip previously encrypted RTP Header Extension */ - opus_packet += ext_len; - opus_packet_len -= ext_len; - } - - /* - * We're left with the decrypted, opus-encoded data. - * Park the payload and decode on the voice courier thread. - */ - vp.vr->audio_data.assign(opus_packet, opus_packet + opus_packet_len); - - { - std::lock_guard lk(voice_courier_shared_state.mtx); - auto& [range, payload_queue, pending_decoder_ctls, decoder] = voice_courier_shared_state.parked_voice_payloads[vp.vr->user_id]; - - if (!decoder) { - /* - * Most likely this is the first time we encounter this speaker. - * Do some initialization for not only the decoder but also the range. - */ - range.min_seq = vp.seq; - range.min_timestamp = vp.timestamp; - - int opus_error = 0; - decoder.reset(opus_decoder_create(opus_sample_rate_hz, opus_channel_count, &opus_error), - &opus_decoder_destroy); - if (opus_error) { - /** - * NOTE: The -10 here makes the opus_error match up with values of exception_error_code, - * which would otherwise conflict as every C library loves to use values from -1 downwards. - */ - throw dpp::voice_exception((exception_error_code)(opus_error - 10), "discord_voice_client::discord_voice_client; opus_decoder_create() failed"); - } - } - - if (vp.seq < range.min_seq && vp.timestamp < range.min_timestamp) { - /* This packet arrived too late. We can only discard it. */ - return; - } - range.max_seq = vp.seq; - range.max_timestamp = vp.timestamp; - payload_queue.push(std::move(vp)); - } - - voice_courier_shared_state.signal_iteration.notify_one(); - - if (!voice_courier.joinable()) { - /* Courier thread is not running, start it */ - voice_courier = std::thread(&voice_courier_loop, - std::ref(*this), - std::ref(voice_courier_shared_state)); - } -#else - throw dpp::voice_exception(err_no_voice_support, "Voice support not enabled in this build of D++"); -#endif -} - -void discord_voice_client::write_ready() -{ - uint64_t duration = 0; - bool track_marker_found = false; - uint64_t bufsize = 0; - send_audio_type_t type = satype_recorded_audio; - { - std::lock_guard lock(this->stream_mutex); - if (!this->paused && outbuf.size()) { - type = send_audio_type; - if (outbuf[0].packet.size() == sizeof(uint16_t) && (*((uint16_t*)(outbuf[0].packet.data()))) == AUDIO_TRACK_MARKER) { - outbuf.erase(outbuf.begin()); - track_marker_found = true; - if (tracks > 0) { - tracks--; - } - } - if (outbuf.size()) { - if (this->udp_send(outbuf[0].packet.data(), outbuf[0].packet.length()) == (int)outbuf[0].packet.length()) { - duration = outbuf[0].duration * timescale; - bufsize = outbuf[0].packet.length(); - outbuf.erase(outbuf.begin()); - } - } - } - } - if (duration) { - if (type == satype_recorded_audio) { - std::chrono::nanoseconds latency = std::chrono::duration_cast(std::chrono::high_resolution_clock::now() - last_timestamp); - std::chrono::nanoseconds sleep_time = std::chrono::nanoseconds(duration) - latency; - if (sleep_time.count() > 0) { - std::this_thread::sleep_for(sleep_time); - } - } - else if (type == satype_overlap_audio) { - std::chrono::nanoseconds latency = std::chrono::duration_cast(std::chrono::high_resolution_clock::now() - last_timestamp); - std::chrono::nanoseconds sleep_time = std::chrono::nanoseconds(duration) + last_sleep_remainder - latency; - std::chrono::nanoseconds sleep_increment = (std::chrono::nanoseconds(duration) - latency) / AUDIO_OVERLAP_SLEEP_SAMPLES; - if (sleep_time.count() > 0) { - uint16_t samples_count = 0; - std::chrono::nanoseconds overshoot_accumulator{}; - - do { - std::chrono::high_resolution_clock::time_point start_sleep = std::chrono::high_resolution_clock::now(); - std::this_thread::sleep_for(sleep_increment); - std::chrono::high_resolution_clock::time_point end_sleep = std::chrono::high_resolution_clock::now(); - - samples_count++; - overshoot_accumulator += std::chrono::duration_cast(end_sleep - start_sleep) - sleep_increment; - sleep_time -= std::chrono::duration_cast(end_sleep - start_sleep); - } while (std::chrono::nanoseconds(overshoot_accumulator.count() / samples_count) + sleep_increment < sleep_time); - last_sleep_remainder = sleep_time; - } else { - last_sleep_remainder = std::chrono::nanoseconds(0); - } - } - - last_timestamp = std::chrono::high_resolution_clock::now(); - if (!creator->on_voice_buffer_send.empty()) { - voice_buffer_send_t snd(nullptr, ""); - snd.buffer_size = bufsize; - snd.packets_left = outbuf.size(); - snd.voice_client = this; - creator->on_voice_buffer_send.call(snd); - } - } - if (track_marker_found) { - if (!creator->on_voice_track_marker.empty()) { - voice_track_marker_t vtm(nullptr, ""); - vtm.voice_client = this; - { - std::lock_guard lock(this->stream_mutex); - if (!track_meta.empty()) { - vtm.track_meta = track_meta[0]; - track_meta.erase(track_meta.begin()); - } - } - creator->on_voice_track_marker.call(vtm); - } - } -} - dpp::utility::uptime discord_voice_client::get_uptime() { return dpp::utility::uptime(time(nullptr) - connect_time); @@ -1343,125 +477,8 @@ discord_voice_client& discord_voice_client::send_silence(const uint64_t duration discord_voice_client& discord_voice_client::set_send_audio_type(send_audio_type_t type) { - { - std::lock_guard lock(this->stream_mutex); - send_audio_type = type; - } - return *this; -} - -discord_voice_client& discord_voice_client::send_audio_raw(uint16_t* audio_data, const size_t length) { -#if HAVE_VOICE - if (length < 4) { - throw dpp::voice_exception(err_invalid_voice_packet_length, "Raw audio packet size can't be less than 4"); - } - - if ((length % 4) != 0) { - throw dpp::voice_exception(err_invalid_voice_packet_length, "Raw audio packet size should be divisible by 4"); - } - - if (length > send_audio_raw_max_length) { - std::string s_audio_data((const char*)audio_data, length); - - while (s_audio_data.length() > send_audio_raw_max_length) { - std::string packet(s_audio_data.substr(0, send_audio_raw_max_length)); - const auto packet_size = static_cast(packet.size()); - - s_audio_data.erase(s_audio_data.begin(), s_audio_data.begin() + packet_size); - - send_audio_raw((uint16_t*)packet.data(), packet_size); - } - - return *this; - } - - if (length < send_audio_raw_max_length) { - std::string packet((const char*)audio_data, length); - packet.resize(send_audio_raw_max_length, 0); - - return send_audio_raw((uint16_t*)packet.data(), packet.size()); - } - - opus_int32 encoded_audio_max_length = (opus_int32)length; - std::vector encoded_audio(encoded_audio_max_length); - size_t encoded_audio_length = encoded_audio_max_length; - - encoded_audio_length = this->encode((uint8_t*)audio_data, length, encoded_audio.data(), encoded_audio_length); - - send_audio_opus(encoded_audio.data(), encoded_audio_length); -#else - throw dpp::voice_exception(err_no_voice_support, "Voice support not enabled in this build of D++"); -#endif - return *this; -} - -discord_voice_client& discord_voice_client::send_audio_opus(uint8_t* opus_packet, const size_t length) { -#if HAVE_VOICE - int samples = opus_packet_get_nb_samples(opus_packet, (opus_int32)length, opus_sample_rate_hz); - uint64_t duration = (samples / 48) / (timescale / 1000000); - send_audio_opus(opus_packet, length, duration); -#else - throw dpp::voice_exception(err_no_voice_support, "Voice support not enabled in this build of D++"); -#endif - return *this; -} - -discord_voice_client& discord_voice_client::send_audio_opus(uint8_t* opus_packet, const size_t length, uint64_t duration) { -#if HAVE_VOICE - int frame_size = (int)(48 * duration * (timescale / 1000000)); - opus_int32 encoded_audio_max_length = (opus_int32)length; - std::vector encoded_audio(encoded_audio_max_length); - size_t encoded_audio_length = encoded_audio_max_length; - - encoded_audio_length = length; - encoded_audio.reserve(length); - memcpy(encoded_audio.data(), opus_packet, length); - - ++sequence; - rtp_header header(sequence, timestamp, (uint32_t)ssrc); - - /* Expected payload size is unencrypted header + encrypted opus packet + unencrypted 32 bit nonce */ - size_t packet_siz = sizeof(header) + (encoded_audio_length + crypto_aead_xchacha20poly1305_IETF_ABYTES) + sizeof(packet_nonce); - - std::vector payload(packet_siz); - - /* Set RTP header */ - std::memcpy(payload.data(), &header, sizeof(header)); - - /* Convert nonce to big-endian */ - uint32_t noncel = htonl(packet_nonce); - - /* 24 byte is needed for encrypting, discord just want 4 byte so just fill up the rest with null */ - unsigned char encrypt_nonce[crypto_aead_xchacha20poly1305_ietf_NPUBBYTES] = { '\0' }; - memcpy(encrypt_nonce, &noncel, sizeof(noncel)); - - /* Execute */ - crypto_aead_xchacha20poly1305_ietf_encrypt( - payload.data() + sizeof(header), - nullptr, - encoded_audio.data(), - encoded_audio_length, - /* The RTP Header as Additional Data */ - reinterpret_cast(&header), - sizeof(header), - nullptr, - static_cast(encrypt_nonce), - secret_key.data() - ); - - /* Append the 4 byte nonce to the resulting payload */ - std::memcpy(payload.data() + payload.size() - sizeof(noncel), &noncel, sizeof(noncel)); - - this->send(reinterpret_cast(payload.data()), payload.size(), duration); - timestamp += frame_size; - - /* Increment for next packet */ - packet_nonce++; - - speak(); -#else - throw dpp::voice_exception(err_no_voice_support, "Voice support not enabled in this build of D++"); -#endif + std::lock_guard lock(this->stream_mutex); + send_audio_type = type; return *this; } diff --git a/src/dpp/voice/stub/voice_payload.cpp b/src/dpp/voice/enabled/cleanup.cpp similarity index 63% rename from src/dpp/voice/stub/voice_payload.cpp rename to src/dpp/voice/enabled/cleanup.cpp index 6cc0856316..a1154378f8 100644 --- a/src/dpp/voice/stub/voice_payload.cpp +++ b/src/dpp/voice/enabled/cleanup.cpp @@ -21,19 +21,42 @@ ************************************************************************************/ #include -#include #include -#include -#include #include #include #include -#include + +#include +#include "../../dave/encryptor.h" + +#include "enabled.h" namespace dpp { -bool discord_voice_client::voice_payload::operator<(const voice_payload& other) const { - return false; +void discord_voice_client::cleanup() +{ + if (runner) { + this->terminating = true; + runner->join(); + delete runner; + runner = nullptr; + } + if (encoder) { + opus_encoder_destroy(encoder); + encoder = nullptr; + } + if (repacketizer) { + opus_repacketizer_destroy(repacketizer); + repacketizer = nullptr; + } + if (voice_courier.joinable()) { + { + std::lock_guard lk(voice_courier_shared_state.mtx); + voice_courier_shared_state.terminating = true; + } + voice_courier_shared_state.signal_iteration.notify_one(); + voice_courier.join(); + } } } \ No newline at end of file diff --git a/src/dpp/voice/enabled/courier_loop.cpp b/src/dpp/voice/enabled/courier_loop.cpp new file mode 100644 index 0000000000..e448a51565 --- /dev/null +++ b/src/dpp/voice/enabled/courier_loop.cpp @@ -0,0 +1,174 @@ +/************************************************************************************ + * + * D++, A Lightweight C++ library for Discord + * + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2021 Craig Edwards and D++ contributors + * (https://github.com/brainboxdotcc/DPP/graphs/contributors) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ************************************************************************************/ + +#include +#include +#include +#include + +#include +#include "../../dave/encryptor.h" + +#include "enabled.h" + +namespace dpp { + +void discord_voice_client::voice_courier_loop(discord_voice_client& client, courier_shared_state_t& shared_state) { + utility::set_thread_name(std::string("vcourier/") + std::to_string(client.server_id)); + while (true) { + std::this_thread::sleep_for(std::chrono::milliseconds{client.iteration_interval}); + + struct flush_data_t { + snowflake user_id; + rtp_seq_t min_seq; + std::priority_queue parked_payloads; + std::vector> pending_decoder_ctls; + std::shared_ptr decoder; + }; + std::vector flush_data; + + /* + * Transport the payloads onto this thread, and + * release the lock as soon as possible. + */ + { + std::unique_lock lk(shared_state.mtx); + + /* mitigates vector resizing while holding the mutex */ + flush_data.reserve(shared_state.parked_voice_payloads.size()); + + bool has_payload_to_deliver = false; + for (auto &[user_id, parking_lot]: shared_state.parked_voice_payloads) { + has_payload_to_deliver = has_payload_to_deliver || !parking_lot.parked_payloads.empty(); + flush_data.push_back(flush_data_t{user_id, + parking_lot.range.min_seq, + std::move(parking_lot.parked_payloads), + /* Quickly check if we already have a decoder and only take the pending ctls if so. */ + parking_lot.decoder ? std::move(parking_lot.pending_decoder_ctls) + : decltype(parking_lot.pending_decoder_ctls){}, + parking_lot.decoder}); + parking_lot.range.min_seq = parking_lot.range.max_seq + 1; + parking_lot.range.min_timestamp = parking_lot.range.max_timestamp + 1; + } + + if (!has_payload_to_deliver) { + if (shared_state.terminating) { + /* We have delivered all data to handlers. Terminate now. */ + break; + } + + shared_state.signal_iteration.wait(lk); + /* + * More data came or about to terminate, or just a spurious wake. + * We need to collect the payloads again to determine what to do next. + */ + continue; + } + } + + if (client.creator->on_voice_receive.empty() && client.creator->on_voice_receive_combined.empty()) { + /* + * We do this check late, to ensure this thread drains the data + * and prevents accumulating them even when there are no handlers. + */ + continue; + } + + /* This 32 bit PCM audio buffer is an upmixed version of the streams + * combined for all users. This is a wider width audio buffer so that + * there is no clipping when there are many loud audio sources at once. + */ + opus_int32 pcm_mix[23040] = {0}; + size_t park_count = 0; + int max_samples = 0; + int samples = 0; + + for (auto &d: flush_data) { + if (!d.decoder) { + continue; + } + for (const auto &decoder_ctl: d.pending_decoder_ctls) { + decoder_ctl(*d.decoder); + } + for (rtp_seq_t seq = d.min_seq; !d.parked_payloads.empty(); ++seq) { + opus_int16 pcm[23040]; + if (d.parked_payloads.top().seq != seq) { + /* + * Lost a packet with sequence number "seq", + * But Opus decoder might be able to guess something. + */ + if (int samples = opus_decode(d.decoder.get(), nullptr, 0, pcm, 5760, 0); + samples >= 0) { + /* + * Since this sample comes from a lost packet, + * we can only pretend there is an event, without any raw payload byte. + */ + voice_receive_t vr(nullptr, "", &client, d.user_id, reinterpret_cast(pcm), + samples * opus_channel_count * sizeof(opus_int16)); + + park_count = audio_mix(client, *client.mixer, pcm_mix, pcm, park_count, samples, max_samples); + client.creator->on_voice_receive.call(vr); + } + } else { + voice_receive_t &vr = *d.parked_payloads.top().vr; + if (vr.audio_data.size() > 0x7FFFFFFF) { + throw dpp::length_exception(err_massive_audio, "audio_data > 2GB! This should never happen!"); + } + if (samples = opus_decode(d.decoder.get(), vr.audio_data.data(), + static_cast(vr.audio_data.size() & 0x7FFFFFFF), pcm, 5760, 0); + samples >= 0) { + vr.reassign(&client, d.user_id, reinterpret_cast(pcm), + samples * opus_channel_count * sizeof(opus_int16)); + client.end_gain = 1.0f / client.moving_average; + park_count = audio_mix(client, *client.mixer, pcm_mix, pcm, park_count, samples, max_samples); + client.creator->on_voice_receive.call(vr); + } + + d.parked_payloads.pop(); + } + } + } + + /* If combined receive is bound, dispatch it */ + if (park_count) { + + /* Downsample the 32 bit samples back to 16 bit */ + opus_int16 pcm_downsample[23040] = {0}; + opus_int16 *pcm_downsample_ptr = pcm_downsample; + opus_int32 *pcm_mix_ptr = pcm_mix; + client.increment = (client.end_gain - client.current_gain) / static_cast(samples); + for (int64_t x = 0; x < (samples * opus_channel_count) / client.mixer->byte_blocks_per_register; ++x) { + client.mixer->collect_single_register(pcm_mix_ptr, pcm_downsample_ptr, client.current_gain, client.increment); + client.current_gain += client.increment * static_cast(client.mixer->byte_blocks_per_register); + pcm_mix_ptr += client.mixer->byte_blocks_per_register; + pcm_downsample_ptr += client.mixer->byte_blocks_per_register; + } + + voice_receive_t vr(nullptr, "", &client, 0, reinterpret_cast(pcm_downsample), + max_samples * opus_channel_count * sizeof(opus_int16)); + + client.creator->on_voice_receive_combined.call(vr); + } + } +} + +} \ No newline at end of file diff --git a/src/dpp/voice/enabled/enabled.h b/src/dpp/voice/enabled/enabled.h index 03c7575231..c08ca8416f 100644 --- a/src/dpp/voice/enabled/enabled.h +++ b/src/dpp/voice/enabled/enabled.h @@ -56,6 +56,16 @@ #include "../../dave/decryptor.h" #include "../../dave/encryptor.h" +#ifdef _WIN32 +#include + #include + #include +#else + #include + #include + #include +#endif + namespace dpp { struct dave_state { @@ -68,6 +78,19 @@ struct dave_state { std::string privacy_code; }; +/** + * @brief Represents an RTP packet. Size should always be exactly 12. + */ +struct rtp_header { + uint16_t constant; + uint16_t sequence; + uint32_t timestamp; + uint32_t ssrc; + + rtp_header(uint16_t _seq, uint32_t _ts, uint32_t _ssrc) : constant(htons(0x8078)), sequence(htons(_seq)), timestamp(htonl(_ts)), ssrc(htonl(_ssrc)) { + } +}; + /** * @brief Transport encryption type (libsodium) */ diff --git a/src/dpp/voice/enabled/handle_frame.cpp b/src/dpp/voice/enabled/handle_frame.cpp new file mode 100644 index 0000000000..3ecaecca4d --- /dev/null +++ b/src/dpp/voice/enabled/handle_frame.cpp @@ -0,0 +1,404 @@ +/************************************************************************************ + * + * D++, A Lightweight C++ library for Discord + * + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2021 Craig Edwards and D++ contributors + * (https://github.com/brainboxdotcc/DPP/graphs/contributors) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ************************************************************************************/ + +#include +#include +#include +#include +#include +#include "../../dave/encryptor.h" + +#include "enabled.h" + +namespace dpp { + +bool discord_voice_client::handle_frame(const std::string &data, ws_opcode opcode) { + json j; + + /** + * MLS frames come in as type OP_BINARY, we can also reply to them as type OP_BINARY. + */ + if (opcode == OP_BINARY && data.size() >= sizeof(dave_binary_header_t)) { + + auto* dave_header = reinterpret_cast(data.data()); + + switch (dave_header->opcode) { + case voice_client_dave_mls_external_sender: { + log(ll_debug, "voice_client_dave_mls_external_sender"); + + mls_state->dave_session->SetExternalSender(dave_header->get_data(data.length())); + + mls_state->encryptor = std::make_unique(); + mls_state->decryptors.clear(); + } + break; + case voice_client_dave_mls_proposals: { + log(ll_debug, "voice_client_dave_mls_proposals"); + + std::optional> response = mls_state->dave_session->ProcessProposals(dave_header->get_data(data.length()), dave_mls_user_list); + if (response.has_value()) { + auto r = response.value(); + mls_state->cached_commit = r; + r.insert(r.begin(), voice_client_dave_mls_commit_message); + this->write(std::string_view(reinterpret_cast(r.data()), r.size()), OP_BINARY); + } + } + break; + case voice_client_dave_announce_commit_transaction: { + log(ll_debug, "voice_client_dave_announce_commit_transaction"); + auto r = mls_state->dave_session->ProcessCommit(mls_state->cached_commit); + for (const auto& user : dave_mls_user_list) { + log(ll_debug, "Setting decryptor key ratchet for user: " + user + ", protocol version: " + std::to_string(mls_state->dave_session->GetProtocolVersion())); + dpp::snowflake u{user}; + mls_state->decryptors.emplace(u, std::make_unique()); + mls_state->decryptors.find(u)->second->TransitionToKeyRatchet(mls_state->dave_session->GetKeyRatchet(user)); + } + mls_state->encryptor->SetKeyRatchet(mls_state->dave_session->GetKeyRatchet(creator->me.id.str())); + + /** + * https://www.ietf.org/archive/id/draft-ietf-mls-protocol-14.html#name-epoch-authenticators + * 9.7. Epoch Authenticators + * The main MLS key schedule provides a per-epoch epoch_authenticator. If one member of the group is being impersonated by an active attacker, + * the epoch_authenticator computed by their client will differ from those computed by the other group members. + */ + mls_state->privacy_code = generate_displayable_code(mls_state->dave_session->GetLastEpochAuthenticator()); + log(ll_debug, "E2EE Privacy Code: " + mls_state->privacy_code); + } + break; + case voice_client_dave_mls_welcome: { + this->mls_state->transition_id = dave_header->get_welcome_transition_id(); + log(ll_debug, "voice_client_dave_mls_welcome with transition id " + std::to_string(this->mls_state->transition_id)); + auto r = mls_state->dave_session->ProcessWelcome(dave_header->get_welcome_data(data.length()), dave_mls_user_list); + if (r.has_value()) { + for (const auto& user : dave_mls_user_list) { + log(ll_debug, "Setting decryptor key ratchet for user: " + user + ", protocol version: " + std::to_string(mls_state->dave_session->GetProtocolVersion())); + dpp::snowflake u{user}; + mls_state->decryptors.emplace(u, std::make_unique()); + mls_state->decryptors.find(u)->second->TransitionToKeyRatchet(mls_state->dave_session->GetKeyRatchet(user)); + } + mls_state->encryptor->SetKeyRatchet(mls_state->dave_session->GetKeyRatchet(creator->me.id.str())); + } + mls_state->privacy_code = generate_displayable_code(mls_state->dave_session->GetLastEpochAuthenticator()); + log(ll_debug, "E2EE Privacy Code: " + mls_state->privacy_code); + } + break; + default: + log(ll_debug, "Unexpected DAVE frame opcode"); + log(dpp::ll_trace, "R: " + dpp::utility::debug_dump((uint8_t*)(data.data()), data.length())); + break; + } + + return true; + } + + try { + log(dpp::ll_trace, std::string("R: ") + data); + j = json::parse(data); + } + catch (const std::exception &e) { + log(dpp::ll_error, std::string("discord_voice_client::handle_frame ") + e.what() + ": " + data); + return true; + } + + if (j.find("seq") != j.end() && j["seq"].is_number()) { + /** + * Save the sequence number needed for heartbeat and resume payload. + * + * NOTE: Contrary to the documentation, discord does not seem to send messages with sequence number + * in order, should we only save the sequence if it's larger number? + */ + receive_sequence = j["seq"].get(); + } + + if (j.find("op") != j.end()) { + uint32_t op = j["op"]; + + switch (op) { + /* Ping acknowledgement */ + case voice_opcode_connection_heartbeat_ack: + /* These opcodes do not require a response or further action */ + break; + case voice_opcode_media_sink: + case voice_client_flags: { + } + break; + case voice_client_platform: { + voice_client_platform_t vcp(nullptr, data); + vcp.voice_client = this; + vcp.user_id = snowflake_not_null(&j["d"], "user_id"); + vcp.platform = static_cast(int8_not_null(&j["d"], "platform")); + creator->on_voice_client_platform.call(vcp); + + } + break; + case voice_opcode_multiple_clients_connect: { + dave_mls_user_list = j["d"]["user_ids"]; + log(ll_debug, "Number of clients in voice channel: " + std::to_string(dave_mls_user_list.size())); + } + break; + case voice_client_dave_mls_invalid_commit_welcome: { + this->mls_state->transition_id = j["d"]["transition_id"]; + log(ll_debug, "voice_client_dave_mls_invalid_commit_welcome transition id " + std::to_string(this->mls_state->transition_id)); + } + break; + case voice_client_dave_execute_transition: { + log(ll_debug, "voice_client_dave_execute_transition"); + this->mls_state->transition_id = j["d"]["transition_id"]; + json obj = { + { "op", voice_client_dave_transition_ready }, + { + "d", + { + { "transition_id", this->mls_state->transition_id }, + } + } + }; + this->write(obj.dump(-1, ' ', false, json::error_handler_t::replace), OP_TEXT); + } + break; + /* "The protocol only uses this opcode to indicate when a downgrade to protocol version 0 is upcoming." */ + case voice_client_dave_prepare_transition: { + uint64_t transition_id = j["d"]["transition_id"]; + uint64_t protocol_version = j["d"]["protocol_version"]; + log(ll_debug, "voice_client_dave_prepare_transition version=" + std::to_string(protocol_version) + " for transition " + std::to_string(transition_id)); + } + break; + case voice_client_dave_prepare_epoch: { + uint64_t protocol_version = j["d"]["protocol_version"]; + uint64_t epoch = j["d"]["epoch"]; + log(ll_debug, "voice_client_dave_prepare_epoch version=" + std::to_string(protocol_version) + " for epoch " + std::to_string(epoch)); + if (epoch == 1) { + mls_state->dave_session->Reset(); + mls_state->dave_session->Init(dave::MaxSupportedProtocolVersion(), channel_id, creator->me.id.str(), mls_state->mls_key); + } + } + break; + /* Client Disconnect */ + case voice_opcode_client_disconnect: { + if (j.find("d") != j.end() && j["d"].find("user_id") != j["d"].end() && !j["d"]["user_id"].is_null()) { + snowflake u_id = snowflake_not_null(&j["d"], "user_id"); + auto it = std::find_if(ssrc_map.begin(), ssrc_map.end(), + [&u_id](const auto & p) { return p.second == u_id; }); + + if (it != ssrc_map.end()) { + ssrc_map.erase(it); + } + + if (!creator->on_voice_client_disconnect.empty()) { + voice_client_disconnect_t vcd(nullptr, data); + vcd.voice_client = this; + vcd.user_id = u_id; + creator->on_voice_client_disconnect.call(vcd); + } + } + } + break; + /* Speaking */ + case voice_opcode_client_speaking: + /* Client Connect (doesn't seem to work) */ + case voice_opcode_client_connect: { + if (j.find("d") != j.end() + && j["d"].find("user_id") != j["d"].end() && !j["d"]["user_id"].is_null() + && j["d"].find("ssrc") != j["d"].end() && !j["d"]["ssrc"].is_null() && j["d"]["ssrc"].is_number_integer()) { + uint32_t u_ssrc = j["d"]["ssrc"].get(); + snowflake u_id = snowflake_not_null(&j["d"], "user_id"); + ssrc_map[u_ssrc] = u_id; + + if (!creator->on_voice_client_speaking.empty()) { + voice_client_speaking_t vcs(nullptr, data); + vcs.voice_client = this; + vcs.user_id = u_id; + vcs.ssrc = u_ssrc; + creator->on_voice_client_speaking.call(vcs); + } + } + } + break; + /* Voice resume */ + case voice_opcode_connection_resumed: + log(ll_debug, "Voice connection resumed"); + break; + /* Voice HELLO */ + case voice_opcode_connection_hello: { + if (j.find("d") != j.end() && j["d"].find("heartbeat_interval") != j["d"].end() && !j["d"]["heartbeat_interval"].is_null()) { + this->heartbeat_interval = j["d"]["heartbeat_interval"].get(); + } + + /* Reset receive_sequence on HELLO */ + receive_sequence = -1; + + if (!modes.empty()) { + log(dpp::ll_debug, "Resuming voice session " + this->sessionid + "..."); + json obj = { + { "op", voice_opcode_connection_resume }, + { + "d", + { + { "server_id", std::to_string(this->server_id) }, + { "session_id", this->sessionid }, + { "token", this->token }, + { "seq_ack", this->receive_sequence }, + } + } + }; + this->write(obj.dump(-1, ' ', false, json::error_handler_t::replace), OP_TEXT); + } else { + log(dpp::ll_debug, "Connecting new voice session (DAVE: " + std::string(dave_version == dave_version_1 ? "Enabled" : "Disabled") + ")..."); + json obj = { + { "op", voice_opcode_connection_identify }, + { + "d", + { + { "user_id", creator->me.id }, + { "server_id", std::to_string(this->server_id) }, + { "session_id", this->sessionid }, + { "token", this->token }, + { "max_dave_protocol_version", dave_version }, + } + } + }; + this->write(obj.dump(-1, ' ', false, json::error_handler_t::replace), OP_TEXT); + } + this->connect_time = time(nullptr); + } + break; + /* Session description */ + case voice_opcode_connection_description: { + json &d = j["d"]; + size_t ofs = 0; + for (auto & c : d["secret_key"]) { + secret_key[ofs++] = (uint8_t)c; + if (ofs > secret_key.size() - 1) { + break; + } + } + has_secret_key = true; + + if (dave_version != dave_version_none) { + if (j["d"]["dave_protocol_version"] != static_cast(dave_version)) { + log(ll_error, "We requested DAVE E2EE but didn't receive it from the server, downgrading..."); + dave_version = dave_version_none; + send_silence(20); + } + + mls_state = std::make_unique(); + mls_state->dave_session = std::make_unique( + nullptr, "" /* sessionid */, [this](std::string const& s1, std::string const& s2) { + log(ll_debug, "Dave session constructor callback: " + s1 + ", " + s2); + }); + mls_state->dave_session->Init(dave::MaxSupportedProtocolVersion(), channel_id, creator->me.id.str(), mls_state->mls_key); + auto key_response = mls_state->dave_session->GetMarshalledKeyPackage(); + key_response.insert(key_response.begin(), voice_client_dave_mls_key_package); + this->write(std::string_view(reinterpret_cast(key_response.data()), key_response.size()), OP_BINARY); + + } else { + /* This is needed to start voice receiving and make sure that the start of sending isn't cut off */ + send_silence(20); + } + + /* Fire on_voice_ready */ + if (!creator->on_voice_ready.empty()) { + voice_ready_t rdy(nullptr, data); + rdy.voice_client = this; + rdy.voice_channel_id = this->channel_id; + creator->on_voice_ready.call(rdy); + } + + /* Reset packet_nonce */ + packet_nonce = 1; + } + break; + /* Voice ready */ + case voice_opcode_connection_ready: { + /* Video stream stuff comes in this frame too, but we can't use it (YET!) */ + json &d = j["d"]; + this->ip = d["ip"].get(); + this->port = d["port"].get(); + this->ssrc = d["ssrc"].get(); + // Modes + for (auto & m : d["modes"]) { + this->modes.push_back(m.get()); + } + log(ll_debug, "Voice websocket established; UDP endpoint: " + ip + ":" + std::to_string(port) + " [ssrc=" + std::to_string(ssrc) + "] with " + std::to_string(modes.size()) + " modes"); + + external_ip = discover_ip(); + + dpp::socket newfd; + if ((newfd = ::socket(AF_INET, SOCK_DGRAM, 0)) >= 0) { + + sockaddr_in servaddr{}; + memset(&servaddr, 0, sizeof(sockaddr_in)); + servaddr.sin_family = AF_INET; + servaddr.sin_addr.s_addr = htonl(INADDR_ANY); + servaddr.sin_port = htons(0); + + if (bind(newfd, (sockaddr*)&servaddr, sizeof(servaddr)) < 0) { + throw dpp::connection_exception(err_bind_failure, "Can't bind() client UDP socket"); + } + + if (!set_nonblocking(newfd, true)) { + throw dpp::connection_exception(err_nonblocking_failure, "Can't switch voice UDP socket to non-blocking mode!"); + } + + /* Hook poll() in the ssl_client to add a new file descriptor */ + this->fd = newfd; + this->custom_writeable_fd = [this] { return want_write(); }; + this->custom_readable_fd = [this] { return want_read(); }; + this->custom_writeable_ready = [this] { write_ready(); }; + this->custom_readable_ready = [this] { read_ready(); }; + + int bound_port = 0; + sockaddr_in sin{}; + socklen_t len = sizeof(sin); + if (getsockname(this->fd, (sockaddr *)&sin, &len) > -1) { + bound_port = ntohs(sin.sin_port); + } + + log(ll_debug, "External IP address: " + external_ip); + + this->write(json({ + { "op", voice_opcode_connection_select_protocol }, + { "d", { + { "protocol", "udp" }, + { "data", { + { "address", external_ip }, + { "port", bound_port }, + { "mode", transport_encryption_protocol } + } + } + } + } + }).dump(-1, ' ', false, json::error_handler_t::replace), OP_TEXT); + } + } + break; + default: { + log(ll_debug, "Unknown voice opcode " + std::to_string(op) + ": " + data); + } + break; + } + } + return true; +} + + +} \ No newline at end of file diff --git a/src/dpp/voice/enabled/opus.cpp b/src/dpp/voice/enabled/opus.cpp new file mode 100644 index 0000000000..37a2b7eb69 --- /dev/null +++ b/src/dpp/voice/enabled/opus.cpp @@ -0,0 +1,138 @@ +/************************************************************************************ + * + * D++, A Lightweight C++ library for Discord + * + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2021 Craig Edwards and D++ contributors + * (https://github.com/brainboxdotcc/DPP/graphs/contributors) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ************************************************************************************/ + +#include +#include +#include +#include + +#include +#include "../../dave/encryptor.h" + +#include "enabled.h" + +namespace dpp { + +discord_voice_client& discord_voice_client::send_audio_raw(uint16_t* audio_data, const size_t length) { + if (length < 4) { + throw dpp::voice_exception(err_invalid_voice_packet_length, "Raw audio packet size can't be less than 4"); + } + + if ((length % 4) != 0) { + throw dpp::voice_exception(err_invalid_voice_packet_length, "Raw audio packet size should be divisible by 4"); + } + + if (length > send_audio_raw_max_length) { + std::string s_audio_data((const char*)audio_data, length); + + while (s_audio_data.length() > send_audio_raw_max_length) { + std::string packet(s_audio_data.substr(0, send_audio_raw_max_length)); + const auto packet_size = static_cast(packet.size()); + + s_audio_data.erase(s_audio_data.begin(), s_audio_data.begin() + packet_size); + + send_audio_raw((uint16_t*)packet.data(), packet_size); + } + + return *this; + } + + if (length < send_audio_raw_max_length) { + std::string packet((const char*)audio_data, length); + packet.resize(send_audio_raw_max_length, 0); + + return send_audio_raw((uint16_t*)packet.data(), packet.size()); + } + + opus_int32 encoded_audio_max_length = (opus_int32)length; + std::vector encoded_audio(encoded_audio_max_length); + size_t encoded_audio_length = encoded_audio_max_length; + + encoded_audio_length = this->encode((uint8_t*)audio_data, length, encoded_audio.data(), encoded_audio_length); + + send_audio_opus(encoded_audio.data(), encoded_audio_length); + return *this; +} + +discord_voice_client& discord_voice_client::send_audio_opus(uint8_t* opus_packet, const size_t length) { + int samples = opus_packet_get_nb_samples(opus_packet, (opus_int32)length, opus_sample_rate_hz); + uint64_t duration = (samples / 48) / (timescale / 1000000); + send_audio_opus(opus_packet, length, duration); + return *this; +} + +discord_voice_client& discord_voice_client::send_audio_opus(uint8_t* opus_packet, const size_t length, uint64_t duration) { + int frame_size = (int)(48 * duration * (timescale / 1000000)); + opus_int32 encoded_audio_max_length = (opus_int32)length; + std::vector encoded_audio(encoded_audio_max_length); + size_t encoded_audio_length = encoded_audio_max_length; + + encoded_audio_length = length; + encoded_audio.reserve(length); + memcpy(encoded_audio.data(), opus_packet, length); + + ++sequence; + rtp_header header(sequence, timestamp, (uint32_t)ssrc); + + /* Expected payload size is unencrypted header + encrypted opus packet + unencrypted 32 bit nonce */ + size_t packet_siz = sizeof(header) + (encoded_audio_length + crypto_aead_xchacha20poly1305_IETF_ABYTES) + sizeof(packet_nonce); + + std::vector payload(packet_siz); + + /* Set RTP header */ + std::memcpy(payload.data(), &header, sizeof(header)); + + /* Convert nonce to big-endian */ + uint32_t noncel = htonl(packet_nonce); + + /* 24 byte is needed for encrypting, discord just want 4 byte so just fill up the rest with null */ + unsigned char encrypt_nonce[crypto_aead_xchacha20poly1305_ietf_NPUBBYTES] = { '\0' }; + memcpy(encrypt_nonce, &noncel, sizeof(noncel)); + + /* Execute */ + crypto_aead_xchacha20poly1305_ietf_encrypt( + payload.data() + sizeof(header), + nullptr, + encoded_audio.data(), + encoded_audio_length, + /* The RTP Header as Additional Data */ + reinterpret_cast(&header), + sizeof(header), + nullptr, + static_cast(encrypt_nonce), + secret_key.data() + ); + + /* Append the 4 byte nonce to the resulting payload */ + std::memcpy(payload.data() + payload.size() - sizeof(noncel), &noncel, sizeof(noncel)); + + this->send(reinterpret_cast(payload.data()), payload.size(), duration); + timestamp += frame_size; + + /* Increment for next packet */ + packet_nonce++; + + speak(); + return *this; +} + +} \ No newline at end of file diff --git a/src/dpp/voice/enabled/read_ready.cpp b/src/dpp/voice/enabled/read_ready.cpp new file mode 100644 index 0000000000..db4375d2d3 --- /dev/null +++ b/src/dpp/voice/enabled/read_ready.cpp @@ -0,0 +1,195 @@ +/************************************************************************************ + * + * D++, A Lightweight C++ library for Discord + * + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2021 Craig Edwards and D++ contributors + * (https://github.com/brainboxdotcc/DPP/graphs/contributors) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ************************************************************************************/ + +#include +#include +#include +#include + +#include +#include "../../dave/encryptor.h" + +#include "enabled.h" + +namespace dpp { + +void discord_voice_client::read_ready() +{ + uint8_t buffer[65535]; + int packet_size = this->udp_recv((char*)buffer, sizeof(buffer)); + + bool receive_handler_is_empty = creator->on_voice_receive.empty() && creator->on_voice_receive_combined.empty(); + if (packet_size <= 0 || receive_handler_is_empty) { + /* Nothing to do */ + return; + } + + constexpr size_t header_size = 12; + if (static_cast(packet_size) < header_size) { + /* Invalid RTP payload */ + return; + } + + /* It's a "silence packet" - throw it away. */ + if (packet_size < 44) { + return; + } + + if (uint8_t payload_type = buffer[1] & 0b0111'1111; + 72 <= payload_type && payload_type <= 76) { + /* + * This is an RTCP payload. Discord is known to send + * RTCP Receiver Reports. + * + * See https://datatracker.ietf.org/doc/html/rfc3551#section-6 + */ + return; + } + + voice_payload vp{0, // seq, populate later + 0, // timestamp, populate later + std::make_unique(nullptr, std::string((char*)buffer, packet_size))}; + + vp.vr->voice_client = this; + + uint32_t speaker_ssrc; + { /* Get the User ID of the speaker */ + std::memcpy(&speaker_ssrc, &buffer[8], sizeof(uint32_t)); + speaker_ssrc = ntohl(speaker_ssrc); + vp.vr->user_id = ssrc_map[speaker_ssrc]; + } + + /* Get the sequence number of the voice UDP packet */ + std::memcpy(&vp.seq, &buffer[2], sizeof(rtp_seq_t)); + vp.seq = ntohs(vp.seq); + + /* Get the timestamp of the voice UDP packet */ + std::memcpy(&vp.timestamp, &buffer[4], sizeof(rtp_timestamp_t)); + vp.timestamp = ntohl(vp.timestamp); + + constexpr size_t nonce_size = sizeof(uint32_t); + /* Nonce is 4 byte at the end of payload with zero padding */ + uint8_t nonce[24] = { 0 }; + std::memcpy(nonce, buffer + packet_size - nonce_size, nonce_size); + + /* Get the number of CSRC in header */ + const size_t csrc_count = buffer[0] & 0b0000'1111; + /* Skip to the encrypted voice data */ + const ptrdiff_t offset_to_data = header_size + sizeof(uint32_t) * csrc_count; + size_t total_header_len = offset_to_data; + + uint8_t* ciphertext = buffer + offset_to_data; + size_t ciphertext_len = packet_size - offset_to_data - nonce_size; + + size_t ext_len = 0; + if ([[maybe_unused]] const bool uses_extension = (buffer[0] >> 4) & 0b0001) { + /** + * Get the RTP Extensions size, we only get the size here because + * the extension itself is encrypted along with the opus packet + */ + { + uint16_t ext_len_in_words; + memcpy(&ext_len_in_words, &ciphertext[2], sizeof(uint16_t)); + ext_len_in_words = ntohs(ext_len_in_words); + ext_len = sizeof(uint32_t) * ext_len_in_words; + } + constexpr size_t ext_header_len = sizeof(uint16_t) * 2; + ciphertext += ext_header_len; + ciphertext_len -= ext_header_len; + total_header_len += ext_header_len; + } + + uint8_t decrypted[65535] = { 0 }; + unsigned long long opus_packet_len = 0; + if (crypto_aead_xchacha20poly1305_ietf_decrypt( + decrypted, &opus_packet_len, + nullptr, + ciphertext, ciphertext_len, + buffer, + /** + * Additional Data: + * The whole header (including csrc list) + + * 4 byte extension header (magic 0xBEDE + 16-bit denoting extension length) + */ + total_header_len, + nonce, secret_key.data()) != 0) { + /* Invalid Discord RTP payload. */ + return; + } + + uint8_t *opus_packet = decrypted; + if (ext_len > 0) { + /* Skip previously encrypted RTP Header Extension */ + opus_packet += ext_len; + opus_packet_len -= ext_len; + } + + /* + * We're left with the decrypted, opus-encoded data. + * Park the payload and decode on the voice courier thread. + */ + vp.vr->audio_data.assign(opus_packet, opus_packet + opus_packet_len); + + { + std::lock_guard lk(voice_courier_shared_state.mtx); + auto& [range, payload_queue, pending_decoder_ctls, decoder] = voice_courier_shared_state.parked_voice_payloads[vp.vr->user_id]; + + if (!decoder) { + /* + * Most likely this is the first time we encounter this speaker. + * Do some initialization for not only the decoder but also the range. + */ + range.min_seq = vp.seq; + range.min_timestamp = vp.timestamp; + + int opus_error = 0; + decoder.reset(opus_decoder_create(opus_sample_rate_hz, opus_channel_count, &opus_error), + &opus_decoder_destroy); + if (opus_error) { + /** + * NOTE: The -10 here makes the opus_error match up with values of exception_error_code, + * which would otherwise conflict as every C library loves to use values from -1 downwards. + */ + throw dpp::voice_exception((exception_error_code)(opus_error - 10), "discord_voice_client::discord_voice_client; opus_decoder_create() failed"); + } + } + + if (vp.seq < range.min_seq && vp.timestamp < range.min_timestamp) { + /* This packet arrived too late. We can only discard it. */ + return; + } + range.max_seq = vp.seq; + range.max_timestamp = vp.timestamp; + payload_queue.push(std::move(vp)); + } + + voice_courier_shared_state.signal_iteration.notify_one(); + + if (!voice_courier.joinable()) { + /* Courier thread is not running, start it */ + voice_courier = std::thread(&voice_courier_loop, + std::ref(*this), + std::ref(voice_courier_shared_state)); + } +} + +} \ No newline at end of file diff --git a/src/dpp/voice/enabled/thread.cpp b/src/dpp/voice/enabled/thread.cpp new file mode 100644 index 0000000000..69a105e768 --- /dev/null +++ b/src/dpp/voice/enabled/thread.cpp @@ -0,0 +1,90 @@ +/************************************************************************************ + * + * D++, A Lightweight C++ library for Discord + * + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2021 Craig Edwards and D++ contributors + * (https://github.com/brainboxdotcc/DPP/graphs/contributors) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ************************************************************************************/ + +#include +#include +#include +#include + +#include +#include "../../dave/encryptor.h" + +#include "enabled.h" + +namespace dpp { + +void discord_voice_client::thread_run() +{ + utility::set_thread_name(std::string("vc/") + std::to_string(server_id)); + + size_t times_looped = 0; + time_t last_loop_time = time(nullptr); + + do { + bool error = false; + ssl_client::read_loop(); + ssl_client::close(); + + time_t current_time = time(nullptr); + /* Here, we check if it's been longer than 3 seconds since the previous loop, + * this gives us time to see if it's an actual disconnect, or an error. + * This will prevent us from looping too much, meaning error codes do not cause an infinite loop. + */ + if (current_time - last_loop_time >= 3) + times_looped = 0; + + /* This does mean we'll always have times_looped at a minimum of 1, this is intended. */ + times_looped++; + /* If we've looped 5 or more times, abort the loop. */ + if (times_looped >= 5) { + log(dpp::ll_warning, "Reached max loops whilst attempting to read from the websocket. Aborting websocket."); + break; + } + + last_loop_time = current_time; + + if (!terminating) { + log(dpp::ll_debug, "Attempting to reconnect the websocket..."); + do { + try { + ssl_client::connect(); + websocket_client::connect(); + } + catch (const std::exception &e) { + log(dpp::ll_error, std::string("Error establishing voice websocket connection, retry in 5 seconds: ") + e.what()); + ssl_client::close(); + std::this_thread::sleep_for(std::chrono::seconds(5)); + error = true; + } + } while (error && !terminating); + } + } while(!terminating); +} + +void discord_voice_client::run() +{ + this->runner = new std::thread(&discord_voice_client::thread_run, this); + this->thread_id = runner->native_handle(); +} + + +} \ No newline at end of file diff --git a/src/dpp/voice/enabled/write_ready.cpp b/src/dpp/voice/enabled/write_ready.cpp new file mode 100644 index 0000000000..839f347f49 --- /dev/null +++ b/src/dpp/voice/enabled/write_ready.cpp @@ -0,0 +1,115 @@ +/************************************************************************************ + * + * D++, A Lightweight C++ library for Discord + * + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2021 Craig Edwards and D++ contributors + * (https://github.com/brainboxdotcc/DPP/graphs/contributors) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ************************************************************************************/ + +#include +#include +#include + +#include "../../dave/encryptor.h" + +#include "enabled.h" + +namespace dpp { + +void discord_voice_client::write_ready() { + uint64_t duration = 0; + bool track_marker_found = false; + uint64_t bufsize = 0; + send_audio_type_t type = satype_recorded_audio; + { + std::lock_guard lock(this->stream_mutex); + if (!this->paused && outbuf.size()) { + type = send_audio_type; + if (outbuf[0].packet.size() == sizeof(uint16_t) && (*((uint16_t*)(outbuf[0].packet.data()))) == AUDIO_TRACK_MARKER) { + outbuf.erase(outbuf.begin()); + track_marker_found = true; + if (tracks > 0) { + tracks--; + } + } + if (outbuf.size()) { + if (this->udp_send(outbuf[0].packet.data(), outbuf[0].packet.length()) == (int)outbuf[0].packet.length()) { + duration = outbuf[0].duration * timescale; + bufsize = outbuf[0].packet.length(); + outbuf.erase(outbuf.begin()); + } + } + } + } + if (duration) { + if (type == satype_recorded_audio) { + std::chrono::nanoseconds latency = std::chrono::duration_cast(std::chrono::high_resolution_clock::now() - last_timestamp); + std::chrono::nanoseconds sleep_time = std::chrono::nanoseconds(duration) - latency; + if (sleep_time.count() > 0) { + std::this_thread::sleep_for(sleep_time); + } + } + else if (type == satype_overlap_audio) { + std::chrono::nanoseconds latency = std::chrono::duration_cast(std::chrono::high_resolution_clock::now() - last_timestamp); + std::chrono::nanoseconds sleep_time = std::chrono::nanoseconds(duration) + last_sleep_remainder - latency; + std::chrono::nanoseconds sleep_increment = (std::chrono::nanoseconds(duration) - latency) / AUDIO_OVERLAP_SLEEP_SAMPLES; + if (sleep_time.count() > 0) { + uint16_t samples_count = 0; + std::chrono::nanoseconds overshoot_accumulator{}; + + do { + std::chrono::high_resolution_clock::time_point start_sleep = std::chrono::high_resolution_clock::now(); + std::this_thread::sleep_for(sleep_increment); + std::chrono::high_resolution_clock::time_point end_sleep = std::chrono::high_resolution_clock::now(); + + samples_count++; + overshoot_accumulator += std::chrono::duration_cast(end_sleep - start_sleep) - sleep_increment; + sleep_time -= std::chrono::duration_cast(end_sleep - start_sleep); + } while (std::chrono::nanoseconds(overshoot_accumulator.count() / samples_count) + sleep_increment < sleep_time); + last_sleep_remainder = sleep_time; + } else { + last_sleep_remainder = std::chrono::nanoseconds(0); + } + } + + last_timestamp = std::chrono::high_resolution_clock::now(); + if (!creator->on_voice_buffer_send.empty()) { + voice_buffer_send_t snd(nullptr, ""); + snd.buffer_size = bufsize; + snd.packets_left = outbuf.size(); + snd.voice_client = this; + creator->on_voice_buffer_send.call(snd); + } + } + if (track_marker_found) { + if (!creator->on_voice_track_marker.empty()) { + voice_track_marker_t vtm(nullptr, ""); + vtm.voice_client = this; + { + std::lock_guard lock(this->stream_mutex); + if (!track_meta.empty()) { + vtm.track_meta = track_meta[0]; + track_meta.erase(track_meta.begin()); + } + } + creator->on_voice_track_marker.call(vtm); + } + } +} + + +} \ No newline at end of file diff --git a/src/dpp/voice/stub/stubs.cpp b/src/dpp/voice/stub/stubs.cpp new file mode 100644 index 0000000000..429945c02f --- /dev/null +++ b/src/dpp/voice/stub/stubs.cpp @@ -0,0 +1,70 @@ +/************************************************************************************ + * + * D++, A Lightweight C++ library for Discord + * + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2021 Craig Edwards and D++ contributors + * (https://github.com/brainboxdotcc/DPP/graphs/contributors) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ************************************************************************************/ + +#include +#include +#include +#include + +#include "stub.h" + +namespace dpp { + + void discord_voice_client::voice_courier_loop(discord_voice_client& client, courier_shared_state_t& shared_state) { + } + + void discord_voice_client::cleanup(){ + } + + void discord_voice_client::run() { + } + + void discord_voice_client::thread_run() { + } + + bool discord_voice_client::voice_payload::operator<(const voice_payload& other) const { + return false; + } + + bool discord_voice_client::handle_frame(const std::string &data, ws_opcode opcode) { + return false; + } + + void discord_voice_client::read_ready() { + } + + void discord_voice_client::write_ready() { + } + + discord_voice_client& discord_voice_client::send_audio_raw(uint16_t* audio_data, const size_t length) { + return *this; + } + + discord_voice_client& discord_voice_client::send_audio_opus(uint8_t* opus_packet, const size_t length) { + return *this; + } + + discord_voice_client& discord_voice_client::send_audio_opus(uint8_t* opus_packet, const size_t length, uint64_t duration) { + return *this; + } + +} \ No newline at end of file