Skip to content

Commit 3fd30cb

Browse files
committed
feat: build client
1 parent 22c247d commit 3fd30cb

File tree

6 files changed

+151
-56
lines changed

6 files changed

+151
-56
lines changed

src/client.rs

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
use std::{
2+
io::{BufReader, BufWriter, Write},
3+
net::{SocketAddr, TcpStream},
4+
};
5+
6+
use serde_json::{de::IoRead, StreamDeserializer};
7+
8+
use crate::{
9+
error::{Error, ErrorKind, Result},
10+
net::{Request, Response},
11+
};
12+
13+
pub struct Client<'a> {
14+
writer: BufWriter<TcpStream>,
15+
reader: StreamDeserializer<'a, IoRead<BufReader<TcpStream>>, Response>,
16+
}
17+
18+
impl<'a> Client<'a> {
19+
pub fn connect(addr: SocketAddr) -> Result<Client<'a>> {
20+
let stream = TcpStream::connect(addr)?;
21+
let writer = BufWriter::new(stream.try_clone()?);
22+
let reader = BufReader::new(stream);
23+
let reader = serde_json::Deserializer::from_reader(reader).into_iter::<Response>();
24+
Ok(Client { reader, writer })
25+
}
26+
27+
pub fn get(&mut self, key: String) -> Result<Option<String>> {
28+
let request = Request::Get {
29+
key: key.to_owned(),
30+
};
31+
32+
self.send_request(&request)?;
33+
34+
if let Some(response) = self.reader.next() {
35+
if let Response::Get(Ok(result)) = response? {
36+
return Ok(result);
37+
}
38+
}
39+
40+
Err(Error::from(ErrorKind::KeyNotFound(format!(
41+
"cannot get value of key:{}",
42+
key
43+
))))
44+
}
45+
46+
pub fn set(&mut self, key: String, value: String) -> Result<()> {
47+
let request = Request::Set { key, value };
48+
self.send_request(&request)?;
49+
50+
if let Some(response) = self.reader.next() {
51+
if let Response::Set(Ok(())) = response? {
52+
return Ok(());
53+
}
54+
}
55+
Err(Error::from(ErrorKind::Error(
56+
"cannot get response from server".to_string(),
57+
)))
58+
}
59+
60+
pub fn remove(&mut self, key: String) -> Result<()> {
61+
let request = Request::Remove { key };
62+
self.send_request(&request)?;
63+
64+
if let Some(response) = self.reader.next() {
65+
if let Response::Remove(Ok(())) = response? {
66+
return Ok(());
67+
}
68+
}
69+
70+
Err(Error::from(ErrorKind::Error(
71+
"cannot get response from server".to_string(),
72+
)))
73+
}
74+
75+
fn send_request(&mut self, request: &Request) -> Result<()> {
76+
let buf = serde_json::to_vec(request)?;
77+
78+
self.writer.write_all(&buf[..])?;
79+
self.writer.flush()?;
80+
81+
Ok(())
82+
}
83+
}

src/common.rs

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -57,20 +57,3 @@ impl OffSet {
5757
self.file_no
5858
}
5959
}
60-
#[derive(Debug, Serialize)]
61-
pub enum GetResponse {
62-
Ok(Option<String>),
63-
Err(String),
64-
}
65-
66-
#[derive(Debug, Serialize)]
67-
pub enum SetResponse {
68-
Ok(()),
69-
Err(String),
70-
}
71-
72-
#[derive(Debug, Serialize)]
73-
pub enum RemoveResponse {
74-
Ok(()),
75-
Err(String),
76-
}

src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
1+
mod client;
12
pub mod common;
23
mod database;
34
pub mod error;
45
pub mod kvs_store;
6+
mod net;
57
mod protocol;
68
mod reader;
79
pub mod server;

src/net.rs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
use serde::{Deserialize, Serialize};
2+
use std::fs::OpenOptions;
3+
4+
#[derive(Serialize, Deserialize, Debug)]
5+
pub enum Request {
6+
Get { key: String },
7+
Set { key: String, value: String },
8+
Remove { key: String },
9+
}
10+
11+
#[derive(Serialize, Deserialize, Debug)]
12+
pub enum Response {
13+
Get(Result<Option<String>, String>),
14+
Set(Result<(), String>),
15+
Remove(Result<(), String>),
16+
}
17+
18+
impl Response {
19+
pub fn set(result: Result<(), String>) -> Self {
20+
Response::Set(result)
21+
}
22+
23+
pub fn get(result: Result<Option<String>, String>) -> Self {
24+
Response::Get(result)
25+
}
26+
27+
pub fn remove(result: Result<(), String>) -> Self {
28+
Response::Remove(result)
29+
}
30+
}

