Skip to content

Commit

Permalink
plugin-v2024090102
Browse files Browse the repository at this point in the history
  • Loading branch information
MXWXZ committed Sep 2, 2024
1 parent ec9e1fc commit 4696cf4
Show file tree
Hide file tree
Showing 12 changed files with 144 additions and 43 deletions.
13 changes: 13 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
# plugin-v2024090102
## Components
- agent: 0.3.0 => 0.3.1
- skynet_api_agent: 0.3.0 => 0.3.1
- monitor: 0.2.4 => 0.2.5

## Changes
1. agent now use rust env to determine OS type.

## Bug fix
1. `agent`, `monitor`: Fix potential data corrupt.
2. `agent`: Fix potential dead lock.

# plugin-v2024090101
## Components
- monitor: 0.2.3 => 0.2.4
Expand Down
8 changes: 5 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions plugin/agent/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "agent"
version = "0.3.0"
version = "0.3.1"
edition = "2021"
authors = ["MXWXZ <[email protected]>"]

Expand All @@ -19,9 +19,10 @@ miniz_oxide = "0.8.0"
portable-pty = "0.8.1"
thiserror = "1.0.63"
aes-gcm = "0.10.3"
bytes = "1.7.1"

skynet_api_monitor = { version = "0.2.0", path = "../monitor_api" }
skynet_api_agent = { version = "0.3.0", path = "../agent_api" }
skynet_api_agent = { version = "0.3.1", path = "../agent_api" }
skynet_api = { path = "../../skynet_api" }

