Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

branch-3.0: [fix](restore) Add a local snapshot lock to protect snapshot dir #47279 #47292

Open
wants to merge 1 commit into
base: branch-3.0
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 32 additions & 1 deletion be/src/olap/snapshot_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,35 @@ using std::vector;
namespace doris {
using namespace ErrorCode;

LocalSnapshotLockGuard LocalSnapshotLock::acquire(const std::string& path) {
std::unique_lock<std::mutex> l(_lock);
auto& ctx = _local_snapshot_contexts[path];
while (ctx._is_locked) {
ctx._waiting_count++;
ctx._cv.wait(l);
ctx._waiting_count--;
}

ctx._is_locked = true;
return {path};
}

void LocalSnapshotLock::release(const std::string& path) {
std::lock_guard<std::mutex> l(_lock);
auto iter = _local_snapshot_contexts.find(path);
if (iter == _local_snapshot_contexts.end()) {
return;
}

auto& ctx = iter->second;
ctx._is_locked = false;
if (ctx._waiting_count > 0) {
ctx._cv.notify_one();
} else {
_local_snapshot_contexts.erase(iter);
}
}

SnapshotManager::SnapshotManager(StorageEngine& engine) : _engine(engine) {
_mem_tracker =
MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::OTHER, "SnapshotManager");
Expand Down Expand Up @@ -118,6 +147,8 @@ Status SnapshotManager::make_snapshot(const TSnapshotRequest& request, string* s
}

Status SnapshotManager::release_snapshot(const string& snapshot_path) {
auto local_snapshot_guard = LocalSnapshotLock::instance().acquire(snapshot_path);

// If the requested snapshot_path is located in the root/snapshot folder, it is considered legal and can be deleted.
// Otherwise, it is considered an illegal request and returns an error result.
SCOPED_ATTACH_TASK(_mem_tracker);
Expand Down Expand Up @@ -448,7 +479,7 @@ Status SnapshotManager::_create_snapshot_files(const TabletSharedPtr& ref_tablet
}
}
// be would definitely set it as true no matter has missed version or not
// but it would take no effets on the following range loop
// but it would take no effects on the following range loop
if (!is_single_rowset_clone && request.__isset.missing_version) {
for (int64_t missed_version : request.missing_version) {
Version version = {missed_version, missed_version};
Expand Down
51 changes: 51 additions & 0 deletions be/src/olap/snapshot_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

#pragma once

#include <condition_variable>
#include <memory>
#include <mutex>
#include <string>
#include <vector>

Expand All @@ -33,6 +35,55 @@ struct RowsetId;
class StorageEngine;
class MemTrackerLimiter;

class LocalSnapshotLockGuard;

// A simple lock to protect the local snapshot path.
class LocalSnapshotLock {
friend class LocalSnapshotLockGuard;

public:
LocalSnapshotLock() = default;
~LocalSnapshotLock() = default;
LocalSnapshotLock(const LocalSnapshotLock&) = delete;
LocalSnapshotLock& operator=(const LocalSnapshotLock&) = delete;

static LocalSnapshotLock& instance() {
static LocalSnapshotLock instance;
return instance;
}

// Acquire the lock for the specified path. It will block if the lock is already held by another.
LocalSnapshotLockGuard acquire(const std::string& path);

private:
void release(const std::string& path);

class LocalSnapshotContext {
public:
bool _is_locked = false;
size_t _waiting_count = 0;
std::condition_variable _cv;

LocalSnapshotContext() = default;
LocalSnapshotContext(const LocalSnapshotContext&) = delete;
LocalSnapshotContext& operator=(const LocalSnapshotContext&) = delete;
};

std::mutex _lock;
std::unordered_map<std::string, LocalSnapshotContext> _local_snapshot_contexts;
};

class LocalSnapshotLockGuard {
public:
LocalSnapshotLockGuard(std::string path) : _snapshot_path(std::move(path)) {}
LocalSnapshotLockGuard(const LocalSnapshotLockGuard&) = delete;
LocalSnapshotLockGuard& operator=(const LocalSnapshotLockGuard&) = delete;
~LocalSnapshotLockGuard() { LocalSnapshotLock::instance().release(_snapshot_path); }

private:
std::string _snapshot_path;
};

class SnapshotManager {
public:
SnapshotManager(StorageEngine& engine);
Expand Down
100 changes: 49 additions & 51 deletions be/src/runtime/snapshot_loader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include <gen_cpp/Types_types.h>

#include <algorithm>
#include <condition_variable>
#include <cstring>
#include <filesystem>
#include <istream>
Expand Down Expand Up @@ -146,6 +147,9 @@ Status SnapshotLoader::upload(const std::map<std::string, std::string>& src_to_d
const std::string& src_path = iter.first;
const std::string& dest_path = iter.second;

// Take a lock to protect the local snapshot path.
auto local_snapshot_guard = LocalSnapshotLock::instance().acquire(src_path);

int64_t tablet_id = 0;
int32_t schema_hash = 0;
RETURN_IF_ERROR(
Expand Down Expand Up @@ -242,6 +246,9 @@ Status SnapshotLoader::download(const std::map<std::string, std::string>& src_to
const std::string& remote_path = iter.first;
const std::string& local_path = iter.second;

// Take a lock to protect the local snapshot path.
auto local_snapshot_guard = LocalSnapshotLock::instance().acquire(local_path);

int64_t local_tablet_id = 0;
int32_t schema_hash = 0;
RETURN_IF_ERROR(_get_tablet_id_and_schema_hash_from_file_path(local_path, &local_tablet_id,
Expand Down Expand Up @@ -397,8 +404,6 @@ Status SnapshotLoader::download(const std::map<std::string, std::string>& src_to
Status SnapshotLoader::remote_http_download(
const std::vector<TRemoteTabletSnapshot>& remote_tablet_snapshots,
std::vector<int64_t>* downloaded_tablet_ids) {
LOG(INFO) << fmt::format("begin to download snapshots via http. job: {}, task id: {}", _job_id,
_task_id);
constexpr uint32_t kListRemoteFileTimeout = 15;
constexpr uint32_t kDownloadFileMaxRetry = 3;
constexpr uint32_t kGetLengthTimeout = 10;
Expand All @@ -408,35 +413,39 @@ Status SnapshotLoader::remote_http_download(
RETURN_IF_ERROR(_report_every(0, &tmp_counter, 0, 0, TTaskType::type::DOWNLOAD));
Status status = Status::OK();

// Step before, validate all remote

// Step 1: Validate local tablet snapshot paths
int report_counter = 0;
int finished_num = 0;
int total_num = remote_tablet_snapshots.size();
for (const auto& remote_tablet_snapshot : remote_tablet_snapshots) {
const auto& path = remote_tablet_snapshot.local_snapshot_path;
const auto& local_path = remote_tablet_snapshot.local_snapshot_path;
const auto& remote_path = remote_tablet_snapshot.remote_snapshot_path;
LOG(INFO) << fmt::format(
"download snapshots via http. job: {}, task id: {}, local dir: {}, remote dir: {}",
_job_id, _task_id, local_path, remote_path);

// Take a lock to protect the local snapshot path.
auto local_snapshot_guard = LocalSnapshotLock::instance().acquire(local_path);

// Step 1: Validate local tablet snapshot paths
bool res = true;
RETURN_IF_ERROR(io::global_local_filesystem()->is_directory(path, &res));
RETURN_IF_ERROR(io::global_local_filesystem()->is_directory(local_path, &res));
if (!res) {
std::stringstream ss;
auto err_msg =
fmt::format("snapshot path is not directory or does not exist: {}", path);
fmt::format("snapshot path is not directory or does not exist: {}", local_path);
LOG(WARNING) << err_msg;
return Status::RuntimeError(err_msg);
}
}

// Step 2: get all local files
struct LocalFileStat {
uint64_t size;
std::string md5;
};
std::unordered_map<std::string, std::unordered_map<std::string, LocalFileStat>> local_files_map;
for (const auto& remote_tablet_snapshot : remote_tablet_snapshots) {
const auto& local_path = remote_tablet_snapshot.local_snapshot_path;
std::vector<std::string> local_files;
RETURN_IF_ERROR(_get_existing_files_from_local(local_path, &local_files));

auto& local_filestat = local_files_map[local_path];
for (auto& local_file : local_files) {
// Step 2: get all local files
struct LocalFileStat {
uint64_t size;
std::string md5;
};
std::unordered_map<std::string, LocalFileStat> local_files;
std::vector<std::string> existing_files;
RETURN_IF_ERROR(_get_existing_files_from_local(local_path, &existing_files));
for (auto& local_file : existing_files) {
// add file size
std::string local_file_path = local_path + "/" + local_file;
std::error_code ec;
Expand All @@ -453,27 +462,20 @@ Status SnapshotLoader::remote_http_download(
<< " md5sum: " << status.to_string();
return status;
}
local_filestat[local_file] = {local_file_size, md5};
local_files[local_file] = {local_file_size, md5};
}
}

// Step 3: Validate remote tablet snapshot paths && remote files map
// key is remote snapshot paths, value is filelist
// get all these use http download action
// http://172.16.0.14:6781/api/_tablet/_download?token=e804dd27-86da-4072-af58-70724075d2a4&file=/home/ubuntu/doris_master/output/be/storage/snapshot/20230410102306.9.180//2774718/217609978/2774718.hdr
int report_counter = 0;
int total_num = remote_tablet_snapshots.size();
int finished_num = 0;
struct RemoteFileStat {
std::string url;
std::string md5;
uint64_t size;
};
std::unordered_map<std::string, std::unordered_map<std::string, RemoteFileStat>>
remote_files_map;
for (const auto& remote_tablet_snapshot : remote_tablet_snapshots) {
const auto& remote_path = remote_tablet_snapshot.remote_snapshot_path;
auto& remote_files = remote_files_map[remote_path];
existing_files.clear();

// Step 3: Validate remote tablet snapshot paths && remote files map
// key is remote snapshot paths, value is filelist
// get all these use http download action
// http://172.16.0.14:6781/api/_tablet/_download?token=e804dd27-86da-4072-af58-70724075d2a4&file=/home/ubuntu/doris_master/output/be/storage/snapshot/20230410102306.9.180//2774718/217609978/2774718.hdr
struct RemoteFileStat {
std::string url;
std::string md5;
uint64_t size;
};
std::unordered_map<std::string, RemoteFileStat> remote_files;
const auto& token = remote_tablet_snapshot.remote_token;
const auto& remote_be_addr = remote_tablet_snapshot.remote_be_addr;

Expand Down Expand Up @@ -516,19 +518,11 @@ Status SnapshotLoader::remote_http_download(

remote_files[filename] = RemoteFileStat {remote_file_url, file_md5, file_size};
}
}

// Step 4: Compare local and remote files && get all need download files
for (const auto& remote_tablet_snapshot : remote_tablet_snapshots) {
// Step 4: Compare local and remote files && get all need download files
RETURN_IF_ERROR(_report_every(10, &report_counter, finished_num, total_num,
TTaskType::type::DOWNLOAD));

const auto& remote_path = remote_tablet_snapshot.remote_snapshot_path;
const auto& local_path = remote_tablet_snapshot.local_snapshot_path;
auto& remote_files = remote_files_map[remote_path];
auto& local_files = local_files_map[local_path];
auto remote_tablet_id = remote_tablet_snapshot.remote_tablet_id;

// get all need download files
std::vector<std::string> need_download_files;
for (const auto& [remote_file, remote_filestat] : remote_files) {
Expand Down Expand Up @@ -656,6 +650,7 @@ Status SnapshotLoader::remote_http_download(
if (total_time_ms > 0) {
copy_rate = total_file_size / ((double)total_time_ms) / 1000;
}
auto remote_tablet_id = remote_tablet_snapshot.remote_tablet_id;
LOG(INFO) << fmt::format(
"succeed to copy remote tablet {} to local tablet {}, total file size: {} B, cost: "
"{} ms, rate: {} MB/s",
Expand Down Expand Up @@ -705,6 +700,9 @@ Status SnapshotLoader::remote_http_download(
// MUST hold tablet's header lock, push lock, cumulative lock and base compaction lock
Status SnapshotLoader::move(const std::string& snapshot_path, TabletSharedPtr tablet,
bool overwrite) {
// Take a lock to protect the local snapshot path.
auto local_snapshot_guard = LocalSnapshotLock::instance().acquire(snapshot_path);

auto tablet_path = tablet->tablet_path();
auto store_path = tablet->data_dir()->path();
LOG(INFO) << "begin to move snapshot files. from: " << snapshot_path << ", to: " << tablet_path
Expand Down
Loading