Skip to content

Commit 261d4d6

Browse files
authored
feat: adds initial findAndModify command support (#51)
* feat: add findAndModify command (wip) * refact: extracted update parser login * feat: added update_from_operation * refact: uses update_from_operation on updates * feat: findAndModify initial implementation * feat: added initial upsert support to findAndModify * feat: supports sort param on findAndModify
1 parent bb5be98 commit 261d4d6

File tree

8 files changed

+616
-241
lines changed

8 files changed

+616
-241
lines changed

src/commands/find_and_modify.rs

+109
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
use crate::handler::{CommandExecutionError, Request};
2+
use crate::parser::parse_update;
3+
use crate::pg::UpdateResult;
4+
use crate::{commands::Handler, pg::SqlParam};
5+
use bson::{doc, Bson, Document};
6+
7+
pub struct FindAndModify {}
8+
9+
impl Handler for FindAndModify {
10+
fn new() -> Self {
11+
FindAndModify {}
12+
}
13+
14+
fn handle(
15+
&self,
16+
request: &Request,
17+
docs: &Vec<Document>,
18+
) -> Result<Document, CommandExecutionError> {
19+
let doc = &docs[0];
20+
let db = doc.get_str("$db").unwrap();
21+
let collection = doc.get_str("findAndModify").unwrap();
22+
let sp = SqlParam::new(db, collection);
23+
let query = doc.get_document("query").unwrap();
24+
let sort = match doc.get_document("sort") {
25+
Ok(sort_doc) => Some(sort_doc),
26+
_ => None,
27+
};
28+
let raw_update = doc.get_document("update").unwrap();
29+
let update_doc = parse_update(raw_update);
30+
let upsert = doc.get_bool("upsert").unwrap_or(false);
31+
32+
let mut client = request.get_client();
33+
client.create_table_if_not_exists(db, collection).unwrap();
34+
35+
let res = client
36+
.update(
37+
&sp,
38+
Some(query),
39+
sort,
40+
update_doc.unwrap(),
41+
false,
42+
false,
43+
true,
44+
)
45+
.unwrap();
46+
47+
match res {
48+
UpdateResult::Count(total) => {
49+
if total == 0 {
50+
if upsert {
51+
let mut obj = query.clone();
52+
obj.extend(extract_operator_values(&raw_update));
53+
54+
let res = client.insert_doc(sp, &obj).unwrap();
55+
return Ok(doc! {
56+
"value": null,
57+
"lastErrorObject": {
58+
"updatedExisting": false,
59+
"upserted": res.get_object_id("_id").unwrap().to_string(),
60+
"n": 1,
61+
},
62+
"ok": 1.0,
63+
});
64+
} else {
65+
return Ok(doc! {
66+
"value": null,
67+
"ok": Bson::Double(1.0),
68+
});
69+
}
70+
} else {
71+
unreachable!(
72+
"Unexpected numeric result for a findAndUpdate command: {:#?}",
73+
doc
74+
);
75+
}
76+
}
77+
UpdateResult::Document(value) => Ok(doc! {
78+
"n": Bson::Int64(1),
79+
"value": value,
80+
"ok": Bson::Double(1.0),
81+
}),
82+
}
83+
}
84+
}
85+
86+
fn extract_operator_values(doc: &Document) -> Document {
87+
let mut res = Document::new();
88+
for (key, value) in doc {
89+
if key.starts_with("$") {
90+
if let Some(value) = value.as_document() {
91+
res.extend(value.clone());
92+
}
93+
}
94+
}
95+
res
96+
}
97+
98+
#[cfg(test)]
99+
mod tests {
100+
use super::*;
101+
102+
#[test]
103+
fn test_extract_operator_values() {
104+
assert_eq!(
105+
extract_operator_values(&doc! { "$inc": { "score": 1 }, "$set": { "name": "abc" } }),
106+
doc! { "score": 1, "name": "abc" }
107+
);
108+
}
109+
}

src/commands/mod.rs

+2-3
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ mod delete;
1212
mod drop;
1313
mod drop_database;
1414
mod find;
15+
mod find_and_modify;
1516
mod get_cmd_line_opts;
1617
mod get_parameter;
1718
mod hello;
@@ -36,6 +37,7 @@ pub use self::delete::Delete;
3637
pub use self::drop::Drop;
3738
pub use self::drop_database::DropDatabase;
3839
pub use self::find::Find;
40+
pub use self::find_and_modify::FindAndModify;
3941
pub use self::get_cmd_line_opts::GetCmdLineOpts;
4042
pub use self::get_parameter::GetParameter;
4143
pub use self::hello::Hello;
@@ -45,10 +47,7 @@ pub use self::list_collections::ListCollections;
4547
pub use self::list_databases::ListDatabases;
4648
pub use self::list_indexes::ListIndexes;
4749
pub use self::ping::Ping;
48-
pub use self::update::InvalidUpdateError;
4950
pub use self::update::Update;
50-
pub use self::update::UpdateDoc;
51-
pub use self::update::UpdateOper;
5251
pub use self::whats_my_uri::WhatsMyUri;
5352

5453
pub trait Handler {

src/commands/update.rs

+17-153
Original file line numberDiff line numberDiff line change
@@ -1,59 +1,11 @@
11
#![allow(dead_code)]
22
use crate::handler::{CommandExecutionError, Request};
3-
use crate::utils::expand_fields;
4-
use crate::{commands::Handler, pg::SqlParam};
3+
use crate::parser::parse_update;
4+
use crate::{commands::Handler, pg::SqlParam, pg::UpdateResult};
55
use bson::{doc, Bson, Document};
66

77
pub struct Update {}
88

9-
#[derive(Debug, Clone, PartialEq)]
10-
pub enum UpdateOper {
11-
Update(Vec<UpdateDoc>),
12-
Replace(Document),
13-
}
14-
15-
#[derive(Debug, Clone, PartialEq)]
16-
pub enum UpdateDoc {
17-
Inc(Document),
18-
Set(Document),
19-
Unset(Document),
20-
}
21-
22-
#[derive(Debug, Clone, PartialEq)]
23-
pub struct InvalidUpdateError {
24-
reason: String,
25-
}
26-
27-
impl InvalidUpdateError {
28-
pub fn new(reason: String) -> Self {
29-
InvalidUpdateError { reason }
30-
}
31-
}
32-
33-
impl UpdateDoc {
34-
fn validate(&self) -> Result<UpdateDoc, InvalidUpdateError> {
35-
match self {
36-
UpdateDoc::Set(doc) => match expand_fields(doc) {
37-
Ok(u) => Ok(UpdateDoc::Set(u)),
38-
Err(e) => {
39-
return Err(InvalidUpdateError::new(format!(
40-
"Cannot update '{}' and '{}' at the same time",
41-
e.target, e.source
42-
)));
43-
}
44-
},
45-
UpdateDoc::Unset(doc) => Ok(UpdateDoc::Unset(doc.clone())),
46-
UpdateDoc::Inc(u) => Ok(UpdateDoc::Inc(u.clone())),
47-
// _ => {
48-
// return Err(InvalidUpdateError::new(format!(
49-
// "Unhandled update operation: {:?}",
50-
// self
51-
// )));
52-
// }
53-
}
54-
}
55-
}
56-
579
impl Handler for Update {
5810
fn new() -> Self {
5911
Update {}
@@ -85,9 +37,22 @@ impl Handler for Update {
8537
return Err(CommandExecutionError::new(format!("{:?}", update_doc)));
8638
}
8739

88-
n += client
89-
.update(&sp, Some(q), update_doc.unwrap(), upsert, multi)
40+
let result = client
41+
.update(
42+
&sp,
43+
Some(q),
44+
None,
45+
update_doc.unwrap(),
46+
upsert,
47+
multi,
48+
false,
49+
)
9050
.unwrap();
51+
52+
match result {
53+
UpdateResult::Count(total) => n += total,
54+
UpdateResult::Document(_) => n += 1,
55+
}
9156
}
9257

9358
Ok(doc! {
@@ -97,104 +62,3 @@ impl Handler for Update {
9762
})
9863
}
9964
}
100-
101-
fn parse_update(doc: &Document) -> Result<UpdateOper, InvalidUpdateError> {
102-
let mut res: Vec<UpdateDoc> = vec![];
103-
if !doc.keys().any(|k| k.starts_with("$")) {
104-
return Ok(UpdateOper::Replace(doc.clone()));
105-
}
106-
for (key, value) in doc.iter() {
107-
match key.as_str() {
108-
"$set" => {
109-
let expanded_doc = match expand_fields(value.as_document().unwrap()) {
110-
Ok(doc) => doc,
111-
Err(e) => {
112-
return Err(InvalidUpdateError::new(format!(
113-
"Cannot update '{}' and '{}' at the same time",
114-
e.target, e.source
115-
)));
116-
}
117-
};
118-
match UpdateDoc::Set(expanded_doc).validate() {
119-
Ok(update_doc) => res.push(update_doc),
120-
Err(e) => {
121-
return Err(InvalidUpdateError::new(format!("{:?}", e)));
122-
}
123-
}
124-
}
125-
"$unset" => {
126-
let expanded_doc = match expand_fields(value.as_document().unwrap()) {
127-
Ok(doc) => doc,
128-
Err(e) => {
129-
return Err(InvalidUpdateError::new(format!(
130-
"Cannot update '{}' and '{}' at the same time",
131-
e.target, e.source
132-
)));
133-
}
134-
};
135-
match UpdateDoc::Unset(expanded_doc).validate() {
136-
Ok(update_doc) => res.push(update_doc),
137-
Err(e) => {
138-
return Err(InvalidUpdateError::new(format!("{:?}", e)));
139-
}
140-
}
141-
}
142-
"$inc" => {
143-
let expanded_doc = match expand_fields(value.as_document().unwrap()) {
144-
Ok(doc) => doc,
145-
Err(e) => {
146-
return Err(InvalidUpdateError::new(format!(
147-
"Cannot update '{}' and '{}' at the same time",
148-
e.target, e.source
149-
)));
150-
}
151-
};
152-
match UpdateDoc::Inc(expanded_doc).validate() {
153-
Ok(update_doc) => res.push(update_doc),
154-
Err(e) => {
155-
return Err(InvalidUpdateError::new(format!("{:?}", e)));
156-
}
157-
}
158-
}
159-
_ => {
160-
if key.starts_with("$") || res.len() > 0 {
161-
return Err(InvalidUpdateError::new(format!(
162-
"Unknown modifier: {}",
163-
key
164-
)));
165-
}
166-
}
167-
}
168-
}
169-
Ok(UpdateOper::Update(res))
170-
}
171-
172-
#[cfg(test)]
173-
mod tests {
174-
use super::*;
175-
176-
#[test]
177-
fn test_parse_update() {
178-
let set_doc = doc! { "$set": { "a": 1 } };
179-
let repl_doc = doc! { "b": 2, "c": 8, "d": 9 };
180-
let unknown_doc = doc! { "$xyz": { "a": 1 } };
181-
let mixed_doc = doc! { "$set": { "x": 1 }, "b": 2 };
182-
183-
assert_eq!(
184-
parse_update(&set_doc).unwrap(),
185-
UpdateOper::Update(vec![UpdateDoc::Set(doc! { "a": 1 })])
186-
);
187-
assert_eq!(
188-
parse_update(&repl_doc).unwrap(),
189-
UpdateOper::Replace(repl_doc)
190-
);
191-
assert_eq!(
192-
parse_update(&unknown_doc).unwrap_err(),
193-
InvalidUpdateError::new("Unknown modifier: $xyz".to_string())
194-
);
195-
assert_eq!(
196-
parse_update(&mixed_doc).unwrap_err(),
197-
InvalidUpdateError::new("Unknown modifier: b".to_string())
198-
);
199-
}
200-
}

src/handler.rs

+4-2
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
#![allow(dead_code)]
22
use crate::commands::{
33
Aggregate, BuildInfo, CollStats, ConnectionStatus, Create, CreateIndexes, DbStats, Delete,
4-
Drop, DropDatabase, Find, GetCmdLineOpts, GetParameter, Handler, Hello, Insert, IsMaster,
5-
ListCollections, ListDatabases, ListIndexes, Ping, Update, WhatsMyUri,
4+
Drop, DropDatabase, Find, FindAndModify, GetCmdLineOpts, GetParameter, Handler, Hello, Insert,
5+
IsMaster, ListCollections, ListDatabases, ListIndexes, Ping, Update, WhatsMyUri,
66
};
77
use crate::pg::PgDb;
88
use crate::wire::{OpCode, OpMsg};
@@ -120,6 +120,8 @@ fn run(request: &Request, docs: &Vec<Document>) -> Result<Document, CommandExecu
120120

121121
if command == "find" {
122122
Find::new().handle(request, docs)
123+
} else if command == "findAndModify" {
124+
FindAndModify::new().handle(request, docs)
123125
} else if command == "aggregate" {
124126
Aggregate::new().handle(request, docs)
125127
} else if command == "insert" {

src/parser.rs src/parser/mod.rs

+7
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,13 @@ use mongodb_language_model::{
1010
use serde_json::Map;
1111
use std::fmt;
1212

13+
pub use self::update_parser::parse_update;
14+
pub use self::update_parser::InvalidUpdateError;
15+
pub use self::update_parser::UpdateDoc;
16+
pub use self::update_parser::UpdateOper;
17+
18+
mod update_parser;
19+
1320
#[derive(Debug)]
1421
struct UnimplementedError {
1522
pub kind: String,

0 commit comments

Comments
 (0)