Skip to content
This repository was archived by the owner on Feb 21, 2024. It is now read-only.

Commit 9f4faa3

Browse files
author
Matthew Fisher
committed
bump to krustlet 0.5
Signed-off-by: Matthew Fisher <[email protected]>
1 parent 63c6ed2 commit 9f4faa3

21 files changed

+1995
-478
lines changed

Cargo.lock

Lines changed: 684 additions & 229 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,14 @@ async-trait = "0.1"
1212
chrono = { version = "0.4", features = ["serde"] }
1313
env_logger = "0.7"
1414
futures = "0.3"
15-
k8s-openapi = { version = "0.7", features = ["v1_17"] }
16-
kube = "0.33"
17-
kubelet = {version = "0.2", features = ["cli"]}
15+
k8s-openapi = { version = "0.9", default-features = false, features = ["v1_17"] }
16+
kube = { version= "0.40", default-features = false, features = ["native-tls"] }
17+
kubelet = "0.5"
1818
log = "0.4"
19-
oci-distribution = "0.1"
19+
oci-distribution = "0.4"
20+
serde = "1.0"
21+
serde_derive = "1.0"
22+
serde_json = "1.0"
2023
tempfile = "3.1"
2124
tokio = { version = "0.2", features = ["fs", "stream", "macros", "io-util", "sync"] }
2225
wasm3 = { git = "https://github.com/Veykril/wasm3-rs.git", features = ["wasi"] }

README.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,12 @@
11
# krustlet-wasm3
22

