zephyr/
work.rs

1//! Zephyr Work Queues
2//!
3//! # Zephyr Work Queues and Work
4//!
5//! Zephyr has a mechanism called a
6//! [Workqueue](https://docs.zephyrproject.org/latest/kernel/services/threads/workqueue.html).
7//!
8//! Each workqueue is backed by a single Zephyr thread, and has its own stack.  The work queue
9//! consists of a FIFO queue of work items that will be run consecutively on that thread.  The
10//! underlying types are `k_work_q` for the work queue itself, and `k_work` for the worker.
11//!
12//! In addition to the simple schedulable work, Zephyr also has two additional types of work:
13//! `k_work_delayable` which can be scheduled to run in the future, and `k_work_poll`, described as
14//! triggered work in the docs.  This can be scheduled to run when various items within Zephyr
15//! become available.  This triggered work also has a timeout.  In this sense the triggered work is
16//! a superset of the other types of work.  Both delayable and triggered work are implemented by
17//! having the `k_work` embedded in their structure, and Zephyr schedules the work when the given
18//! reason happens.
19//!
20//! At this time, only the basic work queue type is supported.
21//!
22//! Zephyr's work queues can be used in different ways:
23//!
24//! - Work can be scheduled as needed.  For example, an IRQ handler can queue a work item to process
25//!   data it has received from a device.
26//! - Work can be scheduled periodically.
27//!
28//! As most C use of Zephyr statically allocates things like work, these are typically rescheduled
29//! when the work is complete.  The work queue scheduling functions are designed, and intended, for
30//! a given work item to be able to reschedule itself, and such usage is common.
31//!
32//! ## Ownership
33//!
34//! The remaining challenge with implementing `k_work` for Rust is that of ownership.  The model
35//! taken here is that the work items are held in a `Box` that is effectively owned by the work
36//! itself.  When the work item is scheduled to Zephyr, ownership of that box is effectively handed
37//! off to C, and then when the work item is called, the Box re-constructed.  This repeats until the
38//! work is no longer needed, at which point the work will be dropped.
39//!
40//! There are two common ways the lifecycle of work can be managed in an embedded system:
41//!
42//! - A set of `Future`'s are allocated once at the start, and these never return a value.  Work
43//!   Futures inside of this (which correspond to `.await` in async code) can have lives and return
44//!   values, but the main loops will not return values, or be dropped.  Embedded Futures will
45//!   typically not be boxed.
46//!
47//! One consequence of the ownership being passed through to C code is that if the work cancellation
48//! mechanism is used on a work queue, the work items themselves will be leaked.
49//!
50//! These work items are also `Pin`, to ensure that the work actions are not moved.
51//!
52//! ## The work queues themselves
53//!
54//! Workqueues themselves are built using [`WorkQueueBuilder`].  This needs a statically defined
55//! stack.  Typical usage will be along the lines of:
56//! ```rust
57//! kobj_define! {
58//!   WORKER_STACK: ThreadStack<2048>;
59//! }
60//! // ...
61//! let main_worker = Box::new(
62//!     WorkQueueBuilder::new()
63//!         .set_priority(2).
64//!         .set_name(c"mainloop")
65//!         .set_no_yield(true)
66//!         .start(MAIN_LOOP_STACK.init_once(()).unwrap())
67//!     );
68//!
69//! let _ = zephyr::kio::spawn(
70//!     mainloop(), // Async or function returning Future.
71//!     &main_worker,
72//!     c"w:mainloop",
73//! );
74//!
75//! ...
76//!
77//! // Leak the Box so that the worker is never freed.
78//! let _ = Box::leak(main_worker);
79//! ```
80//!
81//! It is important that WorkQueues never be dropped.  It has a Drop implementation that invokes
82//! panic.  Zephyr provides no mechanism to stop work queue threads, so dropping would result in
83//! undefined behavior.
84//!
85//! # Current Status
86//!
87//! Although Zephyr has 3 types of work queues, the `k_work_poll` is sufficient to implement all of
88//! the behavior, and this implementation only implements this type.  Non Future work could be built
89//! around the other work types.
90//!
91//! As such, this means that manually constructed work is still built using `Future`.  The `_async`
92//! primitives throughout this crate can be used just as readily by hand-written Futures as by async
93//! code.  Notable, the use of [`Signal`] will likely be common, along with possible timeouts.
94//!
95//! [`sys::sync::Semaphore`]: crate::sys::sync::Semaphore
96//! [`sync::channel`]: crate::sync::channel
97//! [`sync::Mutex`]: crate::sync::Mutex
98//! [`join`]: futures::JoinHandle::join
99//! [`join_async`]: futures::JoinHandle::join_async
100
101extern crate alloc;
102
103use core::{
104    cell::UnsafeCell,
105    ffi::{c_int, c_uint, CStr},
106    mem,
107    pin::Pin,
108};
109
110use zephyr_sys::{
111    k_poll_signal, k_poll_signal_check, k_poll_signal_init, k_poll_signal_raise,
112    k_poll_signal_reset, k_work, k_work_init, k_work_q, k_work_queue_config, k_work_queue_init,
113    k_work_queue_start, k_work_submit, k_work_submit_to_queue,
114};
115
116use crate::{
117    error::to_result_void,
118    object::Fixed,
119    simpletls::SimpleTls,
120    sync::{Arc, Mutex},
121    sys::thread::ThreadStack,
122};
123
124/// A builder for work queues themselves.
125///
126/// A work queue is a Zephyr thread that instead of directly running a piece of code, manages a work
127/// queue.  Various types of `Work` can be submitted to these queues, along with various types of
128/// triggering conditions.
129pub struct WorkQueueBuilder {
130    /// The "config" value passed in.
131    config: k_work_queue_config,
132    /// Priority for the work queue thread.
133    priority: c_int,
134}
135
136impl WorkQueueBuilder {
137    /// Construct a new WorkQueueBuilder with default values.
138    pub fn new() -> Self {
139        Self {
140            config: Default::default(),
141            priority: 0,
142        }
143    }
144
145    /// Set the name for the WorkQueue thread.
146    ///
147    /// This name shows up in debuggers and some analysis tools.
148    pub fn set_name(&mut self, name: &'static CStr) -> &mut Self {
149        self.config.name = name.as_ptr();
150        self
151    }
152
153    /// Set the "no yield" flag for the created worker.
154    ///
155    /// If this is not set, the work queue will call `k_yield` between each enqueued work item.  For
156    /// non-preemptible threads, this will allow other threads to run.  For preemptible threads,
157    /// this will allow other threads at the same priority to run.
158    ///
159    /// This method has a negative in the name, which goes against typical conventions.  This is
160    /// done to match the field in the Zephyr config.
161    pub fn set_no_yield(&mut self, value: bool) -> &mut Self {
162        self.config.no_yield = value;
163        self
164    }
165
166    /// Set the "essential" flag for the created worker.
167    ///
168    /// This sets the essential flag on the running thread.  The system considers the termination of
169    /// an essential thread to be a fatal error.
170    pub fn set_essential(&mut self, value: bool) -> &mut Self {
171        self.config.essential = value;
172        self
173    }
174
175    /// Set the priority for the worker thread.
176    ///
177    /// See the Zephyr docs for the meaning of priority.
178    pub fn set_priority(&mut self, value: c_int) -> &mut Self {
179        self.priority = value;
180        self
181    }
182
183    /// Start the given work queue thread.
184    ///
185    /// TODO: Implement a 'start' that works from a static work queue.
186    pub fn start(&self, stack: ThreadStack) -> WorkQueue {
187        let item: Fixed<k_work_q> = Fixed::new(unsafe { mem::zeroed() });
188        unsafe {
189            // SAFETY: Initialize zeroed memory.
190            k_work_queue_init(item.get());
191
192            // SAFETY: This associates the workqueue with the thread ID that runs it.  The thread is
193            // a pointer into this work item, which will not move, because of the Fixed.
194            let this = &mut *item.get();
195            WORK_QUEUES
196                .lock()
197                .unwrap()
198                .insert(&this.thread, WorkQueueRef(item.get()));
199
200            // SAFETY: Start work queue thread.  The main issue here is that the work queue cannot
201            // be deallocated once the thread has started.  We enforce this by making Drop panic.
202            k_work_queue_start(
203                item.get(),
204                stack.base,
205                stack.size,
206                self.priority,
207                &self.config,
208            );
209        }
210
211        WorkQueue { item }
212    }
213}
214
215/// A running work queue thread.
216///
217/// # Panic
218///
219/// Allowing a work queue to drop will result in a panic.  There are two ways to handle this,
220/// depending on whether the WorkQueue is in a Box, or an Arc:
221/// ```
222/// // Leak a work queue in an Arc.
223/// let wq = Arc::new(WorkQueueBuilder::new().start(...));
224/// // If the Arc is used after this:
225/// let _ = Arc::into_raw(wq.clone());
226/// // If the Arc is no longer needed:
227/// let _ = Arc::into_raw(wq);
228///
229/// // Leak a work queue in a Box.
230/// let wq = Box::new(WorkQueueBuilder::new().start(...));
231/// let _ = Box::leak(wq);
232/// ```
233pub struct WorkQueue {
234    #[allow(dead_code)]
235    item: Fixed<k_work_q>,
236}
237
238/// Work queues can be referenced from multiple threads, and thus are Send and Sync.
239unsafe impl Send for WorkQueue {}
240unsafe impl Sync for WorkQueue {}
241
242impl Drop for WorkQueue {
243    fn drop(&mut self) {
244        panic!("WorkQueues must not be dropped");
245    }
246}
247
248/// A simple mapping to get the current work_queue from the currently running thread.
249///
250/// This assumes that Zephyr's works queues have a 1:1 mapping between the work queue and the
251/// thread.
252///
253/// # Safety
254///
255/// The work queue is protected with a sync Mutex (which uses an underlying Zephyr mutex).  It is,
256/// in general, not a good idea to use a mutex in a work queue, as deadlock can happen.  So it is
257/// important to both never .await while holding the lock, as well as to make sure operations within
258/// it are relatively fast.  In this case, `insert` and `get` on the SimpleTls are reasonably fast.
259/// `insert` is usually done just at startup as well.
260///
261/// This is a little bit messy as we don't have a lazy mechanism, so we have to handle this a bit
262/// manually right now.
263static WORK_QUEUES: Mutex<SimpleTls<WorkQueueRef>> = Mutex::new(SimpleTls::new());
264
265/// For the queue mapping, we need a simple wrapper around the underlying pointer, one that doesn't
266/// implement stop.
267#[derive(Copy, Clone)]
268struct WorkQueueRef(*mut k_work_q);
269
270// SAFETY: The work queue reference is also safe for both Send and Sync per Zephyr semantics.
271unsafe impl Send for WorkQueueRef {}
272unsafe impl Sync for WorkQueueRef {}
273
274/// Retrieve the current work queue, if we are running within one.
275pub fn get_current_workq() -> Option<*mut k_work_q> {
276    WORK_QUEUES.lock().unwrap().get().map(|wq| wq.0)
277}
278
279/// A Rust wrapper for `k_poll_signal`.
280///
281/// A signal in Zephyr is an event mechanism that can be used to trigger actions in event queues to
282/// run.  The work somewhat like a kind of half boolean semaphore.  The signaling is robust in the
283/// direction of the event happening, as in a blocked task will definitely wake when the signal happens. However, the clearing of the signal is racy.  Generally, there are two ways to do this:
284///
285/// - A work action can clear the signal as soon as it wakes up, before it starts processing any
286///   data the signal was meant to indicate.  If the race happens, the processing will handle the
287///   extra data.
288/// - A work action can clear the signal after it does it's processing.  This is useful for things
289///   like periodic timers, where if it is still processing when an additional timer tick comes in,
290///   that timer tick will be ignored.  This is useful for periodic events where it is better to
291///   just skip a tick rather than for them to "stack up" and get behind.
292///
293/// Notably, as long as the `reset` method is only ever called by the worker that is waiting upon
294/// it, there shouldn't ever be a race in the `wait_async` itself.
295///
296/// Signals can pass a `c_int` from the signalling task to the task that is waiting for the signal.
297/// It is not specified in the Zephyr documentation what value will be passed if `raise` is called
298/// multiple times before a task waits upon a signal.  The current implementation will return the
299/// most recent raised `result` value.
300///
301/// For most other use cases, channels or semaphores are likely to be better solutions.
302pub struct Signal {
303    /// The raw Zephyr `k_poll_signal`.
304    pub(crate) item: Fixed<k_poll_signal>,
305}
306
307// SAFETY: Zephyr's API maintains thread safety.
308unsafe impl Send for Signal {}
309unsafe impl Sync for Signal {}
310
311impl Signal {
312    /// Create a new `Signal`.
313    ///
314    /// The Signal will be in the non-signaled state.
315    pub fn new() -> Signal {
316        // SAFETY: The memory is zero initialized, and Fixed ensure that it never changes address.
317        let item: Fixed<k_poll_signal> = Fixed::new(unsafe { mem::zeroed() });
318        unsafe {
319            k_poll_signal_init(item.get());
320        }
321        Signal { item }
322    }
323
324    /// Reset the Signal
325    ///
326    /// This resets the signal state to unsignaled.
327    ///
328    /// Please see the [`Signal`] documentation on how to handle the races that this implies.
329    pub fn reset(&self) {
330        // SAFETY: This is safe with a non-mut reference, as the purpose of the Zephyr API is to
331        // coordinate this information between threads.
332        unsafe {
333            k_poll_signal_reset(self.item.get());
334        }
335    }
336
337    /// Check the status of a signal.
338    ///
339    /// This reads the status of the signal.  If the state is "signalled", this will return
340    /// `Some(result)` where the `result` is the result value given to [`raise`].
341    ///
342    /// [`raise`]: Self::raise
343    pub fn check(&self) -> Option<c_int> {
344        let mut signaled: c_uint = 0;
345        let mut result: c_int = 0;
346        unsafe {
347            // SAFETY: Zephyr's signal API coordinates access across threads.
348            k_poll_signal_check(self.item.get(), &mut signaled, &mut result);
349        }
350
351        if signaled != 0 {
352            Some(result)
353        } else {
354            None
355        }
356    }
357
358    /// Signal a signal object.
359    ///
360    /// This will signal to any worker that is waiting on this object that the event has happened.
361    /// The `result` will be returned from the worker's `wait` call.
362    ///
363    /// As per the Zephyr docs, this could return an EAGAIN error if the polling thread is in the
364    /// process of expiring.  The implication is that the signal will not be raised in this case.
365    /// ...
366    pub fn raise(&self, result: c_int) -> crate::Result<()> {
367        to_result_void(unsafe { k_poll_signal_raise(self.item.get(), result) })
368    }
369}
370
371impl Default for Signal {
372    fn default() -> Self {
373        Signal::new()
374    }
375}
376
377/// Possible returns from work queue submission.
378#[derive(Debug, Clone, Copy)]
379pub enum SubmitResult {
380    /// This work was already in a queue.
381    AlreadySubmitted,
382    /// The work has been added to the specified queue.
383    Enqueued,
384    /// The queue was called from the worker itself, and has been queued to the queue that was
385    /// running it.
386    WasRunning,
387}
388
389impl SubmitResult {
390    /// Does this result indicate that the work was enqueued?
391    pub fn enqueued(self) -> bool {
392        matches!(self, Self::Enqueued | Self::WasRunning)
393    }
394
395    /// Convert an int result from a work submit function.
396    fn to_result(value: c_int) -> crate::Result<Self> {
397        crate::error::to_result(value).map(|code| match code {
398            0 => Self::AlreadySubmitted,
399            1 => Self::Enqueued,
400            2 => Self::WasRunning,
401            _ => panic!("Unexpected result {} from Zephyr work submission", code),
402        })
403    }
404}
405
406/// A simple action that just does something with its data.
407///
408/// This is similar to a Future, except there is no concept of it completing.  It manages its
409/// associated data however it wishes, and is responsible for re-queuing as needed.
410///
411/// Note, specifically, that the Act does not take a mutable reference.  This is because the Work
412/// below uses an Arc, so this data can be shared.
413pub trait SimpleAction {
414    /// Perform the action.
415    fn act(self: Pin<&Self>);
416}
417
418/// A basic Zephyr work item.
419///
420/// Holds a `k_work`, along with the data associated with that work.  When the work is queued, the
421/// `act` method will be called on the provided `SimpleAction`.
422pub struct Work<T> {
423    work: UnsafeCell<k_work>,
424    action: T,
425}
426
427/// SAFETY: Work queues can be sent as long as the action itself can be.
428unsafe impl<F> Send for Work<F>
429where
430    F: SimpleAction,
431    F: Send,
432{
433}
434
435/// SAFETY: Work queues are Sync when the action is.
436unsafe impl<F> Sync for Work<F>
437where
438    F: SimpleAction,
439    F: Sync,
440{
441}
442
443impl<T: SimpleAction + Send> Work<T> {
444    /// Construct a new Work from the given action.
445    ///
446    /// Note that the data will be moved into the pinned Work.  The data is internal, and only
447    /// accessible to the work thread (the `act` method).  If shared data is needed, normal
448    /// inter-thread sharing mechanisms are needed.
449    ///
450    /// TODO: Can we come up with a way to allow sharing on the same worker using Rc instead of Arc?
451    pub fn new(action: T) -> Pin<Arc<Self>> {
452        let this = Arc::pin(Self {
453            // SAFETY: will be initialized below, after this is pinned.
454            work: unsafe { mem::zeroed() },
455            action,
456        });
457
458        // SAFETY: Initializes above zero-initialized struct.
459        unsafe {
460            k_work_init(this.work.get(), Some(Self::handler));
461        }
462
463        this
464    }
465
466    /// Submit this work to the system work queue.
467    ///
468    /// This can return several possible `Ok` results.  See the docs on [`SubmitResult`] for an
469    /// explanation of them.
470    pub fn submit(this: Pin<Arc<Self>>) -> crate::Result<SubmitResult> {
471        // We "leak" the arc, so that when the handler runs, it can be safely turned back into an
472        // Arc, and the drop on the arc will then run.
473        let work = this.work.get();
474
475        // SAFETY: C the code does not perform moves on the data, and the `from_raw` below puts it
476        // back into a Pin when it reconstructs the Arc.
477        let this = unsafe { Pin::into_inner_unchecked(this) };
478        let _ = Arc::into_raw(this);
479
480        // SAFETY: The Pin ensures this will not move.  Our implementation of drop ensures that the
481        // work item is no longer queued when the data is dropped.
482        SubmitResult::to_result(unsafe { k_work_submit(work) })
483    }
484
485    /// Submit this work to a specified work queue.
486    ///
487    /// TODO: Change when we have better wrappers for work queues.
488    pub fn submit_to_queue(this: Pin<Arc<Self>>, queue: &WorkQueue) -> crate::Result<SubmitResult> {
489        let work = this.work.get();
490
491        // "leak" the arc to give to C.  We'll reconstruct it in the handler.
492        // SAFETY: The C code does not perform moves on the data, and the `from_raw` below puts it
493        // back into a Pin when it reconstructs the Arc.
494        let this = unsafe { Pin::into_inner_unchecked(this) };
495        let _ = Arc::into_raw(this);
496
497        // SAFETY: The Pin ensures this will not move.  Our implementation of drop ensures that the
498        // work item is no longer queued when the data is dropped.
499        SubmitResult::to_result(unsafe { k_work_submit_to_queue(queue.item.get(), work) })
500    }
501
502    /// Callback, through C, but bound by a specific type.
503    extern "C" fn handler(work: *mut k_work) {
504        // We want to avoid needing a `repr(C)` on our struct, so the `k_work` pointer is not
505        // necessarily at the beginning of the struct.
506        // SAFETY: Converts raw pointer to work back into the box.
507        let this = unsafe { Self::from_raw(work) };
508
509        // Access the action within, still pinned.
510        // SAFETY: It is safe to keep the pin on the interior.
511        let action = unsafe { this.as_ref().map_unchecked(|p| &p.action) };
512
513        action.act();
514    }
515
516    /*
517    /// Consume this Arc, returning the internal pointer.  Needs to have a complementary `from_raw`
518    /// called to avoid leaking the item.
519    fn into_raw(this: Pin<Arc<Self>>) -> *const Self {
520        // SAFETY: This removes the Pin guarantee, but is given as a raw pointer to C, which doesn't
521        // generally use move.
522        let this = unsafe { Pin::into_inner_unchecked(this) };
523        Arc::into_raw(this)
524    }
525    */
526
527    /// Given a pointer to the work_q burried within, recover the Pinned Box containing our data.
528    unsafe fn from_raw(ptr: *const k_work) -> Pin<Arc<Self>> {
529        // SAFETY: This fixes the pointer back to the beginning of Self.  This also assumes the
530        // pointer is valid.
531        let ptr = ptr
532            .cast::<u8>()
533            .sub(mem::offset_of!(Self, work))
534            .cast::<Self>();
535        let this = Arc::from_raw(ptr);
536        Pin::new_unchecked(this)
537    }
538
539    /// Access the inner action.
540    pub fn action(&self) -> &T {
541        &self.action
542    }
543}