Skip to content

Commit

Permalink
fix: disk cache deduped get_ranges (#1218)
Browse files Browse the repository at this point in the history
## Rationale
Close #1215

## Detailed Changes
- Add MemoryStore to help test
- When persist file, first write to tmp file, then rename to dest file.

## Test Plan
CI itself
  • Loading branch information
jiacai2050 authored Sep 21, 2023
1 parent 9414180 commit 41f6313
Show file tree
Hide file tree
Showing 3 changed files with 247 additions and 26 deletions.
99 changes: 73 additions & 26 deletions components/object_store/src/disk_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use chrono::{DateTime, Utc};
use crc::{Crc, CRC_32_ISCSI};
use futures::stream::BoxStream;
use hash_ext::SeaHasherBuilder;
use log::{debug, error, info, warn};
use log::{debug, info, warn};
use lru::LruCache;
use notifier::notifier::{ExecutionGuard, RequestNotifiers};
use partitioned_lock::PartitionedMutex;
Expand Down Expand Up @@ -167,36 +167,83 @@ impl Manifest {
}
}

/// The encoder of the page file in the disk cache.
/// The writer of the page file in the disk cache.
///
/// Following the payload, a footer [`PageFileEncoder::MAGIC_FOOTER`] is
/// appended.
struct PageFileEncoder {
payload: Bytes,
struct PageFileWriter {
output: String,
tmp_file: String,
need_clean_tmpfile: bool,
}

impl PageFileEncoder {
impl Drop for PageFileWriter {
fn drop(&mut self) {
if self.need_clean_tmpfile {
if let Err(e) = std::fs::remove_file(&self.tmp_file) {
warn!(
"Disk cache remove page tmp file failed, file:{}, err:{e}",
&self.tmp_file
);
}
}
}
}

impl PageFileWriter {
const MAGIC_FOOTER: [u8; 8] = [0, 0, 0, 0, b'c', b'e', b'r', b'e'];

async fn encode_and_persist<W>(&self, writer: &mut W, name: &str) -> Result<()>
where
W: AsyncWrite + std::marker::Unpin,
{
fn new(output: String) -> Self {
let tmp_file = Self::tmp_file(&output);

Self {
output,
tmp_file,
need_clean_tmpfile: true,
}
}

fn tmp_file(input: &str) -> String {
format!("{}.tmp", input)
}

async fn write_inner(&self, bytes: Bytes) -> Result<()> {
let tmp_file = &self.tmp_file;
let mut writer = File::create(tmp_file)
.await
.context(Io { file: tmp_file })?;
writer
.write_all(&self.payload[..])
.write_all(&bytes)
.await
.context(Io { file: name })?;
.context(Io { file: tmp_file })?;

writer
.write_all(&Self::MAGIC_FOOTER)
.await
.context(Io { file: name })?;
.context(Io { file: tmp_file })?;

writer.flush().await.context(Io { file: tmp_file })?;

writer.flush().await.context(Io { file: name })?;
tokio::fs::rename(tmp_file, &self.output)
.await
.context(Io { file: &self.output })?;

Ok(())
}

// When write bytes to file, the cache lock is released, so when one thread is
// reading, another thread may update it, so we write to tmp file first,
// then rename to expected filename to avoid other threads see partial
// content.
async fn write_and_flush(mut self, bytes: Bytes) -> Result<()> {
let write_result = self.write_inner(bytes).await;
if write_result.is_ok() {
self.need_clean_tmpfile = false;
}

write_result
}

#[inline]
fn encoded_size(payload_len: usize) -> usize {
payload_len + Self::MAGIC_FOOTER.len()
Expand Down Expand Up @@ -262,7 +309,7 @@ impl DiskCache {

async fn insert_data(&self, filename: String, value: Bytes) {
let page_meta = {
let file_size = PageFileEncoder::encoded_size(value.len());
let file_size = PageFileWriter::encoded_size(value.len());
PageMeta { file_size }
};
let evicted_file = self.insert_page_meta(filename.clone(), page_meta);
Expand Down Expand Up @@ -357,18 +404,16 @@ impl DiskCache {
}

async fn persist_bytes(&self, filename: &str, payload: Bytes) -> Result<()> {
let file_path = std::path::Path::new(&self.root_dir)
let dest_filepath = std::path::Path::new(&self.root_dir)
.join(filename)
.into_os_string()
.into_string()
.unwrap();

let mut file = File::create(&file_path)
.await
.context(Io { file: &file_path })?;
let writer = PageFileWriter::new(dest_filepath);
writer.write_and_flush(payload).await?;

let encoding = PageFileEncoder { payload };
encoding.encode_and_persist(&mut file, filename).await
Ok(())
}

/// Read the bytes from the cached file.
Expand All @@ -381,7 +426,7 @@ impl DiskCache {
range: &Range<usize>,
expect_file_size: usize,
) -> std::io::Result<ReadBytesResult> {
if PageFileEncoder::encoded_size(range.len()) > expect_file_size {
if PageFileWriter::encoded_size(range.len()) > expect_file_size {
return Ok(ReadBytesResult::OutOfRange);
}

Expand Down Expand Up @@ -681,7 +726,7 @@ impl DiskCacheStore {
}
.fail(),
) {
error!("Failed to send notifier error result, err:{e:?}.");
warn!("Failed to send notifier error result, err:{e:?}.");
}
}
}
Expand All @@ -698,8 +743,10 @@ impl DiskCacheStore {
{
self.cache.insert_data(cache_key, bytes.clone()).await;
for notifier in notifiers {
if let Err(e) = notifier.send(Ok(bytes.clone())) {
error!("Failed to send notifier success result, err:{e:?}.");
if notifier.send(Ok(bytes.clone())).is_err() {
// The error contains sent bytes, which maybe very large,
// so we don't log error.
warn!("Failed to send notifier success result");
}
}
}
Expand Down Expand Up @@ -940,6 +987,7 @@ mod test {
use upstream::local::LocalFileSystem;

use super::*;
use crate::test_util::MemoryStore;

struct StoreWithCacheDir {
inner: DiskCacheStore,
Expand All @@ -951,8 +999,7 @@ mod test {
cap: usize,
partition_bits: usize,
) -> StoreWithCacheDir {
let local_path = tempdir().unwrap();
let local_store = Arc::new(LocalFileSystem::new_with_prefix(local_path.path()).unwrap());
let local_store = Arc::new(MemoryStore::default());

let cache_dir = tempdir().unwrap();
let store = DiskCacheStore::try_new(
Expand Down
2 changes: 2 additions & 0 deletions components/object_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,7 @@ pub mod multipart;
pub mod obkv;
pub mod prefix;
pub mod s3;
#[cfg(test)]
pub mod test_util;

pub type ObjectStoreRef = Arc<dyn ObjectStore>;
172 changes: 172 additions & 0 deletions components/object_store/src/test_util.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
// Copyright 2023 The CeresDB Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::{collections::HashMap, fmt::Display, ops::Range, sync::RwLock};

use async_trait::async_trait;
use bytes::Bytes;
use futures::stream::{self, BoxStream};
use tokio::io::AsyncWrite;
use upstream::{path::Path, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, Result};

#[derive(Debug)]
struct StoreError {
path: Path,
msg: String,
}

impl Display for StoreError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("StoreError")
.field("path", &self.path)
.field("msg", &self.msg)
.finish()
}
}

impl std::error::Error for StoreError {}

/// A memory based object store implementation, mainly used for testing.
#[derive(Debug, Default)]
pub struct MemoryStore {
files: RwLock<HashMap<Path, Bytes>>,
get_range_counts: RwLock<HashMap<Path, usize>>,
}

impl Display for MemoryStore {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("MemoryStore")
.field("counts", &self.get_counts())
.finish()
}
}

impl MemoryStore {
pub fn get_counts(&self) -> HashMap<Path, usize> {
let counts = self.get_range_counts.read().unwrap();
counts.clone().into_iter().collect()
}
}

#[async_trait]
impl ObjectStore for MemoryStore {
async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> {
let mut files = self.files.write().unwrap();
files.insert(location.clone(), bytes);
Ok(())
}

async fn get(&self, location: &Path) -> Result<GetResult> {
let files = self.files.read().unwrap();
if let Some(bs) = files.get(location) {
let bs = bs.clone();
Ok(GetResult::Stream(Box::pin(stream::once(
async move { Ok(bs) },
))))
} else {
let source = Box::new(StoreError {
msg: "not found".to_string(),
path: location.clone(),
});
Err(upstream::Error::Generic {
store: "get",
source,
})
}
}

async fn get_range(&self, location: &Path, range: Range<usize>) -> Result<Bytes> {
{
let mut counts = self.get_range_counts.write().unwrap();
counts
.entry(location.clone())
.and_modify(|c| *c += 1)
.or_insert(1);
}

let files = self.files.read().unwrap();
if let Some(bs) = files.get(location) {
Ok(bs.slice(range))
} else {
let source = Box::new(StoreError {
msg: "not found".to_string(),
path: location.clone(),
});
Err(upstream::Error::Generic {
store: "get_range",
source,
})
}
}

async fn head(&self, location: &Path) -> Result<ObjectMeta> {
let files = self.files.read().unwrap();

if let Some(bs) = files.get(location) {
Ok(ObjectMeta {
location: location.clone(),
size: bs.len(),
last_modified: Default::default(),
})
} else {
let source = Box::new(StoreError {
msg: "not found".to_string(),
path: location.clone(),
});
Err(upstream::Error::Generic {
store: "head",
source,
})
}
}

async fn put_multipart(
&self,
_location: &Path,
) -> Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
unimplemented!()
}

async fn abort_multipart(&self, _location: &Path, _multipart_id: &MultipartId) -> Result<()> {
unimplemented!()
}

async fn delete(&self, _location: &Path) -> Result<()> {
unimplemented!()
}

async fn list(&self, _prefix: Option<&Path>) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
unimplemented!()
}

async fn list_with_delimiter(&self, _prefix: Option<&Path>) -> Result<ListResult> {
unimplemented!()
}

async fn copy(&self, _from: &Path, _to: &Path) -> Result<()> {
unimplemented!()
}

async fn rename(&self, _from: &Path, _to: &Path) -> Result<()> {
unimplemented!()
}

async fn copy_if_not_exists(&self, _from: &Path, _to: &Path) -> Result<()> {
unimplemented!()
}

async fn rename_if_not_exists(&self, _from: &Path, _to: &Path) -> Result<()> {
unimplemented!()
}
}

0 comments on commit 41f6313

Please sign in to comment.