33
[Krustlet](https://github.com/deislabs/krustlet) provider for the [wasm3](https://github.com/wasm3/wasm3) runtime.
4+
5+
## Prerequisites
6+
7+
[Install Clang 3.9](https://rust-lang.github.io/rust-bindgen/requirements.html#installing-clang-39) and [rust-bindgen](https://rust-lang.github.io/rust-bindgen) prior to running `cargo build`:
8+
9+
```console
10+
$ apt install llvm-dev libclang-dev clang
11+
$ cargo install bindgen
12+
```

src/provider/runtime.rs renamed to runtime.rs

Lines changed: 42 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,16 @@ use tempfile::NamedTempFile;
55
use tokio::sync::watch::{self, Sender};
66
use tokio::task::JoinHandle;
77
use wasm3::{Environment, Module};
8-
use kubelet::handle::{RuntimeHandle, Stop};
9-
use kubelet::status::ContainerStatus;
8+
use kubelet::container::Handle as ContainerHandle;
9+
use kubelet::container::Status as ContainerStatus;
10+
use kubelet::handle::StopHandler;
1011

11-
pub struct HandleStopper {
12+
pub struct Runtime {
1213
handle: JoinHandle<anyhow::Result<()>>,
1314
}
1415

1516
#[async_trait::async_trait]
16-
impl Stop for HandleStopper {
17+
impl StopHandler for Runtime {
1718
async fn stop(&mut self) -> anyhow::Result<()> {
1819
// no nothing
1920
Ok(())
@@ -26,13 +27,13 @@ impl Stop for HandleStopper {
2627
}
2728

2829
/// A runtime context for running a wasm module with wasm3
29-
pub struct Runtime {
30+
pub struct Wasm3Runtime {
3031
module_bytes: Vec<u8>,
3132
stack_size: u32,
3233
output: Arc<NamedTempFile>,
3334
}
3435

35-
impl Runtime {
36+
impl Wasm3Runtime {
3637
pub async fn new<L: AsRef<Path> + Send + Sync + 'static>(module_bytes: Vec<u8>, stack_size: u32, log_dir: L) -> anyhow::Result<Self> {
3738
let temp = tokio::task::spawn_blocking(move || -> anyhow::Result<NamedTempFile> {
3839
Ok(NamedTempFile::new_in(log_dir)?)
@@ -46,7 +47,7 @@ impl Runtime {
4647
})
4748
}
4849

49-
pub async fn start(&mut self) -> anyhow::Result<RuntimeHandle<HandleStopper, LogHandleFactory>> {
50+
pub async fn start(&mut self) -> anyhow::Result<ContainerHandle<Runtime, LogHandleFactory>> {
5051
let temp = self.output.clone();
5152
let output_write = tokio::task::spawn_blocking(move || -> anyhow::Result<std::fs::File> {
5253
Ok(temp.reopen()?)
@@ -64,10 +65,9 @@ impl Runtime {
6465
temp: self.output.clone(),
6566
};
6667

67-
Ok(RuntimeHandle::new(
68-
HandleStopper{handle},
68+
Ok(ContainerHandle::new(
69+
Runtime{handle},
6970
log_handle_factory,
70-
status_recv,
7171
))
7272
}
7373
}
@@ -77,7 +77,7 @@ pub struct LogHandleFactory {
7777
temp: Arc<NamedTempFile>,
7878
}
7979

80-
impl kubelet::handle::LogHandleFactory<tokio::fs::File> for LogHandleFactory {
80+
impl kubelet::log::HandleFactory<tokio::fs::File> for LogHandleFactory {
8181
/// Creates `tokio::fs::File` on demand for log reading.
8282
fn new_handle(&self) -> tokio::fs::File {
8383
tokio::fs::File::from_std(self.temp.reopen().unwrap())
@@ -94,20 +94,46 @@ async fn spawn_wasm3(
9494
_output_write: std::fs::File, //TODO: hook this up such that log output will be written to the file
9595
) -> anyhow::Result<JoinHandle<anyhow::Result<()>>> {
9696
let handle = tokio::task::spawn_blocking(move || -> anyhow::Result<_> {
97-
let env = Environment::new().expect("cannot create environment");
97+
98+
let env = match Environment::new() {
99+
// We can't map errors here or it moves the send channel, so we
100+
// do it in a match
101+
Ok(m) => m,
102+
Err(e) => {
103+
let message = "cannot create environment";
104+
error!("{}: {:?}", message, e);
105+
send(
106+
status_sender.clone(),
107+
name,
108+
Status::Terminated {
109+
failed: true,
110+
message: message.into(),
111+
timestamp: chrono::Utc::now(),
112+
},
113+
&mut cx,
114+
);
115+
return Err(e);
116+
}
117+
}
118+
98119
let rt = env.create_runtime(stack_size).expect("cannot create runtime");
120+
99121
let module = Module::parse(&env, &module_bytes).expect("cannot parse module");
122+
100123
let mut module = rt.load_module(module).expect("cannot load module");
124+
101125
module.link_wasi().expect("cannot link WASI");
126+
102127
let func = module.find_function::<(), ()>("_start").expect("cannot find function '_start' in module");
128+
103129
func.call().expect("cannot call '_start' in module");
104-
status_sender
105-
.broadcast(ContainerStatus::Terminated {
130+
131+
status_sender.broadcast(ContainerStatus::Terminated {
106132
failed: false,
107133
message: "Module run completed".into(),
108134
timestamp: chrono::Utc::now(),
109-
})
110-
.expect("status should be able to send");
135+
}).expect("status should be able to send");
136+
111137
Ok(())
112138
});
113139

src/lib.rs

Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
//! A custom kubelet backend that can run [WASI](https://wasi.dev/) based workloads
2+
//!
3+
//! The crate provides the [`WasiProvider`] type which can be used
4+
//! as a provider with [`kubelet`].
5+
//!
6+
//! # Example
7+
//! ```rust,no_run
8+
//! use kubelet::{Kubelet, config::Config};
9+
//! use kubelet::store::oci::FileStore;
10+
//! use std::sync::Arc;
11+
//! use wasi_provider::WasiProvider;
12+
//!
13+
//! async {
14+
//! // Get a configuration for the Kubelet
15+
//! let kubelet_config = Config::default();
16+
//! let client = oci_distribution::Client::default();
17+
//! let store = Arc::new(FileStore::new(client, &std::path::PathBuf::from("")));
18+
//!
19+
//! // Load a kubernetes configuration
20+
//! let kubeconfig = kube::Config::infer().await.unwrap();
21+
//!
22+
//! // Instantiate the provider type
23+
//! let provider = WasiProvider::new(store, &kubelet_config, kubeconfig.clone()).await.unwrap();
24+
//!
25+
//! // Instantiate the Kubelet
26+
//! let kubelet = Kubelet::new(provider, kubeconfig, kubelet_config).await.unwrap();
27+
//! // Start the Kubelet and block on it
28+
//! kubelet.start().await.unwrap();
29+
//! };
30+
//! ```
31+
32+
#![deny(missing_docs)]
33+
34+
mod wasi_runtime;
35+
36+
use std::collections::HashMap;
37+
use std::path::PathBuf;
38+
use std::sync::Arc;
39+
40+
use async_trait::async_trait;
41+
use kubelet::node::Builder;
42+
use kubelet::pod::{key_from_pod, pod_key, Handle, Pod};
43+
use kubelet::provider::{Provider, ProviderError};
44+
use kubelet::store::Store;
45+
use kubelet::volume::Ref;
46+
use tokio::sync::mpsc::{self, Receiver, Sender};
47+
use tokio::sync::RwLock;
48+
use wasi_runtime::Runtime;
49+
50+
mod states;
51+
52+
use states::registered::Registered;
53+
use states::terminated::Terminated;
54+
55+
const TARGET_WASM32_WASI: &str = "wasm32-wasi";
56+
const LOG_DIR_NAME: &str = "wasi-logs";
57+
const VOLUME_DIR: &str = "volumes";
58+
59+
/// WasiProvider provides a Kubelet runtime implementation that executes WASM
60+
/// binaries conforming to the WASI spec.
61+
#[derive(Clone)]
62+
pub struct WasiProvider {
63+
shared: SharedPodState,
64+
}
65+
66+
#[derive(Clone)]
67+
struct SharedPodState {
68+
handles: Arc<RwLock<HashMap<String, Handle<Runtime, wasi_runtime::HandleFactory>>>>,
69+
store: Arc<dyn Store + Sync + Send>,
70+
log_path: PathBuf,
71+
kubeconfig: kube::Config,
72+
volume_path: PathBuf,
73+
}
74+
75+
impl WasiProvider {
76+
/// Create a new wasi provider from a module store and a kubelet config
77+
pub async fn new(
78+
store: Arc<dyn Store + Sync + Send>,
79+
config: &kubelet::config::Config,
80+
kubeconfig: kube::Config,
81+
) -> anyhow::Result<Self> {
82+
let log_path = config.data_dir.join(LOG_DIR_NAME);
83+
let volume_path = config.data_dir.join(VOLUME_DIR);
84+
tokio::fs::create_dir_all(&log_path).await?;
85+
tokio::fs::create_dir_all(&volume_path).await?;
86+
Ok(Self {
87+
shared: SharedPodState {
88+
handles: Default::default(),
89+
store,
90+
log_path,
91+
volume_path,
92+
kubeconfig,
93+
},
94+
})
95+
}
96+
}
97+
98+
struct ModuleRunContext {
99+
modules: HashMap<String, Vec<u8>>,
100+
volumes: HashMap<String, Ref>,
101+
status_sender: Sender<(String, kubelet::container::Status)>,
102+
status_recv: Receiver<(String, kubelet::container::Status)>,
103+
}
104+
105+
/// State that is shared between pod state handlers.
106+
pub struct PodState {
107+
key: String,
108+
run_context: ModuleRunContext,
109+
errors: usize,
110+
shared: SharedPodState,
111+
}
112+
113+
// No cleanup state needed, we clean up when dropping PodState.
114+
#[async_trait]
115+
impl kubelet::state::AsyncDrop for PodState {
116+
async fn async_drop(self) {
117+
{
118+
let mut handles = self.shared.handles.write().await;
119+
handles.remove(&self.key);
120+
}
121+
}
122+
}
123+
124+
#[async_trait::async_trait]
125+
impl Provider for WasiProvider {
126+
type InitialState = Registered;
127+
type TerminatedState = Terminated;
128+
type PodState = PodState;
129+
130+
const ARCH: &'static str = TARGET_WASM32_WASI;
131+
132+
async fn node(&self, builder: &mut Builder) -> anyhow::Result<()> {
133+
builder.set_architecture("wasm-wasi");
134+
builder.add_taint("NoExecute", "kubernetes.io/arch", Self::ARCH);
135+
Ok(())
136+
}
137+
138+
async fn initialize_pod_state(&self, pod: &Pod) -> anyhow::Result<Self::PodState> {
139+
let (tx, rx) = mpsc::channel(pod.all_containers().len());
140+
let run_context = ModuleRunContext {
141+
modules: Default::default(),
142+
volumes: Default::default(),
143+
status_sender: tx,
144+
status_recv: rx,
145+
};
146+
let key = key_from_pod(pod);
147+
Ok(PodState {
148+
key,
149+
run_context,
150+
errors: 0,
151+
shared: self.shared.clone(),
152+
})
153+
}
154+
155+
async fn logs(
156+
&self,
157+
namespace: String,
158+
pod_name: String,
159+
container_name: String,
160+
sender: kubelet::log::Sender,
161+
) -> anyhow::Result<()> {
162+
let mut handles = self.shared.handles.write().await;
163+
let handle = handles
164+
.get_mut(&pod_key(&namespace, &pod_name))
165+
.ok_or_else(|| ProviderError::PodNotFound {
166+
pod_name: pod_name.clone(),
167+
})?;
168+
handle.output(&container_name, sender).await
169+
}
170+
}

src/main.rs

Lines changed: 0 additions & 23 deletions
This file was deleted.

0 commit comments

Comments
 (0)