[lib]
Expand Down
2 changes: 1 addition & 1 deletion plugin/agent/config.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
id: "ce96ae04-6801-4ca4-b09d-a087e05f3783"
name: "agent"
description: "Agent auto updater"
version: "0.3.0"
version: "0.3.1"
api_version: "~0.1.0"
priority: 50
2 changes: 1 addition & 1 deletion plugin/agent/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ async fn connect(
.send_msg(&state.new_client_msg(Data::Info(InfoMessage {
endpoint: addr.to_string(),
version: env!("CARGO_PKG_VERSION").to_owned(),
os: System::name(),
os: Some(consts::OS.to_owned()),
system: System::long_os_version(),
arch: Some(consts::ARCH.to_owned()),
hostname: System::host_name(),
Expand Down
40 changes: 20 additions & 20 deletions plugin/agent/src/shell.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use std::io::{ErrorKind, Read, Write};
use std::{
io::{ErrorKind, Read, Write},
thread,
};

use portable_pty::{native_pty_system, Child, CommandBuilder, PtyPair, PtySize};
use skynet_api::actix_cloud::tokio::spawn;
use skynet_api::actix_cloud::tokio::sync::mpsc::UnboundedSender;
use skynet_api::Result;
use skynet_api_monitor::{message::Data, ShellOutputMessage};
Expand Down Expand Up @@ -33,25 +35,23 @@ impl ShellInstance {

// safe to detach, terminated when reader closed.
let token = token.to_owned();
spawn(async move {
loop {
let mut buffer = [0; 64];
match reader.read(&mut buffer) {
Ok(n) => {
if n == 0 {
break;
}
if let Some(x) = &sender {
let _ = x.send(Data::ShellOutput(ShellOutputMessage {
token: Some(token.clone()),
data: buffer[..n].to_vec(),
}));
}
thread::spawn(move || loop {
let mut buffer = [0; 64];
match reader.read(&mut buffer) {
Ok(n) => {
if n == 0 {
break;
}
Err(e) => {
if e.kind() != ErrorKind::Interrupted {
break;
}
if let Some(x) = &sender {
let _ = x.send(Data::ShellOutput(ShellOutputMessage {
token: Some(token.clone()),
data: buffer[..n].to_vec(),
}));
}
}
Err(e) => {
if e.kind() != ErrorKind::Interrupted {
break;
}
}
}
Expand Down
54 changes: 49 additions & 5 deletions plugin/agent/src/socket.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,22 @@
use std::io;

use aes_gcm::{
aead::{Aead, OsRng},
AeadCore, Aes256Gcm, KeyInit, Nonce,
};
use bytes::BytesMut;
use derivative::Derivative;
use skynet_api::{actix_cloud::bail, Result};
use skynet_api::{
actix_cloud::tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::TcpStream,
},
anyhow,
};
use skynet_api::{
actix_cloud::{bail, tokio::io::AsyncRead},
Result,
};
use skynet_api_monitor::prost::Message as _;
use skynet_api_monitor::{
ecies::{encrypt, PublicKey},
Expand Down Expand Up @@ -41,11 +47,40 @@ pub enum SocketError {
ShellDisabled,
}

#[derive(Derivative)]
#[derivative(Default(new = "true"))]
struct FrameLen {
data: [u8; 4],
len: usize,
}

impl FrameLen {
async fn next<R>(&mut self, io: &mut R) -> Result<u32>
where
R: AsyncRead + Unpin,
{
while self.len < 4 {
let cnt = io.read(&mut self.data[self.len..]).await?;
if cnt == 0 {
self.len = 0;
return Err(io::Error::from(io::ErrorKind::UnexpectedEof).into());
}
self.len += cnt;
}
Ok(u32::from_be_bytes(self.data))
}

fn reset(&mut self) {
self.len = 0;
}
}

pub struct Frame {
pk: [u8; PUBLIC_KEY_SIZE],
key: [u8; AES256_KEY_SIZE],
stream: TcpStream,
cipher: Aes256Gcm,
len: FrameLen,
}

impl Frame {
Expand All @@ -56,6 +91,7 @@ impl Frame {
stream,
cipher: Aes256Gcm::new(&key),
key: key.into(),
len: FrameLen::new(),
}
}

Expand Down Expand Up @@ -92,12 +128,20 @@ impl Frame {
}

pub async fn read(&mut self) -> Result<Vec<u8>> {
let len = self.stream.read_u32().await?;
let mut buf = vec![0; len.try_into()?];
self.stream.read_exact(&mut buf).await?;
Ok(buf)
let len = self.len.next(&mut self.stream).await?;
let mut ret = BytesMut::with_capacity(len.try_into()?);
if self.stream.read_buf(&mut ret).await? == 0 {
self.len.reset();
return Err(io::Error::from(io::ErrorKind::UnexpectedEof).into());
}
self.len.reset();
Ok(ret.into())
}

/// Read message from frame.
///
/// # Cancel safety
/// This function is cancellation safe.
pub async fn read_msg(&mut self) -> Result<Message> {
let buf = self.read().await?;
let nonce = Nonce::from_slice(&buf[0..NONCE_SIZE]);
Expand Down
2 changes: 1 addition & 1 deletion plugin/agent_api/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "skynet_api_agent"
version = "0.3.0"
version = "0.3.1"
edition = "2021"
authors = ["MXWXZ <[email protected]>"]
description = "API for Skynet agent plugin."
Expand Down
2 changes: 1 addition & 1 deletion plugin/agent_api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ impl System {
Some(Self::Windows)
} else if str.contains("linux") {
Some(Self::Linux)
} else if str.contains("darwin") || str.contains("macos") {
} else if str.contains("macos") {
Some(Self::OSX)
} else {
None
Expand Down
5 changes: 3 additions & 2 deletions plugin/monitor/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "monitor"
version = "0.2.4"
version = "0.2.5"
edition = "2021"
authors = ["MXWXZ <[email protected]>"]

Expand All @@ -18,9 +18,10 @@ base64 = "0.22.1"
aes-gcm = "0.10.3"
crc32fast = "1.4.2"
miniz_oxide = "0.8.0"
bytes = "1.7.1"

skynet_api_monitor = { version = "0.2.0", path = "../monitor_api" }
skynet_api_agent = { version = "0.3.0", path = "../agent_api" }
skynet_api_agent = { version = "0.3.1", path = "../agent_api" }
skynet_api = { path = "../../skynet_api" }
skynet_macro = { path = "../../skynet_macro" }
sea-orm-migration = { version = "1.0.1", features = [
Expand Down
2 changes: 1 addition & 1 deletion plugin/monitor/config.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
id: "2eb2e1a5-66b4-45f9-ad24-3c4f05c858aa"
name: "monitor"
description: "Server monitor"
version: "0.2.4"
version: "0.2.5"
api_version: "~0.1.0"
priority: 100
52 changes: 46 additions & 6 deletions plugin/monitor/src/server.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
use std::io;
use std::net::SocketAddr;
use std::time::Duration;

use actix::clock::{interval, Instant, Interval};
use aes_gcm::aead::{Aead, OsRng};
use aes_gcm::{AeadCore, Aes256Gcm, KeyInit, Nonce};
use bytes::BytesMut;
use derivative::Derivative;
use miniz_oxide::deflate::compress_to_vec;
use skynet_api::actix_cloud::bail;
use skynet_api::actix_cloud::chrono::{DateTime, Utc};
use skynet_api::actix_cloud::tokio::io::{AsyncReadExt, AsyncWriteExt};
use skynet_api::actix_cloud::tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt};
use skynet_api::actix_cloud::tokio::net::{TcpListener, TcpStream};
use skynet_api::actix_cloud::tokio::sync::broadcast::{channel, Receiver, Sender};
use skynet_api::actix_cloud::tokio::sync::mpsc::{
Expand Down Expand Up @@ -39,10 +41,39 @@ const AES256_KEY_SIZE: usize = 32;
const SECRET_KEY_SIZE: usize = 32;
const MAGIC_NUMBER: &[u8] = b"SKNT";

#[derive(Derivative)]
#[derivative(Default(new = "true"))]
struct FrameLen {
data: [u8; 4],
len: usize,
}

impl FrameLen {
async fn next<R>(&mut self, io: &mut R) -> Result<u32>
where
R: AsyncRead + Unpin,
{
while self.len < 4 {
let cnt = io.read(&mut self.data[self.len..]).await?;
if cnt == 0 {
self.len = 0;
return Err(io::Error::from(io::ErrorKind::UnexpectedEof).into());
}
self.len += cnt;
}
Ok(u32::from_be_bytes(self.data))
}

fn reset(&mut self) {
self.len = 0;
}
}

struct Frame {
stream: TcpStream,
cipher: Option<Aes256Gcm>,
sk: [u8; SECRET_KEY_SIZE],
len: FrameLen,
}

impl Frame {
Expand All @@ -51,6 +82,7 @@ impl Frame {
stream,
cipher: None,
sk: sk.serialize(),
len: FrameLen::new(),
}
}

Expand Down Expand Up @@ -81,13 +113,21 @@ impl Frame {
self.send(&buf).await
}

async fn read(&mut self) -> Result<Vec<u8>> {
let len = self.stream.read_u32().await?;
let mut buf = vec![0; len.try_into()?];
self.stream.read_exact(&mut buf).await?;
Ok(buf)
pub async fn read(&mut self) -> Result<Vec<u8>> {
let len = self.len.next(&mut self.stream).await?;
let mut ret = BytesMut::with_capacity(len.try_into()?);
if self.stream.read_buf(&mut ret).await? == 0 {
self.len.reset();
return Err(io::Error::from(io::ErrorKind::UnexpectedEof).into());
}
self.len.reset();
Ok(ret.into())
}

/// Read message from frame.
///
/// # Cancel safety
/// This function is cancellation safe.
async fn read_msg(&mut self) -> Result<Message> {
let buf = self.read().await?;
if let Some(cipher) = &self.cipher {
Expand Down

0 comments on commit 4696cf4

Please sign in to comment.