src/protocol/connection.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
1-
use std::io::{BufReader, Cursor};
1+
use std::io::Cursor;
22

3-
use bytes::{buf, Buf, BytesMut};
3+
use bytes::{Buf, BytesMut};
44
use tokio::{
5-
io::{AsyncReadExt, AsyncWriteExt, BufWriter},
5+
io::{AsyncWriteExt, BufWriter},
66
net::TcpStream,
77
};
88

9-
use crate::error::{Error, Result};
9+
use crate::error::Result;
1010

1111
use super::protocol::Frame;
1212

src/server.rs

Lines changed: 32 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
1-
use crate::common::{Command, GetResponse, KvsEngine, RemoveResponse, SetResponse};
1+
use crate::common::KvsEngine;
22
use crate::error::Result;
3+
use crate::net::{Request, Response};
34
use serde_json::Deserializer;
45
use slog::{debug, error, info, Logger};
56
use std::io::{BufReader, BufWriter, Write};
@@ -20,55 +21,51 @@ impl<T: KvsEngine> Server<T> {
2021
}
2122
Ok(())
2223
}
23-
2424
fn handle_client(&mut self, stream: TcpStream, logger: &Logger) -> Result<()> {
2525
let peer_addr = stream.peer_addr()?;
2626
let reader = BufReader::new(&stream);
2727
let mut writer = BufWriter::new(&stream);
28-
let requests = Deserializer::from_reader(reader).into_iter::<Command>();
28+
let requests = Deserializer::from_reader(reader).into_iter::<Request>();
29+
30+
for request in requests {
31+
if let Ok(request) = request {
32+
info!(logger,"request:"; "request" => format!("{:?}", request));
33+
34+
let response = match request {
35+
Request::Get { key } => match self.engine.get(key) {
36+
Ok(v) => Response::Get(Ok(v)),
37+
Err(e) => Response::Get(Err(e.to_string())),
38+
},
39+
Request::Remove { key } => match self.engine.remove(key) {
40+
Ok(_) => Response::Remove(Ok(())),
41+
Err(e) => Response::Remove(Err(e.as_string())),
42+
},
43+
Request::Set { key, value } => match self.engine.set(key, value) {
44+
Ok(()) => Response::Set(Ok(())),
45+
Err(e) => Response::Set(Err(e.to_string())),
46+
},
47+
};
48+
49+
self.send_response(&mut writer, &response)?;
2950

30-
macro_rules! respond {
31-
($response: expr) => {{
32-
let response = $response;
33-
serde_json::to_writer(&mut writer, &response)?;
34-
writer.flush()?;
3551
debug!(
3652
logger,
3753
"Response sent to";
3854
"addr" => format!("{}",peer_addr),
3955
"response" => format!("{:?}",response)
4056
);
41-
};};
42-
}
43-
44-
for request in requests {
45-
if let Ok(request) = request {
46-
info!(logger,"request:"; "request" => format!("{:?}", request));
47-
match request {
48-
Command::Get { key } => {
49-
respond!(match self.engine.get(key) {
50-
Ok(v) => GetResponse::Ok(v),
51-
Err(e) => GetResponse::Err(e.as_string()),
52-
})
53-
}
54-
Command::Remove { key } => {
55-
respond!(match self.engine.remove(key) {
56-
Ok(_) => RemoveResponse::Ok(()),
57-
Err(e) => RemoveResponse::Err(e.as_string()),
58-
})
59-
}
60-
Command::Set { key, value } => {
61-
respond!(match self.engine.set(key, value) {
62-
Ok(()) => SetResponse::Ok(()),
63-
Err(e) => SetResponse::Err(e.as_string()),
64-
})
65-
}
66-
}
67-
writer.flush()?;
57+
// write response
6858
} else {
6959
error!(logger, "can not parse the request");
7060
}
7161
}
7262
Ok(())
7363
}
64+
65+
fn send_response(&self, writer: &mut BufWriter<&TcpStream>, response: &Response) -> Result<()> {
66+
let buf = serde_json::to_vec(response)?;
67+
writer.write_all(&buf[..])?;
68+
writer.flush()?;
69+
Ok(())
70+
}
7471
}

0 commit comments

Comments
 (0)