-
Notifications
You must be signed in to change notification settings - Fork 89
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
read entries using AIO #286
base: master
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Check the contributing guide here. Needs to sign the commit and run make format & make clippy
.
src/engine.rs
Outdated
new_block_flags.push(false); | ||
} | ||
} | ||
let mut a_list: Vec<aiocb> = Vec::with_capacity(block_sum); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hide aiocb inside AioContext
(it holds Vec<aiocb>
and create new ones when a new read request is issued via fs::read_async
). No one other than the DefaultFileSystem
should have access to aiocb.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean to use one AioContext to manage all read requests? Then I need to redesign the AioContext, like this
pub struct AioContext{
fd_vec: Vec<Arc<LogFd>>,
aio_vec: Vec<aiocb>,
buf_vec: Vec<Vec<u8>>,
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if there's a performance difference between waiting on each aiocb separately, or waiting all of them in one single syscall (maybe you can benchmark it too).
If there's no difference, then it makes sense to create new AioContext for each request. But either way, all the details must be put inside the file system implementation. E.g.
impl AioContext {
pub fn wait() -> Result<()>;
// UB if `wait()` is not called and returns `Ok(())`.
pub fn data() -> &[u8];
}
76f4403
to
193b92e
Compare
src/env/default.rs
Outdated
@@ -97,6 +102,21 @@ impl LogFd { | |||
Ok(readed) | |||
} | |||
|
|||
pub fn read_aio(&self, seq: usize, ctx: &mut AioContext, offset: u64) { | |||
let mut buf = ctx.buf_vec.last().unwrap().lock().unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The lock is meaningless. The mutable reference (&mut AioContext
) ensures there's only one thread accessing the ctx.
Of course, due to the unsafe block, the buf_vec
is in fact "leaked" to aio code. That's why we must manually guarantee there's no one reading or writing to buf_vec
after calling aio_read
.
src/env/default.rs
Outdated
|
||
impl AsyncContext for AioContext { | ||
fn single_wait(&mut self, seq: usize) -> IoResult<usize> { | ||
let buf_len = self.buf_vec[seq].lock().unwrap().len(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto, the &mut self
has the same effect.
src/env/default.rs
Outdated
&self, | ||
handle: Arc<Self::Handle>, | ||
seq: usize, | ||
ctx: &mut AioContext, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AioContext
should be a generic type, different file system implementation can have different context.
The code will look like:
trait WaitData {
fn wait(&mut self, index: usize) -> &[u8];
}
trait FileSystem {
type AsyncContext: WaitData;
fn new_async_context(&self) -> Self::AsyncContext;
}
impl FileSystem for DefaultFileSystem {
type AsyncContext = AioContext;
fn new_async_context(&self) {
AioContext::new()
}
}
src/env/mod.rs
Outdated
&self, | ||
handle: Arc<Self::Handle>, | ||
seq: usize, | ||
ctx: &mut AioContext, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are still leaking details of ctx. Here you should emulate the use of Writer
:
type AsyncContext: AioContext;
fn read_async(&self, handle, ctx: &mut Self::AsyncContext);
fn new_async_context(&self) -> Self::AsyncContext;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Please run the checks locally before submitting a commit, as a contributor the basic level of respect is to pass CI before requesting reviews.
- A good PR description should contain three parts: the problem, the solution, the verification method (test plan).
@@ -257,12 +275,100 @@ impl WriteExt for LogFile { | |||
} | |||
} | |||
|
|||
pub struct AioContext { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't use this name outside this file. Just like type Handle = <DefaultFileSystem as FileSystem>::Handle;
, you can use the same syntax to reference aio context of base file system without needing to expose this struct.
src/pipe_log.rs
Outdated
) -> Result<()>; | ||
|
||
/// Reads bytes from multi blocks using 'Async IO'. | ||
fn async_read_bytes( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's no need to create a function that isn't used anywhere. Now, you need to make fetch_entries_to_aio
call this function and remove async_entry_read
, because (1) concept of entry should not be exposed to pipe log (2) we need the raw bytes to populate block cache (this part can be implemented later in a different PR maybe)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tabokie Now my implementation idea is to read all bytes through async_entry_read
, and then parse them in turn. like
pub fn fetch_entries_to_aio(){
...
let bytes = async_entry_read();
for (idx,i) in ents_idx.iter().enumerate(){
entry = parse_from_bytes(byte[idx]);
vec.push(entry);
}
...
}
Does it ok?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that's what I meant. But the name should not be async_entry_read
, the underlying pipe is not aware of the "entry" concept.
src/env/default.rs
Outdated
} | ||
} | ||
|
||
impl AsyncContext for AioContext { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current implementation wouldn't work with custom file system such as ObfuscatedFileSystem
. If you add a test that reads async with obfuscated fs, the result would be wrong.
Also, another minor detail is, it's not intuitive to have async context be used both passively and actively. i.e. fs::new_reader(ctx, handle) should not coexist with ctx::wait().
Let's do this instead:
pub trait FileSystem {
pub type AsyncContext;
fn async_read(&self, ctx: &mut Self::AsyncContext, handle: Arc<Self::Handle>, block: FileBlockHandle) -> Result<()>;
fn async_finish(&self, ctx: Self::AsyncContext) -> Result<Vec<Vec<u8>>;
}
// for obfuscated.rs
pub struct ObfuscatedContext(<DefaultFileSystem as FileSystem>::AsyncIoContext);
impl FileSystem for ObfuscatedFileSystem {
fn async_finish(&self, ctx: Self::AsyncContext) -> Result<Vec<Vec<u8>>> {
let base = self.0.async_finish(ctx.0)?;
for v in &mut base {
// do obfuscation
for c in v {
c.wrapping_sub(1);
}
}
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tabokie Whether it is necessary to consider such situations? That is in an fetch_entries_to_aio
call, some handles
belong to the Append queue and others belong to the Rewrite queue. They correspond to different files_ system
, then you need to call 2 times async_finish()
.In more complex cases, they are interspersed, like Append, Rewrite, Append, Rewrite, Rewrite...,then the design of async_finish()
will doesn't work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Like this:
impl DualPipes {
fn read_async(&self, handls: Vec<FileBlockHandle>) ->Vec<Vec<u8>> {
let mut ctx = fs.new_context();
for handle in handles {
fs.read_async(&mut ctx, handle);
}
fs.async_finish(ctx);
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tabokie How can I determine which fs
to use? Use self.pipes[LogQueue::Append].file_system
, or self.pipes[LogQueue::Rewrite].file_system
, or both?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Either one is fine. They are always the same.
src/env/default.rs
Outdated
fn single_wait(&mut self, seq: usize) -> IoResult<usize> { | ||
let buf_len = self.buf_vec[seq].len(); | ||
unsafe { | ||
loop { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if the read hits EOF? It will loop forever? This needs testing as well.
Signed-off-by: root <[email protected]> Signed-off-by: root <[email protected]>
Signed-off-by: root <[email protected]> Signed-off-by: root <[email protected]>
Signed-off-by: root <[email protected]> Signed-off-by: root <[email protected]>
Signed-off-by: root <[email protected]> Signed-off-by: root <[email protected]>
Signed-off-by: root <[email protected]> Signed-off-by: root <[email protected]>
Signed-off-by: root <[email protected]>
Signed-off-by: root <[email protected]>
I have completed the code modification according to your comments and run the unit test locally, all of them have passed. PTAL. @tabokie |
@ustc-wxy format and clippy failed. |
Signed-off-by: root <[email protected]>
Signed-off-by: root <[email protected]>
Codecov ReportPatch coverage:
Additional details and impacted files@@ Coverage Diff @@
## master #286 +/- ##
==========================================
- Coverage 97.74% 97.40% -0.34%
==========================================
Files 30 30
Lines 11287 11668 +381
==========================================
+ Hits 11032 11365 +333
- Misses 255 303 +48
... and 1 file with indirect coverage changes Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here. ☔ View full report in Codecov by Sentry. |
src/pipe_log.rs
Outdated
@@ -172,6 +173,9 @@ pub trait PipeLog: Sized { | |||
/// Reads some bytes from the specified position. | |||
fn read_bytes(&self, handle: FileBlockHandle) -> Result<Vec<u8>>; | |||
|
|||
/// Reads bytes from multi blocks using 'Async IO'. | |||
fn async_read_bytes(&self, ents_idx: &mut Vec<EntryIndex>) -> Result<Vec<Vec<u8>>>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As mentioned before, pipe is not aware of the "entry" concept. Here should use Vec<FileBlockHandle>
.
src/file_pipe_log/pipe.rs
Outdated
self.pipes[LogQueue::Append as usize].async_read(block, &mut ctx); | ||
} | ||
LogQueue::Rewrite => { | ||
self.pipes[LogQueue::Rewrite as usize].async_read(block, &mut ctx); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change the function to async_read(ctx, buf)
. It's the common order to pass in a context.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tabokie Did you mean async_read(ctx, blocks)
? Like
impl<F: FileSystem> PipeLog for DualPipes<F> {
fn async_read_bytes(blocks:Vec<FileBlockHandle>) -> Result<Vec<Vec<u8>>>{
...
self.pipes[LogQueue::Append].async_read(ctx,blocks);
let bytes = fs.async_finish(ctx);
...
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes
src/file_pipe_log/pipe.rs
Outdated
} | ||
} | ||
} | ||
let res = fs.async_finish(&mut ctx).unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change it to async_finish(ctx: Context)
(instead of async_finish(ctx: &mut Context)
). This makes sure the context isn't reused afterwards.
src/file_pipe_log/pipe.rs
Outdated
@@ -444,6 +451,32 @@ impl<F: FileSystem> PipeLog for DualPipes<F> { | |||
self.pipes[handle.id.queue as usize].read_bytes(handle) | |||
} | |||
|
|||
#[inline] | |||
fn async_read_bytes(&self, ents_idx: &mut Vec<EntryIndex>) -> Result<Vec<Vec<u8>>> { | |||
let mut blocks: Vec<FileBlockHandle> = vec![]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Won't this compile? let mut blocks = Vec::new();
src/file_pipe_log/pipe.rs
Outdated
@@ -254,6 +254,13 @@ impl<F: FileSystem> SinglePipe<F> { | |||
reader.read(handle) | |||
} | |||
|
|||
fn async_read(&self, block: &mut FileBlockHandle, ctx: &mut F::AsyncIoContext) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's no need for the block
to be mutable.
src/file_pipe_log/pipe.rs
Outdated
let fd = self.get_fd(block.id.seq).unwrap(); | ||
let buf = vec![0_u8; block.len]; | ||
|
||
self.file_system.async_read(ctx, fd, buf, block).unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not creating the vector inside async_read
? So there's one less argument to pass into (and validate).
src/env/obfuscated.rs
Outdated
|
||
impl FileSystem for ObfuscatedFileSystem { | ||
type Handle = <DefaultFileSystem as FileSystem>::Handle; | ||
type Reader = ObfuscatedReader; | ||
type Writer = ObfuscatedWriter; | ||
type AsyncIoContext = ObfuscatedContext; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why? Doesn't this work already?
type AsyncIoContext = <DefaultFileSystem as FileSystem>::AsyncIoContext
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tabokie I introduced the ObfuscatedContext
struct based on your comments:#286 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding a layer of wrapper is only useful if you have implemented something on top of the base context. Right now you didn't implement anything, so there is no need to wrap it.
src/env/obfuscated.rs
Outdated
fn wait(&mut self) -> IoResult<usize> { | ||
self.0.wait() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These codes are not covered, meaning you didn't test ObfuscatedFileSystem
in unit tests.
src/env/default.rs
Outdated
} | ||
} | ||
|
||
pub fn set_fd(&mut self, fd: Arc<LogFd>) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand this is to hide the LogFd
. But it isn't necessary if you treat AsyncContext
as a data-only struct (inline all code into file system, and remove the trait AsyncContext
entirely). i.e.
impl SomeFileSystem {
fn async_read(ctx: &mut Self::Context, handle: Self::Handle, block: FileBlockHandle) {
handle.submit_async_read(ctx.buf[i], ...)?;
}
}
src/env/default.rs
Outdated
for _ in 0..block_sum { | ||
aio_vec.push(mem::zeroed::<libc::aiocb>()); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why creating them before hand, instead of creating them the same time as buf_vec
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tabokie If libc:: aiocb
is created in multiple function calls, the compiler will assign the same address to the pointer every time, which will leads incorrect result. I'm not so familiar with the memory allocate API of trust, do you have any other good methods?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it's possible to have multiple variables with the same address. It's possible that you mistakenly free some of them.
Signed-off-by: root <[email protected]>
Signed-off-by: root <[email protected]>
@tabokie PTAL |
Signed-off-by: root <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#286 (comment) and #286 (comment) are not addressed.
src/file_pipe_log/pipe.rs
Outdated
@@ -184,7 +185,6 @@ impl<F: FileSystem> SinglePipe<F> { | |||
} | |||
Ok(files[(file_seq - files[0].seq) as usize].handle.clone()) | |||
} | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
restore the newline.
src/file_pipe_log/pipe.rs
Outdated
@@ -254,6 +254,13 @@ impl<F: FileSystem> SinglePipe<F> { | |||
reader.read(handle) | |||
} | |||
|
|||
fn async_read(&self, blocks: Vec<FileBlockHandle>, ctx: &mut F::AsyncIoContext) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move ctx first, as explained in #286 (comment).
src/file_pipe_log/pipe.rs
Outdated
let fd = self.get_fd(block.id.seq).unwrap(); | ||
self.file_system.async_read(ctx, fd, block).unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't use unwrap.
src/env/obfuscated.rs
Outdated
let mut res = vec![]; | ||
for v in base { | ||
let mut temp = vec![]; | ||
//do obfuscation. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
//do obfuscation. | |
// do obfuscation. |
src/env/obfuscated.rs
Outdated
let base = self.inner.async_finish(ctx).unwrap(); | ||
let mut res = vec![]; | ||
for v in base { | ||
let mut temp = vec![]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know why you need to create another vector instead of modifying base
directly.
src/env/mod.rs
Outdated
@@ -7,14 +7,25 @@ use std::sync::Arc; | |||
mod default; | |||
mod obfuscated; | |||
|
|||
pub use default::AioContext; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove this, as explained in #286 (comment)
Signed-off-by: root <[email protected]>
@tabokie PTAL. |
#286 (comment) is still not addressed. Is there any confusion?
|
Signed-off-by: root <[email protected]>
@tabokie PTAL, thx! |
src/engine.rs
Outdated
return Ok(ents_idx.len()); | ||
} | ||
Ok(0) | ||
} | ||
|
||
pub fn fetch_entries_to_aio<M: Message + MessageExt<Entry = M>>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pub fn fetch_entries_to_aio<M: Message + MessageExt<Entry = M>>( | |
pub fn fetch_entries_to_aio<M: MessageExt>( |
src/env/mod.rs
Outdated
/// FileSystem | ||
pub trait FileSystem: Send + Sync { | ||
type Handle: Send + Sync + Handle; | ||
type Reader: Seek + Read + Send; | ||
type Writer: Seek + Write + Send + WriteExt; | ||
type AsyncIoContext; | ||
|
||
fn async_read( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since all the async happens inside the implementation, we can rename this to something like RocksDB's MultiGet, i.e. multi_read.
src/env/mod.rs
Outdated
/// FileSystem | ||
pub trait FileSystem: Send + Sync { | ||
type Handle: Send + Sync + Handle; | ||
type Reader: Seek + Read + Send; | ||
type Writer: Seek + Write + Send + WriteExt; | ||
type AsyncIoContext; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This to MultiReadContext.
src/engine.rs
Outdated
} | ||
fn scan_entries_aio<FR: Fn(u64, LogQueue, &[u8])>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a newline between functions.
src/engine.rs
Outdated
pub fn fetch_entries_to_aio<M: Message + MessageExt<Entry = M>>( | ||
&self, | ||
region_id: u64, | ||
begin: u64, | ||
end: u64, | ||
max_size: Option<usize>, | ||
vec: &mut Vec<M::Entry>, | ||
) -> Result<usize> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think based on the tests, you can antomatically select single_read or multi_read and avoid creating two different engine methods, e.g. use aio when blocks.len() > 4 or something.
One issue though is that I'm not sure if aio syscall is portable enough. You might need to do some research on how to detect if aio is available (maybe take a look at how RocksDB did it).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK. Let me try.
Signed-off-by: root <[email protected]>
What is changed and how it works?
Issue Number: Ref #248
What's Changed:
1.Add fetch_ entries_aio() interface, which read entries using AIO.
Solution:
Manage single entry-read requests in batch through AsyncIOContext. After submit read request according to different file blocks, fetch the byte stream collection of all blocks from AsyncIOContext. In the end, parse them into Entry in turn.
Check List
Tests