diff --git a/Cargo.toml b/Cargo.toml index 8d0f53f0..ac9cc80c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,9 +7,9 @@ license = "Apache-2.0/MIT" repository = "https://github.com/stjepang/async-std" homepage = "https://github.com/stjepang/async-std" documentation = "https://docs.rs/async-std" -description = "Asynchronous standard library" -keywords = [] -categories = ["asynchronous", "concurrency"] +description = "Async version of the Rust standard library" +keywords = ["async", "await", "future", "std", "task"] +categories = ["asynchronous", "concurrency", "network-programming"] [package.metadata.docs.rs] features = ["docs"] @@ -19,7 +19,7 @@ rustdoc-args = ["--features docs"] docs = [] [dependencies] -async-task = { path = "async-task" } +async-task = { path = "../async-task" } cfg-if = "0.1.9" crossbeam = "0.7.1" futures-preview = "0.3.0-alpha.17" @@ -37,9 +37,3 @@ slab = "0.4.2" femme = "1.1.0" # surf = { git = "ssh://github.com/yoshuawuyts/surf" } tempdir = "0.3.7" - -[workspace] -members = [ - ".", - "async-task", -] diff --git a/async-task/Cargo.toml b/async-task/Cargo.toml deleted file mode 100644 index 2286f1ee..00000000 --- a/async-task/Cargo.toml +++ /dev/null @@ -1,20 +0,0 @@ -[package] -name = "async-task" -version = "0.1.0" -authors = ["Stjepan Glavina "] -edition = "2018" -license = "Apache-2.0/MIT" -repository = "https://github.com/stjepang/async-task" -homepage = "https://github.com/stjepang/async-task" -documentation = "https://docs.rs/async-task" -description = "Task abstraction for building executors" -keywords = ["future", "task", "executor", "spawn"] -categories = ["asynchronous", "concurrency"] - -[dependencies] -crossbeam-utils = "0.6.5" - -[dev-dependencies] -crossbeam = "0.7.1" -futures-preview = "0.3.0-alpha.17" -lazy_static = "1.3.0" diff --git a/async-task/LICENSE-APACHE b/async-task/LICENSE-APACHE deleted file mode 100644 index 16fe87b0..00000000 --- a/async-task/LICENSE-APACHE +++ /dev/null @@ -1,201 +0,0 @@ - Apache License - Version 2.0, January 2004 - http://www.apache.org/licenses/ - -TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION - -1. Definitions. - - "License" shall mean the terms and conditions for use, reproduction, - and distribution as defined by Sections 1 through 9 of this document. - - "Licensor" shall mean the copyright owner or entity authorized by - the copyright owner that is granting the License. - - "Legal Entity" shall mean the union of the acting entity and all - other entities that control, are controlled by, or are under common - control with that entity. For the purposes of this definition, - "control" means (i) the power, direct or indirect, to cause the - direction or management of such entity, whether by contract or - otherwise, or (ii) ownership of fifty percent (50%) or more of the - outstanding shares, or (iii) beneficial ownership of such entity. - - "You" (or "Your") shall mean an individual or Legal Entity - exercising permissions granted by this License. - - "Source" form shall mean the preferred form for making modifications, - including but not limited to software source code, documentation - source, and configuration files. - - "Object" form shall mean any form resulting from mechanical - transformation or translation of a Source form, including but - not limited to compiled object code, generated documentation, - and conversions to other media types. - - "Work" shall mean the work of authorship, whether in Source or - Object form, made available under the License, as indicated by a - copyright notice that is included in or attached to the work - (an example is provided in the Appendix below). - - "Derivative Works" shall mean any work, whether in Source or Object - form, that is based on (or derived from) the Work and for which the - editorial revisions, annotations, elaborations, or other modifications - represent, as a whole, an original work of authorship. For the purposes - of this License, Derivative Works shall not include works that remain - separable from, or merely link (or bind by name) to the interfaces of, - the Work and Derivative Works thereof. - - "Contribution" shall mean any work of authorship, including - the original version of the Work and any modifications or additions - to that Work or Derivative Works thereof, that is intentionally - submitted to Licensor for inclusion in the Work by the copyright owner - or by an individual or Legal Entity authorized to submit on behalf of - the copyright owner. For the purposes of this definition, "submitted" - means any form of electronic, verbal, or written communication sent - to the Licensor or its representatives, including but not limited to - communication on electronic mailing lists, source code control systems, - and issue tracking systems that are managed by, or on behalf of, the - Licensor for the purpose of discussing and improving the Work, but - excluding communication that is conspicuously marked or otherwise - designated in writing by the copyright owner as "Not a Contribution." - - "Contributor" shall mean Licensor and any individual or Legal Entity - on behalf of whom a Contribution has been received by Licensor and - subsequently incorporated within the Work. - -2. Grant of Copyright License. Subject to the terms and conditions of - this License, each Contributor hereby grants to You a perpetual, - worldwide, non-exclusive, no-charge, royalty-free, irrevocable - copyright license to reproduce, prepare Derivative Works of, - publicly display, publicly perform, sublicense, and distribute the - Work and such Derivative Works in Source or Object form. - -3. Grant of Patent License. Subject to the terms and conditions of - this License, each Contributor hereby grants to You a perpetual, - worldwide, non-exclusive, no-charge, royalty-free, irrevocable - (except as stated in this section) patent license to make, have made, - use, offer to sell, sell, import, and otherwise transfer the Work, - where such license applies only to those patent claims licensable - by such Contributor that are necessarily infringed by their - Contribution(s) alone or by combination of their Contribution(s) - with the Work to which such Contribution(s) was submitted. If You - institute patent litigation against any entity (including a - cross-claim or counterclaim in a lawsuit) alleging that the Work - or a Contribution incorporated within the Work constitutes direct - or contributory patent infringement, then any patent licenses - granted to You under this License for that Work shall terminate - as of the date such litigation is filed. - -4. Redistribution. You may reproduce and distribute copies of the - Work or Derivative Works thereof in any medium, with or without - modifications, and in Source or Object form, provided that You - meet the following conditions: - - (a) You must give any other recipients of the Work or - Derivative Works a copy of this License; and - - (b) You must cause any modified files to carry prominent notices - stating that You changed the files; and - - (c) You must retain, in the Source form of any Derivative Works - that You distribute, all copyright, patent, trademark, and - attribution notices from the Source form of the Work, - excluding those notices that do not pertain to any part of - the Derivative Works; and - - (d) If the Work includes a "NOTICE" text file as part of its - distribution, then any Derivative Works that You distribute must - include a readable copy of the attribution notices contained - within such NOTICE file, excluding those notices that do not - pertain to any part of the Derivative Works, in at least one - of the following places: within a NOTICE text file distributed - as part of the Derivative Works; within the Source form or - documentation, if provided along with the Derivative Works; or, - within a display generated by the Derivative Works, if and - wherever such third-party notices normally appear. The contents - of the NOTICE file are for informational purposes only and - do not modify the License. You may add Your own attribution - notices within Derivative Works that You distribute, alongside - or as an addendum to the NOTICE text from the Work, provided - that such additional attribution notices cannot be construed - as modifying the License. - - You may add Your own copyright statement to Your modifications and - may provide additional or different license terms and conditions - for use, reproduction, or distribution of Your modifications, or - for any such Derivative Works as a whole, provided Your use, - reproduction, and distribution of the Work otherwise complies with - the conditions stated in this License. - -5. Submission of Contributions. Unless You explicitly state otherwise, - any Contribution intentionally submitted for inclusion in the Work - by You to the Licensor shall be under the terms and conditions of - this License, without any additional terms or conditions. - Notwithstanding the above, nothing herein shall supersede or modify - the terms of any separate license agreement you may have executed - with Licensor regarding such Contributions. - -6. Trademarks. This License does not grant permission to use the trade - names, trademarks, service marks, or product names of the Licensor, - except as required for reasonable and customary use in describing the - origin of the Work and reproducing the content of the NOTICE file. - -7. Disclaimer of Warranty. Unless required by applicable law or - agreed to in writing, Licensor provides the Work (and each - Contributor provides its Contributions) on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - implied, including, without limitation, any warranties or conditions - of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A - PARTICULAR PURPOSE. You are solely responsible for determining the - appropriateness of using or redistributing the Work and assume any - risks associated with Your exercise of permissions under this License. - -8. Limitation of Liability. In no event and under no legal theory, - whether in tort (including negligence), contract, or otherwise, - unless required by applicable law (such as deliberate and grossly - negligent acts) or agreed to in writing, shall any Contributor be - liable to You for damages, including any direct, indirect, special, - incidental, or consequential damages of any character arising as a - result of this License or out of the use or inability to use the - Work (including but not limited to damages for loss of goodwill, - work stoppage, computer failure or malfunction, or any and all - other commercial damages or losses), even if such Contributor - has been advised of the possibility of such damages. - -9. Accepting Warranty or Additional Liability. While redistributing - the Work or Derivative Works thereof, You may choose to offer, - and charge a fee for, acceptance of support, warranty, indemnity, - or other liability obligations and/or rights consistent with this - License. However, in accepting such obligations, You may act only - on Your own behalf and on Your sole responsibility, not on behalf - of any other Contributor, and only if You agree to indemnify, - defend, and hold each Contributor harmless for any liability - incurred by, or claims asserted against, such Contributor by reason - of your accepting any such warranty or additional liability. - -END OF TERMS AND CONDITIONS - -APPENDIX: How to apply the Apache License to your work. - - To apply the Apache License to your work, attach the following - boilerplate notice, with the fields enclosed by brackets "[]" - replaced with your own identifying information. (Don't include - the brackets!) The text should be enclosed in the appropriate - comment syntax for the file format. We also recommend that a - file or class name and description of purpose be included on the - same "printed page" as the copyright notice for easier - identification within third-party archives. - -Copyright [yyyy] [name of copyright owner] - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. diff --git a/async-task/LICENSE-MIT b/async-task/LICENSE-MIT deleted file mode 100644 index 31aa7938..00000000 --- a/async-task/LICENSE-MIT +++ /dev/null @@ -1,23 +0,0 @@ -Permission is hereby granted, free of charge, to any -person obtaining a copy of this software and associated -documentation files (the "Software"), to deal in the -Software without restriction, including without -limitation the rights to use, copy, modify, merge, -publish, distribute, sublicense, and/or sell copies of -the Software, and to permit persons to whom the Software -is furnished to do so, subject to the following -conditions: - -The above copyright notice and this permission notice -shall be included in all copies or substantial portions -of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF -ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED -TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A -PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT -SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY -CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION -OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR -IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -DEALINGS IN THE SOFTWARE. diff --git a/async-task/README.md b/async-task/README.md deleted file mode 100644 index 22fe9f23..00000000 --- a/async-task/README.md +++ /dev/null @@ -1,21 +0,0 @@ -# async-task - -A task abstraction for building executors. - -This crate makes it possible to build an efficient and extendable executor in few lines of -code. - -## License - -Licensed under either of - - * Apache License, Version 2.0 ([LICENSE-APACHE](LICENSE-APACHE) or http://www.apache.org/licenses/LICENSE-2.0) - * MIT license ([LICENSE-MIT](LICENSE-MIT) or http://opensource.org/licenses/MIT) - -at your option. - -#### Contribution - -Unless you explicitly state otherwise, any contribution intentionally submitted -for inclusion in the work by you, as defined in the Apache-2.0 license, shall be -dual licensed as above, without any additional terms or conditions. diff --git a/async-task/benches/bench.rs b/async-task/benches/bench.rs deleted file mode 100644 index 6fd79353..00000000 --- a/async-task/benches/bench.rs +++ /dev/null @@ -1,43 +0,0 @@ -#![feature(async_await, test)] - -extern crate test; - -use futures::channel::oneshot; -use futures::executor; -use futures::future::TryFutureExt; -use test::Bencher; - -#[bench] -fn task_create(b: &mut Bencher) { - b.iter(|| { - async_task::spawn(async {}, drop, ()); - }); -} - -#[bench] -fn task_run(b: &mut Bencher) { - b.iter(|| { - let (task, handle) = async_task::spawn(async {}, drop, ()); - task.run(); - executor::block_on(handle).unwrap(); - }); -} - -#[bench] -fn oneshot_create(b: &mut Bencher) { - b.iter(|| { - let (tx, _rx) = oneshot::channel::<()>(); - let _task = Box::new(async move { tx.send(()).map_err(|_| ()) }); - }); -} - -#[bench] -fn oneshot_run(b: &mut Bencher) { - b.iter(|| { - let (tx, rx) = oneshot::channel::<()>(); - let task = Box::new(async move { tx.send(()).map_err(|_| ()) }); - - let future = task.and_then(|_| rx.map_err(|_| ())); - executor::block_on(future).unwrap(); - }); -} diff --git a/async-task/examples/panic-propagation.rs b/async-task/examples/panic-propagation.rs deleted file mode 100644 index 9c4f081a..00000000 --- a/async-task/examples/panic-propagation.rs +++ /dev/null @@ -1,75 +0,0 @@ -//! A single-threaded executor where join handles propagate panics from tasks. - -#![feature(async_await)] - -use std::future::Future; -use std::panic::{resume_unwind, AssertUnwindSafe}; -use std::pin::Pin; -use std::task::{Context, Poll}; -use std::thread; - -use crossbeam::channel::{unbounded, Sender}; -use futures::executor; -use futures::future::FutureExt; -use lazy_static::lazy_static; - -/// Spawns a future on the executor. -fn spawn(future: F) -> JoinHandle -where - F: Future + Send + 'static, - R: Send + 'static, -{ - lazy_static! { - // A channel that holds scheduled tasks. - static ref QUEUE: Sender> = { - let (sender, receiver) = unbounded::>(); - - // Start the executor thread. - thread::spawn(|| { - for task in receiver { - // No need for `catch_unwind()` here because panics are already caught. - task.run(); - } - }); - - sender - }; - } - - // Create a future that catches panics within itself. - let future = AssertUnwindSafe(future).catch_unwind(); - - // Create a task that is scheduled by sending itself into the channel. - let schedule = |t| QUEUE.send(t).unwrap(); - let (task, handle) = async_task::spawn(future, schedule, ()); - - // Schedule the task by sending it into the channel. - task.schedule(); - - // Wrap the handle into one that propagates panics. - JoinHandle(handle) -} - -/// A join handle that propagates panics inside the task. -struct JoinHandle(async_task::JoinHandle, ()>); - -impl Future for JoinHandle { - type Output = Option; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - match Pin::new(&mut self.0).poll(cx) { - Poll::Pending => Poll::Pending, - Poll::Ready(None) => Poll::Ready(None), - Poll::Ready(Some(Ok(val))) => Poll::Ready(Some(val)), - Poll::Ready(Some(Err(err))) => resume_unwind(err), - } - } -} - -fn main() { - // Spawn a future that panics and block on it. - let handle = spawn(async { - panic!("Ooops!"); - }); - executor::block_on(handle); -} diff --git a/async-task/examples/panic-result.rs b/async-task/examples/panic-result.rs deleted file mode 100644 index b1200a38..00000000 --- a/async-task/examples/panic-result.rs +++ /dev/null @@ -1,74 +0,0 @@ -//! A single-threaded executor where join handles catch panics inside tasks. - -#![feature(async_await)] - -use std::future::Future; -use std::panic::AssertUnwindSafe; -use std::thread; - -use crossbeam::channel::{unbounded, Sender}; -use futures::executor; -use futures::future::FutureExt; -use lazy_static::lazy_static; - -/// Spawns a future on the executor. -fn spawn(future: F) -> async_task::JoinHandle, ()> -where - F: Future + Send + 'static, - R: Send + 'static, -{ - lazy_static! { - // A channel that holds scheduled tasks. - static ref QUEUE: Sender> = { - let (sender, receiver) = unbounded::>(); - - // Start the executor thread. - thread::spawn(|| { - for task in receiver { - // No need for `catch_unwind()` here because panics are already caught. - task.run(); - } - }); - - sender - }; - } - - // Create a future that catches panics within itself. - let future = AssertUnwindSafe(future).catch_unwind(); - - // Create a task that is scheduled by sending itself into the channel. - let schedule = |t| QUEUE.send(t).unwrap(); - let (task, handle) = async_task::spawn(future, schedule, ()); - - // Schedule the task by sending it into the channel. - task.schedule(); - - handle -} - -fn main() { - // Spawn a future that completes succesfully. - let handle = spawn(async { - println!("Hello, world!"); - }); - - // Block on the future and report its result. - match executor::block_on(handle) { - None => println!("The task was cancelled."), - Some(Ok(val)) => println!("The task completed with {:?}", val), - Some(Err(_)) => println!("The task has panicked"), - } - - // Spawn a future that panics. - let handle = spawn(async { - panic!("Ooops!"); - }); - - // Block on the future and report its result. - match executor::block_on(handle) { - None => println!("The task was cancelled."), - Some(Ok(val)) => println!("The task completed with {:?}", val), - Some(Err(_)) => println!("The task has panicked"), - } -} diff --git a/async-task/examples/spawn-on-thread.rs b/async-task/examples/spawn-on-thread.rs deleted file mode 100644 index 6d5b9a20..00000000 --- a/async-task/examples/spawn-on-thread.rs +++ /dev/null @@ -1,55 +0,0 @@ -//! A function that runs a future to completion on a dedicated thread. - -#![feature(async_await)] - -use std::future::Future; -use std::sync::Arc; -use std::thread; - -use crossbeam::channel; -use futures::executor; - -/// Spawns a future on a new dedicated thread. -/// -/// The returned handle can be used to await the output of the future. -fn spawn_on_thread(future: F) -> async_task::JoinHandle -where - F: Future + Send + 'static, - R: Send + 'static, -{ - // Create a channel that holds the task when it is scheduled for running. - let (sender, receiver) = channel::unbounded(); - let sender = Arc::new(sender); - let s = Arc::downgrade(&sender); - - // Wrap the future into one that disconnects the channel on completion. - let future = async move { - // When the inner future completes, the sender gets dropped and disconnects the channel. - let _sender = sender; - future.await - }; - - // Create a task that is scheduled by sending itself into the channel. - let schedule = move |t| s.upgrade().unwrap().send(t).unwrap(); - let (task, handle) = async_task::spawn(future, schedule, ()); - - // Schedule the task by sending it into the channel. - task.schedule(); - - // Spawn a thread running the task to completion. - thread::spawn(move || { - // Keep taking the task from the channel and running it until completion. - for task in receiver { - task.run(); - } - }); - - handle -} - -fn main() { - // Spawn a future on a dedicated thread. - executor::block_on(spawn_on_thread(async { - println!("Hello, world!"); - })); -} diff --git a/async-task/examples/spawn.rs b/async-task/examples/spawn.rs deleted file mode 100644 index 6e798c0b..00000000 --- a/async-task/examples/spawn.rs +++ /dev/null @@ -1,52 +0,0 @@ -//! A simple single-threaded executor. - -#![feature(async_await)] - -use std::future::Future; -use std::panic::catch_unwind; -use std::thread; - -use crossbeam::channel::{unbounded, Sender}; -use futures::executor; -use lazy_static::lazy_static; - -/// Spawns a future on the executor. -fn spawn(future: F) -> async_task::JoinHandle -where - F: Future + Send + 'static, - R: Send + 'static, -{ - lazy_static! { - // A channel that holds scheduled tasks. - static ref QUEUE: Sender> = { - let (sender, receiver) = unbounded::>(); - - // Start the executor thread. - thread::spawn(|| { - for task in receiver { - // Ignore panics for simplicity. - let _ignore_panic = catch_unwind(|| task.run()); - } - }); - - sender - }; - } - - // Create a task that is scheduled by sending itself into the channel. - let schedule = |t| QUEUE.send(t).unwrap(); - let (task, handle) = async_task::spawn(future, schedule, ()); - - // Schedule the task by sending it into the channel. - task.schedule(); - - handle -} - -fn main() { - // Spawn a future and await its result. - let handle = spawn(async { - println!("Hello, world!"); - }); - executor::block_on(handle); -} diff --git a/async-task/examples/task-id.rs b/async-task/examples/task-id.rs deleted file mode 100644 index b3832d07..00000000 --- a/async-task/examples/task-id.rs +++ /dev/null @@ -1,88 +0,0 @@ -//! An executor that assigns an ID to every spawned task. - -#![feature(async_await)] - -use std::cell::Cell; -use std::future::Future; -use std::panic::catch_unwind; -use std::thread; - -use crossbeam::atomic::AtomicCell; -use crossbeam::channel::{unbounded, Sender}; -use futures::executor; -use lazy_static::lazy_static; - -#[derive(Clone, Copy, Debug)] -struct TaskId(usize); - -thread_local! { - /// The ID of the current task. - static TASK_ID: Cell> = Cell::new(None); -} - -/// Returns the ID of the currently executing task. -/// -/// Returns `None` if called outside the runtime. -fn task_id() -> Option { - TASK_ID.with(|id| id.get()) -} - -/// Spawns a future on the executor. -fn spawn(future: F) -> async_task::JoinHandle -where - F: Future + Send + 'static, - R: Send + 'static, -{ - lazy_static! { - // A channel that holds scheduled tasks. - static ref QUEUE: Sender> = { - let (sender, receiver) = unbounded::>(); - - // Start the executor thread. - thread::spawn(|| { - TASK_ID.with(|id| { - for task in receiver { - // Store the task ID into the thread-local before running. - id.set(Some(*task.tag())); - - // Ignore panics for simplicity. - let _ignore_panic = catch_unwind(|| task.run()); - } - }) - }); - - sender - }; - - // A counter that assigns IDs to spawned tasks. - static ref COUNTER: AtomicCell = AtomicCell::new(0); - } - - // Reserve an ID for the new task. - let id = TaskId(COUNTER.fetch_add(1)); - - // Create a task that is scheduled by sending itself into the channel. - let schedule = |task| QUEUE.send(task).unwrap(); - let (task, handle) = async_task::spawn(future, schedule, id); - - // Schedule the task by sending it into the channel. - task.schedule(); - - handle -} - -fn main() { - let mut handles = vec![]; - - // Spawn a bunch of tasks. - for _ in 0..10 { - handles.push(spawn(async move { - println!("Hello from task with {:?}", task_id()); - })); - } - - // Wait for the tasks to finish. - for handle in handles { - executor::block_on(handle); - } -} diff --git a/async-task/src/header.rs b/async-task/src/header.rs deleted file mode 100644 index 0ce51645..00000000 --- a/async-task/src/header.rs +++ /dev/null @@ -1,158 +0,0 @@ -use std::alloc::Layout; -use std::cell::Cell; -use std::fmt; -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::task::Waker; - -use crossbeam_utils::Backoff; - -use crate::raw::TaskVTable; -use crate::state::*; -use crate::utils::{abort_on_panic, extend}; - -/// The header of a task. -/// -/// This header is stored right at the beginning of every heap-allocated task. -pub(crate) struct Header { - /// Current state of the task. - /// - /// Contains flags representing the current state and the reference count. - pub(crate) state: AtomicUsize, - - /// The task that is blocked on the `JoinHandle`. - /// - /// This waker needs to be woken once the task completes or is closed. - pub(crate) awaiter: Cell>, - - /// The virtual table. - /// - /// In addition to the actual waker virtual table, it also contains pointers to several other - /// methods necessary for bookkeeping the heap-allocated task. - pub(crate) vtable: &'static TaskVTable, -} - -impl Header { - /// Cancels the task. - /// - /// This method will only mark the task as closed and will notify the awaiter, but it won't - /// reschedule the task if it's not completed. - pub(crate) fn cancel(&self) { - let mut state = self.state.load(Ordering::Acquire); - - loop { - // If the task has been completed or closed, it can't be cancelled. - if state & (COMPLETED | CLOSED) != 0 { - break; - } - - // Mark the task as closed. - match self.state.compare_exchange_weak( - state, - state | CLOSED, - Ordering::AcqRel, - Ordering::Acquire, - ) { - Ok(_) => { - // Notify the awaiter that the task has been closed. - if state & AWAITER != 0 { - self.notify(); - } - - break; - } - Err(s) => state = s, - } - } - } - - /// Notifies the task blocked on the task. - /// - /// If there is a registered waker, it will be removed from the header and woken. - #[inline] - pub(crate) fn notify(&self) { - if let Some(waker) = self.swap_awaiter(None) { - // We need a safeguard against panics because waking can panic. - abort_on_panic(|| { - waker.wake(); - }); - } - } - - /// Notifies the task blocked on the task unless its waker matches `current`. - /// - /// If there is a registered waker, it will be removed from the header. - #[inline] - pub(crate) fn notify_unless(&self, current: &Waker) { - if let Some(waker) = self.swap_awaiter(None) { - if !waker.will_wake(current) { - // We need a safeguard against panics because waking can panic. - abort_on_panic(|| { - waker.wake(); - }); - } - } - } - - /// Swaps the awaiter and returns the previous value. - #[inline] - pub(crate) fn swap_awaiter(&self, new: Option) -> Option { - let new_is_none = new.is_none(); - - // We're about to try acquiring the lock in a loop. If it's already being held by another - // thread, we'll have to spin for a while so it's best to employ a backoff strategy. - let backoff = Backoff::new(); - loop { - // Acquire the lock. If we're storing an awaiter, then also set the awaiter flag. - let state = if new_is_none { - self.state.fetch_or(LOCKED, Ordering::Acquire) - } else { - self.state.fetch_or(LOCKED | AWAITER, Ordering::Acquire) - }; - - // If the lock was acquired, break from the loop. - if state & LOCKED == 0 { - break; - } - - // Snooze for a little while because the lock is held by another thread. - backoff.snooze(); - } - - // Replace the awaiter. - let old = self.awaiter.replace(new); - - // Release the lock. If we've cleared the awaiter, then also unset the awaiter flag. - if new_is_none { - self.state.fetch_and(!LOCKED & !AWAITER, Ordering::Release); - } else { - self.state.fetch_and(!LOCKED, Ordering::Release); - } - - old - } - - /// Returns the offset at which the tag of type `T` is stored. - #[inline] - pub(crate) fn offset_tag() -> usize { - let layout_header = Layout::new::
(); - let layout_t = Layout::new::(); - let (_, offset_t) = extend(layout_header, layout_t); - offset_t - } -} - -impl fmt::Debug for Header { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - let state = self.state.load(Ordering::SeqCst); - - f.debug_struct("Header") - .field("scheduled", &(state & SCHEDULED != 0)) - .field("running", &(state & RUNNING != 0)) - .field("completed", &(state & COMPLETED != 0)) - .field("closed", &(state & CLOSED != 0)) - .field("awaiter", &(state & AWAITER != 0)) - .field("handle", &(state & HANDLE != 0)) - .field("ref_count", &(state / REFERENCE)) - .finish() - } -} diff --git a/async-task/src/join_handle.rs b/async-task/src/join_handle.rs deleted file mode 100644 index fb5c275e..00000000 --- a/async-task/src/join_handle.rs +++ /dev/null @@ -1,333 +0,0 @@ -use std::fmt; -use std::future::Future; -use std::marker::{PhantomData, Unpin}; -use std::pin::Pin; -use std::ptr::NonNull; -use std::sync::atomic::Ordering; -use std::task::{Context, Poll}; - -use crate::header::Header; -use crate::state::*; -use crate::utils::abort_on_panic; - -/// A handle that awaits the result of a task. -/// -/// If the task has completed with `value`, the handle returns it as `Some(value)`. If the task was -/// cancelled or has panicked, the handle returns `None`. Otherwise, the handle has to wait until -/// the task completes, panics, or gets cancelled. -/// -/// # Examples -/// -/// ``` -/// #![feature(async_await)] -/// -/// use crossbeam::channel; -/// use futures::executor; -/// -/// // The future inside the task. -/// let future = async { 1 + 2 }; -/// -/// // If the task gets woken, it will be sent into this channel. -/// let (s, r) = channel::unbounded(); -/// let schedule = move |task| s.send(task).unwrap(); -/// -/// // Create a task with the future and the schedule function. -/// let (task, handle) = async_task::spawn(future, schedule, ()); -/// -/// // Run the task. In this example, it will complete after a single run. -/// task.run(); -/// assert!(r.is_empty()); -/// -/// // Await the result of the task. -/// let result = executor::block_on(handle); -/// assert_eq!(result, Some(3)); -/// ``` -pub struct JoinHandle { - /// A raw task pointer. - pub(crate) raw_task: NonNull<()>, - - /// A marker capturing the generic type `R`. - pub(crate) _marker: PhantomData<(R, T)>, -} - -unsafe impl Send for JoinHandle {} -unsafe impl Sync for JoinHandle {} - -impl Unpin for JoinHandle {} - -impl JoinHandle { - /// Cancels the task. - /// - /// When cancelled, the task won't be scheduled again even if a [`Waker`] wakes it. An attempt - /// to run it won't do anything. And if it's completed, awaiting its result evaluates to - /// `None`. - /// - /// [`Waker`]: https://doc.rust-lang.org/std/task/struct.Waker.html - /// - /// # Examples - /// - /// ``` - /// # #![feature(async_await)] - /// use crossbeam::channel; - /// use futures::executor; - /// - /// // The future inside the task. - /// let future = async { 1 + 2 }; - /// - /// // If the task gets woken, it will be sent into this channel. - /// let (s, r) = channel::unbounded(); - /// let schedule = move |task| s.send(task).unwrap(); - /// - /// // Create a task with the future and the schedule function. - /// let (task, handle) = async_task::spawn(future, schedule, ()); - /// - /// // Cancel the task. - /// handle.cancel(); - /// - /// // Running a cancelled task does nothing. - /// task.run(); - /// - /// // Await the result of the task. - /// let result = executor::block_on(handle); - /// assert_eq!(result, None); - /// ``` - pub fn cancel(&self) { - let ptr = self.raw_task.as_ptr(); - let header = ptr as *const Header; - - unsafe { - let mut state = (*header).state.load(Ordering::Acquire); - - loop { - // If the task has been completed or closed, it can't be cancelled. - if state & (COMPLETED | CLOSED) != 0 { - break; - } - - // If the task is not scheduled nor running, we'll need to schedule it. - let new = if state & (SCHEDULED | RUNNING) == 0 { - (state | SCHEDULED | CLOSED) + REFERENCE - } else { - state | CLOSED - }; - - // Mark the task as closed. - match (*header).state.compare_exchange_weak( - state, - new, - Ordering::AcqRel, - Ordering::Acquire, - ) { - Ok(_) => { - // If the task is not scheduled nor running, schedule it so that its future - // gets dropped by the executor. - if state & (SCHEDULED | RUNNING) == 0 { - ((*header).vtable.schedule)(ptr); - } - - // Notify the awaiter that the task has been closed. - if state & AWAITER != 0 { - (*header).notify(); - } - - break; - } - Err(s) => state = s, - } - } - } - } - - /// Returns a reference to the tag stored inside the task. - /// - /// # Examples - /// - /// ``` - /// # #![feature(async_await)] - /// use crossbeam::channel; - /// - /// // The future inside the task. - /// let future = async { 1 + 2 }; - /// - /// // If the task gets woken, it will be sent into this channel. - /// let (s, r) = channel::unbounded(); - /// let schedule = move |task| s.send(task).unwrap(); - /// - /// // Create a task with the future and the schedule function. - /// let (task, handle) = async_task::spawn(future, schedule, "a simple task"); - /// - /// // Access the tag. - /// assert_eq!(*handle.tag(), "a simple task"); - /// ``` - pub fn tag(&self) -> &T { - let offset = Header::offset_tag::(); - let ptr = self.raw_task.as_ptr(); - - unsafe { - let raw = (ptr as *mut u8).add(offset) as *const T; - &*raw - } - } -} - -impl Drop for JoinHandle { - fn drop(&mut self) { - let ptr = self.raw_task.as_ptr(); - let header = ptr as *const Header; - - // A place where the output will be stored in case it needs to be dropped. - let mut output = None; - - unsafe { - // Optimistically assume the `JoinHandle` is being dropped just after creating the - // task. This is a common case so if the handle is not used, the overhead of it is only - // one compare-exchange operation. - if let Err(mut state) = (*header).state.compare_exchange_weak( - SCHEDULED | HANDLE | REFERENCE, - SCHEDULED | REFERENCE, - Ordering::AcqRel, - Ordering::Acquire, - ) { - loop { - // If the task has been completed but not yet closed, that means its output - // must be dropped. - if state & COMPLETED != 0 && state & CLOSED == 0 { - // Mark the task as closed in order to grab its output. - match (*header).state.compare_exchange_weak( - state, - state | CLOSED, - Ordering::AcqRel, - Ordering::Acquire, - ) { - Ok(_) => { - // Read the output. - output = - Some((((*header).vtable.get_output)(ptr) as *mut R).read()); - - // Update the state variable because we're continuing the loop. - state |= CLOSED; - } - Err(s) => state = s, - } - } else { - // If this is the last reference to task and it's not closed, then close - // it and schedule one more time so that its future gets dropped by the - // executor. - let new = if state & (!(REFERENCE - 1) | CLOSED) == 0 { - SCHEDULED | CLOSED | REFERENCE - } else { - state & !HANDLE - }; - - // Unset the handle flag. - match (*header).state.compare_exchange_weak( - state, - new, - Ordering::AcqRel, - Ordering::Acquire, - ) { - Ok(_) => { - // If this is the last reference to the task, we need to either - // schedule dropping its future or destroy it. - if state & !(REFERENCE - 1) == 0 { - if state & CLOSED == 0 { - ((*header).vtable.schedule)(ptr); - } else { - ((*header).vtable.destroy)(ptr); - } - } - - break; - } - Err(s) => state = s, - } - } - } - } - } - - // Drop the output if it was taken out of the task. - drop(output); - } -} - -impl Future for JoinHandle { - type Output = Option; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let ptr = self.raw_task.as_ptr(); - let header = ptr as *const Header; - - unsafe { - let mut state = (*header).state.load(Ordering::Acquire); - - loop { - // If the task has been closed, notify the awaiter and return `None`. - if state & CLOSED != 0 { - // Even though the awaiter is most likely the current task, it could also be - // another task. - (*header).notify_unless(cx.waker()); - return Poll::Ready(None); - } - - // If the task is not completed, register the current task. - if state & COMPLETED == 0 { - // Replace the waker with one associated with the current task. We need a - // safeguard against panics because dropping the previous waker can panic. - abort_on_panic(|| { - (*header).swap_awaiter(Some(cx.waker().clone())); - }); - - // Reload the state after registering. It is possible that the task became - // completed or closed just before registration so we need to check for that. - state = (*header).state.load(Ordering::Acquire); - - // If the task has been closed, notify the awaiter and return `None`. - if state & CLOSED != 0 { - // Even though the awaiter is most likely the current task, it could also - // be another task. - (*header).notify_unless(cx.waker()); - return Poll::Ready(None); - } - - // If the task is still not completed, we're blocked on it. - if state & COMPLETED == 0 { - return Poll::Pending; - } - } - - // Since the task is now completed, mark it as closed in order to grab its output. - match (*header).state.compare_exchange( - state, - state | CLOSED, - Ordering::AcqRel, - Ordering::Acquire, - ) { - Ok(_) => { - // Notify the awaiter. Even though the awaiter is most likely the current - // task, it could also be another task. - if state & AWAITER != 0 { - (*header).notify_unless(cx.waker()); - } - - // Take the output from the task. - let output = ((*header).vtable.get_output)(ptr) as *mut R; - return Poll::Ready(Some(output.read())); - } - Err(s) => state = s, - } - } - } - } -} - -impl fmt::Debug for JoinHandle { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - let ptr = self.raw_task.as_ptr(); - let header = ptr as *const Header; - - f.debug_struct("JoinHandle") - .field("header", unsafe { &(*header) }) - .finish() - } -} diff --git a/async-task/src/lib.rs b/async-task/src/lib.rs deleted file mode 100644 index 55185154..00000000 --- a/async-task/src/lib.rs +++ /dev/null @@ -1,149 +0,0 @@ -//! Task abstraction for building executors. -//! -//! # What is an executor? -//! -//! An async block creates a future and an async function returns one. But futures don't do -//! anything unless they are awaited inside other async blocks or async functions. So the question -//! arises: who or what awaits the main future that awaits others? -//! -//! One solution is to call [`block_on()`] on the main future, which will block -//! the current thread and keep polling the future until it completes. But sometimes we don't want -//! to block the current thread and would prefer to *spawn* the future to let a background thread -//! block on it instead. -//! -//! This is where executors step in - they create a number of threads (typically equal to the -//! number of CPU cores on the system) that are dedicated to polling spawned futures. Each executor -//! thread keeps polling spawned futures in a loop and only blocks when all spawned futures are -//! either sleeping or running. -//! -//! # What is a task? -//! -//! In order to spawn a future on an executor, one needs to allocate the future on the heap and -//! keep some state alongside it, like whether the future is ready for polling, waiting to be woken -//! up, or completed. This allocation is usually called a *task*. -//! -//! The executor then runs the spawned task by polling its future. If the future is pending on a -//! resource, a [`Waker`] associated with the task will be registered somewhere so that the task -//! can be woken up and run again at a later time. -//! -//! For example, if the future wants to read something from a TCP socket that is not ready yet, the -//! networking system will clone the task's waker and wake it up once the socket becomes ready. -//! -//! # Task construction -//! -//! A task is constructed with [`Task::create()`]: -//! -//! ``` -//! # #![feature(async_await)] -//! let future = async { 1 + 2 }; -//! let schedule = |task| unimplemented!(); -//! -//! let (task, handle) = async_task::spawn(future, schedule, ()); -//! ``` -//! -//! The first argument to the constructor, `()` in this example, is an arbitrary piece of data -//! called a *tag*. This can be a task identifier, a task name, task-local storage, or something -//! of similar nature. -//! -//! The second argument is the future that gets polled when the task is run. -//! -//! The third argument is the schedule function, which is called every time when the task gets -//! woken up. This function should push the received task into some kind of queue of runnable -//! tasks. -//! -//! The constructor returns a runnable [`Task`] and a [`JoinHandle`] that can await the result of -//! the future. -//! -//! # Task scheduling -//! -//! TODO -//! -//! # Join handles -//! -//! TODO -//! -//! # Cancellation -//! -//! TODO -//! -//! # Performance -//! -//! TODO: explain single allocation, etc. -//! -//! Task [construction] incurs a single allocation only. The [`Task`] can then be run and its -//! result awaited through the [`JoinHandle`]. When woken, the task gets automatically rescheduled. -//! It's also possible to cancel the task so that it stops running and can't be awaited anymore. -//! -//! [construction]: struct.Task.html#method.create -//! [`JoinHandle`]: struct.JoinHandle.html -//! [`Task`]: struct.Task.html -//! [`Future`]: https://doc.rust-lang.org/nightly/std/future/trait.Future.html -//! [`Waker`]: https://doc.rust-lang.org/nightly/std/task/struct.Waker.html -//! [`block_on()`]: https://docs.rs/futures-preview/*/futures/executor/fn.block_on.html -//! -//! # Examples -//! -//! A simple single-threaded executor: -//! -//! ``` -//! # #![feature(async_await)] -//! use std::future::Future; -//! use std::panic::catch_unwind; -//! use std::thread; -//! -//! use async_task::{JoinHandle, Task}; -//! use crossbeam::channel::{unbounded, Sender}; -//! use futures::executor; -//! use lazy_static::lazy_static; -//! -//! /// Spawns a future on the executor. -//! fn spawn(future: F) -> JoinHandle -//! where -//! F: Future + Send + 'static, -//! R: Send + 'static, -//! { -//! lazy_static! { -//! // A channel that holds scheduled tasks. -//! static ref QUEUE: Sender> = { -//! let (sender, receiver) = unbounded::>(); -//! -//! // Start the executor thread. -//! thread::spawn(|| { -//! for task in receiver { -//! // Ignore panics for simplicity. -//! let _ignore_panic = catch_unwind(|| task.run()); -//! } -//! }); -//! -//! sender -//! }; -//! } -//! -//! // Create a task that is scheduled by sending itself into the channel. -//! let schedule = |t| QUEUE.send(t).unwrap(); -//! let (task, handle) = async_task::spawn(future, schedule, ()); -//! -//! // Schedule the task by sending it into the channel. -//! task.schedule(); -//! -//! handle -//! } -//! -//! // Spawn a future and await its result. -//! let handle = spawn(async { -//! println!("Hello, world!"); -//! }); -//! executor::block_on(handle); -//! ``` - -#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)] - -mod header; -mod join_handle; -mod raw; -mod state; -mod task; -mod utils; - -pub use crate::join_handle::JoinHandle; -pub use crate::task::{spawn, Task}; diff --git a/async-task/src/raw.rs b/async-task/src/raw.rs deleted file mode 100644 index 69284275..00000000 --- a/async-task/src/raw.rs +++ /dev/null @@ -1,629 +0,0 @@ -use std::alloc::{self, Layout}; -use std::cell::Cell; -use std::future::Future; -use std::marker::PhantomData; -use std::mem::{self, ManuallyDrop}; -use std::pin::Pin; -use std::ptr::NonNull; -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker}; - -use crate::header::Header; -use crate::state::*; -use crate::utils::{abort_on_panic, extend}; -use crate::Task; - -/// The vtable for a task. -pub(crate) struct TaskVTable { - /// The raw waker vtable. - pub(crate) raw_waker: RawWakerVTable, - - /// Schedules the task. - pub(crate) schedule: unsafe fn(*const ()), - - /// Drops the future inside the task. - pub(crate) drop_future: unsafe fn(*const ()), - - /// Returns a pointer to the output stored after completion. - pub(crate) get_output: unsafe fn(*const ()) -> *const (), - - /// Drops a waker or a task. - pub(crate) decrement: unsafe fn(ptr: *const ()), - - /// Destroys the task. - pub(crate) destroy: unsafe fn(*const ()), - - /// Runs the task. - pub(crate) run: unsafe fn(*const ()), -} - -/// Memory layout of a task. -/// -/// This struct contains the information on: -/// -/// 1. How to allocate and deallocate the task. -/// 2. How to access the fields inside the task. -#[derive(Clone, Copy)] -pub(crate) struct TaskLayout { - /// Memory layout of the whole task. - pub(crate) layout: Layout, - - /// Offset into the task at which the tag is stored. - pub(crate) offset_t: usize, - - /// Offset into the task at which the schedule function is stored. - pub(crate) offset_s: usize, - - /// Offset into the task at which the future is stored. - pub(crate) offset_f: usize, - - /// Offset into the task at which the output is stored. - pub(crate) offset_r: usize, -} - -/// Raw pointers to the fields of a task. -pub(crate) struct RawTask { - /// The task header. - pub(crate) header: *const Header, - - /// The schedule function. - pub(crate) schedule: *const S, - - /// The tag inside the task. - pub(crate) tag: *mut T, - - /// The future. - pub(crate) future: *mut F, - - /// The output of the future. - pub(crate) output: *mut R, -} - -impl Copy for RawTask {} - -impl Clone for RawTask { - fn clone(&self) -> Self { - Self { - header: self.header, - schedule: self.schedule, - tag: self.tag, - future: self.future, - output: self.output, - } - } -} - -impl RawTask -where - F: Future + Send + 'static, - R: Send + 'static, - S: Fn(Task) + Send + Sync + 'static, - T: Send + 'static, -{ - /// Allocates a task with the given `future` and `schedule` function. - /// - /// It is assumed there are initially only the `Task` reference and the `JoinHandle`. - pub(crate) fn allocate(tag: T, future: F, schedule: S) -> NonNull<()> { - // Compute the layout of the task for allocation. Abort if the computation fails. - let task_layout = abort_on_panic(|| Self::task_layout()); - - unsafe { - // Allocate enough space for the entire task. - let raw_task = match NonNull::new(alloc::alloc(task_layout.layout) as *mut ()) { - None => std::process::abort(), - Some(p) => p, - }; - - let raw = Self::from_ptr(raw_task.as_ptr()); - - // Write the header as the first field of the task. - (raw.header as *mut Header).write(Header { - state: AtomicUsize::new(SCHEDULED | HANDLE | REFERENCE), - awaiter: Cell::new(None), - vtable: &TaskVTable { - raw_waker: RawWakerVTable::new( - Self::clone_waker, - Self::wake, - Self::wake_by_ref, - Self::decrement, - ), - schedule: Self::schedule, - drop_future: Self::drop_future, - get_output: Self::get_output, - decrement: Self::decrement, - destroy: Self::destroy, - run: Self::run, - }, - }); - - // Write the tag as the second field of the task. - (raw.tag as *mut T).write(tag); - - // Write the schedule function as the third field of the task. - (raw.schedule as *mut S).write(schedule); - - // Write the future as the fourth field of the task. - raw.future.write(future); - - raw_task - } - } - - /// Creates a `RawTask` from a raw task pointer. - #[inline] - pub(crate) fn from_ptr(ptr: *const ()) -> Self { - let task_layout = Self::task_layout(); - let p = ptr as *const u8; - - unsafe { - Self { - header: p as *const Header, - tag: p.add(task_layout.offset_t) as *mut T, - schedule: p.add(task_layout.offset_s) as *const S, - future: p.add(task_layout.offset_f) as *mut F, - output: p.add(task_layout.offset_r) as *mut R, - } - } - } - - /// Returns the memory layout for a task. - #[inline] - fn task_layout() -> TaskLayout { - // Compute the layouts for `Header`, `T`, `S`, `F`, and `R`. - let layout_header = Layout::new::
(); - let layout_t = Layout::new::(); - let layout_s = Layout::new::(); - let layout_f = Layout::new::(); - let layout_r = Layout::new::(); - - // Compute the layout for `union { F, R }`. - let size_union = layout_f.size().max(layout_r.size()); - let align_union = layout_f.align().max(layout_r.align()); - let layout_union = unsafe { Layout::from_size_align_unchecked(size_union, align_union) }; - - // Compute the layout for `Header` followed by `T`, then `S`, then `union { F, R }`. - let layout = layout_header; - let (layout, offset_t) = extend(layout, layout_t); - let (layout, offset_s) = extend(layout, layout_s); - let (layout, offset_union) = extend(layout, layout_union); - let offset_f = offset_union; - let offset_r = offset_union; - - TaskLayout { - layout, - offset_t, - offset_s, - offset_f, - offset_r, - } - } - - /// Wakes a waker. - unsafe fn wake(ptr: *const ()) { - let raw = Self::from_ptr(ptr); - - let mut state = (*raw.header).state.load(Ordering::Acquire); - - loop { - // If the task is completed or closed, it can't be woken. - if state & (COMPLETED | CLOSED) != 0 { - // Drop the waker. - Self::decrement(ptr); - break; - } - - // If the task is already scheduled, we just need to synchronize with the thread that - // will run the task by "publishing" our current view of the memory. - if state & SCHEDULED != 0 { - // Update the state without actually modifying it. - match (*raw.header).state.compare_exchange_weak( - state, - state, - Ordering::AcqRel, - Ordering::Acquire, - ) { - Ok(_) => { - // Drop the waker. - Self::decrement(ptr); - break; - } - Err(s) => state = s, - } - } else { - // Mark the task as scheduled. - match (*raw.header).state.compare_exchange_weak( - state, - state | SCHEDULED, - Ordering::AcqRel, - Ordering::Acquire, - ) { - Ok(_) => { - // If the task is not yet scheduled and isn't currently running, now is the - // time to schedule it. - if state & (SCHEDULED | RUNNING) == 0 { - // Schedule the task. - let task = Task { - raw_task: NonNull::new_unchecked(ptr as *mut ()), - _marker: PhantomData, - }; - (*raw.schedule)(task); - } else { - // Drop the waker. - Self::decrement(ptr); - } - - break; - } - Err(s) => state = s, - } - } - } - } - - /// Wakes a waker by reference. - unsafe fn wake_by_ref(ptr: *const ()) { - let raw = Self::from_ptr(ptr); - - let mut state = (*raw.header).state.load(Ordering::Acquire); - - loop { - // If the task is completed or closed, it can't be woken. - if state & (COMPLETED | CLOSED) != 0 { - break; - } - - // If the task is already scheduled, we just need to synchronize with the thread that - // will run the task by "publishing" our current view of the memory. - if state & SCHEDULED != 0 { - // Update the state without actually modifying it. - match (*raw.header).state.compare_exchange_weak( - state, - state, - Ordering::AcqRel, - Ordering::Acquire, - ) { - Ok(_) => break, - Err(s) => state = s, - } - } else { - // If the task is not scheduled nor running, we'll need to schedule after waking. - let new = if state & (SCHEDULED | RUNNING) == 0 { - (state | SCHEDULED) + REFERENCE - } else { - state | SCHEDULED - }; - - // Mark the task as scheduled. - match (*raw.header).state.compare_exchange_weak( - state, - new, - Ordering::AcqRel, - Ordering::Acquire, - ) { - Ok(_) => { - // If the task is not scheduled nor running, now is the time to schedule. - if state & (SCHEDULED | RUNNING) == 0 { - // If the reference count overflowed, abort. - if state > isize::max_value() as usize { - std::process::abort(); - } - - // Schedule the task. - let task = Task { - raw_task: NonNull::new_unchecked(ptr as *mut ()), - _marker: PhantomData, - }; - (*raw.schedule)(task); - } - - break; - } - Err(s) => state = s, - } - } - } - } - - /// Clones a waker. - unsafe fn clone_waker(ptr: *const ()) -> RawWaker { - let raw = Self::from_ptr(ptr); - let raw_waker = &(*raw.header).vtable.raw_waker; - - // Increment the reference count. With any kind of reference-counted data structure, - // relaxed ordering is fine when the reference is being cloned. - let state = (*raw.header).state.fetch_add(REFERENCE, Ordering::Relaxed); - - // If the reference count overflowed, abort. - if state > isize::max_value() as usize { - std::process::abort(); - } - - RawWaker::new(ptr, raw_waker) - } - - /// Drops a waker or a task. - /// - /// This function will decrement the reference count. If it drops down to zero and the - /// associated join handle has been dropped too, then the task gets destroyed. - #[inline] - unsafe fn decrement(ptr: *const ()) { - let raw = Self::from_ptr(ptr); - - // Decrement the reference count. - let new = (*raw.header).state.fetch_sub(REFERENCE, Ordering::AcqRel) - REFERENCE; - - // If this was the last reference to the task and the `JoinHandle` has been dropped as - // well, then destroy task. - if new & !(REFERENCE - 1) == 0 && new & HANDLE == 0 { - Self::destroy(ptr); - } - } - - /// Schedules a task for running. - /// - /// This function doesn't modify the state of the task. It only passes the task reference to - /// its schedule function. - unsafe fn schedule(ptr: *const ()) { - let raw = Self::from_ptr(ptr); - - (*raw.schedule)(Task { - raw_task: NonNull::new_unchecked(ptr as *mut ()), - _marker: PhantomData, - }); - } - - /// Drops the future inside a task. - #[inline] - unsafe fn drop_future(ptr: *const ()) { - let raw = Self::from_ptr(ptr); - - // We need a safeguard against panics because the destructor can panic. - abort_on_panic(|| { - raw.future.drop_in_place(); - }) - } - - /// Returns a pointer to the output inside a task. - unsafe fn get_output(ptr: *const ()) -> *const () { - let raw = Self::from_ptr(ptr); - raw.output as *const () - } - - /// Cleans up task's resources and deallocates it. - /// - /// If the task has not been closed, then its future or the output will be dropped. The - /// schedule function and the tag get dropped too. - #[inline] - unsafe fn destroy(ptr: *const ()) { - let raw = Self::from_ptr(ptr); - let task_layout = Self::task_layout(); - - // We need a safeguard against panics because destructors can panic. - abort_on_panic(|| { - // Drop the schedule function. - (raw.schedule as *mut S).drop_in_place(); - - // Drop the tag. - (raw.tag as *mut T).drop_in_place(); - }); - - // Finally, deallocate the memory reserved by the task. - alloc::dealloc(ptr as *mut u8, task_layout.layout); - } - - /// Runs a task. - /// - /// If polling its future panics, the task will be closed and the panic propagated into the - /// caller. - unsafe fn run(ptr: *const ()) { - let raw = Self::from_ptr(ptr); - - // Create a context from the raw task pointer and the vtable inside the its header. - let waker = ManuallyDrop::new(Waker::from_raw(RawWaker::new( - ptr, - &(*raw.header).vtable.raw_waker, - ))); - let cx = &mut Context::from_waker(&waker); - - let mut state = (*raw.header).state.load(Ordering::Acquire); - - // Update the task's state before polling its future. - loop { - // If the task has been closed, drop the task reference and return. - if state & CLOSED != 0 { - // Notify the awaiter that the task has been closed. - if state & AWAITER != 0 { - (*raw.header).notify(); - } - - // Drop the future. - Self::drop_future(ptr); - - // Drop the task reference. - Self::decrement(ptr); - return; - } - - // Mark the task as unscheduled and running. - match (*raw.header).state.compare_exchange_weak( - state, - (state & !SCHEDULED) | RUNNING, - Ordering::AcqRel, - Ordering::Acquire, - ) { - Ok(_) => { - // Update the state because we're continuing with polling the future. - state = (state & !SCHEDULED) | RUNNING; - break; - } - Err(s) => state = s, - } - } - - // Poll the inner future, but surround it with a guard that closes the task in case polling - // panics. - let guard = Guard(raw); - let poll = ::poll(Pin::new_unchecked(&mut *raw.future), cx); - mem::forget(guard); - - match poll { - Poll::Ready(out) => { - // Replace the future with its output. - Self::drop_future(ptr); - raw.output.write(out); - - // A place where the output will be stored in case it needs to be dropped. - let mut output = None; - - // The task is now completed. - loop { - // If the handle is dropped, we'll need to close it and drop the output. - let new = if state & HANDLE == 0 { - (state & !RUNNING & !SCHEDULED) | COMPLETED | CLOSED - } else { - (state & !RUNNING & !SCHEDULED) | COMPLETED - }; - - // Mark the task as not running and completed. - match (*raw.header).state.compare_exchange_weak( - state, - new, - Ordering::AcqRel, - Ordering::Acquire, - ) { - Ok(_) => { - // If the handle is dropped or if the task was closed while running, - // now it's time to drop the output. - if state & HANDLE == 0 || state & CLOSED != 0 { - // Read the output. - output = Some(raw.output.read()); - } - - // Notify the awaiter that the task has been completed. - if state & AWAITER != 0 { - (*raw.header).notify(); - } - - // Drop the task reference. - Self::decrement(ptr); - break; - } - Err(s) => state = s, - } - } - - // Drop the output if it was taken out of the task. - drop(output); - } - Poll::Pending => { - // The task is still not completed. - loop { - // If the task was closed while running, we'll need to unschedule in case it - // was woken and then clean up its resources. - let new = if state & CLOSED != 0 { - state & !RUNNING & !SCHEDULED - } else { - state & !RUNNING - }; - - // Mark the task as not running. - match (*raw.header).state.compare_exchange_weak( - state, - new, - Ordering::AcqRel, - Ordering::Acquire, - ) { - Ok(state) => { - // If the task was closed while running, we need to drop its future. - // If the task was woken while running, we need to schedule it. - // Otherwise, we just drop the task reference. - if state & CLOSED != 0 { - // The thread that closed the task didn't drop the future because - // it was running so now it's our responsibility to do so. - Self::drop_future(ptr); - - // Drop the task reference. - Self::decrement(ptr); - } else if state & SCHEDULED != 0 { - // The thread that has woken the task didn't reschedule it because - // it was running so now it's our responsibility to do so. - Self::schedule(ptr); - } else { - // Drop the task reference. - Self::decrement(ptr); - } - break; - } - Err(s) => state = s, - } - } - } - } - - /// A guard that closes the task if polling its future panics. - struct Guard(RawTask) - where - F: Future + Send + 'static, - R: Send + 'static, - S: Fn(Task) + Send + Sync + 'static, - T: Send + 'static; - - impl Drop for Guard - where - F: Future + Send + 'static, - R: Send + 'static, - S: Fn(Task) + Send + Sync + 'static, - T: Send + 'static, - { - fn drop(&mut self) { - let raw = self.0; - let ptr = raw.header as *const (); - - unsafe { - let mut state = (*raw.header).state.load(Ordering::Acquire); - - loop { - // If the task was closed while running, then unschedule it, drop its - // future, and drop the task reference. - if state & CLOSED != 0 { - // We still need to unschedule the task because it is possible it was - // woken while running. - (*raw.header).state.fetch_and(!SCHEDULED, Ordering::AcqRel); - - // The thread that closed the task didn't drop the future because it - // was running so now it's our responsibility to do so. - RawTask::::drop_future(ptr); - - // Drop the task reference. - RawTask::::decrement(ptr); - break; - } - - // Mark the task as not running, not scheduled, and closed. - match (*raw.header).state.compare_exchange_weak( - state, - (state & !RUNNING & !SCHEDULED) | CLOSED, - Ordering::AcqRel, - Ordering::Acquire, - ) { - Ok(state) => { - // Drop the future because the task is now closed. - RawTask::::drop_future(ptr); - - // Notify the awaiter that the task has been closed. - if state & AWAITER != 0 { - (*raw.header).notify(); - } - - // Drop the task reference. - RawTask::::decrement(ptr); - break; - } - Err(s) => state = s, - } - } - } - } - } - } -} diff --git a/async-task/src/state.rs b/async-task/src/state.rs deleted file mode 100644 index d6ce34fd..00000000 --- a/async-task/src/state.rs +++ /dev/null @@ -1,65 +0,0 @@ -/// Set if the task is scheduled for running. -/// -/// A task is considered to be scheduled whenever its `Task` reference exists. It is in scheduled -/// state at the moment of creation and when it gets unapused either by its `JoinHandle` or woken -/// by a `Waker`. -/// -/// This flag can't be set when the task is completed. However, it can be set while the task is -/// running, in which case it will be rescheduled as soon as polling finishes. -pub(crate) const SCHEDULED: usize = 1 << 0; - -/// Set if the task is running. -/// -/// A task is running state while its future is being polled. -/// -/// This flag can't be set when the task is completed. However, it can be in scheduled state while -/// it is running, in which case it will be rescheduled when it stops being polled. -pub(crate) const RUNNING: usize = 1 << 1; - -/// Set if the task has been completed. -/// -/// This flag is set when polling returns `Poll::Ready`. The output of the future is then stored -/// inside the task until it becomes stopped. In fact, `JoinHandle` picks the output up by marking -/// the task as stopped. -/// -/// This flag can't be set when the task is scheduled or completed. -pub(crate) const COMPLETED: usize = 1 << 2; - -/// Set if the task is closed. -/// -/// If a task is closed, that means its either cancelled or its output has been consumed by the -/// `JoinHandle`. A task becomes closed when: -/// -/// 1. It gets cancelled by `Task::cancel()` or `JoinHandle::cancel()`. -/// 2. Its output is awaited by the `JoinHandle`. -/// 3. It panics while polling the future. -/// 4. It is completed and the `JoinHandle` is dropped. -pub(crate) const CLOSED: usize = 1 << 3; - -/// Set if the `JoinHandle` still exists. -/// -/// The `JoinHandle` is a special case in that it is only tracked by this flag, while all other -/// task references (`Task` and `Waker`s) are tracked by the reference count. -pub(crate) const HANDLE: usize = 1 << 4; - -/// Set if the `JoinHandle` is awaiting the output. -/// -/// This flag is set while there is a registered awaiter of type `Waker` inside the task. When the -/// task gets closed or completed, we need to wake the awaiter. This flag can be used as a fast -/// check that tells us if we need to wake anyone without acquiring the lock inside the task. -pub(crate) const AWAITER: usize = 1 << 5; - -/// Set if the awaiter is locked. -/// -/// This lock is acquired before a new awaiter is registered or the existing one is woken. -pub(crate) const LOCKED: usize = 1 << 6; - -/// A single reference. -/// -/// The lower bits in the state contain various flags representing the task state, while the upper -/// bits contain the reference count. The value of `REFERENCE` represents a single reference in the -/// total reference count. -/// -/// Note that the reference counter only tracks the `Task` and `Waker`s. The `JoinHandle` is -/// tracked separately by the `HANDLE` flag. -pub(crate) const REFERENCE: usize = 1 << 7; diff --git a/async-task/src/task.rs b/async-task/src/task.rs deleted file mode 100644 index 8bfc1643..00000000 --- a/async-task/src/task.rs +++ /dev/null @@ -1,390 +0,0 @@ -use std::fmt; -use std::future::Future; -use std::marker::PhantomData; -use std::mem; -use std::ptr::NonNull; - -use crate::header::Header; -use crate::raw::RawTask; -use crate::JoinHandle; - -/// Creates a new task. -/// -/// This constructor returns a `Task` reference that runs the future and a [`JoinHandle`] that -/// awaits its result. -/// -/// The `tag` is stored inside the allocated task. -/// -/// When run, the task polls `future`. When woken, it gets scheduled for running by the -/// `schedule` function. -/// -/// # Examples -/// -/// ``` -/// # #![feature(async_await)] -/// use crossbeam::channel; -/// -/// // The future inside the task. -/// let future = async { -/// println!("Hello, world!"); -/// }; -/// -/// // If the task gets woken, it will be sent into this channel. -/// let (s, r) = channel::unbounded(); -/// let schedule = move |task| s.send(task).unwrap(); -/// -/// // Create a task with the future and the schedule function. -/// let (task, handle) = async_task::spawn(future, schedule, ()); -/// ``` -/// -/// [`JoinHandle`]: struct.JoinHandle.html -pub fn spawn(future: F, schedule: S, tag: T) -> (Task, JoinHandle) -where - F: Future + Send + 'static, - R: Send + 'static, - S: Fn(Task) + Send + Sync + 'static, - T: Send + Sync + 'static, -{ - let raw_task = RawTask::::allocate(tag, future, schedule); - let task = Task { - raw_task, - _marker: PhantomData, - }; - let handle = JoinHandle { - raw_task, - _marker: PhantomData, - }; - (task, handle) -} - -/// A task that runs a future. -/// -/// # Construction -/// -/// A task is a heap-allocated structure containing: -/// -/// * A reference counter. -/// * The state of the task. -/// * Arbitrary piece of data called a *tag*. -/// * A function that schedules the task when woken. -/// * A future or its result if polling has completed. -/// -/// Constructor [`Task::create()`] returns a [`Task`] and a [`JoinHandle`]. Those two references -/// are like two sides of the task: one runs the future and the other awaits its result. -/// -/// # Behavior -/// -/// The [`Task`] reference "owns" the task itself and is used to [run] it. Running consumes the -/// [`Task`] reference and polls its internal future. If the future is still pending after being -/// polled, the [`Task`] reference will be recreated when woken by a [`Waker`]. If the future -/// completes, its result becomes available to the [`JoinHandle`]. -/// -/// The [`JoinHandle`] is a [`Future`] that awaits the result of the task. -/// -/// When the task is woken, its [`Task`] reference is recreated and passed to the schedule function -/// provided during construction. In most executors, scheduling simply pushes the [`Task`] into a -/// queue of runnable tasks. -/// -/// If the [`Task`] reference is dropped without being run, the task is cancelled. -/// -/// Both [`Task`] and [`JoinHandle`] have methods that cancel the task. When cancelled, the task -/// won't be scheduled again even if a [`Waker`] wakes it or the [`JoinHandle`] is polled. An -/// attempt to run a cancelled task won't do anything. And if the cancelled task has already -/// completed, awaiting its result through [`JoinHandle`] will return `None`. -/// -/// If polling the task's future panics, it gets cancelled automatically. -/// -/// # Task states -/// -/// A task can be in the following states: -/// -/// * Sleeping: The [`Task`] reference doesn't exist and is waiting to be scheduled by a [`Waker`]. -/// * Scheduled: The [`Task`] reference exists and is waiting to be [run]. -/// * Completed: The [`Task`] reference doesn't exist anymore and can't be rescheduled, but its -/// result is available to the [`JoinHandle`]. -/// * Cancelled: The [`Task`] reference may or may not exist, but running it does nothing and -/// awaiting the [`JoinHandle`] returns `None`. -/// -/// When constructed, the task is initially in the scheduled state. -/// -/// # Destruction -/// -/// The future inside the task gets dropped in the following cases: -/// -/// * When [`Task`] is dropped. -/// * When [`Task`] is run to completion. -/// -/// If the future hasn't been dropped and the last [`Waker`] or [`JoinHandle`] is dropped, or if -/// a [`JoinHandle`] cancels the task, then the task will be scheduled one last time so that its -/// future gets dropped by the executor. In other words, the task's future can be dropped only by -/// [`Task`]. -/// -/// When the task completes, the result of its future is stored inside the allocation. This result -/// is taken out when the [`JoinHandle`] awaits it. When the task is cancelled or the -/// [`JoinHandle`] is dropped without being awaited, the result gets dropped too. -/// -/// The task gets deallocated when all references to it are dropped, which includes the [`Task`], -/// the [`JoinHandle`], and any associated [`Waker`]s. -/// -/// The tag inside the task and the schedule function get dropped at the time of deallocation. -/// -/// # Panics -/// -/// If polling the inner future inside [`run()`] panics, the panic will be propagated into -/// the caller. Likewise, a panic inside the task result's destructor will be propagated. All other -/// panics result in the process being aborted. -/// -/// More precisely, the process is aborted if a panic occurs: -/// -/// * Inside the schedule function. -/// * While dropping the tag. -/// * While dropping the future. -/// * While dropping the schedule function. -/// * While waking the task awaiting the [`JoinHandle`]. -/// -/// [`run()`]: struct.Task.html#method.run -/// [run]: struct.Task.html#method.run -/// [`JoinHandle`]: struct.JoinHandle.html -/// [`Task`]: struct.Task.html -/// [`Task::create()`]: struct.Task.html#method.create -/// [`Future`]: https://doc.rust-lang.org/std/future/trait.Future.html -/// [`Waker`]: https://doc.rust-lang.org/std/task/struct.Waker.html -/// -/// # Examples -/// -/// ``` -/// # #![feature(async_await)] -/// use async_task::Task; -/// use crossbeam::channel; -/// use futures::executor; -/// -/// // The future inside the task. -/// let future = async { -/// println!("Hello, world!"); -/// }; -/// -/// // If the task gets woken, it will be sent into this channel. -/// let (s, r) = channel::unbounded(); -/// let schedule = move |task| s.send(task).unwrap(); -/// -/// // Create a task with the future and the schedule function. -/// let (task, handle) = async_task::spawn(future, schedule, ()); -/// -/// // Run the task. In this example, it will complete after a single run. -/// task.run(); -/// assert!(r.is_empty()); -/// -/// // Await its result. -/// executor::block_on(handle); -/// ``` -pub struct Task { - /// A pointer to the heap-allocated task. - pub(crate) raw_task: NonNull<()>, - - /// A marker capturing the generic type `T`. - pub(crate) _marker: PhantomData, -} - -unsafe impl Send for Task {} -unsafe impl Sync for Task {} - -impl Task { - /// Schedules the task. - /// - /// This is a convenience method that simply reschedules the task by passing it to its schedule - /// function. - /// - /// If the task is cancelled, this method won't do anything. - /// - /// # Examples - /// - /// ``` - /// # #![feature(async_await)] - /// use crossbeam::channel; - /// - /// // The future inside the task. - /// let future = async { - /// println!("Hello, world!"); - /// }; - /// - /// // If the task gets woken, it will be sent into this channel. - /// let (s, r) = channel::unbounded(); - /// let schedule = move |task| s.send(task).unwrap(); - /// - /// // Create a task with the future and the schedule function. - /// let (task, handle) = async_task::spawn(future, schedule, ()); - /// - /// // Send the task into the channel. - /// task.schedule(); - /// - /// // Retrieve the task back from the channel. - /// let task = r.recv().unwrap(); - /// ``` - pub fn schedule(self) { - let ptr = self.raw_task.as_ptr(); - let header = ptr as *const Header; - mem::forget(self); - - unsafe { - ((*header).vtable.schedule)(ptr); - } - } - - /// Runs the task. - /// - /// This method polls the task's future. If the future completes, its result will become - /// available to the [`JoinHandle`]. And if the future is still pending, the task will have to - /// be woken in order to be rescheduled and then run again. - /// - /// If the task is cancelled, running it won't do anything. - /// - /// # Panics - /// - /// It is possible that polling the future panics, in which case the panic will be propagated - /// into the caller. It is advised that invocations of this method are wrapped inside - /// [`catch_unwind`]. - /// - /// If a panic occurs, the task is automatically cancelled. - /// - /// [`catch_unwind`]: https://doc.rust-lang.org/std/panic/fn.catch_unwind.html - /// - /// # Examples - /// - /// ``` - /// # #![feature(async_await)] - /// use crossbeam::channel; - /// use futures::executor; - /// - /// // The future inside the task. - /// let future = async { 1 + 2 }; - /// - /// // If the task gets woken, it will be sent into this channel. - /// let (s, r) = channel::unbounded(); - /// let schedule = move |task| s.send(task).unwrap(); - /// - /// // Create a task with the future and the schedule function. - /// let (task, handle) = async_task::spawn(future, schedule, ()); - /// - /// // Run the task. In this example, it will complete after a single run. - /// task.run(); - /// assert!(r.is_empty()); - /// - /// // Await the result of the task. - /// let result = executor::block_on(handle); - /// assert_eq!(result, Some(3)); - /// ``` - pub fn run(self) { - let ptr = self.raw_task.as_ptr(); - let header = ptr as *const Header; - mem::forget(self); - - unsafe { - ((*header).vtable.run)(ptr); - } - } - - /// Cancels the task. - /// - /// When cancelled, the task won't be scheduled again even if a [`Waker`] wakes it. An attempt - /// to run it won't do anything. And if it's completed, awaiting its result evaluates to - /// `None`. - /// - /// [`Waker`]: https://doc.rust-lang.org/std/task/struct.Waker.html - /// - /// # Examples - /// - /// ``` - /// # #![feature(async_await)] - /// use crossbeam::channel; - /// use futures::executor; - /// - /// // The future inside the task. - /// let future = async { 1 + 2 }; - /// - /// // If the task gets woken, it will be sent into this channel. - /// let (s, r) = channel::unbounded(); - /// let schedule = move |task| s.send(task).unwrap(); - /// - /// // Create a task with the future and the schedule function. - /// let (task, handle) = async_task::spawn(future, schedule, ()); - /// - /// // Cancel the task. - /// task.cancel(); - /// - /// // Running a cancelled task does nothing. - /// task.run(); - /// - /// // Await the result of the task. - /// let result = executor::block_on(handle); - /// assert_eq!(result, None); - /// ``` - pub fn cancel(&self) { - let ptr = self.raw_task.as_ptr(); - let header = ptr as *const Header; - - unsafe { - (*header).cancel(); - } - } - - /// Returns a reference to the tag stored inside the task. - /// - /// # Examples - /// - /// ``` - /// # #![feature(async_await)] - /// use crossbeam::channel; - /// - /// // The future inside the task. - /// let future = async { 1 + 2 }; - /// - /// // If the task gets woken, it will be sent into this channel. - /// let (s, r) = channel::unbounded(); - /// let schedule = move |task| s.send(task).unwrap(); - /// - /// // Create a task with the future and the schedule function. - /// let (task, handle) = async_task::spawn(future, schedule, "a simple task"); - /// - /// // Access the tag. - /// assert_eq!(*task.tag(), "a simple task"); - /// ``` - pub fn tag(&self) -> &T { - let offset = Header::offset_tag::(); - let ptr = self.raw_task.as_ptr(); - - unsafe { - let raw = (ptr as *mut u8).add(offset) as *const T; - &*raw - } - } -} - -impl Drop for Task { - fn drop(&mut self) { - let ptr = self.raw_task.as_ptr(); - let header = ptr as *const Header; - - unsafe { - // Cancel the task. - (*header).cancel(); - - // Drop the future. - ((*header).vtable.drop_future)(ptr); - - // Drop the task reference. - ((*header).vtable.decrement)(ptr); - } - } -} - -impl fmt::Debug for Task { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - let ptr = self.raw_task.as_ptr(); - let header = ptr as *const Header; - - f.debug_struct("Task") - .field("header", unsafe { &(*header) }) - .field("tag", self.tag()) - .finish() - } -} diff --git a/async-task/src/utils.rs b/async-task/src/utils.rs deleted file mode 100644 index 441ead1e..00000000 --- a/async-task/src/utils.rs +++ /dev/null @@ -1,48 +0,0 @@ -use std::alloc::Layout; -use std::mem; - -/// Calls a function and aborts if it panics. -/// -/// This is useful in unsafe code where we can't recover from panics. -#[inline] -pub(crate) fn abort_on_panic(f: impl FnOnce() -> T) -> T { - struct Bomb; - - impl Drop for Bomb { - fn drop(&mut self) { - std::process::abort(); - } - } - - let bomb = Bomb; - let t = f(); - mem::forget(bomb); - t -} - -/// Returns the layout for `a` followed by `b` and the offset of `b`. -/// -/// This function was adapted from the currently unstable `Layout::extend()`: -/// https://doc.rust-lang.org/nightly/std/alloc/struct.Layout.html#method.extend -#[inline] -pub(crate) fn extend(a: Layout, b: Layout) -> (Layout, usize) { - let new_align = a.align().max(b.align()); - let pad = padding_needed_for(a, b.align()); - - let offset = a.size().checked_add(pad).unwrap(); - let new_size = offset.checked_add(b.size()).unwrap(); - - let layout = Layout::from_size_align(new_size, new_align).unwrap(); - (layout, offset) -} - -/// Returns the padding after `layout` that aligns the following address to `align`. -/// -/// This function was adapted from the currently unstable `Layout::padding_needed_for()`: -/// https://doc.rust-lang.org/nightly/std/alloc/struct.Layout.html#method.padding_needed_for -#[inline] -pub(crate) fn padding_needed_for(layout: Layout, align: usize) -> usize { - let len = layout.size(); - let len_rounded_up = len.wrapping_add(align).wrapping_sub(1) & !align.wrapping_sub(1); - len_rounded_up.wrapping_sub(len) -} diff --git a/async-task/tests/basic.rs b/async-task/tests/basic.rs deleted file mode 100644 index b9e181b1..00000000 --- a/async-task/tests/basic.rs +++ /dev/null @@ -1,314 +0,0 @@ -#![feature(async_await)] - -use std::future::Future; -use std::pin::Pin; -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::task::{Context, Poll}; - -use async_task::Task; -use crossbeam::atomic::AtomicCell; -use crossbeam::channel; -use futures::future; -use lazy_static::lazy_static; - -// Creates a future with event counters. -// -// Usage: `future!(f, POLL, DROP)` -// -// The future `f` always returns `Poll::Ready`. -// When it gets polled, `POLL` is incremented. -// When it gets dropped, `DROP` is incremented. -macro_rules! future { - ($name:pat, $poll:ident, $drop:ident) => { - lazy_static! { - static ref $poll: AtomicCell = AtomicCell::new(0); - static ref $drop: AtomicCell = AtomicCell::new(0); - } - - let $name = { - struct Fut(Box); - - impl Future for Fut { - type Output = Box; - - fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll { - $poll.fetch_add(1); - Poll::Ready(Box::new(0)) - } - } - - impl Drop for Fut { - fn drop(&mut self) { - $drop.fetch_add(1); - } - } - - Fut(Box::new(0)) - }; - }; -} - -// Creates a schedule function with event counters. -// -// Usage: `schedule!(s, SCHED, DROP)` -// -// The schedule function `s` does nothing. -// When it gets invoked, `SCHED` is incremented. -// When it gets dropped, `DROP` is incremented. -macro_rules! schedule { - ($name:pat, $sched:ident, $drop:ident) => { - lazy_static! { - static ref $sched: AtomicCell = AtomicCell::new(0); - static ref $drop: AtomicCell = AtomicCell::new(0); - } - - let $name = { - struct Guard(Box); - - impl Drop for Guard { - fn drop(&mut self) { - $drop.fetch_add(1); - } - } - - let guard = Guard(Box::new(0)); - move |_task| { - &guard; - $sched.fetch_add(1); - } - }; - }; -} - -// Creates a task with event counters. -// -// Usage: `task!(task, handle f, s, DROP)` -// -// A task with future `f` and schedule function `s` is created. -// The `Task` and `JoinHandle` are bound to `task` and `handle`, respectively. -// When the tag inside the task gets dropped, `DROP` is incremented. -macro_rules! task { - ($task:pat, $handle: pat, $future:expr, $schedule:expr, $drop:ident) => { - lazy_static! { - static ref $drop: AtomicCell = AtomicCell::new(0); - } - - let ($task, $handle) = { - struct Tag(Box); - - impl Drop for Tag { - fn drop(&mut self) { - $drop.fetch_add(1); - } - } - - async_task::spawn($future, $schedule, Tag(Box::new(0))) - }; - }; -} - -#[test] -fn cancel_and_drop_handle() { - future!(f, POLL, DROP_F); - schedule!(s, SCHEDULE, DROP_S); - task!(task, handle, f, s, DROP_D); - - assert_eq!(POLL.load(), 0); - assert_eq!(SCHEDULE.load(), 0); - assert_eq!(DROP_F.load(), 0); - assert_eq!(DROP_S.load(), 0); - assert_eq!(DROP_D.load(), 0); - - task.cancel(); - assert_eq!(POLL.load(), 0); - assert_eq!(SCHEDULE.load(), 0); - assert_eq!(DROP_F.load(), 0); - assert_eq!(DROP_S.load(), 0); - assert_eq!(DROP_D.load(), 0); - - drop(handle); - assert_eq!(POLL.load(), 0); - assert_eq!(SCHEDULE.load(), 0); - assert_eq!(DROP_F.load(), 0); - assert_eq!(DROP_S.load(), 0); - assert_eq!(DROP_D.load(), 0); - - drop(task); - assert_eq!(POLL.load(), 0); - assert_eq!(SCHEDULE.load(), 0); - assert_eq!(DROP_F.load(), 1); - assert_eq!(DROP_S.load(), 1); - assert_eq!(DROP_D.load(), 1); -} - -#[test] -fn run_and_drop_handle() { - future!(f, POLL, DROP_F); - schedule!(s, SCHEDULE, DROP_S); - task!(task, handle, f, s, DROP_D); - - drop(handle); - assert_eq!(POLL.load(), 0); - assert_eq!(SCHEDULE.load(), 0); - assert_eq!(DROP_F.load(), 0); - assert_eq!(DROP_S.load(), 0); - assert_eq!(DROP_D.load(), 0); - - task.run(); - assert_eq!(POLL.load(), 1); - assert_eq!(SCHEDULE.load(), 0); - assert_eq!(DROP_F.load(), 1); - assert_eq!(DROP_S.load(), 1); - assert_eq!(DROP_D.load(), 1); -} - -#[test] -fn drop_handle_and_run() { - future!(f, POLL, DROP_F); - schedule!(s, SCHEDULE, DROP_S); - task!(task, handle, f, s, DROP_D); - - drop(handle); - assert_eq!(POLL.load(), 0); - assert_eq!(SCHEDULE.load(), 0); - assert_eq!(DROP_F.load(), 0); - assert_eq!(DROP_S.load(), 0); - assert_eq!(DROP_D.load(), 0); - - task.run(); - assert_eq!(POLL.load(), 1); - assert_eq!(SCHEDULE.load(), 0); - assert_eq!(DROP_F.load(), 1); - assert_eq!(DROP_S.load(), 1); - assert_eq!(DROP_D.load(), 1); -} - -#[test] -fn cancel_and_run() { - future!(f, POLL, DROP_F); - schedule!(s, SCHEDULE, DROP_S); - task!(task, handle, f, s, DROP_D); - - handle.cancel(); - assert_eq!(POLL.load(), 0); - assert_eq!(SCHEDULE.load(), 0); - assert_eq!(DROP_F.load(), 0); - assert_eq!(DROP_S.load(), 0); - assert_eq!(DROP_D.load(), 0); - - drop(handle); - assert_eq!(POLL.load(), 0); - assert_eq!(SCHEDULE.load(), 0); - assert_eq!(DROP_F.load(), 0); - assert_eq!(DROP_S.load(), 0); - assert_eq!(DROP_D.load(), 0); - - task.run(); - assert_eq!(POLL.load(), 0); - assert_eq!(SCHEDULE.load(), 0); - assert_eq!(DROP_F.load(), 1); - assert_eq!(DROP_S.load(), 1); - assert_eq!(DROP_D.load(), 1); -} - -#[test] -fn run_and_cancel() { - future!(f, POLL, DROP_F); - schedule!(s, SCHEDULE, DROP_S); - task!(task, handle, f, s, DROP_D); - - task.run(); - assert_eq!(POLL.load(), 1); - assert_eq!(SCHEDULE.load(), 0); - assert_eq!(DROP_F.load(), 1); - assert_eq!(DROP_S.load(), 0); - assert_eq!(DROP_D.load(), 0); - - handle.cancel(); - assert_eq!(POLL.load(), 1); - assert_eq!(SCHEDULE.load(), 0); - assert_eq!(DROP_F.load(), 1); - assert_eq!(DROP_S.load(), 0); - assert_eq!(DROP_D.load(), 0); - - drop(handle); - assert_eq!(POLL.load(), 1); - assert_eq!(SCHEDULE.load(), 0); - assert_eq!(DROP_F.load(), 1); - assert_eq!(DROP_S.load(), 1); - assert_eq!(DROP_D.load(), 1); -} - -#[test] -fn schedule() { - let (s, r) = channel::unbounded(); - let schedule = move |t| s.send(t).unwrap(); - let (task, _handle) = async_task::spawn( - future::poll_fn(|_| Poll::<()>::Pending), - schedule, - Box::new(0), - ); - - assert!(r.is_empty()); - task.schedule(); - - let task = r.recv().unwrap(); - assert!(r.is_empty()); - task.schedule(); - - let task = r.recv().unwrap(); - assert!(r.is_empty()); - task.schedule(); - - r.recv().unwrap(); -} - -#[test] -fn tag() { - let (s, r) = channel::unbounded(); - let schedule = move |t| s.send(t).unwrap(); - let (task, handle) = async_task::spawn( - future::poll_fn(|_| Poll::<()>::Pending), - schedule, - AtomicUsize::new(7), - ); - - assert!(r.is_empty()); - task.schedule(); - - let task = r.recv().unwrap(); - assert!(r.is_empty()); - handle.tag().fetch_add(1, Ordering::SeqCst); - task.schedule(); - - let task = r.recv().unwrap(); - assert_eq!(task.tag().load(Ordering::SeqCst), 8); - assert!(r.is_empty()); - task.schedule(); - - r.recv().unwrap(); -} - -#[test] -fn schedule_counter() { - let (s, r) = channel::unbounded(); - let schedule = move |t: Task| { - t.tag().fetch_add(1, Ordering::SeqCst); - s.send(t).unwrap(); - }; - let (task, handle) = async_task::spawn( - future::poll_fn(|_| Poll::<()>::Pending), - schedule, - AtomicUsize::new(0), - ); - task.schedule(); - - assert_eq!(handle.tag().load(Ordering::SeqCst), 1); - r.recv().unwrap().schedule(); - - assert_eq!(handle.tag().load(Ordering::SeqCst), 2); - r.recv().unwrap().schedule(); - - assert_eq!(handle.tag().load(Ordering::SeqCst), 3); - r.recv().unwrap(); -} diff --git a/async-task/tests/join.rs b/async-task/tests/join.rs deleted file mode 100644 index e0829394..00000000 --- a/async-task/tests/join.rs +++ /dev/null @@ -1,454 +0,0 @@ -#![feature(async_await)] - -use std::cell::Cell; -use std::future::Future; -use std::pin::Pin; -use std::task::{Context, Poll}; -use std::thread; -use std::time::Duration; - -use async_task::Task; -use crossbeam::atomic::AtomicCell; -use futures::executor::block_on; -use futures::future; -use lazy_static::lazy_static; - -// Creates a future with event counters. -// -// Usage: `future!(f, POLL, DROP_F, DROP_O)` -// -// The future `f` outputs `Poll::Ready`. -// When it gets polled, `POLL` is incremented. -// When it gets dropped, `DROP_F` is incremented. -// When the output gets dropped, `DROP_O` is incremented. -macro_rules! future { - ($name:pat, $poll:ident, $drop_f:ident, $drop_o:ident) => { - lazy_static! { - static ref $poll: AtomicCell = AtomicCell::new(0); - static ref $drop_f: AtomicCell = AtomicCell::new(0); - static ref $drop_o: AtomicCell = AtomicCell::new(0); - } - - let $name = { - struct Fut(Box); - - impl Future for Fut { - type Output = Out; - - fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll { - $poll.fetch_add(1); - Poll::Ready(Out(Box::new(0))) - } - } - - impl Drop for Fut { - fn drop(&mut self) { - $drop_f.fetch_add(1); - } - } - - struct Out(Box); - - impl Drop for Out { - fn drop(&mut self) { - $drop_o.fetch_add(1); - } - } - - Fut(Box::new(0)) - }; - }; -} - -// Creates a schedule function with event counters. -// -// Usage: `schedule!(s, SCHED, DROP)` -// -// The schedule function `s` does nothing. -// When it gets invoked, `SCHED` is incremented. -// When it gets dropped, `DROP` is incremented. -macro_rules! schedule { - ($name:pat, $sched:ident, $drop:ident) => { - lazy_static! { - static ref $sched: AtomicCell = AtomicCell::new(0); - static ref $drop: AtomicCell = AtomicCell::new(0); - } - - let $name = { - struct Guard(Box); - - impl Drop for Guard { - fn drop(&mut self) { - $drop.fetch_add(1); - } - } - - let guard = Guard(Box::new(0)); - move |task: Task<_>| { - &guard; - task.schedule(); - $sched.fetch_add(1); - } - }; - }; -} - -// Creates a task with event counters. -// -// Usage: `task!(task, handle f, s, DROP)` -// -// A task with future `f` and schedule function `s` is created. -// The `Task` and `JoinHandle` are bound to `task` and `handle`, respectively. -// When the tag inside the task gets dropped, `DROP` is incremented. -macro_rules! task { - ($task:pat, $handle: pat, $future:expr, $schedule:expr, $drop:ident) => { - lazy_static! { - static ref $drop: AtomicCell = AtomicCell::new(0); - } - - let ($task, $handle) = { - struct Tag(Box); - - impl Drop for Tag { - fn drop(&mut self) { - $drop.fetch_add(1); - } - } - - async_task::spawn($future, $schedule, Tag(Box::new(0))) - }; - }; -} - -fn ms(ms: u64) -> Duration { - Duration::from_millis(ms) -} - -#[test] -fn cancel_and_join() { - future!(f, POLL, DROP_F, DROP_O); - schedule!(s, SCHEDULE, DROP_S); - task!(task, handle, f, s, DROP_D); - - assert_eq!(DROP_O.load(), 0); - - task.cancel(); - drop(task); - assert_eq!(DROP_O.load(), 0); - - assert!(block_on(handle).is_none()); - assert_eq!(POLL.load(), 0); - assert_eq!(SCHEDULE.load(), 0); - assert_eq!(DROP_F.load(), 1); - assert_eq!(DROP_S.load(), 1); - assert_eq!(DROP_D.load(), 1); - assert_eq!(DROP_O.load(), 0); -} - -#[test] -fn run_and_join() { - future!(f, POLL, DROP_F, DROP_O); - schedule!(s, SCHEDULE, DROP_S); - task!(task, handle, f, s, DROP_D); - - assert_eq!(DROP_O.load(), 0); - - task.run(); - assert_eq!(DROP_O.load(), 0); - - assert!(block_on(handle).is_some()); - assert_eq!(POLL.load(), 1); - assert_eq!(SCHEDULE.load(), 0); - assert_eq!(DROP_F.load(), 1); - assert_eq!(DROP_S.load(), 1); - assert_eq!(DROP_D.load(), 1); - assert_eq!(DROP_O.load(), 1); -} - -#[test] -fn drop_handle_and_run() { - future!(f, POLL, DROP_F, DROP_O); - schedule!(s, SCHEDULE, DROP_S); - task!(task, handle, f, s, DROP_D); - - assert_eq!(DROP_O.load(), 0); - - drop(handle); - assert_eq!(DROP_O.load(), 0); - - task.run(); - assert_eq!(POLL.load(), 1); - assert_eq!(SCHEDULE.load(), 0); - assert_eq!(DROP_F.load(), 1); - assert_eq!(DROP_S.load(), 1); - assert_eq!(DROP_D.load(), 1); - assert_eq!(DROP_O.load(), 1); -} - -#[test] -fn join_twice() { - future!(f, POLL, DROP_F, DROP_O); - schedule!(s, SCHEDULE, DROP_S); - task!(task, mut handle, f, s, DROP_D); - - assert_eq!(DROP_O.load(), 0); - - task.run(); - assert_eq!(DROP_O.load(), 0); - - assert!(block_on(&mut handle).is_some()); - assert_eq!(POLL.load(), 1); - assert_eq!(SCHEDULE.load(), 0); - assert_eq!(DROP_F.load(), 1); - assert_eq!(DROP_S.load(), 0); - assert_eq!(DROP_D.load(), 0); - assert_eq!(DROP_O.load(), 1); - - assert!(block_on(&mut handle).is_none()); - assert_eq!(POLL.load(), 1); - assert_eq!(SCHEDULE.load(), 0); - assert_eq!(DROP_F.load(), 1); - assert_eq!(DROP_S.load(), 0); - assert_eq!(DROP_D.load(), 0); - assert_eq!(DROP_O.load(), 1); - - drop(handle); - assert_eq!(DROP_S.load(), 1); - assert_eq!(DROP_D.load(), 1); -} - -#[test] -fn join_and_cancel() { - future!(f, POLL, DROP_F, DROP_O); - schedule!(s, SCHEDULE, DROP_S); - task!(task, handle, f, s, DROP_D); - - crossbeam::scope(|scope| { - scope.spawn(|_| { - thread::sleep(ms(100)); - - task.cancel(); - drop(task); - - thread::sleep(ms(200)); - assert_eq!(POLL.load(), 0); - assert_eq!(SCHEDULE.load(), 0); - assert_eq!(DROP_F.load(), 1); - assert_eq!(DROP_O.load(), 0); - assert_eq!(DROP_S.load(), 1); - assert_eq!(DROP_D.load(), 1); - }); - - assert!(block_on(handle).is_none()); - assert_eq!(POLL.load(), 0); - assert_eq!(SCHEDULE.load(), 0); - - thread::sleep(ms(100)); - assert_eq!(DROP_F.load(), 1); - assert_eq!(DROP_O.load(), 0); - assert_eq!(DROP_S.load(), 1); - assert_eq!(DROP_D.load(), 1); - }) - .unwrap(); -} - -#[test] -fn join_and_run() { - future!(f, POLL, DROP_F, DROP_O); - schedule!(s, SCHEDULE, DROP_S); - task!(task, handle, f, s, DROP_D); - - crossbeam::scope(|scope| { - scope.spawn(|_| { - thread::sleep(ms(200)); - - task.run(); - assert_eq!(POLL.load(), 1); - assert_eq!(SCHEDULE.load(), 0); - assert_eq!(DROP_F.load(), 1); - - thread::sleep(ms(100)); - assert_eq!(DROP_S.load(), 1); - assert_eq!(DROP_D.load(), 1); - }); - - assert!(block_on(handle).is_some()); - assert_eq!(POLL.load(), 1); - assert_eq!(SCHEDULE.load(), 0); - assert_eq!(DROP_F.load(), 1); - assert_eq!(DROP_O.load(), 1); - - thread::sleep(ms(100)); - assert_eq!(DROP_S.load(), 1); - assert_eq!(DROP_D.load(), 1); - }) - .unwrap(); -} - -#[test] -fn try_join_and_run_and_join() { - future!(f, POLL, DROP_F, DROP_O); - schedule!(s, SCHEDULE, DROP_S); - task!(task, mut handle, f, s, DROP_D); - - crossbeam::scope(|scope| { - scope.spawn(|_| { - thread::sleep(ms(200)); - - task.run(); - assert_eq!(POLL.load(), 1); - assert_eq!(SCHEDULE.load(), 0); - assert_eq!(DROP_F.load(), 1); - - thread::sleep(ms(100)); - assert_eq!(DROP_S.load(), 1); - assert_eq!(DROP_D.load(), 1); - }); - - block_on(future::select(&mut handle, future::ready(()))); - assert_eq!(POLL.load(), 0); - assert_eq!(SCHEDULE.load(), 0); - assert_eq!(DROP_F.load(), 0); - assert_eq!(DROP_S.load(), 0); - assert_eq!(DROP_D.load(), 0); - assert_eq!(DROP_O.load(), 0); - - assert!(block_on(handle).is_some()); - assert_eq!(POLL.load(), 1); - assert_eq!(SCHEDULE.load(), 0); - assert_eq!(DROP_F.load(), 1); - assert_eq!(DROP_O.load(), 1); - - thread::sleep(ms(100)); - assert_eq!(DROP_S.load(), 1); - assert_eq!(DROP_D.load(), 1); - }) - .unwrap(); -} - -#[test] -fn try_join_and_cancel_and_run() { - future!(f, POLL, DROP_F, DROP_O); - schedule!(s, SCHEDULE, DROP_S); - task!(task, mut handle, f, s, DROP_D); - - crossbeam::scope(|scope| { - scope.spawn(|_| { - thread::sleep(ms(200)); - - task.run(); - assert_eq!(POLL.load(), 0); - assert_eq!(SCHEDULE.load(), 0); - assert_eq!(DROP_F.load(), 1); - assert_eq!(DROP_S.load(), 1); - assert_eq!(DROP_D.load(), 1); - }); - - block_on(future::select(&mut handle, future::ready(()))); - assert_eq!(POLL.load(), 0); - assert_eq!(SCHEDULE.load(), 0); - assert_eq!(DROP_F.load(), 0); - assert_eq!(DROP_S.load(), 0); - assert_eq!(DROP_D.load(), 0); - assert_eq!(DROP_O.load(), 0); - - handle.cancel(); - assert_eq!(POLL.load(), 0); - assert_eq!(SCHEDULE.load(), 0); - assert_eq!(DROP_F.load(), 0); - assert_eq!(DROP_S.load(), 0); - assert_eq!(DROP_D.load(), 0); - assert_eq!(DROP_O.load(), 0); - - drop(handle); - assert_eq!(POLL.load(), 0); - assert_eq!(SCHEDULE.load(), 0); - assert_eq!(DROP_F.load(), 0); - assert_eq!(DROP_S.load(), 0); - assert_eq!(DROP_D.load(), 0); - assert_eq!(DROP_O.load(), 0); - }) - .unwrap(); -} - -#[test] -fn try_join_and_run_and_cancel() { - future!(f, POLL, DROP_F, DROP_O); - schedule!(s, SCHEDULE, DROP_S); - task!(task, mut handle, f, s, DROP_D); - - crossbeam::scope(|scope| { - scope.spawn(|_| { - thread::sleep(ms(200)); - - task.run(); - assert_eq!(POLL.load(), 1); - assert_eq!(SCHEDULE.load(), 0); - assert_eq!(DROP_F.load(), 1); - assert_eq!(DROP_S.load(), 0); - assert_eq!(DROP_D.load(), 0); - }); - - block_on(future::select(&mut handle, future::ready(()))); - assert_eq!(POLL.load(), 0); - assert_eq!(SCHEDULE.load(), 0); - assert_eq!(DROP_F.load(), 0); - assert_eq!(DROP_S.load(), 0); - assert_eq!(DROP_D.load(), 0); - assert_eq!(DROP_O.load(), 0); - - thread::sleep(ms(400)); - - handle.cancel(); - assert_eq!(POLL.load(), 1); - assert_eq!(SCHEDULE.load(), 0); - assert_eq!(DROP_F.load(), 1); - assert_eq!(DROP_S.load(), 0); - assert_eq!(DROP_D.load(), 0); - assert_eq!(DROP_O.load(), 0); - - drop(handle); - assert_eq!(POLL.load(), 1); - assert_eq!(SCHEDULE.load(), 0); - assert_eq!(DROP_F.load(), 1); - assert_eq!(DROP_S.load(), 1); - assert_eq!(DROP_D.load(), 1); - assert_eq!(DROP_O.load(), 1); - }) - .unwrap(); -} - -#[test] -fn await_output() { - struct Fut(Cell>); - - impl Fut { - fn new(t: T) -> Fut { - Fut(Cell::new(Some(t))) - } - } - - impl Future for Fut { - type Output = T; - - fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll { - Poll::Ready(self.0.take().unwrap()) - } - } - - for i in 0..10 { - let (task, handle) = async_task::spawn(Fut::new(i), drop, Box::new(0)); - task.run(); - assert_eq!(block_on(handle), Some(i)); - } - - for i in 0..10 { - let (task, handle) = async_task::spawn(Fut::new(vec![7; i]), drop, Box::new(0)); - task.run(); - assert_eq!(block_on(handle), Some(vec![7; i])); - } - - let (task, handle) = async_task::spawn(Fut::new("foo".to_string()), drop, Box::new(0)); - task.run(); - assert_eq!(block_on(handle), Some("foo".to_string())); -} diff --git a/async-task/tests/panic.rs b/async-task/tests/panic.rs deleted file mode 100644 index 68058a22..00000000 --- a/async-task/tests/panic.rs +++ /dev/null @@ -1,288 +0,0 @@ -#![feature(async_await)] - -use std::future::Future; -use std::panic::catch_unwind; -use std::pin::Pin; -use std::task::{Context, Poll}; -use std::thread; -use std::time::Duration; - -use async_task::Task; -use crossbeam::atomic::AtomicCell; -use futures::executor::block_on; -use futures::future; -use lazy_static::lazy_static; - -// Creates a future with event counters. -// -// Usage: `future!(f, POLL, DROP)` -// -// The future `f` sleeps for 200 ms and then panics. -// When it gets polled, `POLL` is incremented. -// When it gets dropped, `DROP` is incremented. -macro_rules! future { - ($name:pat, $poll:ident, $drop:ident) => { - lazy_static! { - static ref $poll: AtomicCell = AtomicCell::new(0); - static ref $drop: AtomicCell = AtomicCell::new(0); - } - - let $name = { - struct Fut(Box); - - impl Future for Fut { - type Output = (); - - fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll { - $poll.fetch_add(1); - thread::sleep(ms(200)); - panic!() - } - } - - impl Drop for Fut { - fn drop(&mut self) { - $drop.fetch_add(1); - } - } - - Fut(Box::new(0)) - }; - }; -} - -// Creates a schedule function with event counters. -// -// Usage: `schedule!(s, SCHED, DROP)` -// -// The schedule function `s` does nothing. -// When it gets invoked, `SCHED` is incremented. -// When it gets dropped, `DROP` is incremented. -macro_rules! schedule { - ($name:pat, $sched:ident, $drop:ident) => { - lazy_static! { - static ref $sched: AtomicCell = AtomicCell::new(0); - static ref $drop: AtomicCell = AtomicCell::new(0); - } - - let $name = { - struct Guard(Box); - - impl Drop for Guard { - fn drop(&mut self) { - $drop.fetch_add(1); - } - } - - let guard = Guard(Box::new(0)); - move |_task: Task<_>| { - &guard; - $sched.fetch_add(1); - } - }; - }; -} - -// Creates a task with event counters. -// -// Usage: `task!(task, handle f, s, DROP)` -// -// A task with future `f` and schedule function `s` is created. -// The `Task` and `JoinHandle` are bound to `task` and `handle`, respectively. -// When the tag inside the task gets dropped, `DROP` is incremented. -macro_rules! task { - ($task:pat, $handle: pat, $future:expr, $schedule:expr, $drop:ident) => { - lazy_static! { - static ref $drop: AtomicCell = AtomicCell::new(0); - } - - let ($task, $handle) = { - struct Tag(Box); - - impl Drop for Tag { - fn drop(&mut self) { - $drop.fetch_add(1); - } - } - - async_task::spawn($future, $schedule, Tag(Box::new(0))) - }; - }; -} - -fn ms(ms: u64) -> Duration { - Duration::from_millis(ms) -} - -#[test] -fn cancel_during_run() { - future!(f, POLL, DROP_F); - schedule!(s, SCHEDULE, DROP_S); - task!(task, handle, f, s, DROP_D); - - crossbeam::scope(|scope| { - scope.spawn(|_| { - assert!(catch_unwind(|| task.run()).is_err()); - assert_eq!(POLL.load(), 1); - assert_eq!(SCHEDULE.load(), 0); - assert_eq!(DROP_F.load(), 1); - assert_eq!(DROP_S.load(), 1); - assert_eq!(DROP_D.load(), 1); - }); - - thread::sleep(ms(100)); - - handle.cancel(); - assert_eq!(POLL.load(), 1); - assert_eq!(SCHEDULE.load(), 0); - assert_eq!(DROP_F.load(), 0); - assert_eq!(DROP_S.load(), 0); - assert_eq!(DROP_D.load(), 0); - - drop(handle); - assert_eq!(POLL.load(), 1); - assert_eq!(SCHEDULE.load(), 0); - assert_eq!(DROP_F.load(), 0); - assert_eq!(DROP_S.load(), 0); - assert_eq!(DROP_D.load(), 0); - }) - .unwrap(); -} - -#[test] -fn run_and_join() { - future!(f, POLL, DROP_F); - schedule!(s, SCHEDULE, DROP_S); - task!(task, handle, f, s, DROP_D); - - assert!(catch_unwind(|| task.run()).is_err()); - assert_eq!(POLL.load(), 1); - assert_eq!(SCHEDULE.load(), 0); - assert_eq!(DROP_F.load(), 1); - assert_eq!(DROP_S.load(), 0); - assert_eq!(DROP_D.load(), 0); - - assert!(block_on(handle).is_none()); - assert_eq!(POLL.load(), 1); - assert_eq!(SCHEDULE.load(), 0); - assert_eq!(DROP_F.load(), 1); - assert_eq!(DROP_S.load(), 1); - assert_eq!(DROP_D.load(), 1); -} - -#[test] -fn try_join_and_run_and_join() { - future!(f, POLL, DROP_F); - schedule!(s, SCHEDULE, DROP_S); - task!(task, mut handle, f, s, DROP_D); - - block_on(future::select(&mut handle, future::ready(()))); - assert_eq!(POLL.load(), 0); - assert_eq!(SCHEDULE.load(), 0); - assert_eq!(DROP_F.load(), 0); - assert_eq!(DROP_S.load(), 0); - assert_eq!(DROP_D.load(), 0); - - assert!(catch_unwind(|| task.run()).is_err()); - assert_eq!(POLL.load(), 1); - assert_eq!(SCHEDULE.load(), 0); - assert_eq!(DROP_F.load(), 1); - assert_eq!(DROP_S.load(), 0); - assert_eq!(DROP_D.load(), 0); - - assert!(block_on(handle).is_none()); - assert_eq!(POLL.load(), 1); - assert_eq!(SCHEDULE.load(), 0); - assert_eq!(DROP_F.load(), 1); - assert_eq!(DROP_S.load(), 1); - assert_eq!(DROP_D.load(), 1); -} - -#[test] -fn join_during_run() { - future!(f, POLL, DROP_F); - schedule!(s, SCHEDULE, DROP_S); - task!(task, handle, f, s, DROP_D); - - crossbeam::scope(|scope| { - scope.spawn(|_| { - assert!(catch_unwind(|| task.run()).is_err()); - assert_eq!(POLL.load(), 1); - assert_eq!(SCHEDULE.load(), 0); - assert_eq!(DROP_F.load(), 1); - - thread::sleep(ms(100)); - assert_eq!(DROP_S.load(), 1); - assert_eq!(DROP_D.load(), 1); - }); - - thread::sleep(ms(100)); - - assert!(block_on(handle).is_none()); - assert_eq!(POLL.load(), 1); - assert_eq!(SCHEDULE.load(), 0); - assert_eq!(DROP_F.load(), 1); - - thread::sleep(ms(100)); - assert_eq!(DROP_S.load(), 1); - assert_eq!(DROP_D.load(), 1); - }) - .unwrap(); -} - -#[test] -fn try_join_during_run() { - future!(f, POLL, DROP_F); - schedule!(s, SCHEDULE, DROP_S); - task!(task, mut handle, f, s, DROP_D); - - crossbeam::scope(|scope| { - scope.spawn(|_| { - assert!(catch_unwind(|| task.run()).is_err()); - assert_eq!(POLL.load(), 1); - assert_eq!(SCHEDULE.load(), 0); - assert_eq!(DROP_F.load(), 1); - assert_eq!(DROP_S.load(), 1); - assert_eq!(DROP_D.load(), 1); - }); - - thread::sleep(ms(100)); - - block_on(future::select(&mut handle, future::ready(()))); - assert_eq!(POLL.load(), 1); - assert_eq!(SCHEDULE.load(), 0); - assert_eq!(DROP_F.load(), 0); - assert_eq!(DROP_S.load(), 0); - assert_eq!(DROP_D.load(), 0); - drop(handle); - }) - .unwrap(); -} - -#[test] -fn drop_handle_during_run() { - future!(f, POLL, DROP_F); - schedule!(s, SCHEDULE, DROP_S); - task!(task, handle, f, s, DROP_D); - - crossbeam::scope(|scope| { - scope.spawn(|_| { - assert!(catch_unwind(|| task.run()).is_err()); - assert_eq!(POLL.load(), 1); - assert_eq!(SCHEDULE.load(), 0); - assert_eq!(DROP_F.load(), 1); - assert_eq!(DROP_S.load(), 1); - assert_eq!(DROP_D.load(), 1); - }); - - thread::sleep(ms(100)); - - drop(handle); - assert_eq!(POLL.load(), 1); - assert_eq!(SCHEDULE.load(), 0); - assert_eq!(DROP_F.load(), 0); - assert_eq!(DROP_S.load(), 0); - assert_eq!(DROP_D.load(), 0); - }) - .unwrap(); -} diff --git a/async-task/tests/ready.rs b/async-task/tests/ready.rs deleted file mode 100644 index ecca328b..00000000 --- a/async-task/tests/ready.rs +++ /dev/null @@ -1,265 +0,0 @@ -#![feature(async_await)] - -use std::future::Future; -use std::pin::Pin; -use std::task::{Context, Poll}; -use std::thread; -use std::time::Duration; - -use async_task::Task; -use crossbeam::atomic::AtomicCell; -use futures::executor::block_on; -use futures::future; -use lazy_static::lazy_static; - -// Creates a future with event counters. -// -// Usage: `future!(f, POLL, DROP_F, DROP_O)` -// -// The future `f` sleeps for 200 ms and outputs `Poll::Ready`. -// When it gets polled, `POLL` is incremented. -// When it gets dropped, `DROP_F` is incremented. -// When the output gets dropped, `DROP_O` is incremented. -macro_rules! future { - ($name:pat, $poll:ident, $drop_f:ident, $drop_o:ident) => { - lazy_static! { - static ref $poll: AtomicCell = AtomicCell::new(0); - static ref $drop_f: AtomicCell = AtomicCell::new(0); - static ref $drop_o: AtomicCell = AtomicCell::new(0); - } - - let $name = { - struct Fut(Box); - - impl Future for Fut { - type Output = Out; - - fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll { - $poll.fetch_add(1); - thread::sleep(ms(200)); - Poll::Ready(Out(Box::new(0))) - } - } - - impl Drop for Fut { - fn drop(&mut self) { - $drop_f.fetch_add(1); - } - } - - struct Out(Box); - - impl Drop for Out { - fn drop(&mut self) { - $drop_o.fetch_add(1); - } - } - - Fut(Box::new(0)) - }; - }; -} - -// Creates a schedule function with event counters. -// -// Usage: `schedule!(s, SCHED, DROP)` -// -// The schedule function `s` does nothing. -// When it gets invoked, `SCHED` is incremented. -// When it gets dropped, `DROP` is incremented. -macro_rules! schedule { - ($name:pat, $sched:ident, $drop:ident) => { - lazy_static! { - static ref $sched: AtomicCell = AtomicCell::new(0); - static ref $drop: AtomicCell = AtomicCell::new(0); - } - - let $name = { - struct Guard(Box); - - impl Drop for Guard { - fn drop(&mut self) { - $drop.fetch_add(1); - } - } - - let guard = Guard(Box::new(0)); - move |_task: Task<_>| { - &guard; - $sched.fetch_add(1); - } - }; - }; -} - -// Creates a task with event counters. -// -// Usage: `task!(task, handle f, s, DROP)` -// -// A task with future `f` and schedule function `s` is created. -// The `Task` and `JoinHandle` are bound to `task` and `handle`, respectively. -// When the tag inside the task gets dropped, `DROP` is incremented. -macro_rules! task { - ($task:pat, $handle: pat, $future:expr, $schedule:expr, $drop:ident) => { - lazy_static! { - static ref $drop: AtomicCell = AtomicCell::new(0); - } - - let ($task, $handle) = { - struct Tag(Box); - - impl Drop for Tag { - fn drop(&mut self) { - $drop.fetch_add(1); - } - } - - async_task::spawn($future, $schedule, Tag(Box::new(0))) - }; - }; -} - -fn ms(ms: u64) -> Duration { - Duration::from_millis(ms) -} - -#[test] -fn cancel_during_run() { - future!(f, POLL, DROP_F, DROP_O); - schedule!(s, SCHEDULE, DROP_S); - task!(task, handle, f, s, DROP_D); - - crossbeam::scope(|scope| { - scope.spawn(|_| { - task.run(); - assert_eq!(POLL.load(), 1); - assert_eq!(SCHEDULE.load(), 0); - assert_eq!(DROP_F.load(), 1); - assert_eq!(DROP_S.load(), 0); - assert_eq!(DROP_D.load(), 0); - assert_eq!(DROP_O.load(), 1); - }); - - thread::sleep(ms(100)); - - handle.cancel(); - assert_eq!(POLL.load(), 1); - assert_eq!(SCHEDULE.load(), 0); - assert_eq!(DROP_F.load(), 0); - assert_eq!(DROP_S.load(), 0); - assert_eq!(DROP_D.load(), 0); - assert_eq!(DROP_O.load(), 0); - - thread::sleep(ms(200)); - - assert_eq!(POLL.load(), 1); - assert_eq!(SCHEDULE.load(), 0); - assert_eq!(DROP_F.load(), 1); - assert_eq!(DROP_S.load(), 0); - assert_eq!(DROP_D.load(), 0); - assert_eq!(DROP_O.load(), 1); - - drop(handle); - assert_eq!(POLL.load(), 1); - assert_eq!(SCHEDULE.load(), 0); - assert_eq!(DROP_F.load(), 1); - assert_eq!(DROP_S.load(), 1); - assert_eq!(DROP_D.load(), 1); - assert_eq!(DROP_O.load(), 1); - }) - .unwrap(); -} - -#[test] -fn join_during_run() { - future!(f, POLL, DROP_F, DROP_O); - schedule!(s, SCHEDULE, DROP_S); - task!(task, handle, f, s, DROP_D); - - crossbeam::scope(|scope| { - scope.spawn(|_| { - task.run(); - assert_eq!(POLL.load(), 1); - assert_eq!(SCHEDULE.load(), 0); - assert_eq!(DROP_F.load(), 1); - - thread::sleep(ms(100)); - assert_eq!(DROP_S.load(), 1); - assert_eq!(DROP_D.load(), 1); - }); - - thread::sleep(ms(100)); - - assert!(block_on(handle).is_some()); - assert_eq!(POLL.load(), 1); - assert_eq!(SCHEDULE.load(), 0); - assert_eq!(DROP_F.load(), 1); - assert_eq!(DROP_O.load(), 1); - - thread::sleep(ms(100)); - assert_eq!(DROP_S.load(), 1); - assert_eq!(DROP_D.load(), 1); - }) - .unwrap(); -} - -#[test] -fn try_join_during_run() { - future!(f, POLL, DROP_F, DROP_O); - schedule!(s, SCHEDULE, DROP_S); - task!(task, mut handle, f, s, DROP_D); - - crossbeam::scope(|scope| { - scope.spawn(|_| { - task.run(); - assert_eq!(POLL.load(), 1); - assert_eq!(SCHEDULE.load(), 0); - assert_eq!(DROP_F.load(), 1); - assert_eq!(DROP_S.load(), 1); - assert_eq!(DROP_D.load(), 1); - assert_eq!(DROP_O.load(), 1); - }); - - thread::sleep(ms(100)); - - block_on(future::select(&mut handle, future::ready(()))); - assert_eq!(POLL.load(), 1); - assert_eq!(SCHEDULE.load(), 0); - assert_eq!(DROP_F.load(), 0); - assert_eq!(DROP_S.load(), 0); - assert_eq!(DROP_D.load(), 0); - assert_eq!(DROP_O.load(), 0); - drop(handle); - }) - .unwrap(); -} - -#[test] -fn drop_handle_during_run() { - future!(f, POLL, DROP_F, DROP_O); - schedule!(s, SCHEDULE, DROP_S); - task!(task, handle, f, s, DROP_D); - - crossbeam::scope(|scope| { - scope.spawn(|_| { - task.run(); - assert_eq!(POLL.load(), 1); - assert_eq!(SCHEDULE.load(), 0); - assert_eq!(DROP_F.load(), 1); - assert_eq!(DROP_S.load(), 1); - assert_eq!(DROP_D.load(), 1); - assert_eq!(DROP_O.load(), 1); - }); - - thread::sleep(ms(100)); - - drop(handle); - assert_eq!(POLL.load(), 1); - assert_eq!(SCHEDULE.load(), 0); - assert_eq!(DROP_F.load(), 0); - assert_eq!(DROP_S.load(), 0); - assert_eq!(DROP_D.load(), 0); - assert_eq!(DROP_O.load(), 0); - }) - .unwrap(); -} diff --git a/async-task/tests/waker_panic.rs b/async-task/tests/waker_panic.rs deleted file mode 100644 index a683f26f..00000000 --- a/async-task/tests/waker_panic.rs +++ /dev/null @@ -1,357 +0,0 @@ -#![feature(async_await)] - -use std::cell::Cell; -use std::future::Future; -use std::panic::catch_unwind; -use std::pin::Pin; -use std::task::Waker; -use std::task::{Context, Poll}; -use std::thread; -use std::time::Duration; - -use async_task::Task; -use crossbeam::atomic::AtomicCell; -use crossbeam::channel; -use lazy_static::lazy_static; - -// Creates a future with event counters. -// -// Usage: `future!(f, waker, POLL, DROP)` -// -// The future `f` always sleeps for 200 ms, and panics the second time it is polled. -// When it gets polled, `POLL` is incremented. -// When it gets dropped, `DROP` is incremented. -// -// Every time the future is run, it stores the waker into a global variable. -// This waker can be extracted using the `waker` function. -macro_rules! future { - ($name:pat, $waker:pat, $poll:ident, $drop:ident) => { - lazy_static! { - static ref $poll: AtomicCell = AtomicCell::new(0); - static ref $drop: AtomicCell = AtomicCell::new(0); - static ref WAKER: AtomicCell> = AtomicCell::new(None); - } - - let ($name, $waker) = { - struct Fut(Cell, Box); - - impl Future for Fut { - type Output = (); - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - WAKER.store(Some(cx.waker().clone())); - $poll.fetch_add(1); - thread::sleep(ms(200)); - - if self.0.get() { - panic!() - } else { - self.0.set(true); - Poll::Pending - } - } - } - - impl Drop for Fut { - fn drop(&mut self) { - $drop.fetch_add(1); - } - } - - (Fut(Cell::new(false), Box::new(0)), || { - WAKER.swap(None).unwrap() - }) - }; - }; -} - -// Creates a schedule function with event counters. -// -// Usage: `schedule!(s, chan, SCHED, DROP)` -// -// The schedule function `s` pushes the task into `chan`. -// When it gets invoked, `SCHED` is incremented. -// When it gets dropped, `DROP` is incremented. -// -// Receiver `chan` extracts the task when it is scheduled. -macro_rules! schedule { - ($name:pat, $chan:pat, $sched:ident, $drop:ident) => { - lazy_static! { - static ref $sched: AtomicCell = AtomicCell::new(0); - static ref $drop: AtomicCell = AtomicCell::new(0); - } - - let ($name, $chan) = { - let (s, r) = channel::unbounded(); - - struct Guard(Box); - - impl Drop for Guard { - fn drop(&mut self) { - $drop.fetch_add(1); - } - } - - let guard = Guard(Box::new(0)); - let sched = move |task: Task<_>| { - &guard; - $sched.fetch_add(1); - s.send(task).unwrap(); - }; - - (sched, r) - }; - }; -} - -// Creates a task with event counters. -// -// Usage: `task!(task, handle f, s, DROP)` -// -// A task with future `f` and schedule function `s` is created. -// The `Task` and `JoinHandle` are bound to `task` and `handle`, respectively. -// When the tag inside the task gets dropped, `DROP` is incremented. -macro_rules! task { - ($task:pat, $handle: pat, $future:expr, $schedule:expr, $drop:ident) => { - lazy_static! { - static ref $drop: AtomicCell = AtomicCell::new(0); - } - - let ($task, $handle) = { - struct Tag(Box); - - impl Drop for Tag { - fn drop(&mut self) { - $drop.fetch_add(1); - } - } - - async_task::spawn($future, $schedule, Tag(Box::new(0))) - }; - }; -} - -fn ms(ms: u64) -> Duration { - Duration::from_millis(ms) -} - -#[test] -fn wake_during_run() { - future!(f, waker, POLL, DROP_F); - schedule!(s, chan, SCHEDULE, DROP_S); - task!(task, handle, f, s, DROP_D); - - task.run(); - let w = waker(); - w.wake_by_ref(); - let task = chan.recv().unwrap(); - - crossbeam::scope(|scope| { - scope.spawn(|_| { - assert!(catch_unwind(|| task.run()).is_err()); - drop(waker()); - assert_eq!(POLL.load(), 2); - assert_eq!(SCHEDULE.load(), 1); - assert_eq!(DROP_F.load(), 1); - assert_eq!(DROP_S.load(), 1); - assert_eq!(DROP_D.load(), 1); - assert_eq!(chan.len(), 0); - }); - - thread::sleep(ms(100)); - - w.wake(); - drop(handle); - assert_eq!(POLL.load(), 2); - assert_eq!(SCHEDULE.load(), 1); - assert_eq!(DROP_F.load(), 0); - assert_eq!(DROP_S.load(), 0); - assert_eq!(DROP_D.load(), 0); - assert_eq!(chan.len(), 0); - - thread::sleep(ms(200)); - - assert_eq!(POLL.load(), 2); - assert_eq!(SCHEDULE.load(), 1); - assert_eq!(DROP_F.load(), 1); - assert_eq!(DROP_S.load(), 1); - assert_eq!(DROP_D.load(), 1); - assert_eq!(chan.len(), 0); - }) - .unwrap(); -} - -#[test] -fn cancel_during_run() { - future!(f, waker, POLL, DROP_F); - schedule!(s, chan, SCHEDULE, DROP_S); - task!(task, handle, f, s, DROP_D); - - task.run(); - let w = waker(); - w.wake(); - let task = chan.recv().unwrap(); - - crossbeam::scope(|scope| { - scope.spawn(|_| { - assert!(catch_unwind(|| task.run()).is_err()); - drop(waker()); - assert_eq!(POLL.load(), 2); - assert_eq!(SCHEDULE.load(), 1); - assert_eq!(DROP_F.load(), 1); - assert_eq!(DROP_S.load(), 1); - assert_eq!(DROP_D.load(), 1); - assert_eq!(chan.len(), 0); - }); - - thread::sleep(ms(100)); - - handle.cancel(); - assert_eq!(POLL.load(), 2); - assert_eq!(SCHEDULE.load(), 1); - assert_eq!(DROP_F.load(), 0); - assert_eq!(DROP_S.load(), 0); - assert_eq!(DROP_D.load(), 0); - assert_eq!(chan.len(), 0); - - drop(handle); - assert_eq!(POLL.load(), 2); - assert_eq!(SCHEDULE.load(), 1); - assert_eq!(DROP_F.load(), 0); - assert_eq!(DROP_S.load(), 0); - assert_eq!(DROP_D.load(), 0); - assert_eq!(chan.len(), 0); - - thread::sleep(ms(200)); - - assert_eq!(POLL.load(), 2); - assert_eq!(SCHEDULE.load(), 1); - assert_eq!(DROP_F.load(), 1); - assert_eq!(DROP_S.load(), 1); - assert_eq!(DROP_D.load(), 1); - assert_eq!(chan.len(), 0); - }) - .unwrap(); -} - -#[test] -fn wake_and_cancel_during_run() { - future!(f, waker, POLL, DROP_F); - schedule!(s, chan, SCHEDULE, DROP_S); - task!(task, handle, f, s, DROP_D); - - task.run(); - let w = waker(); - w.wake_by_ref(); - let task = chan.recv().unwrap(); - - crossbeam::scope(|scope| { - scope.spawn(|_| { - assert!(catch_unwind(|| task.run()).is_err()); - drop(waker()); - assert_eq!(POLL.load(), 2); - assert_eq!(SCHEDULE.load(), 1); - assert_eq!(DROP_F.load(), 1); - assert_eq!(DROP_S.load(), 1); - assert_eq!(DROP_D.load(), 1); - assert_eq!(chan.len(), 0); - }); - - thread::sleep(ms(100)); - - w.wake(); - assert_eq!(POLL.load(), 2); - assert_eq!(SCHEDULE.load(), 1); - assert_eq!(DROP_F.load(), 0); - assert_eq!(DROP_S.load(), 0); - assert_eq!(DROP_D.load(), 0); - assert_eq!(chan.len(), 0); - - handle.cancel(); - assert_eq!(POLL.load(), 2); - assert_eq!(SCHEDULE.load(), 1); - assert_eq!(DROP_F.load(), 0); - assert_eq!(DROP_S.load(), 0); - assert_eq!(DROP_D.load(), 0); - assert_eq!(chan.len(), 0); - - drop(handle); - assert_eq!(POLL.load(), 2); - assert_eq!(SCHEDULE.load(), 1); - assert_eq!(DROP_F.load(), 0); - assert_eq!(DROP_S.load(), 0); - assert_eq!(DROP_D.load(), 0); - assert_eq!(chan.len(), 0); - - thread::sleep(ms(200)); - - assert_eq!(POLL.load(), 2); - assert_eq!(SCHEDULE.load(), 1); - assert_eq!(DROP_F.load(), 1); - assert_eq!(DROP_S.load(), 1); - assert_eq!(DROP_D.load(), 1); - assert_eq!(chan.len(), 0); - }) - .unwrap(); -} - -#[test] -fn cancel_and_wake_during_run() { - future!(f, waker, POLL, DROP_F); - schedule!(s, chan, SCHEDULE, DROP_S); - task!(task, handle, f, s, DROP_D); - - task.run(); - let w = waker(); - w.wake_by_ref(); - let task = chan.recv().unwrap(); - - crossbeam::scope(|scope| { - scope.spawn(|_| { - assert!(catch_unwind(|| task.run()).is_err()); - drop(waker()); - assert_eq!(POLL.load(), 2); - assert_eq!(SCHEDULE.load(), 1); - assert_eq!(DROP_F.load(), 1); - assert_eq!(DROP_S.load(), 1); - assert_eq!(DROP_D.load(), 1); - assert_eq!(chan.len(), 0); - }); - - thread::sleep(ms(100)); - - handle.cancel(); - assert_eq!(POLL.load(), 2); - assert_eq!(SCHEDULE.load(), 1); - assert_eq!(DROP_F.load(), 0); - assert_eq!(DROP_S.load(), 0); - assert_eq!(DROP_D.load(), 0); - assert_eq!(chan.len(), 0); - - drop(handle); - assert_eq!(POLL.load(), 2); - assert_eq!(SCHEDULE.load(), 1); - assert_eq!(DROP_F.load(), 0); - assert_eq!(DROP_S.load(), 0); - assert_eq!(DROP_D.load(), 0); - assert_eq!(chan.len(), 0); - - w.wake(); - assert_eq!(POLL.load(), 2); - assert_eq!(SCHEDULE.load(), 1); - assert_eq!(DROP_F.load(), 0); - assert_eq!(DROP_S.load(), 0); - assert_eq!(DROP_D.load(), 0); - assert_eq!(chan.len(), 0); - - thread::sleep(ms(200)); - - assert_eq!(POLL.load(), 2); - assert_eq!(SCHEDULE.load(), 1); - assert_eq!(DROP_F.load(), 1); - assert_eq!(DROP_S.load(), 1); - assert_eq!(DROP_D.load(), 1); - assert_eq!(chan.len(), 0); - }) - .unwrap(); -} diff --git a/async-task/tests/waker_pending.rs b/async-task/tests/waker_pending.rs deleted file mode 100644 index 547ff7a3..00000000 --- a/async-task/tests/waker_pending.rs +++ /dev/null @@ -1,348 +0,0 @@ -#![feature(async_await)] - -use std::future::Future; -use std::pin::Pin; -use std::task::Waker; -use std::task::{Context, Poll}; -use std::thread; -use std::time::Duration; - -use async_task::Task; -use crossbeam::atomic::AtomicCell; -use crossbeam::channel; -use lazy_static::lazy_static; - -// Creates a future with event counters. -// -// Usage: `future!(f, waker, POLL, DROP)` -// -// The future `f` always sleeps for 200 ms and returns `Poll::Pending`. -// When it gets polled, `POLL` is incremented. -// When it gets dropped, `DROP` is incremented. -// -// Every time the future is run, it stores the waker into a global variable. -// This waker can be extracted using the `waker` function. -macro_rules! future { - ($name:pat, $waker:pat, $poll:ident, $drop:ident) => { - lazy_static! { - static ref $poll: AtomicCell = AtomicCell::new(0); - static ref $drop: AtomicCell = AtomicCell::new(0); - static ref WAKER: AtomicCell> = AtomicCell::new(None); - } - - let ($name, $waker) = { - struct Fut(Box); - - impl Future for Fut { - type Output = (); - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - WAKER.store(Some(cx.waker().clone())); - $poll.fetch_add(1); - thread::sleep(ms(200)); - Poll::Pending - } - } - - impl Drop for Fut { - fn drop(&mut self) { - $drop.fetch_add(1); - } - } - - (Fut(Box::new(0)), || WAKER.swap(None).unwrap()) - }; - }; -} - -// Creates a schedule function with event counters. -// -// Usage: `schedule!(s, chan, SCHED, DROP)` -// -// The schedule function `s` pushes the task into `chan`. -// When it gets invoked, `SCHED` is incremented. -// When it gets dropped, `DROP` is incremented. -// -// Receiver `chan` extracts the task when it is scheduled. -macro_rules! schedule { - ($name:pat, $chan:pat, $sched:ident, $drop:ident) => { - lazy_static! { - static ref $sched: AtomicCell = AtomicCell::new(0); - static ref $drop: AtomicCell = AtomicCell::new(0); - } - - let ($name, $chan) = { - let (s, r) = channel::unbounded(); - - struct Guard(Box); - - impl Drop for Guard { - fn drop(&mut self) { - $drop.fetch_add(1); - } - } - - let guard = Guard(Box::new(0)); - let sched = move |task: Task<_>| { - &guard; - $sched.fetch_add(1); - s.send(task).unwrap(); - }; - - (sched, r) - }; - }; -} - -// Creates a task with event counters. -// -// Usage: `task!(task, handle f, s, DROP)` -// -// A task with future `f` and schedule function `s` is created. -// The `Task` and `JoinHandle` are bound to `task` and `handle`, respectively. -// When the tag inside the task gets dropped, `DROP` is incremented. -macro_rules! task { - ($task:pat, $handle: pat, $future:expr, $schedule:expr, $drop:ident) => { - lazy_static! { - static ref $drop: AtomicCell = AtomicCell::new(0); - } - - let ($task, $handle) = { - struct Tag(Box); - - impl Drop for Tag { - fn drop(&mut self) { - $drop.fetch_add(1); - } - } - - async_task::spawn($future, $schedule, Tag(Box::new(0))) - }; - }; -} - -fn ms(ms: u64) -> Duration { - Duration::from_millis(ms) -} - -#[test] -fn wake_during_run() { - future!(f, waker, POLL, DROP_F); - schedule!(s, chan, SCHEDULE, DROP_S); - task!(task, _handle, f, s, DROP_D); - - task.run(); - let w = waker(); - w.wake_by_ref(); - let task = chan.recv().unwrap(); - - crossbeam::scope(|scope| { - scope.spawn(|_| { - task.run(); - assert_eq!(POLL.load(), 2); - assert_eq!(SCHEDULE.load(), 2); - assert_eq!(DROP_F.load(), 0); - assert_eq!(DROP_S.load(), 0); - assert_eq!(DROP_D.load(), 0); - assert_eq!(chan.len(), 1); - }); - - thread::sleep(ms(100)); - - w.wake_by_ref(); - assert_eq!(POLL.load(), 2); - assert_eq!(SCHEDULE.load(), 1); - assert_eq!(DROP_F.load(), 0); - assert_eq!(DROP_S.load(), 0); - assert_eq!(DROP_D.load(), 0); - assert_eq!(chan.len(), 0); - - thread::sleep(ms(200)); - - assert_eq!(POLL.load(), 2); - assert_eq!(SCHEDULE.load(), 2); - assert_eq!(DROP_F.load(), 0); - assert_eq!(DROP_S.load(), 0); - assert_eq!(DROP_D.load(), 0); - assert_eq!(chan.len(), 1); - }) - .unwrap(); - - chan.recv().unwrap(); - drop(waker()); -} - -#[test] -fn cancel_during_run() { - future!(f, waker, POLL, DROP_F); - schedule!(s, chan, SCHEDULE, DROP_S); - task!(task, handle, f, s, DROP_D); - - task.run(); - let w = waker(); - w.wake(); - let task = chan.recv().unwrap(); - - crossbeam::scope(|scope| { - scope.spawn(|_| { - task.run(); - drop(waker()); - assert_eq!(POLL.load(), 2); - assert_eq!(SCHEDULE.load(), 1); - assert_eq!(DROP_F.load(), 1); - assert_eq!(DROP_S.load(), 1); - assert_eq!(DROP_D.load(), 1); - assert_eq!(chan.len(), 0); - }); - - thread::sleep(ms(100)); - - handle.cancel(); - assert_eq!(POLL.load(), 2); - assert_eq!(SCHEDULE.load(), 1); - assert_eq!(DROP_F.load(), 0); - assert_eq!(DROP_S.load(), 0); - assert_eq!(DROP_D.load(), 0); - assert_eq!(chan.len(), 0); - - drop(handle); - assert_eq!(POLL.load(), 2); - assert_eq!(SCHEDULE.load(), 1); - assert_eq!(DROP_F.load(), 0); - assert_eq!(DROP_S.load(), 0); - assert_eq!(DROP_D.load(), 0); - assert_eq!(chan.len(), 0); - - thread::sleep(ms(200)); - - assert_eq!(POLL.load(), 2); - assert_eq!(SCHEDULE.load(), 1); - assert_eq!(DROP_F.load(), 1); - assert_eq!(DROP_S.load(), 1); - assert_eq!(DROP_D.load(), 1); - assert_eq!(chan.len(), 0); - }) - .unwrap(); -} - -#[test] -fn wake_and_cancel_during_run() { - future!(f, waker, POLL, DROP_F); - schedule!(s, chan, SCHEDULE, DROP_S); - task!(task, handle, f, s, DROP_D); - - task.run(); - let w = waker(); - w.wake_by_ref(); - let task = chan.recv().unwrap(); - - crossbeam::scope(|scope| { - scope.spawn(|_| { - task.run(); - drop(waker()); - assert_eq!(POLL.load(), 2); - assert_eq!(SCHEDULE.load(), 1); - assert_eq!(DROP_F.load(), 1); - assert_eq!(DROP_S.load(), 1); - assert_eq!(DROP_D.load(), 1); - assert_eq!(chan.len(), 0); - }); - - thread::sleep(ms(100)); - - w.wake(); - assert_eq!(POLL.load(), 2); - assert_eq!(SCHEDULE.load(), 1); - assert_eq!(DROP_F.load(), 0); - assert_eq!(DROP_S.load(), 0); - assert_eq!(DROP_D.load(), 0); - assert_eq!(chan.len(), 0); - - handle.cancel(); - assert_eq!(POLL.load(), 2); - assert_eq!(SCHEDULE.load(), 1); - assert_eq!(DROP_F.load(), 0); - assert_eq!(DROP_S.load(), 0); - assert_eq!(DROP_D.load(), 0); - assert_eq!(chan.len(), 0); - - drop(handle); - assert_eq!(POLL.load(), 2); - assert_eq!(SCHEDULE.load(), 1); - assert_eq!(DROP_F.load(), 0); - assert_eq!(DROP_S.load(), 0); - assert_eq!(DROP_D.load(), 0); - assert_eq!(chan.len(), 0); - - thread::sleep(ms(200)); - - assert_eq!(POLL.load(), 2); - assert_eq!(SCHEDULE.load(), 1); - assert_eq!(DROP_F.load(), 1); - assert_eq!(DROP_S.load(), 1); - assert_eq!(DROP_D.load(), 1); - assert_eq!(chan.len(), 0); - }) - .unwrap(); -} - -#[test] -fn cancel_and_wake_during_run() { - future!(f, waker, POLL, DROP_F); - schedule!(s, chan, SCHEDULE, DROP_S); - task!(task, handle, f, s, DROP_D); - - task.run(); - let w = waker(); - w.wake_by_ref(); - let task = chan.recv().unwrap(); - - crossbeam::scope(|scope| { - scope.spawn(|_| { - task.run(); - drop(waker()); - assert_eq!(POLL.load(), 2); - assert_eq!(SCHEDULE.load(), 1); - assert_eq!(DROP_F.load(), 1); - assert_eq!(DROP_S.load(), 1); - assert_eq!(DROP_D.load(), 1); - assert_eq!(chan.len(), 0); - }); - - thread::sleep(ms(100)); - - handle.cancel(); - assert_eq!(POLL.load(), 2); - assert_eq!(SCHEDULE.load(), 1); - assert_eq!(DROP_F.load(), 0); - assert_eq!(DROP_S.load(), 0); - assert_eq!(DROP_D.load(), 0); - assert_eq!(chan.len(), 0); - - drop(handle); - assert_eq!(POLL.load(), 2); - assert_eq!(SCHEDULE.load(), 1); - assert_eq!(DROP_F.load(), 0); - assert_eq!(DROP_S.load(), 0); - assert_eq!(DROP_D.load(), 0); - assert_eq!(chan.len(), 0); - - w.wake(); - assert_eq!(POLL.load(), 2); - assert_eq!(SCHEDULE.load(), 1); - assert_eq!(DROP_F.load(), 0); - assert_eq!(DROP_S.load(), 0); - assert_eq!(DROP_D.load(), 0); - assert_eq!(chan.len(), 0); - - thread::sleep(ms(200)); - - assert_eq!(POLL.load(), 2); - assert_eq!(SCHEDULE.load(), 1); - assert_eq!(DROP_F.load(), 1); - assert_eq!(DROP_S.load(), 1); - assert_eq!(DROP_D.load(), 1); - assert_eq!(chan.len(), 0); - }) - .unwrap(); -} diff --git a/async-task/tests/waker_ready.rs b/async-task/tests/waker_ready.rs deleted file mode 100644 index e64cc554..00000000 --- a/async-task/tests/waker_ready.rs +++ /dev/null @@ -1,328 +0,0 @@ -#![feature(async_await)] - -use std::cell::Cell; -use std::future::Future; -use std::pin::Pin; -use std::task::Waker; -use std::task::{Context, Poll}; -use std::thread; -use std::time::Duration; - -use async_task::Task; -use crossbeam::atomic::AtomicCell; -use crossbeam::channel; -use lazy_static::lazy_static; - -// Creates a future with event counters. -// -// Usage: `future!(f, waker, POLL, DROP)` -// -// The future `f` always sleeps for 200 ms, and returns `Poll::Ready` the second time it is polled. -// When it gets polled, `POLL` is incremented. -// When it gets dropped, `DROP` is incremented. -// -// Every time the future is run, it stores the waker into a global variable. -// This waker can be extracted using the `waker` function. -macro_rules! future { - ($name:pat, $waker:pat, $poll:ident, $drop:ident) => { - lazy_static! { - static ref $poll: AtomicCell = AtomicCell::new(0); - static ref $drop: AtomicCell = AtomicCell::new(0); - static ref WAKER: AtomicCell> = AtomicCell::new(None); - } - - let ($name, $waker) = { - struct Fut(Cell, Box); - - impl Future for Fut { - type Output = Box; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - WAKER.store(Some(cx.waker().clone())); - $poll.fetch_add(1); - thread::sleep(ms(200)); - - if self.0.get() { - Poll::Ready(Box::new(0)) - } else { - self.0.set(true); - Poll::Pending - } - } - } - - impl Drop for Fut { - fn drop(&mut self) { - $drop.fetch_add(1); - } - } - - (Fut(Cell::new(false), Box::new(0)), || { - WAKER.swap(None).unwrap() - }) - }; - }; -} - -// Creates a schedule function with event counters. -// -// Usage: `schedule!(s, chan, SCHED, DROP)` -// -// The schedule function `s` pushes the task into `chan`. -// When it gets invoked, `SCHED` is incremented. -// When it gets dropped, `DROP` is incremented. -// -// Receiver `chan` extracts the task when it is scheduled. -macro_rules! schedule { - ($name:pat, $chan:pat, $sched:ident, $drop:ident) => { - lazy_static! { - static ref $sched: AtomicCell = AtomicCell::new(0); - static ref $drop: AtomicCell = AtomicCell::new(0); - } - - let ($name, $chan) = { - let (s, r) = channel::unbounded(); - - struct Guard(Box); - - impl Drop for Guard { - fn drop(&mut self) { - $drop.fetch_add(1); - } - } - - let guard = Guard(Box::new(0)); - let sched = move |task: Task<_>| { - &guard; - $sched.fetch_add(1); - s.send(task).unwrap(); - }; - - (sched, r) - }; - }; -} - -// Creates a task with event counters. -// -// Usage: `task!(task, handle f, s, DROP)` -// -// A task with future `f` and schedule function `s` is created. -// The `Task` and `JoinHandle` are bound to `task` and `handle`, respectively. -// When the tag inside the task gets dropped, `DROP` is incremented. -macro_rules! task { - ($task:pat, $handle: pat, $future:expr, $schedule:expr, $drop:ident) => { - lazy_static! { - static ref $drop: AtomicCell = AtomicCell::new(0); - } - - let ($task, $handle) = { - struct Tag(Box); - - impl Drop for Tag { - fn drop(&mut self) { - $drop.fetch_add(1); - } - } - - async_task::spawn($future, $schedule, Tag(Box::new(0))) - }; - }; -} - -fn ms(ms: u64) -> Duration { - Duration::from_millis(ms) -} - -#[test] -fn wake() { - future!(f, waker, POLL, DROP_F); - schedule!(s, chan, SCHEDULE, DROP_S); - task!(mut task, _, f, s, DROP_D); - - assert!(chan.is_empty()); - - task.run(); - assert_eq!(POLL.load(), 1); - assert_eq!(SCHEDULE.load(), 0); - assert_eq!(DROP_F.load(), 0); - assert_eq!(DROP_S.load(), 0); - assert_eq!(DROP_D.load(), 0); - assert_eq!(chan.len(), 0); - - waker().wake(); - task = chan.recv().unwrap(); - assert_eq!(POLL.load(), 1); - assert_eq!(SCHEDULE.load(), 1); - assert_eq!(DROP_F.load(), 0); - assert_eq!(DROP_S.load(), 0); - assert_eq!(DROP_D.load(), 0); - assert_eq!(chan.len(), 0); - - task.run(); - assert_eq!(POLL.load(), 2); - assert_eq!(SCHEDULE.load(), 1); - assert_eq!(DROP_F.load(), 1); - assert_eq!(DROP_S.load(), 0); - assert_eq!(DROP_D.load(), 0); - assert_eq!(chan.len(), 0); - - waker().wake(); - assert_eq!(POLL.load(), 2); - assert_eq!(SCHEDULE.load(), 1); - assert_eq!(DROP_F.load(), 1); - assert_eq!(DROP_S.load(), 1); - assert_eq!(DROP_D.load(), 1); - assert_eq!(chan.len(), 0); -} - -#[test] -fn wake_by_ref() { - future!(f, waker, POLL, DROP_F); - schedule!(s, chan, SCHEDULE, DROP_S); - task!(mut task, _, f, s, DROP_D); - - assert!(chan.is_empty()); - - task.run(); - assert_eq!(POLL.load(), 1); - assert_eq!(SCHEDULE.load(), 0); - assert_eq!(DROP_F.load(), 0); - assert_eq!(DROP_S.load(), 0); - assert_eq!(DROP_D.load(), 0); - assert_eq!(chan.len(), 0); - - waker().wake_by_ref(); - task = chan.recv().unwrap(); - assert_eq!(POLL.load(), 1); - assert_eq!(SCHEDULE.load(), 1); - assert_eq!(DROP_F.load(), 0); - assert_eq!(DROP_S.load(), 0); - assert_eq!(DROP_D.load(), 0); - assert_eq!(chan.len(), 0); - - task.run(); - assert_eq!(POLL.load(), 2); - assert_eq!(SCHEDULE.load(), 1); - assert_eq!(DROP_F.load(), 1); - assert_eq!(DROP_S.load(), 0); - assert_eq!(DROP_D.load(), 0); - assert_eq!(chan.len(), 0); - - waker().wake_by_ref(); - assert_eq!(POLL.load(), 2); - assert_eq!(SCHEDULE.load(), 1); - assert_eq!(DROP_F.load(), 1); - assert_eq!(DROP_S.load(), 1); - assert_eq!(DROP_D.load(), 1); - assert_eq!(chan.len(), 0); -} - -#[test] -fn clone() { - future!(f, waker, POLL, DROP_F); - schedule!(s, chan, SCHEDULE, DROP_S); - task!(mut task, _, f, s, DROP_D); - - task.run(); - assert_eq!(POLL.load(), 1); - assert_eq!(SCHEDULE.load(), 0); - assert_eq!(DROP_F.load(), 0); - assert_eq!(DROP_S.load(), 0); - assert_eq!(DROP_D.load(), 0); - assert_eq!(chan.len(), 0); - - let w2 = waker().clone(); - let w3 = w2.clone(); - let w4 = w3.clone(); - w4.wake(); - - task = chan.recv().unwrap(); - task.run(); - assert_eq!(POLL.load(), 2); - assert_eq!(SCHEDULE.load(), 1); - assert_eq!(DROP_F.load(), 1); - assert_eq!(DROP_S.load(), 0); - assert_eq!(DROP_D.load(), 0); - assert_eq!(chan.len(), 0); - - w3.wake(); - assert_eq!(POLL.load(), 2); - assert_eq!(SCHEDULE.load(), 1); - assert_eq!(DROP_F.load(), 1); - assert_eq!(DROP_S.load(), 0); - assert_eq!(DROP_D.load(), 0); - assert_eq!(chan.len(), 0); - - drop(w2); - drop(waker()); - assert_eq!(DROP_S.load(), 1); - assert_eq!(DROP_D.load(), 1); -} - -#[test] -fn wake_cancelled() { - future!(f, waker, POLL, DROP_F); - schedule!(s, chan, SCHEDULE, DROP_S); - task!(task, _, f, s, DROP_D); - - task.run(); - assert_eq!(POLL.load(), 1); - assert_eq!(SCHEDULE.load(), 0); - assert_eq!(DROP_F.load(), 0); - assert_eq!(DROP_S.load(), 0); - assert_eq!(DROP_D.load(), 0); - assert_eq!(chan.len(), 0); - - let w = waker(); - - w.wake_by_ref(); - chan.recv().unwrap().cancel(); - assert_eq!(POLL.load(), 1); - assert_eq!(SCHEDULE.load(), 1); - assert_eq!(DROP_F.load(), 1); - assert_eq!(DROP_S.load(), 0); - assert_eq!(DROP_D.load(), 0); - assert_eq!(chan.len(), 0); - - w.wake(); - assert_eq!(POLL.load(), 1); - assert_eq!(SCHEDULE.load(), 1); - assert_eq!(DROP_F.load(), 1); - assert_eq!(DROP_S.load(), 1); - assert_eq!(DROP_D.load(), 1); - assert_eq!(chan.len(), 0); -} - -#[test] -fn wake_completed() { - future!(f, waker, POLL, DROP_F); - schedule!(s, chan, SCHEDULE, DROP_S); - task!(task, _, f, s, DROP_D); - - task.run(); - let w = waker(); - assert_eq!(POLL.load(), 1); - assert_eq!(SCHEDULE.load(), 0); - assert_eq!(DROP_F.load(), 0); - assert_eq!(DROP_S.load(), 0); - assert_eq!(DROP_D.load(), 0); - assert_eq!(chan.len(), 0); - - w.wake(); - chan.recv().unwrap().run(); - assert_eq!(POLL.load(), 2); - assert_eq!(SCHEDULE.load(), 1); - assert_eq!(DROP_F.load(), 1); - assert_eq!(DROP_S.load(), 0); - assert_eq!(DROP_D.load(), 0); - assert_eq!(chan.len(), 0); - - waker().wake(); - assert_eq!(POLL.load(), 2); - assert_eq!(SCHEDULE.load(), 1); - assert_eq!(DROP_F.load(), 1); - assert_eq!(DROP_S.load(), 1); - assert_eq!(DROP_D.load(), 1); - assert_eq!(chan.len(), 0); -}