Skip to content

Commit ffaca74

Browse files
committed
revert to mutex
1 parent 2f314dd commit ffaca74

File tree

4 files changed

+27
-46
lines changed

4 files changed

+27
-46
lines changed

.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
/target
22
Cargo.lock
3+
.DS_Store

Cargo.toml

+4-6
Original file line numberDiff line numberDiff line change
@@ -6,27 +6,25 @@ homepage = "https://github.com/al8n/wg"
66
repository = "https://github.com/al8n/wg.git"
77
documentation = "https://docs.rs/wg/"
88
readme = "README.md"
9-
version = "0.4.2"
9+
version = "0.5.0"
1010
license = "MIT OR Apache-2.0"
1111
keywords = ["waitgroup", "async", "sync", "notify", "wake"]
1212
categories = ["asynchronous", "concurrency", "data-structures"]
1313
edition = "2021"
1414

1515
[features]
1616
default = []
17-
full = ["async", "triomphe", "parking_lot"]
18-
async = ["atomic-waker"]
17+
full = ["triomphe", "parking_lot"]
1918
triomphe = ["dep:triomphe"]
2019
parking_lot = ["dep:parking_lot"]
2120

2221
[dependencies]
23-
atomic-waker = { version = "1", optional = true }
2422
parking_lot = {version = "0.12", optional = true }
2523
triomphe = { version = "0.1", optional = true }
2624

2725
[dev-dependencies]
28-
tokio = { version = "1.32", features = ["full"] }
29-
async-std = { version = "1.12.0", features = ["attributes"] }
26+
tokio = { version = "1", features = ["full"] }
27+
async-std = { version = "1.12", features = ["attributes"] }
3028

3129
[package.metadata.docs.rs]
3230
all-features = true

README.md

+3-9
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,13 @@ Golang like WaitGroup implementation for sync/async Rust.
1919
## Installation
2020
```toml
2121
[dependencies]
22-
wg = "0.4"
22+
wg = "0.5"
2323
```
2424

2525
## Example
2626

2727
### Sync
28+
2829
```rust
2930
use wg::WaitGroup;
3031
use std::sync::Arc;
@@ -86,8 +87,8 @@ async fn main() {
8687
```
8788

8889
## Acknowledgements
89-
- Inspired by Golang sync.WaitGroup, [ibraheemdev's `AwaitGroup`] and [`crossbeam_utils::WaitGroup`].
9090

91+
- Inspired by Golang sync.WaitGroup, [ibraheemdev's `AwaitGroup`] and [`crossbeam_utils::WaitGroup`].
9192

9293
## License
9394

@@ -96,8 +97,6 @@ Licensed under either of <a href="https://opensource.org/licenses/Apache-2.0">Ap
9697
2.0</a> or <a href="https://opensource.org/licenses/MIT">MIT license</a> at your option.
9798
</sup>
9899

99-
<br>
100-
101100
<sub>
102101
Unless you explicitly state otherwise, any contribution intentionally submitted
103102
for inclusion in this project by you, as defined in the Apache-2.0 license,
@@ -111,8 +110,3 @@ shall be dual licensed as above, without any additional terms or conditions.
111110
[doc-url]: https://docs.rs/wg
112111
[crates-url]: https://crates.io/crates/wg
113112
[codecov-url]: https://app.codecov.io/gh/al8n/wg/
114-
[license-url]: https://opensource.org/licenses/Apache-2.0
115-
[rustc-url]: https://github.com/rust-lang/rust/blob/master/RELEASES.md
116-
[license-apache-url]: https://opensource.org/licenses/Apache-2.0
117-
[license-mit-url]: https://opensource.org/licenses/MIT
118-
[rustc-image]: https://img.shields.io/badge/rustc-1.56.0%2B-orange.svg?style=for-the-badge&logo=Rust

src/lib.rs

+19-31
Original file line numberDiff line numberDiff line change
@@ -371,23 +371,20 @@ impl WaitGroup {
371371
}
372372
}
373373

374-
#[cfg(feature = "atomic-waker")]
375374
pub use r#async::*;
376375

377-
#[cfg(feature = "atomic-waker")]
378376
mod r#async {
379377
use super::*;
380-
use atomic_waker::AtomicWaker;
381378
use std::sync::atomic::{AtomicUsize, Ordering};
382379

383380
use std::{
384381
future::Future,
385382
pin::Pin,
386-
task::{Context, Poll},
383+
task::{Context, Poll, Waker},
387384
};
388385

389386
struct AsyncInner {
390-
waker: AtomicWaker,
387+
waker: Mutex<Option<Waker>>,
391388
count: AtomicUsize,
392389
}
393390

@@ -442,7 +439,7 @@ mod r#async {
442439
Self {
443440
inner: Arc::new(AsyncInner {
444441
count: AtomicUsize::new(0),
445-
waker: AtomicWaker::new(),
442+
waker: Mutex::new(None),
446443
}),
447444
}
448445
}
@@ -453,7 +450,7 @@ mod r#async {
453450
Self {
454451
inner: Arc::new(AsyncInner {
455452
count: AtomicUsize::new(count),
456-
waker: AtomicWaker::new(),
453+
waker: Mutex::new(None),
457454
}),
458455
}
459456
}
@@ -542,22 +539,11 @@ mod r#async {
542539
/// }
543540
/// ```
544541
pub fn done(&self) {
545-
let res = self
546-
.inner
547-
.count
548-
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |val| {
549-
// We are the last worker
550-
if val == 1 {
551-
Some(0)
552-
} else if val == 0 {
553-
None
554-
} else {
555-
Some(val - 1)
556-
}
557-
});
558-
if let Ok(count) = res {
559-
if count == 1 {
560-
self.inner.waker.wake();
542+
let count = self.inner.count.fetch_sub(1, Ordering::Relaxed);
543+
// We are the last worker
544+
if count == 1 {
545+
if let Some(waker) = self.inner.waker.lock_me().take() {
546+
waker.wake();
561547
}
562548
}
563549
}
@@ -643,12 +629,16 @@ mod r#async {
643629
type Output = ();
644630

645631
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
646-
match self.inner.count.load(Ordering::Acquire) {
632+
if self.inner.count.load(Ordering::Relaxed) == 0 {
633+
return Poll::Ready(());
634+
}
635+
636+
let waker = cx.waker().clone();
637+
*self.inner.waker.lock_me() = Some(waker);
638+
639+
match self.inner.count.load(Ordering::Relaxed) {
647640
0 => Poll::Ready(()),
648-
_ => {
649-
self.inner.waker.register(cx.waker());
650-
Poll::Pending
651-
}
641+
_ => Poll::Pending,
652642
}
653643
}
654644
}
@@ -772,9 +762,7 @@ mod r#async {
772762
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
773763
async fn test_async_block_wait() {
774764
let wg = AsyncWaitGroup::new();
775-
wg.add(1);
776-
let t_wg = wg.clone();
777-
765+
let t_wg = wg.add(1);
778766
tokio::spawn(async move {
779767
// do some time consuming task
780768
t_wg.done()

0 commit comments

Comments
 (0)