Skip to content

Commit f9b3a30

Browse files
committed
shuffle: implement basic shuffle server functions (#16)
1 parent b59f98d commit f9b3a30

30 files changed

+2118
-3
lines changed

Cargo.toml

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@ members = [
1919
"curvine-fuse",
2020
"curvine-web",
2121
"curvine-ufs",
22-
"curvine-cli"
22+
"curvine-cli",
23+
"curvine-shuffle"
2324
]
2425

2526
[workspace.package]
@@ -37,6 +38,7 @@ curvine-ufs = { path = "curvine-ufs" }
3738
curvine-tests = { path = "curvine-tests" }
3839
curvine-fuse = { path = "curvine-fuse" }
3940
curvine-web = { path = "curvine-web" }
41+
curvine-shuffle = { path = "curvine-shuffle" }
4042

4143
# The most important library start
4244

@@ -134,4 +136,5 @@ num_cpus = "1.16.0"
134136
slog = "2.7.0"
135137
slog-stdlog = "4.1.0"
136138
num-bigint = "0.4.6"
137-
bigdecimal = "0.4.8"
139+
bigdecimal = "0.4.8"
140+
bitvec = "1.0.1"

curvine-client/src/unified/unified_filesystem.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use curvine_common::fs::{FileSystem, Path};
2121
use curvine_common::proto::MountOptions;
2222
use curvine_common::state::{FileStatus, MasterInfo, MountInfo};
2323
use curvine_common::FsResult;
24-
use log::{info, warn};
24+
use log::{debug, info, warn};
2525
use orpc::common::{FastHashMap, LocalTime};
2626
use orpc::runtime::Runtime;
2727
use orpc::sync::AtomicCounter;

curvine-shuffle/Cargo.toml

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
[package]
2+
name = "curvine-shuffle"
3+
version = "0.1.0"
4+
edition = "2021"
5+
6+
[dependencies]
7+
orpc = { workspace = true }
8+
tokio = { workspace = true }
9+
curvine-common = { workspace = true }
10+
curvine-server = { workspace = true }
11+
curvine-client = { workspace = true }
12+
curvine-web = { workspace = true }
13+
serde = { workspace = true }
14+
dashmap = { workspace = true }
15+
serde_json = { workspace = true }
16+
thiserror = { workspace = true }
17+
bytes = { workspace = true }
18+
num_enum = { workspace = true }
19+
prost = { workspace = true }
20+
log = { workspace = true }
21+
bitvec = { workspace = true }
22+
23+
[build-dependencies]
24+
prost-build = { workspace = true }
25+
26+
[features]
27+
default = ["server", "client"]
28+
server = []
29+
client = []

curvine-shuffle/build.rs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
// Copyright 2025 OPPO.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::{env, fs};
16+
17+
fn main() {
18+
let src = vec!["shuffle.proto"];
19+
20+
let base = env::var("OUT_DIR").unwrap_or_else(|_| ".".to_string());
21+
let output = format!("{}/protos", base);
22+
fs::create_dir_all(&output).unwrap();
23+
24+
let mut build = prost_build::Config::new();
25+
build.type_attribute(".", "#[derive(serde::Serialize, serde::Deserialize)]");
26+
27+
build
28+
.out_dir(&output)
29+
.compile_protos(&src, &["proto/"])
30+
.unwrap();
31+
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
// Copyright 2025 OPPO.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use curvine_common::conf::ClusterConf;
16+
use curvine_shuffle::common::ShuffleConf;
17+
use curvine_shuffle::master::ShuffleMaster;
18+
use curvine_shuffle::worker::ShuffleWorker;
19+
use orpc::common::Logger;
20+
use orpc::CommonResult;
21+
use std::thread;
22+
use std::time::Duration;
23+
24+
fn main() -> CommonResult<()> {
25+
Logger::default();
26+
let mut conf = ClusterConf::from("etc/curvine-cluster.toml")?;
27+
conf.format_master = true;
28+
conf.format_worker = true;
29+
conf.journal.snapshot_interval = "10s".to_owned();
30+
conf.master.min_block_size = 1024 * 1024;
31+
32+
let shuffle_conf = ShuffleConf {
33+
cluster_conf: conf.clone(),
34+
..Default::default()
35+
};
36+
37+
let master = ShuffleMaster::with_conf(shuffle_conf.clone())?;
38+
39+
thread::spawn(move || {
40+
master.block_on_start();
41+
});
42+
43+
let worker = ShuffleWorker::with_conf(shuffle_conf)?;
44+
thread::spawn(move || {
45+
worker.block_on_start();
46+
});
47+
48+
thread::sleep(Duration::from_secs(u64::MAX));
49+
Ok(())
50+
}
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
syntax = "proto2";
2+
package proto;
3+
4+
option java_package = "io.curvine.proto";
5+
option java_multiple_files = true;
6+
option java_outer_classname = "ShuffleProto";
7+
8+
message StageKeyProto {
9+
required string app_id = 1;
10+
required string stage_id = 2;
11+
}
12+
13+
message SplitKeyProto {
14+
required int32 part_id = 1;
15+
required int32 split_id = 2;
16+
}
17+
18+
message InetAddrProto {
19+
required string hostname = 1;
20+
required int32 port = 2;
21+
}
22+
23+
message SplitInfoProto {
24+
required int32 part_id = 1;
25+
required int32 split_id = 2;
26+
required int64 write_len = 3;
27+
required InetAddrProto worker_addr = 4;
28+
}
29+
30+
31+
message AllocWriterRequest {
32+
required StageKeyProto stage = 1;
33+
required int32 part_id = 2;
34+
required int64 split_size = 3;
35+
optional SplitInfoProto full_split = 4;
36+
repeated uint32 exclude_workers = 5;
37+
}
38+
39+
message AllocWriterResponse {
40+
required StageKeyProto stage = 1;
41+
required int32 part_id = 2;
42+
required SplitInfoProto cur_split = 3;
43+
}
44+
45+
message WriteDataRequest {
46+
required StageKeyProto stage = 1;
47+
required int32 part_id = 2;
48+
required int32 split_id = 3;
49+
required int64 split_size = 4;
50+
}
51+
52+
message WriteDataResponse {
53+
optional SplitInfoProto full_split = 1;
54+
}
55+
56+
message TaskCommitRequest {
57+
required StageKeyProto stage = 1;
58+
required int32 task_id = 2;
59+
required int32 num_tasks = 3;
60+
}
61+
62+
message TaskCommitResponse {
63+
}
64+
65+
// shuffle stage submission. This submission has two meanings:
66+
// 1. Aggregate shuffle, is_merge = true, submission can only be made after all tasks are completed
67+
// 2. Normal shuffle, is_merge = false, after the task is completed, the writer can submit immediately.
68+
message StageCommitRequest {
69+
required StageKeyProto stage = 1;
70+
}
71+
72+
message StageCommitResponse {
73+
repeated SplitKeyProto splits = 1;
74+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
// Copyright 2025 OPPO.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use orpc::CommonResult;
16+
17+
fn main() -> CommonResult<()> {
18+
Ok(())
19+
}
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
// Copyright 2025 OPPO.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use crate::common::ShuffleCode;
16+
use crate::proto::{
17+
AllocWriterRequest, AllocWriterResponse, TaskCommitRequest, TaskCommitResponse,
18+
};
19+
use crate::protocol::{ProtoUtils, SplitInfo, StageKey};
20+
use curvine_common::error::FsError;
21+
use curvine_common::FsResult;
22+
use orpc::client::ClusterConnector;
23+
use std::sync::Arc;
24+
25+
#[derive(Clone)]
26+
pub struct MasterClient {
27+
connector: Arc<ClusterConnector>,
28+
}
29+
30+
impl MasterClient {
31+
pub fn new(connector: Arc<ClusterConnector>) -> Self {
32+
Self { connector }
33+
}
34+
35+
pub async fn alloc_writer(
36+
&self,
37+
stage: &StageKey,
38+
part_id: i32,
39+
split_size: i64,
40+
full_split: Option<SplitInfo>,
41+
) -> FsResult<SplitInfo> {
42+
let header = AllocWriterRequest {
43+
stage: ProtoUtils::stage_key_to_proto(stage.clone()),
44+
part_id,
45+
split_size,
46+
full_split: full_split.map(ProtoUtils::split_info_to_proto),
47+
exclude_workers: vec![],
48+
};
49+
50+
let rep_header: AllocWriterResponse = self
51+
.connector
52+
.proto_rpc::<_, _, FsError>(ShuffleCode::AllocWriter, header)
53+
.await?;
54+
let split = ProtoUtils::split_info_from_proto(rep_header.cur_split);
55+
Ok(split)
56+
}
57+
58+
pub async fn task_commit(
59+
&self,
60+
stage: &StageKey,
61+
task_id: i32,
62+
num_tasks: i32,
63+
) -> FsResult<()> {
64+
let header = TaskCommitRequest {
65+
stage: ProtoUtils::stage_key_to_proto(stage.clone()),
66+
task_id,
67+
num_tasks,
68+
};
69+
let _: TaskCommitResponse = self
70+
.connector
71+
.proto_rpc::<_, _, FsError>(ShuffleCode::TaskCommit, header)
72+
.await?;
73+
Ok(())
74+
}
75+
}

curvine-shuffle/src/client/mod.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
// Copyright 2025 OPPO.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
pub mod shuffle_writer;
16+
pub use self::shuffle_writer::ShuffleWriter;
17+
18+
mod master_client;
19+
pub use self::master_client::MasterClient;
20+
21+
mod worker_client;
22+
pub use self::worker_client::WorkerClient;
23+
24+
mod shuffle_context;
25+
pub use self::shuffle_context::ShuffleContext;
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
// Copyright 2025 OPPO.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use crate::client::WorkerClient;
16+
use crate::common::ShuffleConf;
17+
use curvine_common::FsResult;
18+
use orpc::client::ClusterConnector;
19+
use orpc::io::net::InetAddr;
20+
use orpc::runtime::Runtime;
21+
use std::sync::Arc;
22+
23+
pub struct ShuffleContext {
24+
connector: Arc<ClusterConnector>,
25+
conf: ShuffleConf,
26+
}
27+
28+
impl ShuffleContext {
29+
pub fn new(conf: ShuffleConf, connector: Arc<ClusterConnector>) -> Self {
30+
Self { connector, conf }
31+
}
32+
33+
pub async fn worker_client(&self, addr: &InetAddr) -> FsResult<WorkerClient> {
34+
WorkerClient::new(&self.connector, addr.clone()).await
35+
}
36+
37+
pub fn conf(&self) -> &ShuffleConf {
38+
&self.conf
39+
}
40+
41+
pub fn connector(&self) -> Arc<ClusterConnector> {
42+
self.connector.clone()
43+
}
44+
45+
pub fn clone_runtime(&self) -> Arc<Runtime> {
46+
self.connector.clone_runtime()
47+
}
48+
49+
pub fn root_dir(&self) -> &str {
50+
&self.conf.root_dir
51+
}
52+
}

0 commit comments

Comments
 (0)