Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RCORE-2219 Add test to verify upload progress notifications during client reset #7958

Draft
wants to merge 7 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

### Internals
* Update TestAppSession to allow scope-based usage for restarting the local app resources. ([PR #7672](https://github.com/realm/realm-core/pull/7672))
* Added test to verify upload progress notification reporting during a client reset. [PR #7958](https://github.com/realm/realm-core/pull/7958)

----------------------------------------------

Expand Down
248 changes: 239 additions & 9 deletions test/object-store/sync/session/progress_notifications.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,15 @@

#include <realm/util/scope_exit.hpp>

#if REALM_ENABLE_AUTH_TESTS
#ifdef REALM_ENABLE_AUTH_TESTS
#include "util/test_file.hpp"
#include "util/sync/flx_sync_harness.hpp"
#include "util/sync/sync_test_utils.hpp"

#include <realm/object-store/impl/object_accessor_impl.hpp>
#include <realm/object-store/sync/async_open_task.hpp>
#include <realm/object-store/util/scheduler.hpp>
#include <realm/sync/noinst/client_reset.hpp>

using namespace realm::app;
#endif
Expand Down Expand Up @@ -1051,28 +1052,35 @@ TEST_CASE("progress notification", "[sync][session][progress]") {
}
}

#if REALM_ENABLE_AUTH_TESTS
#ifdef REALM_ENABLE_AUTH_TESTS

struct TestSetup {
TableRef get_table(const SharedRealm& r)
{
return r->read_group().get_table("class_" + table_name);
}

size_t add_objects(SharedRealm& r, int num)
size_t add_objects(SharedRealm& r, int num, size_t data_size = 1024 * 1024)
{
CppContext ctx(r);
for (int i = 0; i < num; ++i) {
// use specifically separate transactions for a bit of history
r->begin_transaction();
Object::create(ctx, r, StringData(table_name), std::any(make_one(i)));
Object::create(ctx, r, StringData(table_name), std::any(make_one(i, data_size)));
r->commit_transaction();
}
return get_table(r)->size();
}

AutoVerifiedEmailCredentials create_user_and_log_in()
{
return ::create_user_and_log_in(app());
}

virtual SyncTestFile make_config() = 0;
virtual AnyDict make_one(int64_t idx) = 0;
virtual AnyDict make_one(int64_t idx, size_t data_size) = 0;
virtual SharedApp app() const = 0;
virtual const AppSession& app_session() const = 0;

std::string table_name;
};
Expand All @@ -1088,17 +1096,32 @@ struct PBS : TestSetup {
return SyncTestFile(session.app()->current_user(), partition, get_default_schema());
}

AnyDict make_one(int64_t /* idx */) override
AnyDict make_one(int64_t /* idx */, size_t data_size) override
{
return AnyDict{{"_id", std::any(ObjectId::gen())},
{"breed", std::string("bulldog")},
{"name", random_string(1024 * 1024)}};
{"name", random_string(data_size)}};
}

SharedApp app() const override
{
return session.app();
}

const AppSession& app_session() const override
{
return session.app_session();
}

TestAppSession session;
const std::string partition = random_string(100);
};

static std::ostream& operator<<(std::ostream& os, const PBS&)
{
return os << "PBS";
}

struct FLX : TestSetup {
FLX(const std::string& app_id = "flx_sync_progress")
: harness(app_id)
Expand Down Expand Up @@ -1128,16 +1151,31 @@ struct FLX : TestSetup {
sub.commit();
}

AnyDict make_one(int64_t idx) override
AnyDict make_one(int64_t idx, size_t data_size) override
{
return AnyDict{{"_id", ObjectId::gen()},
{"queryable_int_field", idx},
{"queryable_str_field", random_string(1024 * 1024)}};
{"queryable_str_field", random_string(data_size)}};
}

SharedApp app() const override
{
return harness.app();
}

const AppSession& app_session() const override
{
return harness.session().app_session();
}

FLXSyncTestHarness harness;
};

static std::ostream& operator<<(std::ostream& os, const FLX&)
{
return os << "FLX";
}

struct ProgressIncreasesMatcher : Catch::Matchers::MatcherGenericBase {
enum MatchMode { ByteCountOnly, All };
ProgressIncreasesMatcher() = default;
Expand Down Expand Up @@ -1558,4 +1596,196 @@ TEST_CASE("sync progress: flx download progress", "[sync][baas][progress]") {
}
}

