diff --git a/README.md b/README.md index ffad56eb..33f14977 100644 --- a/README.md +++ b/README.md @@ -10,6 +10,18 @@ **Curvine** is a high-performance, concurrent distributed cache system written in Rust, designed for low-latency and high-throughput workloads. +

+ English | + 简体中文 | + Deutsch | + Español | + français | + 日本語 | + 한국어 | + Português | + Русский +

+ ## 📚 Documentation Resources For more detailed information, please refer to: diff --git a/README_zh.md b/README_zh.md new file mode 100644 index 00000000..006e2718 --- /dev/null +++ b/README_zh.md @@ -0,0 +1,218 @@ +
+ +
+ +![curvine-font-dark](https://raw.githubusercontent.com/CurvineIO/curvine-doc/refs/heads/main/static/img/curvine_font_dark.svg#gh-light-mode-only) +![curvine-font-light](https://raw.githubusercontent.com/CurvineIO/curvine-doc/refs/heads/main/static/img/curvine_font_white.svg#gh-dark-mode-only) + +[![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](https://opensource.org/licenses/Apache-2.0) +[![Rust](https://img.shields.io/badge/Rust-1.80%2B-orange)](https://www.rust-lang.org) + +**Curvine** 是一个用 Rust 编写的高性能、并发分布式缓存系统,专为低延迟和高吞吐量工作负载设计。 + +

+ English || + 简体中文 | + Deutsch | + Español | + français | + 日本語 | + 한국어 | + Português | + Русский +

+ +## 📚 文档资源 + +更多详细信息,请参阅: + +- [官方文档](https://curvineio.github.io/docs/Overview/instroduction) +- [快速入门](https://curvineio.github.io/docs/Deploy/quick-start) +- [用户手册](https://curvineio.github.io/docs/category/user-manuals) +- [性能基准](https://curvineio.github.io/docs/category/benchmark) +- [DeepWiki](https://deepwiki.com/CurvineIO/curvine) + +## 应用场景 + +![use_case](https://raw.githubusercontent.com/CurvineIO/curvine-doc/refs/heads/main/docs/1-Overview/img/curvine-scene.jpg) + +- **场景1**: 数据洗牌加速 +- **场景2**: 热表数据加速 +- **场景3**: 训练加速 +- **场景4**: 模型分发 +- **场景5**: 多云数据缓存 + +## 🚀 核心特性 + +- **高性能 RPC 框架**:基于 Tokio 的异步通信框架,支持高并发请求处理。 +- **分布式架构**:采用 Master-Worker 架构设计,支持水平扩展。 +- **多级缓存**:支持内存、SSD 和 HDD 的多级缓存策略。 +- **FUSE 接口**:提供 FUSE 文件系统接口,可无缝集成到现有系统中。 +- **底层存储集成**:支持与多种底层存储系统集成。 +- **Raft 共识**:采用 Raft 算法确保数据一致性与高可用性。 +- **监控与指标**:内置监控与性能指标收集功能。 +- **Web 界面**:提供 Web 管理界面,便于系统监控与管理。 + +## 🧩 模块化架构 + +Curvine 采用模块化设计,主要由以下核心组件构成: + +- **orpc**: 一个支持异步 RPC 调用的高性能网络通信框架 +- **curvine-common**: 包含协议定义、错误处理和通用工具的共享库 +- **curvine-server**: 服务端组件,包含 Master 和 Worker 实现 +- **curvine-client**: 提供与服务器交互 API 的客户端库 +- **curvine-fuse**: FUSE 文件系统接口,支持将 Curvine 挂载为本地文件系统 +- **curvine-libsdk**: 支持多语言访问的 SDK 库 +- **curvine-web**: Web 管理界面和 API +- **curvine-tests**: 测试框架与性能基准测试工具 + +## 📦 系统要求 + +- Rust 1.80+ +- Linux 或 macOS (Windows 支持有限) +- FUSE 库 (用于文件系统功能) + +## 🗂️ 缓存文件系统访问 + +### 🦀 Rust API (原生集成推荐) + +``` +use curvine_common::conf::ClusterConf; +use curvine_common::fs::Path; +use std::sync::Arc; + +let conf = ClusterConf::from(conf_path); +let rt = Arc::new(conf.client_rpc_conf().create_runtime()); +let fs = CurvineFileSystem::with_rt(conf, rt)?; + +let path = Path::from_str("/dir")?; +fs.mkdir(&path).await?; +``` + +### 📌 FUSE (用户空间文件系统) + +``` +ls /curvine-fuse +``` + +**官方支持的 Linux 发行版**​ + +| 操作系统发行版 | 内核要求 | 测试版本 | 依赖项 | +|---------------------|-------------------|---------------|--------------| +| ​**CentOS 7**​ | ≥3.10.0 | 7.6 | fuse2-2.9.2 | +| ​**CentOS 8**​ | ≥4.18.0 | 8.5 | fuse3-3.9.1 | +| ​**Rocky Linux 9**​ | ≥5.14.0 | 9.5 | fuse3-3.10.2 | +| ​**RHEL 9**​ | ≥5.14.0 | 9.5 | fuse3-3.10.2 | +| ​**Ubuntu 9**​ | ≥5.15.0 | 22.4 | fuse3-3.10.5 | + +### 🐘 Hadoop 兼容 API + +``` +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.*; + +Configuration conf = new Configuration(); +conf.set("fs.cv.impl", "io.curvine.CurvineFileSystem"); + +FileSystem fs = FileSystem.get(URI.create("cv://master:8995"), conf); +FSDataInputStream in = fs.open(new Path("/user/test/file.txt")); +``` + +## 🛠 构建指南 + +本项目需要以下依赖项,请确保在继续之前已安装: + +### 📋 先决条件 + +- ​**Rust**: 1.80 或更高版本 ([安装指南](https://www.rust-lang.org/tools/install)) +- ​**Protobuf**: 2.x 版本 +- ​**Maven**: 3.8 或更高版本 ([安装指南](https://maven.apache.org/install.html)) +- ​**LLVM**: 12 或更高版本 ([安装指南](https://llvm.org/docs/GettingStarted.html)) +- ​**FUSE**: libfuse2 或 libfuse3 开发包 +- ​**JDK**: 1.8 或更高版本 (OpenJDK 或 Oracle JDK) +- ​**npm**: 9 或更高版本 ([Node.js 安装](https://nodejs.org/)) + +您可以选择: + +1. 使用预配置的 `curvine-docker/compile/Dockerfile_rocky9` 来构建编译镜像 +2. 参考此 Dockerfile 为其他操作系统版本创建编译镜像 + +### 🚀 构建步骤 (Linux - Ubuntu/Debian 示例) + +```bash +# Compiled files are in build/dist +sh build/build.sh +``` + +编译成功后,目标文件将生成在 build/dist 目录中。该文件是可用于部署或构建镜像的 Curvine 安装包。 + +### 🖥️ 启动单节点集群 + +```bash +cd build/dist + +# Start the master node +bin/curvine-master.sh start + +# Start the worker node +bin/curvine-worker.sh start +``` + +挂载文件系统 + +```bash +# The default mount point is /curvine-fuse +bin/curvine-fuse.sh start +``` + +查看集群概览: + +```bash +bin/curvine report +``` + +使用兼容的 HDFS 命令访问文件系统: + +```bash +bin/curvine fs -mkdir /a +bin/curvine fs -ls / +``` + +访问 Web 界面: + +``` +http://your-hostname:9000 +``` + +Curvine 使用 TOML 格式的配置文件。示例配置位于 conf/curvine-cluster.toml,主要配置项包括: + +- 网络设置(端口、地址等) +- 存储策略(缓存大小、存储类型) +- 集群配置(节点数量、副本因子) +- 性能调优参数 + +## 🏗️ 架构设计 + +Curvine 采用主从架构: + +- **主节点**:负责元数据管理、工作节点协调和负载均衡 +- **工作节点**:负责数据存储和处理 +- **客户端**:通过 RPC 与主节点和工作节点通信 + +该系统使用 Raft 共识算法确保元数据一致性,并支持多种存储策略(内存、SSD、HDD)以优化性能和成本。 + +## 📈 性能表现 + +Curvine 在高并发场景下表现优异,支持: + +- 高吞吐量数据读写 +- 低延迟操作 +- 大规模并发连接 + +## 📜 许可证 + +Curvine 采用 **[Apache License 2.0](LICENSE)** 开源协议授权。 + +## 星标历史 + +[![Star History Chart](https://api.star-history.com/svg?repos=CurvineIO/curvine&type=Date)](https://www.star-history.com/#CurvineIO/curvine&Date) diff --git a/curvine-cli/Cargo.toml b/curvine-cli/Cargo.toml index fa8c0a8d..a33356fa 100644 --- a/curvine-cli/Cargo.toml +++ b/curvine-cli/Cargo.toml @@ -15,4 +15,5 @@ serde_with = { workspace = true } num-bigint = { workspace = true } bigdecimal = { workspace = true } chrono = { workspace = true } -bytes = { workspace = true } \ No newline at end of file +bytes = { workspace = true } +reqwest = { version = "0.11", features = ["json"] } \ No newline at end of file diff --git a/curvine-cli/src/cmds/mod.rs b/curvine-cli/src/cmds/mod.rs index aaeaec8b..f691cfd3 100644 --- a/curvine-cli/src/cmds/mod.rs +++ b/curvine-cli/src/cmds/mod.rs @@ -17,6 +17,7 @@ mod load; mod load_cancel; mod load_status; mod mount; +mod node; mod report; mod umount; @@ -25,5 +26,6 @@ pub use load::LoadCommand; pub use load_cancel::CancelLoadCommand; pub use load_status::LoadStatusCommand; pub use mount::MountCommand; +pub use node::NodeCommand; pub use report::ReportCommand; pub use umount::UnMountCommand; diff --git a/curvine-cli/src/cmds/node.rs b/curvine-cli/src/cmds/node.rs new file mode 100644 index 00000000..35ed82a1 --- /dev/null +++ b/curvine-cli/src/cmds/node.rs @@ -0,0 +1,283 @@ +// Copyright 2025 OPPO. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::util::*; +use clap::Parser; +use curvine_client::file::FsClient; +use curvine_common::conf::ClusterConf; +use orpc::{err_box, CommonResult, common::ByteUnit}; +use std::collections::HashMap; +use reqwest::Client; +use std::sync::Arc; +use std::time::Duration; + +#[derive(Parser, Debug)] +#[command(arg_required_else_help = true)] +pub struct NodeCommand { + /// list all nodes + #[arg(long, short = 'l')] + list: bool, + + /// add decommission node + #[arg(long)] + add_decommission: bool, + + /// remove decommission node + #[arg(long)] + remove_decommission: bool, + + /// node list with port (format: hostname:port), comma separated for multiple nodes + #[arg( + last = true, + required_if_eq("add_decommission", "true"), + required_if_eq("remove_decommission", "true") + )] + nodes: Vec, +} + +impl NodeCommand { + // Send HTTP GET request to master server + async fn http_get( + &self, + client: Arc, + conf: &ClusterConf, + path: &str, + ) -> CommonResult { + // Get master address and web port + let master_info = client.get_master_info().await?; + let master_parts: Vec<&str> = master_info.active_master.split(':').collect(); + let master_host = master_parts[0]; + let web_port = conf.master.web_port; + + // Build complete URL + let url = format!("http://{}:{}{}", master_host, web_port, path); + + // Create HTTP client and send request + let http_client = match Client::builder().timeout(Duration::from_secs(30)).build() { + Ok(client) => client, + Err(e) => return err_box!("Failed to create HTTP client: {}", e), + }; + + let response = match http_client.get(&url).send().await { + Ok(resp) => resp, + Err(e) => return err_box!("Failed to send HTTP request: {}", e), + }; + + let status = response.status(); + if !status.is_success() { + return err_box!("HTTP request failed with status: {}", status); + } + + let body = match response.text().await { + Ok(text) => text, + Err(e) => return err_box!("Failed to read response body: {}", e), + }; + + Ok(body) + } + + // Handle listing all worker nodes + async fn handle_list(&self, client: Arc, conf: &ClusterConf) -> CommonResult<()> { + // Get worker list + let url = "/api/workers"; + let response = handle_rpc_result(self.http_get(client.clone(), conf, url)).await; + + // Parse JSON response + let workers_map: HashMap> = serde_json::from_str(&response)?; + + // Get live_workers and lost_workers + let live_workers = workers_map.get("live_workers").cloned().unwrap_or_default(); + let lost_workers = workers_map.get("lost_workers").cloned().unwrap_or_default(); + + // Print worker information + println!("Worker Nodes:"); + println!("{:<25} {:<15} {:<15} {:<15}", "Address", "Status", "Capacity", "Available"); + println!("{}", "-".repeat(70)); + + // Process live_workers + for worker in live_workers { + let hostname = worker["address"]["hostname"].as_str().unwrap_or("Unknown"); + let rpc_port = worker["address"]["rpc_port"].as_u64().unwrap_or(0); + let address = format!("{hostname}:{rpc_port}"); + let status = worker["status"].as_str().unwrap_or("Unknown"); + let capacity = worker["capacity"].as_i64().unwrap_or(0); + let available = worker["available"].as_i64().unwrap_or(0); + + println!("{:<25} {:<15} {:<15} {:<15}", + address, + status, + ByteUnit::byte_to_string(capacity as u64), + ByteUnit::byte_to_string(available as u64)); + } + + // Process lost_workers + for worker in lost_workers { + let hostname = worker["address"]["hostname"].as_str().unwrap_or("Unknown"); + let rpc_port = worker["address"]["rpc_port"].as_u64().unwrap_or(0); + let address = format!("{hostname}:{rpc_port}"); + let status = worker["status"].as_str().unwrap_or("Unknown"); + let capacity = worker["capacity"].as_i64().unwrap_or(0); + let available = worker["available"].as_i64().unwrap_or(0); + + println!("{:<25} {:<15} {:<15} {:<15}", + address, + status, + ByteUnit::byte_to_string(capacity as u64), + ByteUnit::byte_to_string(available as u64)); + } + + Ok(()) + } + + // Extract hostnames from node addresses (hostname:port format) + fn extract_hostnames(&self, nodes: &[String]) -> Vec { + nodes.iter() + .map(|node| { + // Split by ':' and take the first part (hostname) + node.split(':').next().unwrap_or(node).to_string() + }) + .collect() + } + + // Process decommission operation results + fn process_decommission_results( + &self, + result: &[String], + requested_nodes: &[String], + operation_type: &str + ) { + if result.is_empty() { + println!("No worker was {} decommission list", operation_type); + return; + } + + // Create a map of requested nodes for quick lookup + let mut requested_map = HashMap::new(); + for node in requested_nodes { + let hostname = node.split(':').next().unwrap_or(node); + requested_map.insert(hostname.to_string(), node.clone()); + } + + // Track successful and failed operations + let mut successful = Vec::new(); + let mut failed = Vec::new(); + + // Process successful operations + for worker in result { + // Extract worker_id and hostname:port from worker info + let parts: Vec<&str> = worker.split(',').collect(); + if parts.len() >= 2 { + let addr_part = parts[1].to_string(); + successful.push(addr_part); + } + } + + // Identify failed operations + for (hostname, full_node) in &requested_map { + let found = result.iter().any(|worker| { + let parts: Vec<&str> = worker.split(',').collect(); + if parts.len() >= 2 { + let worker_hostname = parts[1].split(':').next().unwrap_or(""); + worker_hostname == hostname + } else { + false + } + }); + + if !found { + failed.push(full_node.clone()); + } + } + + // Print successful operations + if !successful.is_empty() { + println!("Successfully {} workers:", operation_type); + for addr in &successful { + println!(" {}", addr); + } + println!("Total: {} worker(s)", successful.len()); + } + + // Print failed operations + if !failed.is_empty() { + println!("Failed to {} workers:", operation_type); + for addr in &failed { + println!(" {}", addr); + } + println!("Total: {} worker(s)", failed.len()); + } + } + + // Handle adding workers to decommission list + async fn handle_add_decommission( + &self, + client: Arc, + conf: &ClusterConf + ) -> CommonResult<()> { + // Extract hostname from hostname:port format + let worker_hostnames = self.extract_hostnames(&self.nodes); + let workers = worker_hostnames.join(","); + + // Call add-dcm API + let url = format!("/add-dcm?workers={}", workers); + let response = handle_rpc_result(self.http_get(client.clone(), conf, &url)).await; + + // Parse response + let result: Vec = serde_json::from_str(&response)?; + + // Process and display results + self.process_decommission_results(&result, &self.nodes, "added to"); + + Ok(()) + } + + // Handle removing workers from decommission list + async fn handle_remove_decommission( + &self, + client: Arc, + conf: &ClusterConf + ) -> CommonResult<()> { + // Extract hostname from hostname:port format + let worker_hostnames = self.extract_hostnames(&self.nodes); + let workers = worker_hostnames.join(","); + + // Call remove-dcm API + let url = format!("/remove-dcm?workers={}", workers); + let response = handle_rpc_result(self.http_get(client.clone(), conf, &url)).await; + + // Parse response + let result: Vec = serde_json::from_str(&response)?; + + // Process and display results + self.process_decommission_results(&result, &self.nodes, "removed from"); + + Ok(()) + } + + pub async fn execute(&self, client: Arc, conf: ClusterConf) -> CommonResult<()> { + if self.list { + return self.handle_list(client, &conf).await; + } + + if self.add_decommission { + return self.handle_add_decommission(client, &conf).await; + } + + if self.remove_decommission { + return self.handle_remove_decommission(client, &conf).await; + } + + Ok(()) + } +} diff --git a/curvine-cli/src/commands.rs b/curvine-cli/src/commands.rs index 1a446d02..ded945b8 100644 --- a/curvine-cli/src/commands.rs +++ b/curvine-cli/src/commands.rs @@ -43,6 +43,10 @@ pub enum Commands { #[command(name = "umount")] UnMount(UnMountCommand), + /// Node command + #[command(name = "node")] + Node(NodeCommand), + /// show cli version #[command(name = "version")] Version, diff --git a/curvine-cli/src/main.rs b/curvine-cli/src/main.rs index 3bf7594e..a760824b 100644 --- a/curvine-cli/src/main.rs +++ b/curvine-cli/src/main.rs @@ -60,6 +60,7 @@ fn main() -> CommonResult<()> { Commands::CancelLoad(cmd) => cmd.execute(load_client).await, Commands::Mount(cmd) => cmd.execute(fs_client).await, Commands::UnMount(cmd) => cmd.execute(fs_client).await, + Commands::Node(cmd) => cmd.execute(fs_client, conf.clone()).await, Commands::Version => { println!("Curvine version: {}", version::GIT_VERSION); Ok(())