Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 71 additions & 62 deletions sdk/rust/src/filesystem/agentfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,13 @@ impl File for AgentFSFile {
let end_chunk = (offset + size).saturating_sub(1) / chunk_size;

let mut stmt = conn
.prepare_cached("SELECT chunk_index, data FROM fs_data WHERE ino = ? AND chunk_index >= ? AND chunk_index <= ? ORDER BY chunk_index")
.prepare_cached("SELECT ino_and_chunk_index, data FROM fs_data WHERE ino_and_chunk_index >= ? AND ino_and_chunk_index <= ? ORDER BY ino_and_chunk_index")
.await?;
let mut rows = stmt
.query((self.ino, start_chunk as i64, end_chunk as i64))
.query((
(self.ino << 32) | start_chunk as i64,
(self.ino << 32) | end_chunk as i64,
))
.await?;

let mut result = Vec::with_capacity(size as usize);
Expand All @@ -107,6 +110,7 @@ impl File for AgentFSFile {
.ok()
.and_then(|v| v.as_integer().copied())
.unwrap_or(0) as u64;
let chunk_index = chunk_index & 0xffffffff;

// Fill gaps with zeros for sparse files
while next_expected_chunk < chunk_index && result.len() < size as usize {
Expand Down Expand Up @@ -183,12 +187,12 @@ impl File for AgentFSFile {
// If writing beyond current size, extend with zeros first
if offset > current_size {
let zeros = vec![0u8; (offset - current_size) as usize];
self.write_data_at_offset_with_conn(&conn, current_size, &zeros)
self.write_data_at_offset_with_conn(&conn, current_size, current_size, &zeros)
.await?;
}

// Write the actual data
self.write_data_at_offset_with_conn(&conn, offset, data)
self.write_data_at_offset_with_conn(&conn, offset, current_size, data)
.await?;

// Update file size and mtime
Expand Down Expand Up @@ -228,36 +232,36 @@ impl File for AgentFSFile {
if new_size == 0 {
// Special case: truncate to zero - just delete all chunks
let mut stmt = conn
.prepare_cached("DELETE FROM fs_data WHERE ino = ?")
.prepare_cached("DELETE FROM fs_data WHERE ino_and_chunk_index >= ? AND ino_and_chunk_index < ?")
.await?;
stmt.execute((self.ino,)).await?;
stmt.execute((self.ino << 32, (self.ino + 1) << 32)).await?;
} else if new_size < current_size {
// Shrinking: delete excess chunks and truncate last chunk if needed
let last_chunk_idx = (new_size - 1) / chunk_size;

// Delete all chunks beyond the last one we need
conn.execute(
"DELETE FROM fs_data WHERE ino = ? AND chunk_index > ?",
(self.ino, last_chunk_idx as i64),
"DELETE FROM fs_data WHERE ino_and_chunk_index > ? AND ino_and_chunk_index < ?",
((self.ino << 32) | last_chunk_idx as i64, ((self.ino + 1) << 32)),
)
.await?;

// Truncate the last chunk if needed
let offset_in_chunk = (new_size % chunk_size) as usize;
if offset_in_chunk > 0 {
let mut stmt = conn
.prepare_cached("SELECT data FROM fs_data WHERE ino = ? AND chunk_index = ?")
.prepare_cached("SELECT data FROM fs_data WHERE ino_and_chunk_index = ?")
.await?;
let mut rows = stmt.query((self.ino, last_chunk_idx as i64)).await?;
let mut rows = stmt.query(((self.ino << 32) | last_chunk_idx as i64,)).await?;

if let Some(row) = rows.next().await? {
if let Ok(Value::Blob(mut chunk_data)) = row.get_value(0) {
if chunk_data.len() > offset_in_chunk {
chunk_data.truncate(offset_in_chunk);
let mut stmt = conn
.prepare_cached("UPDATE fs_data SET data = ? WHERE ino = ? AND chunk_index = ?")
.prepare_cached("UPDATE fs_data SET data = ? WHERE ino_and_chunk_index = ?")
.await?;
stmt.execute((Value::Blob(chunk_data), self.ino, last_chunk_idx as i64)).await?;
stmt.execute((Value::Blob(chunk_data), (self.ino << 32) | last_chunk_idx as i64)).await?;
}
}
}
Expand Down Expand Up @@ -322,6 +326,7 @@ impl AgentFSFile {
&self,
conn: &Connection,
offset: u64,
size: u64,
data: &[u8],
) -> Result<()> {
let chunk_size = self.chunk_size as u64;
Expand All @@ -333,11 +338,11 @@ impl AgentFSFile {

// get statements only once (in order to avoid heavy clone on every while iteration)
let mut select_stmt = conn
.prepare_cached("SELECT data FROM fs_data WHERE ino = ? AND chunk_index = ?")
.prepare_cached("SELECT data FROM fs_data WHERE ino_and_chunk_index = ?")
.await?;
let mut insert_stmt = conn
.prepare_cached(
"INSERT OR REPLACE INTO fs_data (ino, chunk_index, data) VALUES (?, ?, ?)",
"INSERT OR REPLACE INTO fs_data (ino_and_chunk_index, data) VALUES (?, ?)",
)
.await?;
while written < data.len() {
Expand All @@ -351,9 +356,9 @@ impl AgentFSFile {
let to_write = std::cmp::min(remaining_in_chunk, remaining_data);

let mut chunk_data;
if to_write != chunk_size as usize {
if current_offset < size && to_write != chunk_size as usize {
// Get existing chunk data (if any)
let mut rows = select_stmt.query((self.ino, chunk_index)).await?;
let mut rows = select_stmt.query(((self.ino << 32) | chunk_index,)).await?;

chunk_data = if let Some(row) = rows.next().await? {
row.get_value(0)
Expand All @@ -380,12 +385,18 @@ impl AgentFSFile {
chunk_data[offset_in_chunk..offset_in_chunk + to_write]
.copy_from_slice(&data[written..written + to_write]);
} else {
chunk_data = data[written..written + to_write].to_vec();
if offset_in_chunk != 0 {
chunk_data = vec![0u8; offset_in_chunk + to_write];
chunk_data[offset_in_chunk..offset_in_chunk + to_write]
.copy_from_slice(&data[written..written + to_write]);
} else {
chunk_data = data[written..written + to_write].to_vec();
}
}

// Save chunk
insert_stmt
.execute((self.ino, chunk_index, Value::Blob(chunk_data)))
.execute(((self.ino << 32) | chunk_index, Value::Blob(chunk_data)))
.await?;
insert_stmt.reset()?;

Expand Down Expand Up @@ -496,10 +507,8 @@ impl AgentFS {
// Create data chunks table
conn.execute(
"CREATE TABLE IF NOT EXISTS fs_data (
ino INTEGER NOT NULL,
chunk_index INTEGER NOT NULL,
data BLOB NOT NULL,
PRIMARY KEY (ino, chunk_index)
ino_and_chunk_index INTEGER PRIMARY KEY,
data BLOB NOT NULL
)",
(),
)
Expand Down Expand Up @@ -1029,9 +1038,9 @@ impl AgentFS {
let ino = if let Some(ino) = self.lookup_child(&conn, parent_ino, name).await? {
// Delete existing data
let mut stmt = conn
.prepare_cached("DELETE FROM fs_data WHERE ino = ?")
.prepare_cached("DELETE FROM fs_data WHERE ino_and_chunk_index >= ? AND ino_and_chunk_index < ?")
.await?;
stmt.execute((ino,)).await?;
stmt.execute((ino << 32, (ino + 1) << 32)).await?;
ino
} else {
// Create new inode
Expand Down Expand Up @@ -1075,8 +1084,8 @@ impl AgentFS {
// Write data in chunks
for (chunk_index, chunk) in data.chunks(self.chunk_size).enumerate() {
conn.execute(
"INSERT INTO fs_data (ino, chunk_index, data) VALUES (?, ?, ?)",
(ino, chunk_index as i64, chunk),
"INSERT INTO fs_data (ino_and_chunk_index, data) VALUES (?, ?)",
((ino << 32) | chunk_index as i64, chunk),
)
.await?;
}
Expand Down Expand Up @@ -1200,8 +1209,8 @@ impl AgentFS {

let mut rows = conn
.query(
"SELECT data FROM fs_data WHERE ino = ? ORDER BY chunk_index",
(ino,),
"SELECT data FROM fs_data WHERE ino_and_chunk_index >= ? and ino_and_chunk_index < ? ORDER BY ino_and_chunk_index",
(ino << 32, (ino + 1) << 32),
)
.await?;

Expand Down Expand Up @@ -1235,16 +1244,16 @@ impl AgentFS {

let mut rows = conn
.query(
"SELECT chunk_index, data FROM fs_data WHERE ino = ? AND chunk_index >= ? AND chunk_index <= ? ORDER BY chunk_index",
(ino, start_chunk as i64, end_chunk as i64),
"SELECT data FROM fs_data WHERE ino_and_chunk_index >= ? AND ino_and_chunk_index <= ? ORDER BY ino_and_chunk_index",
((ino << 32) | start_chunk as i64, (ino << 32) | end_chunk as i64),
)
.await?;

let mut result = Vec::with_capacity(size as usize);
let start_offset_in_chunk = (offset % chunk_size) as usize;

while let Some(row) = rows.next().await? {
if let Ok(Value::Blob(chunk_data)) = row.get_value(1) {
if let Ok(Value::Blob(chunk_data)) = row.get_value(0) {
let skip = if result.is_empty() {
start_offset_in_chunk
} else {
Expand Down Expand Up @@ -1390,8 +1399,8 @@ impl AgentFS {
let mut chunk_data = if needs_read {
let mut rows = conn
.query(
"SELECT data FROM fs_data WHERE ino = ? AND chunk_index = ?",
(ino, chunk_idx as i64),
"SELECT data FROM fs_data WHERE ino_and_chunk_index = ?",
((ino << 32) | chunk_idx as i64,),
)
.await?;
if let Some(row) = rows.next().await? {
Expand Down Expand Up @@ -1427,13 +1436,13 @@ impl AgentFS {

// Write the chunk - delete existing then insert
conn.execute(
"DELETE FROM fs_data WHERE ino = ? AND chunk_index = ?",
(ino, chunk_idx as i64),
"DELETE FROM fs_data WHERE ino_and_chunk_index = ?",
((ino << 32) | chunk_idx as i64,),
)
.await?;
conn.execute(
"INSERT INTO fs_data (ino, chunk_index, data) VALUES (?, ?, ?)",
(ino, chunk_idx as i64, &chunk_data[..actual_len]),
"INSERT INTO fs_data (ino_and_chunk_index, data) VALUES (?, ?)",
((ino << 32) | chunk_idx as i64, &chunk_data[..actual_len]),
)
.await?;
}
Expand Down Expand Up @@ -1497,17 +1506,17 @@ impl AgentFS {
if new_size == 0 {
// Special case: truncate to zero - just delete all chunks
let mut stmt = conn
.prepare_cached("DELETE FROM fs_data WHERE ino = ?")
.prepare_cached("DELETE FROM fs_data WHERE ino_and_chunk_index >= ? AND ino_and_chunk_index < ?")
.await?;
stmt.execute((ino,)).await?;
stmt.execute((ino << 32, (ino + 1) << 32)).await?;
} else if new_size < current_size {
// Shrinking: delete excess chunks and truncate last chunk if needed
let last_chunk_idx = (new_size - 1) / chunk_size;

// Delete all chunks beyond the last one we need
conn.execute(
"DELETE FROM fs_data WHERE ino = ? AND chunk_index > ?",
(ino, last_chunk_idx as i64),
"DELETE FROM fs_data WHERE ino_and_chunk_index > ? AND ino_and_chunk_index < ?",
((ino << 32) | last_chunk_idx as i64, ((ino + 1) << 32)),
)
.await?;

Expand All @@ -1518,18 +1527,18 @@ impl AgentFS {
// read it, truncate, and rewrite
if end_in_last_chunk < chunk_size {
let mut stmt = conn
.prepare_cached("SELECT data FROM fs_data WHERE ino = ? AND chunk_index = ?")
.prepare_cached("SELECT data FROM fs_data WHERE ino_and_chunk_index = ?")
.await?;
let mut rows = stmt.query((ino, last_chunk_idx as i64)).await?;
let mut rows = stmt.query(((ino << 32) | last_chunk_idx as i64, )).await?;

if let Some(row) = rows.next().await? {
if let Ok(Value::Blob(chunk_data)) = row.get_value(0) {
if chunk_data.len() > end_in_last_chunk as usize {
let truncated = &chunk_data[..end_in_last_chunk as usize];
let mut stmt = conn
.prepare_cached("UPDATE fs_data SET data = ? WHERE ino = ? AND chunk_index = ?")
.prepare_cached("UPDATE fs_data SET data = ? WHERE ino_and_chunk_index = ?")
.await?;
stmt.execute((truncated, ino, last_chunk_idx as i64)).await?;
stmt.execute((truncated, (ino << 32) | last_chunk_idx as i64)).await?;
}
}
}
Expand All @@ -1546,9 +1555,9 @@ impl AgentFS {
// Pad the last existing chunk with zeros if it's not full
if let Some(last_idx) = last_existing_chunk {
let mut stmt = conn
.prepare_cached("SELECT data FROM fs_data WHERE ino = ? AND chunk_index = ?")
.prepare_cached("SELECT data FROM fs_data WHERE ino_and_chunk_index = ?")
.await?;
let mut rows = stmt.query((ino, last_idx as i64)).await?;
let mut rows = stmt.query(((ino << 32) | last_idx as i64,)).await?;

if let Some(row) = rows.next().await? {
if let Ok(Value::Blob(chunk_data)) = row.get_value(0) {
Expand All @@ -1565,9 +1574,9 @@ impl AgentFS {
let mut padded = chunk_data.clone();
padded.resize(needed_len, 0);
let mut stmt = conn
.prepare_cached("UPDATE fs_data SET data = ? WHERE ino = ? AND chunk_index = ?")
.prepare_cached("UPDATE fs_data SET data = ? WHERE ino_and_chunk_index = ?")
.await?;
stmt.execute((&padded[..], ino, last_idx as i64)).await?;
stmt.execute((&padded[..], (ino << 32) | last_idx as i64)).await?;
}
}
}
Expand All @@ -1583,8 +1592,8 @@ impl AgentFS {
};
let zeros = vec![0u8; chunk_len];
conn.execute(
"INSERT INTO fs_data (ino, chunk_index, data) VALUES (?, ?, ?)",
(ino, chunk_idx as i64, &zeros[..]),
"INSERT INTO fs_data (ino_and_chunk_index, data) VALUES (?, ?)",
((ino << 32) | chunk_idx as i64, &zeros[..]),
)
.await?;
}
Expand Down Expand Up @@ -2030,9 +2039,9 @@ impl AgentFS {
// Manually handle cascading deletes since we don't use foreign keys
// Delete data blocks
let mut stmt = conn
.prepare_cached("DELETE FROM fs_data WHERE ino = ?")
.prepare_cached("DELETE FROM fs_data WHERE ino_and_chunk_index >= ? AND ino_and_chunk_index < ?")
.await?;
stmt.execute((ino,)).await?;
stmt.execute((ino << 32, (ino + 1) << 32)).await?;

// Delete symlink if exists
let mut stmt = conn
Expand Down Expand Up @@ -2209,9 +2218,9 @@ impl AgentFS {
let link_count = self.get_link_count(&conn, dst_ino).await?;
if link_count == 0 {
let mut stmt = conn
.prepare_cached("DELETE FROM fs_data WHERE ino = ?")
.prepare_cached("DELETE FROM fs_data WHERE ino_and_chunk_index >= ? AND ino_and_chunk_index < ?")
.await?;
stmt.execute((dst_ino,)).await?;
stmt.execute((dst_ino << 32, (dst_ino + 1) << 32)).await?;
let mut stmt = conn
.prepare_cached("DELETE FROM fs_symlink WHERE ino = ?")
.await?;
Expand Down Expand Up @@ -2350,7 +2359,7 @@ impl AgentFS {
async fn get_chunk_count(&self, ino: i64) -> Result<i64> {
let conn = self.pool.get_connection().await?;
let mut rows = conn
.query("SELECT COUNT(*) FROM fs_data WHERE ino = ?", (ino,))
.query("SELECT COUNT(*) FROM fs_data WHERE ino_and_chunk_index >= ? AND ino_and_chunk_index < ?", (ino << 32, (ino + 1) << 32))
.await?;

if let Some(row) = rows.next().await? {
Expand Down Expand Up @@ -2778,8 +2787,8 @@ mod tests {
let conn = fs.pool.get_connection().await?;
let result = conn
.execute(
"INSERT INTO fs_data (ino, chunk_index, data) VALUES (?, 0, ?)",
(ino, vec![1u8; 10]),
"INSERT INTO fs_data (ino_and_chunk_index, data) VALUES (?, ?)",
(ino << 32, vec![1u8; 10]),
)
.await;

Expand All @@ -2804,8 +2813,8 @@ mod tests {
let conn = fs.pool.get_connection().await?;
let mut rows = conn
.query(
"SELECT chunk_index FROM fs_data WHERE ino = ? ORDER BY chunk_index",
(ino,),
"SELECT ino_and_chunk_index FROM fs_data WHERE ino_and_chunk_index >= ? and ino_and_chunk_index < ? ORDER BY ino_and_chunk_index",
(ino << 32, (ino + 1) << 32),
)
.await?;

Expand All @@ -2816,7 +2825,7 @@ mod tests {
.ok()
.and_then(|v| v.as_integer().copied())
.unwrap_or(-1);
indices.push(idx);
indices.push(idx & 0xffffffff);
}

assert_eq!(indices, vec![0, 1, 2, 3, 4]);
Expand Down Expand Up @@ -2844,7 +2853,7 @@ mod tests {
// Verify all chunks are gone
let conn = fs.pool.get_connection().await?;
let mut rows = conn
.query("SELECT COUNT(*) FROM fs_data WHERE ino = ?", (ino,))
.query("SELECT COUNT(*) FROM fs_data WHERE ino_and_chunk_index >= ? AND ino_and_chunk_index < ?", (ino << 32, (ino + 1) << 32))
.await?;

let count = rows
Expand Down