Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions src/main/infinity_context_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -538,8 +538,7 @@ void InfinityContext::StopThriftServers() {
// stop_servers_func_ {};
}
// close all thrift sessions
const auto removed_session_count = InfinityThriftService::ClearSessionMap();
LOG_INFO(fmt::format("Removed {} thrift sessions", removed_session_count));
LOG_INFO(fmt::format("Removed thrift sessions. cnt: {}", InfinityThriftService::ClearSessionMap()));
}

void InfinityContext::SetConfig(std::unique_ptr<Config> &&config) { config_ = std::move(config); }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ BMPIndexFileWorker::BMPIndexFileWorker(std::shared_ptr<std::string> file_path,
size_t index_size)
: IndexFileWorker(std::move(file_path), std::move(index_base), std::move(column_def)) {
if (index_size == 0) {
std::string index_path = GetFilePath();
std::string index_path = GetPath();
auto [file_handle, status] = VirtualStore::Open(index_path, FileAccessMode::kReadWrite);
if (status.ok()) {
// When replay by checkpoint, the data is deleted, but catalog is recovered. Do not read file in recovery.
Expand Down
5 changes: 3 additions & 2 deletions src/storage/buffer/file_worker/data_file_worker_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
module;

#include <sys/mman.h>
#include <unistd.h>

module infinity_core:data_file_worker.impl;

Expand Down Expand Up @@ -59,7 +60,7 @@ bool DataFileWorker::Write(std::span<char> data, std::unique_ptr<LocalFileHandle
}

auto fd = file_handle->fd();
VirtualStore::Truncate(GetFilePathTemp(), mmap_size_);
VirtualStore::Truncate(GetWorkingPath(), mmap_size_);
if (mmap_ == nullptr) {
mmap_ = mmap(nullptr, mmap_size_, PROT_WRITE | PROT_READ, MAP_SHARED, fd, 0 /*align_offset*/);
size_t offset{};
Expand Down Expand Up @@ -146,7 +147,7 @@ bool DataFileWorker::WriteSnapshot(std::span<char> data,

void DataFileWorker::Read(std::shared_ptr<char[]> &data, std::unique_ptr<LocalFileHandle> &file_handle, size_t file_size) {
// data = std::make_shared_for_overwrite<char[]>(buffer_size_);
std::unique_lock l(mutex_);
// std::unique_lock l(mutex_);
data = std::make_shared<char[]>(buffer_size_);
if (!file_handle) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ bool EMVBIndexFileWorker::Write(std::span<EMVBIndex> data,
}

void EMVBIndexFileWorker::Read(std::shared_ptr<EMVBIndex> &data, std::unique_ptr<LocalFileHandle> &file_handle, size_t file_size) {
std::unique_lock l(mutex_);
// std::unique_lock l(mutex_);
const auto column_embedding_dim = GetEmbeddingInfo()->Dimension();
const auto *index_emvb = static_cast<IndexEMVB *>(index_base_.get());
const auto residual_pq_subspace_num = index_emvb->residual_pq_subspace_num_;
Expand Down
129 changes: 40 additions & 89 deletions src/storage/buffer/file_worker/file_worker.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

module;

#include <fcntl.h>
#include <unistd.h>

export module infinity_core:file_worker;
Expand All @@ -27,6 +28,7 @@ import :virtual_store;
// import :secondary_index_data;
import :block_version;
import :emvb_index;
import :virtual_store;
import :boost;

import std.compat;
Expand Down Expand Up @@ -90,8 +92,8 @@ public:
bool Write(auto data, const FileWorkerSaveCtx &ctx = {}) {
// boost::unique_lock l(boost_rw_mutex_);

[[maybe_unused]] auto tmp = GetFilePathTemp();
auto [file_handle, status] = VirtualStore::Open(GetFilePathTemp(), FileAccessMode::kReadWrite);
[[maybe_unused]] auto tmp = GetWorkingPath();
auto [file_handle, status] = VirtualStore::Open(GetWorkingPath(), FileAccessMode::kReadWrite);
if (!status.ok()) {
// fuck
// UnrecoverableError(status.message());
Expand All @@ -105,67 +107,13 @@ public:
}

void Read(auto &data) {
// boost::upgrade_lock l(boost_rw_mutex_);
// if (mmap_) {
// size_t file_size = 0;
//
// auto temp_path = GetFilePathTemp();
// auto data_path = GetFilePath();
// std::string file_path;
// if (VirtualStore::Exists(temp_path)) { // branchless
// file_path = temp_path;
// auto [file_handle, status] = VirtualStore::Open(file_path, FileAccessMode::kReadWrite);
// if (!status.ok()) {
// std::unique_ptr<LocalFileHandle> file_handle;
// Read(data, file_handle, file_size);
// // UnrecoverableError("??????"); // AddSegmentVersion->GetData->Read
// return;
// }
// file_size = file_handle->FileSize();
// Read(data, file_handle, file_size);
// close(file_handle->fd());
// } else if (persistence_manager_) {
// file_path = data_path;
// auto result = persistence_manager_->GetObjCache(file_path);
// obj_addr_ = result.obj_addr_;
// auto true_file_path = fmt::format("{}/{}", persistence_manager_->workspace(), obj_addr_.obj_key_);
// auto [file_handle, status] = VirtualStore::Open(true_file_path, FileAccessMode::kReadWrite);
// if (!status.ok()) {
// std::unique_ptr<LocalFileHandle> file_handle;
// Read(data, file_handle, file_size);
// // UnrecoverableError("??????"); // AddSegmentVersion->GetData->Read
// return;
// }
// file_handle->Seek(obj_addr_.part_offset_);
// file_size = obj_addr_.part_size_;
// Read(data, file_handle, file_size);
// close(file_handle->fd());
// } else if (VirtualStore::Exists(data_path, true)) {
// file_path = data_path;
// auto [file_handle, status] = VirtualStore::Open(file_path, FileAccessMode::kReadWrite);
// if (!status.ok()) {
// std::unique_ptr<LocalFileHandle> file_handle;
// Read(data, file_handle, file_size);
// // UnrecoverableError("??????"); // AddSegmentVersion->GetData->Read
// return;
// }
// file_size = file_handle->FileSize();
// Read(data, file_handle, file_size);
// close(file_handle->fd());
// } else {
// std::unique_ptr<LocalFileHandle> file_handle;
// Read(data, file_handle, file_size);
// }
// } else {
// boost::upgrade_to_unique_lock ll(l);
size_t file_size = 0;

auto temp_path = GetFilePathTemp();
auto data_path = GetFilePath();
std::string file_path;
if (VirtualStore::Exists(temp_path)) { // branchless
file_path = temp_path;
auto [file_handle, status] = VirtualStore::Open(file_path, FileAccessMode::kReadWrite);
std::unique_lock l(mutex_);
size_t file_size{};

auto working_path = GetWorkingPath();
auto data_path = GetPath();
if (VirtualStore::Exists(working_path)) {
auto [file_handle, status] = VirtualStore::Open(working_path, FileAccessMode::kReadWrite);
if (!status.ok()) {
std::unique_ptr<LocalFileHandle> file_handle;
Read(data, file_handle, file_size);
Expand All @@ -175,51 +123,54 @@ public:
file_size = file_handle->FileSize();
Read(data, file_handle, file_size);
close(file_handle->fd());
} else if (persistence_manager_) {
file_path = data_path;
auto result = persistence_manager_->GetObjCache(file_path);
obj_addr_ = result.obj_addr_;
auto true_file_path = fmt::format("{}/{}", persistence_manager_->workspace(), obj_addr_.obj_key_);
auto [file_handle, status] = VirtualStore::Open(true_file_path, FileAccessMode::kReadWrite);
if (!status.ok()) {
std::unique_ptr<LocalFileHandle> file_handle;
Read(data, file_handle, file_size);
// UnrecoverableError("??????"); // AddSegmentVersion->GetData->Read
return;
}
if (persistence_manager_) {
auto result = persistence_manager_->GetObjCache(data_path);
auto obj_addr = result.obj_addr_;
auto true_file_path = fmt::format("{}/{}", persistence_manager_->workspace(), obj_addr.obj_key_);
if (obj_addr.Valid()) {
VirtualStore::CopyRange(true_file_path, working_path, obj_addr.part_offset_, 0, obj_addr.part_size_);
obj_addr.obj_key_.clear();
auto [file_handle, status] = VirtualStore::Open(working_path, FileAccessMode::kReadWrite);
if (!status.ok()) {
std::unique_ptr<LocalFileHandle> file_handle;
Read(data, file_handle, file_size);
// UnrecoverableError("??????"); // AddSegmentVersion->GetData->Read
return;
}
Read(data, file_handle, obj_addr.part_size_);
close(file_handle->fd());
return;
}
file_handle->Seek(obj_addr_.part_offset_);
file_size = obj_addr_.part_size_;
Read(data, file_handle, file_size);
close(file_handle->fd());
} else if (VirtualStore::Exists(data_path, true)) {
file_path = data_path;
auto [file_handle, status] = VirtualStore::Open(file_path, FileAccessMode::kReadWrite);
}
if (VirtualStore::Exists(data_path, true)) {
auto [file_handle, status] = VirtualStore::Open(data_path, FileAccessMode::kReadWrite);
if (!status.ok()) {
std::unique_ptr<LocalFileHandle> file_handle;
Read(data, file_handle, file_size);
// UnrecoverableError("??????"); // AddSegmentVersion->GetData->Read
return;
}
file_size = file_handle->FileSize();

VirtualStore::CopyRange(data_path, working_path, 0, 0, file_size);
Read(data, file_handle, file_size);
close(file_handle->fd());
} else {
std::unique_ptr<LocalFileHandle> file_handle;
Read(data, file_handle, file_size);
return;
}
std::unique_ptr<LocalFileHandle> file_handle;
Read(data, file_handle, file_size);
}
// }

// void PickForCleanup();

void MoveFile();

virtual FileWorkerType Type() const = 0;

// Get an absolute file path. As key of a buffer handle.
[[nodiscard]] std::string GetFilePath() const;
[[nodiscard]] std::string GetPath() const;

[[nodiscard]] std::string GetFilePathTemp() const;
[[nodiscard]] std::string GetWorkingPath() const;

Status CleanupFile() const;

Expand Down Expand Up @@ -295,7 +246,7 @@ public:
std::shared_ptr<std::string> rel_file_path_;
PersistenceManager *persistence_manager_{};
FileWorkerManager *file_worker_manager_{};
ObjAddr obj_addr_;
// ObjAddr obj_addr_;
void *mmap_{};
// std::atomic_uintptr_t mmap_;
size_t mmap_size_{};
Expand Down
43 changes: 22 additions & 21 deletions src/storage/buffer/file_worker/file_worker_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,20 +109,20 @@ FileWorker::FileWorker(std::shared_ptr<std::string> rel_file_path) : rel_file_pa

void FileWorker::MoveFile() {
// boost::unique_lock l(boost_rw_mutex_);
boost::unique_lock l(mutex_);
std::unique_lock l(mutex_);
msync(mmap_, mmap_size_, MS_SYNC);
auto temp_path = GetFilePathTemp();
auto data_path = GetFilePath();
auto working_path = GetWorkingPath();
auto data_path = GetPath();

if (persistence_manager_) {
PersistResultHandler handler(persistence_manager_);
if (!VirtualStore::Exists(temp_path)) {
if (!VirtualStore::Exists(working_path)) {
return;
}
auto persist_result = persistence_manager_->Persist(data_path, temp_path);
auto persist_result = persistence_manager_->Persist(data_path, working_path);
handler.HandleWriteResult(persist_result);

obj_addr_ = persist_result.obj_addr_;
// obj_addr_ = persist_result.obj_addr_;

if (Type() == FileWorkerType::kRawFile) {
auto temp_dict_path = fmt::format("{}/{}.dic",
Expand All @@ -143,22 +143,22 @@ void FileWorker::MoveFile() {
auto persist_result3 = persistence_manager_->Persist(data_posting_path, temp_posting_path);
}
} else {
if (!VirtualStore::Exists(temp_path)) {
if (!VirtualStore::Exists(working_path)) {
return;
}
auto data_path_parent = VirtualStore::GetParentPath(data_path);
if (!VirtualStore::Exists(data_path_parent)) {
VirtualStore::MakeDirectory(data_path_parent);
}

VirtualStore::Copy(data_path, temp_path);
VirtualStore::Copy(working_path, data_path);
if (Type() == FileWorkerType::kRawFile) {
auto temp_dict_path = fmt::format("{}/{}.dic",
InfinityContext::instance().config()->TempDir(),
rel_file_path_->substr(0, rel_file_path_->find_first_of('.')));
auto temp_posting_path = fmt::format("{}/{}.pos",
auto working_dict_path = fmt::format("{}/{}.dic",
InfinityContext::instance().config()->TempDir(),
rel_file_path_->substr(0, rel_file_path_->find_first_of('.')));
auto working_posting_path = fmt::format("{}/{}.pos",
InfinityContext::instance().config()->TempDir(),
rel_file_path_->substr(0, rel_file_path_->find_first_of('.')));

auto data_dict_path = fmt::format("{}/{}.dic",
InfinityContext::instance().config()->DataDir(),
Expand All @@ -167,19 +167,20 @@ void FileWorker::MoveFile() {
InfinityContext::instance().config()->DataDir(),
rel_file_path_->substr(0, rel_file_path_->find_first_of('.')));

VirtualStore::Copy(data_dict_path, temp_dict_path);
VirtualStore::Copy(data_posting_path, temp_posting_path);
VirtualStore::Copy(working_dict_path, data_dict_path);
VirtualStore::Copy(working_posting_path, data_posting_path);
}
}
}

// Get absolute file path. As key of buffer handle.
std::string FileWorker::GetFilePath() const { return fmt::format("{}/{}", InfinityContext::instance().config()->DataDir(), *rel_file_path_); }
std::string FileWorker::GetPath() const { return fmt::format("{}/{}", InfinityContext::instance().config()->DataDir(), *rel_file_path_); }

std::string FileWorker::GetFilePathTemp() const { return fmt::format("{}/{}", InfinityContext::instance().config()->TempDir(), *rel_file_path_); }
std::string FileWorker::GetWorkingPath() const { return fmt::format("{}/{}", InfinityContext::instance().config()->TempDir(), *rel_file_path_); }

Status FileWorker::CleanupFile() const {
auto status = VirtualStore::DeleteFile(GetFilePathTemp());
std::unique_lock l(mutex_);
auto status = VirtualStore::DeleteFile(GetWorkingPath());
if (Type() == FileWorkerType::kRawFile) {
auto temp_dict_path =
fmt::format("{}/{}.dic", InfinityContext::instance().config()->TempDir(), rel_file_path_->substr(0, rel_file_path_->find_first_of('.')));
Expand All @@ -196,7 +197,7 @@ Status FileWorker::CleanupFile() const {
if (persistence_manager_) {
PersistResultHandler handler{persistence_manager_};
{
auto result_data = persistence_manager_->Cleanup(GetFilePath());
auto result_data = persistence_manager_->Cleanup(GetPath());
// if (!result_data.obj_addr_.Valid()) {
// return Status::OK();
// }
Expand All @@ -217,21 +218,21 @@ Status FileWorker::CleanupFile() const {
handler.HandleWriteResult(result_data);
}
} else {
status = VirtualStore::DeleteFile(GetFilePath());
status = VirtualStore::DeleteFile(GetPath());
status = VirtualStore::DeleteFile(data_dict_path);
status = VirtualStore::DeleteFile(data_posting_path);
}
} else {
if (persistence_manager_) {
PersistResultHandler handler{persistence_manager_};
auto result_data = persistence_manager_->Cleanup(GetFilePath());
auto result_data = persistence_manager_->Cleanup(GetPath());
// if (!result_data.obj_addr_.Valid()) {
// return Status::OK();
// }
handler.HandleWriteResult(result_data);

} else {
status = VirtualStore::DeleteFile(GetFilePath());
status = VirtualStore::DeleteFile(GetPath());
}
}
// if (Type() == FileWorkerType::kHNSWIndexFile) {
Expand Down
4 changes: 2 additions & 2 deletions src/storage/buffer/file_worker/hnsw_file_worker_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ HnswFileWorker::HnswFileWorker(std::shared_ptr<std::string> file_path,
: IndexFileWorker(std::move(file_path), std::move(index_base), std::move(column_def)) {
if (index_size == 0) {

std::string index_path = GetFilePath();
std::string index_path = GetPath();
auto [file_handle, status] = VirtualStore::Open(index_path, FileAccessMode::kReadWrite);
if (status.ok()) {
// When replay by checkpoint, the data is deleted, but catalog is recovered. Do not read file in recovery.
Expand Down Expand Up @@ -87,7 +87,7 @@ bool HnswFileWorker::Write(std::shared_ptr<HnswHandler> &data,
}

void HnswFileWorker::Read(std::shared_ptr<HnswHandler> &data, std::unique_ptr<LocalFileHandle> &file_handle, size_t file_size) {
std::unique_lock l(mutex_);
// std::unique_lock l(mutex_);
if (!file_handle) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ bool IVFIndexFileWorker::Write(std::span<IVFIndexInChunk> data,
}

void IVFIndexFileWorker::Read(IVFIndexInChunk *&data, std::unique_ptr<LocalFileHandle> &file_handle, size_t file_size) {
std::unique_lock l(mutex_);
// std::unique_lock l(mutex_);
auto *index = IVFIndexInChunk::GetNewIVFIndexInChunk(index_base_.get(), column_def_.get());
// data = std::shared_ptr<IVFIndexInChunk>(index);
data = index;
Expand Down
2 changes: 1 addition & 1 deletion src/storage/buffer/file_worker/raw_file_worker_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ bool RawFileWorker::Write(std::span<char> data, std::unique_ptr<LocalFileHandle>
}

void RawFileWorker::Read(std::shared_ptr<char[]> &data, std::unique_ptr<LocalFileHandle> &file_handle, size_t file_size) {
std::unique_lock l(mutex_);
// std::unique_lock l(mutex_);
buffer_size_ = file_handle ? file_handle->FileSize() : 0;
data = std::make_shared<char[]>(buffer_size_);
if (!file_handle) {
Expand Down
Loading