TEMPLATE_TEST_CASE("sync progress: upload progress during client reset", "[sync][baas][progress][client reset]", PBS,
FLX)
{
std::mutex progress_mutex;
std::vector<ProgressEntry> streaming_progress;
std::vector<ProgressEntry> non_streaming_progress;

enum TestMode { NO_CHANGES, LOCAL_CHANGES, REMOTE_CHANGES, BOTH_CHANGED, BOTH_CHANGED_W_DISCARD };
auto xlate_test_mode = [](TestMode tm) -> std::string_view {
switch (tm) {
case NO_CHANGES:
return "no local or remote changes";
case LOCAL_CHANGES:
return "local changes only";
case REMOTE_CHANGES:
return "remote changes only";
case BOTH_CHANGED:
return "both local and remote changes";
case BOTH_CHANGED_W_DISCARD:
return "both local and remote changes";
}
FAIL(util::format("Missing case for unhandled TestMode value: ", static_cast<int>(tm)));
REALM_UNREACHABLE();
};

auto logger = util::Logger::get_default_logger();
TestType setup;
auto test_mode = GENERATE(TestMode::NO_CHANGES, TestMode::LOCAL_CHANGES, TestMode::REMOTE_CHANGES,
TestMode::BOTH_CHANGED, TestMode::BOTH_CHANGED_W_DISCARD);

// Set up the main realm for the test
auto config = setup.make_config();
auto&& [reset_future, reset_handler] = reset_utils::make_client_reset_handler();
config.sync_config->notify_after_client_reset = reset_handler;
if (test_mode == TestMode::BOTH_CHANGED_W_DISCARD) {
config.sync_config->client_resync_mode = ClientResyncMode::DiscardLocal;
}
else {
config.sync_config->client_resync_mode = ClientResyncMode::Recover;
}

logger->debug("PROGRESS TEST: %1 upload progress notifications after %2 client reset with %3", setup,
config.sync_config->client_resync_mode, xlate_test_mode(test_mode));

// Functions to create the progress notification callbacks
auto make_streaming_cb = [&](std::string_view desc) {
return [&, desc](uint64_t transferred, uint64_t transferrable, double estimate) {
logger->debug("PROGRESS TEST: %1 Progress callback called xferred: %2, xferrable: %3, estimate: %4", desc,
transferred, transferrable, estimate_to_string(estimate));
std::lock_guard lk(progress_mutex);
streaming_progress.push_back(ProgressEntry{transferred, transferrable, estimate});
};
};
auto make_non_streaming_cb = [&](std::string_view desc) {
return [&, desc](uint64_t transferred, uint64_t transferrable, double estimate) {
logger->debug("PROGRESS TEST: %1 Progress callback called xferred: %2, xferrable: %3, estimate: %4", desc,
transferred, transferrable, estimate_to_string(estimate));
std::lock_guard lk(progress_mutex);
non_streaming_progress.push_back(ProgressEntry{transferred, transferrable, estimate});
};
};

auto wait_for_sync = [](SharedRealm& r) {
// If a FLX session, also wait for the subscription to complete
if (r->config().sync_config->flx_sync_requested) {
auto sub = r->get_latest_subscription_set();
REQUIRE(sub.state() != sync::SubscriptionSet::State::Error);
if (sub.state() != sync::SubscriptionSet::State::Complete) {
auto result =
sub.get_state_change_notification(sync::SubscriptionSet::State::Complete).get_no_throw();
REQUIRE(result.is_ok());
REQUIRE(result.get_value() == sync::SubscriptionSet::State::Complete);
}
}
wait_for_download(*r);
wait_for_upload(*r);
};

{
// Setup the realm and add some data (FLX subscription is added during initialization)
auto realm = Realm::get_shared_realm(config);

// For FLX sessions, don't create any more subscriptions for future realms
config.sync_config->rerun_init_subscription_on_open = false;
config.sync_config->subscription_initializer = nullptr;

// Add some data and wait for upload
setup.add_objects(realm, 10, 100);
wait_for_sync(realm); // wait for sync/subs to complete
realm->sync_session()->shutdown_and_wait(); // Close the sync session

// Set up some local changes if the test calls for it
if (test_mode == TestMode::LOCAL_CHANGES || test_mode == TestMode::BOTH_CHANGED ||
test_mode == TestMode::BOTH_CHANGED_W_DISCARD) {
logger->trace("PROGRESS TEST: adding local objects");
setup.add_objects(realm, 5, 100); // Add some local objects while offline
}

// Set up some remote changes if the test calls for it
if (test_mode == TestMode::REMOTE_CHANGES || test_mode == TestMode::BOTH_CHANGED ||
test_mode == TestMode::BOTH_CHANGED_W_DISCARD) {
logger->trace("PROGRESS TEST: adding remote objects");
// Make a new config for a different user
setup.create_user_and_log_in();
auto remote_config = setup.make_config(); // Includes the new user just created
auto remote_realm = Realm::get_shared_realm(remote_config);
setup.add_objects(remote_realm, 5, 100); // Add some objects remotely
wait_for_sync(remote_realm); // wait for sync/subs to complete
}
reset_utils::trigger_client_reset(setup.app_session(), realm);
}
auto realm = Realm::get_shared_realm(config);
// Register progress notifiers
realm->sync_session()->register_progress_notifier(make_non_streaming_cb("Non-Streaming Upload"),
NotifierType::upload, false);
realm->sync_session()->register_progress_notifier(make_streaming_cb("Streaming Upload"), NotifierType::upload,
true);

// Wait for the client reset to complete
auto status = wait_for_future(std::move(reset_future)).get_no_throw();
if (!status.is_ok()) {
FAIL(status.get_status());
}

// Progress notifications may not have been sent yet - wait for sync after client reset
wait_for_download(*realm);
wait_for_upload(*realm);

{
std::lock_guard<std::mutex> lk(progress_mutex);
logger->debug("PROGRESS TEST: retrieved progress calls: streaming - %1, non-streaming - %2",
streaming_progress.size(), non_streaming_progress.size());

auto print_progress = [&logger](const std::vector<ProgressEntry>& entries) {
if (!logger->would_log(util::Logger::Level::trace))
return; // don't print if wouldn't log
for (size_t i = 0; i < entries.size(); ++i) {
auto& entry = entries[i];
logger->trace("PROGRESS TEST: entry[%1] - transferrable: %2 - transferred: %3 - estimate: %4", i,
entry.transferrable, entry.transferred, estimate_to_string(entry.estimate));
}
};
logger->trace("PROGRESS TEST: streaming progress size: %1", streaming_progress.size());
print_progress(streaming_progress);
logger->trace("PROGRESS TEST: non-streaming progress size: %1", non_streaming_progress.size());
print_progress(non_streaming_progress);

// Validations for no changes, remote only changes, or both changes with discard local client reset
if (test_mode == TestMode::NO_CHANGES || test_mode == TestMode::REMOTE_CHANGES ||
test_mode == TestMode::BOTH_CHANGED_W_DISCARD) {
// Sometimes a second upload would be sent, resulting in a size of 2
REQUIRE(streaming_progress.size() > 0);
REQUIRE(streaming_progress[0] == ProgressEntry{0, 0, 1.0});
REQUIRE(non_streaming_progress.size() > 0);
// Needs to be changed to 1.0 after PR #7957 is merged
REQUIRE(non_streaming_progress[0] == ProgressEntry{0, 0, 0.0});
}
// Validations for local changes only or both local and remote changes
else if (test_mode == TestMode::LOCAL_CHANGES || test_mode == TestMode::BOTH_CHANGED) {
// Multiple notifications may sent for the changes to upload after client reset
if (config.sync_config->flx_sync_requested) {
// FLX sessions report upload progress as a single notification
REQUIRE(streaming_progress.size() > 0);
REQUIRE(non_streaming_progress.size() > 0);
}
else {
// PBS sessions report upload progress when changes are uploaded and when upload is acked
REQUIRE(streaming_progress.size() > 1);
REQUIRE(non_streaming_progress.size() > 1);
}
REQUIRE(streaming_progress.back().estimate == 1.0); // should end with progress of 1.0
REQUIRE(non_streaming_progress.back().estimate == 1.0); // should end with progress of 1.0
}
else {
// Unhandled TestMode case
FAIL(util::format("Unhandled TestMode case: ", static_cast<int>(test_mode)));
}
}

streaming_progress.clear();
non_streaming_progress.clear();

// Verify the streaming notifications are still received and non-streaming notifications have expired
setup.add_objects(realm, 5, 100);
wait_for_upload(*realm);

// More streaming upload notifications were received
REQUIRE(streaming_progress.size() > 0);
// Non-streaming upload notification callback was expired and no more were received
REQUIRE(non_streaming_progress.size() == 0);
}

#endif
Loading