Skip to content

Commit dac48d9

Browse files
authored
replace futures crate api with standard library (#1461)
* replace futures crate api with standard library Signed-off-by: tottoto <[email protected]> * Move pin value in examples to independent sentence Signed-off-by: tottoto <[email protected]> --------- Signed-off-by: tottoto <[email protected]>
1 parent cb77247 commit dac48d9

File tree

22 files changed

+117
-127
lines changed

22 files changed

+117
-127
lines changed

clippy.toml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,8 @@
11
disallowed-names = []
2+
disallowed-macros = [
3+
"futures::ready", # use instead `std::task::ready`
4+
"futures::pin_mut", # use instead `std::pin::pin`
5+
]
6+
disallowed-methods = [
7+
"futures::future::ready", # use instead `std::future::ready`
8+
]

examples/event_watcher.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1-
use futures::{pin_mut, TryStreamExt};
1+
use std::pin::pin;
2+
3+
use futures::TryStreamExt;
24
use k8s_openapi::{
35
api::{core::v1::ObjectReference, events::v1::Event},
46
apimachinery::pkg::apis::meta::v1::Time,
@@ -37,7 +39,7 @@ async fn main() -> anyhow::Result<()> {
3739
}
3840
}
3941
let event_stream = watcher(events, conf).default_backoff().applied_objects();
40-
pin_mut!(event_stream);
42+
let mut event_stream = pin!(event_stream);
4143

4244
println!("{0:<6} {1:<15} {2:<55} {3}", "AGE", "REASON", "OBJECT", "MESSAGE");
4345
while let Some(ev) = event_stream.try_next().await? {

examples/node_reflector.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1-
use futures::{pin_mut, TryStreamExt};
1+
use std::pin::pin;
2+
3+
use futures::TryStreamExt;
24
use k8s_openapi::api::core::v1::Node;
35
use kube::{
46
api::{Api, ResourceExt},
@@ -23,6 +25,7 @@ async fn main() -> anyhow::Result<()> {
2325
.reflect(writer)
2426
.applied_objects()
2527
.predicate_filter(predicates::labels.combine(predicates::annotations)); // NB: requires an unstable feature
28+
let mut stream = pin!(stream);
2629

2730
// Periodically read our state in the background
2831
tokio::spawn(async move {
@@ -35,7 +38,6 @@ async fn main() -> anyhow::Result<()> {
3538
});
3639

3740
// Log applied events with changes from the reflector
38-
pin_mut!(stream);
3941
while let Some(node) = stream.try_next().await? {
4042
info!("saw node {} with new labels/annots", node.name_any());
4143
}

examples/node_watcher.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1-
use futures::{pin_mut, TryStreamExt};
1+
use std::pin::pin;
2+
3+
use futures::TryStreamExt;
24
use k8s_openapi::api::core::v1::{Event, Node};
35
use kube::{
46
api::{Api, ListParams, ResourceExt},
@@ -21,8 +23,8 @@ async fn main() -> anyhow::Result<()> {
2123
watcher::Config::default()
2224
};
2325
let obs = watcher(nodes, wc).default_backoff().applied_objects();
26+
let mut obs = pin!(obs);
2427

25-
pin_mut!(obs);
2628
while let Some(n) = obs.try_next().await? {
2729
check_for_node_failures(&client, n).await?;
2830
}

examples/pod_reflector.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
use std::pin::pin;
2+
13
use futures::TryStreamExt;
24
use k8s_openapi::api::core::v1::Pod;
35
use kube::{
@@ -40,7 +42,7 @@ async fn main() -> anyhow::Result<()> {
4042
.reflect(writer)
4143
.applied_objects()
4244
.predicate_filter(predicates::resource_version); // NB: requires an unstable feature
43-
futures::pin_mut!(stream);
45+
let mut stream = pin!(stream);
4446

4547
while let Some(pod) = stream.try_next().await? {
4648
info!("saw {}", pod.name_any());

kube-client/src/client/middleware/mod.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,9 @@ impl<S> Layer<S> for AuthLayer {
2727
mod tests {
2828
use super::*;
2929

30-
use std::{matches, sync::Arc};
30+
use std::{matches, pin::pin, sync::Arc};
3131

3232
use chrono::{Duration, Utc};
33-
use futures::pin_mut;
3433
use http::{header::AUTHORIZATION, HeaderValue, Request, Response};
3534
use secrecy::SecretString;
3635
use tokio::sync::Mutex;
@@ -52,7 +51,7 @@ mod tests {
5251

5352
let spawned = tokio::spawn(async move {
5453
// Receive the requests and respond
55-
pin_mut!(handle);
54+
let mut handle = pin!(handle);
5655
let (request, send) = handle.next_request().await.expect("service not called");
5756
assert_eq!(
5857
request.headers().get(AUTHORIZATION).unwrap(),

kube-client/src/client/mod.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
//! The [`Client`] can also be used with [`Discovery`](crate::Discovery) to dynamically
99
//! retrieve the resources served by the kubernetes API.
1010
use either::{Either, Left, Right};
11-
use futures::{self, AsyncBufRead, StreamExt, TryStream, TryStreamExt};
11+
use futures::{AsyncBufRead, StreamExt, TryStream, TryStreamExt};
1212
use http::{self, Request, Response};
1313
use http_body_util::BodyExt;
1414
#[cfg(feature = "ws")] use hyper_util::rt::TokioIo;
@@ -492,9 +492,10 @@ impl TryFrom<Config> for Client {
492492

493493
#[cfg(test)]
494494
mod tests {
495+
use std::pin::pin;
496+
495497
use crate::{client::Body, Api, Client};
496498

497-
use futures::pin_mut;
498499
use http::{Request, Response};
499500
use k8s_openapi::api::core::v1::Pod;
500501
use tower_test::mock;
@@ -511,7 +512,7 @@ mod tests {
511512
let (mock_service, handle) = mock::pair::<Request<Body>, Response<Body>>();
512513
let spawned = tokio::spawn(async move {
513514
// Receive a request for pod and respond with some data
514-
pin_mut!(handle);
515+
let mut handle = pin!(handle);
515516
let (request, send) = handle.next_request().await.expect("service not called");
516517
assert_eq!(request.method(), http::Method::GET);
517518
assert_eq!(request.uri().to_string(), "/api/v1/namespaces/default/pods/test");

kube-runtime/src/controller/future_hash_map.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,10 +77,10 @@ where
7777

7878
#[cfg(test)]
7979
mod tests {
80-
use std::task::Poll;
80+
use std::{future, task::Poll};
8181

8282
use super::FutureHashMap;
83-
use futures::{channel::mpsc, future, poll, StreamExt};
83+
use futures::{channel::mpsc, poll, StreamExt};
8484

8585
#[tokio::test]
8686
async fn fhm_should_forward_all_values_and_shut_down() {

kube-runtime/src/controller/mod.rs

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,17 @@ use derivative::Derivative;
1616
use futures::{
1717
channel,
1818
future::{self, BoxFuture},
19-
ready, stream, Future, FutureExt, Stream, StreamExt, TryFuture, TryFutureExt, TryStream, TryStreamExt,
19+
stream, FutureExt, Stream, StreamExt, TryFuture, TryFutureExt, TryStream, TryStreamExt,
2020
};
2121
use kube_client::api::{Api, DynamicObject, Resource};
2222
use pin_project::pin_project;
2323
use serde::de::DeserializeOwned;
2424
use std::{
2525
fmt::{Debug, Display},
26+
future::Future,
2627
hash::Hash,
2728
sync::Arc,
28-
task::Poll,
29+
task::{ready, Poll},
2930
time::Duration,
3031
};
3132
use stream::BoxStream;
@@ -326,7 +327,8 @@ where
326327
.instrument(reconciler_span)
327328
.left_future()
328329
}
329-
None => future::err(Error::ObjectNotFound(request.obj_ref.erase())).right_future(),
330+
None => std::future::ready(Err(Error::ObjectNotFound(request.obj_ref.erase())))
331+
.right_future(),
330332
}
331333
},
332334
)
@@ -1155,7 +1157,7 @@ where
11551157
/// use kube::{Api, Client, ResourceExt};
11561158
/// use kube_runtime::{
11571159
/// controller::{Controller, Action},
1158-
/// watcher,
1160+
/// watcher,
11591161
/// };
11601162
/// use std::{convert::Infallible, sync::Arc};
11611163
/// Controller::new(
@@ -1274,7 +1276,7 @@ where
12741276

12751277
#[cfg(test)]
12761278
mod tests {
1277-
use std::{convert::Infallible, sync::Arc, time::Duration};
1279+
use std::{convert::Infallible, pin::pin, sync::Arc, time::Duration};
12781280

12791281
use super::{Action, APPLIER_REQUEUE_BUF_SIZE};
12801282
use crate::{
@@ -1283,7 +1285,7 @@ mod tests {
12831285
watcher::{self, metadata_watcher, watcher, Event},
12841286
Config, Controller,
12851287
};
1286-
use futures::{pin_mut, Stream, StreamExt, TryStreamExt};
1288+
use futures::{Stream, StreamExt, TryStreamExt};
12871289
use k8s_openapi::api::core::v1::ConfigMap;
12881290
use kube_client::{core::ObjectMeta, Api, Resource};
12891291
use serde::de::DeserializeOwned;
@@ -1348,7 +1350,7 @@ mod tests {
13481350

13491351
let (queue_tx, queue_rx) = futures::channel::mpsc::unbounded::<ObjectRef<ConfigMap>>();
13501352
let (store_rx, mut store_tx) = reflector::store();
1351-
let applier = applier(
1353+
let mut applier = pin!(applier(
13521354
|obj, _| {
13531355
Box::pin(async move {
13541356
// Try to flood the rescheduling buffer buffer by just putting it back in the queue immediately
@@ -1361,8 +1363,7 @@ mod tests {
13611363
store_rx,
13621364
queue_rx.map(Result::<_, Infallible>::Ok),
13631365
Config::default(),
1364-
);
1365-
pin_mut!(applier);
1366+
));
13661367
for i in 0..items {
13671368
let obj = ConfigMap {
13681369
metadata: ObjectMeta {

kube-runtime/src/controller/runner.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
use super::future_hash_map::FutureHashMap;
22
use crate::scheduler::{ScheduleRequest, Scheduler};
3-
use futures::{future, Future, FutureExt, Stream, StreamExt};
3+
use futures::{FutureExt, Stream, StreamExt};
44
use pin_project::pin_project;
55
use std::{
66
convert::Infallible,
7+
future::{self, Future},
78
hash::Hash,
89
pin::Pin,
910
task::{Context, Poll},
@@ -29,7 +30,7 @@ pub struct Runner<T, R, F, MkF, Ready = future::Ready<Result<(), Infallible>>> {
2930
run_msg: MkF,
3031
slots: FutureHashMap<T, F>,
3132
#[pin]
32-
ready_to_execute_after: future::Fuse<Ready>,
33+
ready_to_execute_after: futures::future::Fuse<Ready>,
3334
is_ready_to_execute: bool,
3435
stopped: bool,
3536
max_concurrent_executions: u16,
@@ -163,8 +164,7 @@ mod tests {
163164
};
164165
use futures::{
165166
channel::{mpsc, oneshot},
166-
future::{self},
167-
poll, stream, Future, SinkExt, StreamExt, TryStreamExt,
167+
future, poll, stream, Future, SinkExt, StreamExt, TryStreamExt,
168168
};
169169
use std::{
170170
cell::RefCell,

0 commit comments

Comments
 (0)