zephyr/
kio.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
//! Async IO for Zephyr
//!
//! This implements the basics of using Zephyr's work queues to implement async code on Zephyr.
//!
//! Most of the work happens in [`work`] and in [`futures`]
//!
//! [`work`]: crate::work
//! [`futures`]: crate::work::futures

use core::ffi::CStr;
use core::task::{Context, Poll};
use core::{future::Future, pin::Pin};

use crate::sys::queue::Queue;
use crate::sys::sync::Semaphore;
use crate::time::{NoWait, Timeout};
use crate::work::futures::WakeInfo;
use crate::work::Signal;
use crate::work::{futures::JoinHandle, futures::WorkBuilder, WorkQueue};

pub mod sync;

pub use crate::work::futures::sleep;

/// Run an async future on the given worker thread.
///
/// Arrange to have the given future run on the given worker thread.  The resulting `JoinHandle` has
/// `join` and `join_async` methods that can be used to wait for the given thread.
pub fn spawn<F>(future: F, worker: &WorkQueue, name: &'static CStr) -> JoinHandle<F>
where
    F: Future + Send + 'static,
    F::Output: Send + 'static,
{
    WorkBuilder::new()
        .set_worker(worker)
        .set_name(name)
        .start(future)
}

/// Run an async future on the current worker thread.
///
/// Arrange to have the given future run on the current worker thread.  The resulting `JoinHandle`
/// has `join` and `join_async` methods that can be used to wait for the given thread.
///
/// By constraining the spawn to the current worker, this function is able to remove the Send
/// constraint from the future (and its return type), allowing tasks to share data using
/// lighter-weight mechanimsms, such as `Rc` and `Rc<RefCell<T>>`, or `&'static RefCell<T>`.
///
/// To be able to use tasks running on different workers, sharing must be done with types such as
/// `Arc`, and `Arc<Mutex<T>>`, or `&'static Mutex<T>`.
///
/// It is important, when using RefCell, that a borrow from the cell not be carried across an await
/// boundary, or RefCell's runtime multi-borrow check can cause a panic.
///
/// # Panics
/// If this is called other than from a worker task running on a work thread, it will panic.
pub fn spawn_local<F>(future: F, name: &'static CStr) -> JoinHandle<F>
where
    F: Future + 'static,
    F::Output: Send + 'static,
{
    WorkBuilder::new().set_name(name).start_local(future)
}

/// Yield the current thread, returning it to the work queue to be run after other work on that
/// queue.  (This has to be called `yield_now` in Rust, because `yield` is a keyword.)
pub fn yield_now() -> impl Future<Output = ()> {
    YieldNow { waited: false }
}

struct YieldNow {
    waited: bool,
}

impl Future for YieldNow {
    type Output = ();

    fn poll(
        mut self: Pin<&mut Self>,
        cx: &mut core::task::Context<'_>,
    ) -> core::task::Poll<Self::Output> {
        if self.waited {
            Poll::Ready(())
        } else {
            // Enqueue outselves with no wait and no events.
            let info = unsafe { WakeInfo::from_context(cx) };

            // Unsafely check if the work queue running us is empty.  We only check explicitly
            // specified workers (TODO access the system work queue).  The check is racy, but should
            // always fail indicating that the queue is not empty when it could be.  Checking this
            // avoids re-scheduling the only worker back into the queue.
            // SAFETY: The check is racy, but will fail with us yielding when we didn't need to.
            if let Some(wq) = info.queue {
                let wq = unsafe { wq.as_ref() };
                if wq.pending.head == wq.pending.tail {
                    return Poll::Ready(());
                }
            }

            info.timeout = NoWait.into();
            self.waited = true;

            Poll::Pending
        }
    }
}

/// Extensions on [`Context`] to support scheduling via Zephyr's workqueue system.
///
/// All of these are called from within the context of running work, and indicate what _next_
/// should cause this work to be run again.  If none of these methods are called before the work
/// exits, the work will be scheduled to run after `Forever`, which is not useful.  There may be
/// later support for having a `Waker` that can schedule work from another context.
///
/// Note that the events to wait on, such as Semaphores or channels, if there are multiple threads
/// that can wait for them, might cause this worker to run, but not actually be available.  As such,
/// to maintain the non-blocking requirements of Work, [`Semaphore::take`], and the blocking `send`
/// and `recv` operations on channels should not be used, even after being woken.
///
/// For the timeout [`Forever`] is useful to indicate there is no timeout.  If called with
/// [`NoWait`], the work will be immediately scheduled. In general, it is better to query the
/// underlying object directly rather than have the overhead of being rescheduled.
///
/// # Safety
///
/// The lifetime bounds on the items waited for ensure that these items live at least as long as the
/// work queue.  Practically, this can only be satisfied by using something with 'static' lifetime,
/// or embedding the value in the Future itself.
///
/// With the Zephyr executor, the `Context` is embedded within a `WakeInfo` struct, which this makes
/// use of.  If a different executor were to be used, these calls would result in undefined
/// behavior.
///
/// This could be checked at runtime, but it would have runtime cost.
///
/// [`Forever`]: crate::time::Forever
pub trait ContextExt {
    /// Indicate the work should next be scheduled based on a semaphore being available for "take".
    ///
    /// The work will be scheduled either when the given semaphore becomes available to 'take', or
    /// after the timeout.
    fn add_semaphore<'a>(&'a mut self, sem: &'a Semaphore, timeout: impl Into<Timeout>);

    /// Indicate that the work should be scheduled after receiving the given [`Signal`], or the
    /// timeout occurs.
    fn add_signal<'a>(&'a mut self, signal: &'a Signal, timeout: impl Into<Timeout>);

    /// Indicate that the work should be scheduled when the given [`Queue`] has data available to
    /// recv, or the timeout occurs.
    fn add_queue<'a>(&'a mut self, queue: &'a Queue, timeout: impl Into<Timeout>);

    /// Indicate that the work should just be scheduled after the given timeout.
    ///
    /// Note that this only works if none of the other wake methods are called, as those also set
    /// the timeout.
    fn add_timeout(&mut self, timeout: impl Into<Timeout>);
}

/// Implementation of ContextExt for the Rust [`Context`] type.
impl<'b> ContextExt for Context<'b> {
    fn add_semaphore<'a>(&'a mut self, sem: &'a Semaphore, timeout: impl Into<Timeout>) {
        let info = unsafe { WakeInfo::from_context(self) };
        info.add_semaphore(sem);
        info.timeout = timeout.into();
    }

    fn add_signal<'a>(&'a mut self, signal: &'a Signal, timeout: impl Into<Timeout>) {
        let info = unsafe { WakeInfo::from_context(self) };
        info.add_signal(signal);
        info.timeout = timeout.into();
    }

    fn add_queue<'a>(&'a mut self, queue: &'a Queue, timeout: impl Into<Timeout>) {
        let info = unsafe { WakeInfo::from_context(self) };
        info.add_queue(queue);
        info.timeout = timeout.into();
    }

    fn add_timeout(&mut self, timeout: impl Into<Timeout>) {
        let info = unsafe { WakeInfo::from_context(self) };
        info.timeout = timeout.into();
    }
}