zephyr/
work.rs

1//! Zephyr Work Queues
2//!
3//! # Zephyr Work Queues and Work
4//!
5//! Zephyr has a mechanism called a
6//! [Workqueue](https://docs.zephyrproject.org/latest/kernel/services/threads/workqueue.html).
7//!
8//! Each workqueue is backed by a single Zephyr thread, and has its own stack.  The work queue
9//! consists of a FIFO queue of work items that will be run consecutively on that thread.  The
10//! underlying types are `k_work_q` for the work queue itself, and `k_work` for the worker.
11//!
12//! In addition to the simple schedulable work, Zephyr also has two additional types of work:
13//! `k_work_delayable` which can be scheduled to run in the future, and `k_work_poll`, described as
14//! triggered work in the docs.  This can be scheduled to run when various items within Zephyr
15//! become available.  This triggered work also has a timeout.  In this sense the triggered work is
16//! a superset of the other types of work.  Both delayable and triggered work are implemented by
17//! having the `k_work` embedded in their structure, and Zephyr schedules the work when the given
18//! reason happens.
19//!
20//! At this time, only the basic work queue type is supported.
21//!
22//! Zephyr's work queues can be used in different ways:
23//!
24//! - Work can be scheduled as needed.  For example, an IRQ handler can queue a work item to process
25//!   data it has received from a device.
26//! - Work can be scheduled periodically.
27//!
28//! As most C use of Zephyr statically allocates things like work, these are typically rescheduled
29//! when the work is complete.  The work queue scheduling functions are designed, and intended, for
30//! a given work item to be able to reschedule itself, and such usage is common.
31//!
32//! ## Ownership
33//!
34//! The remaining challenge with implementing `k_work` for Rust is that of ownership.  The model
35//! taken here is that the work items are held in a `Box` that is effectively owned by the work
36//! itself.  When the work item is scheduled to Zephyr, ownership of that box is effectively handed
37//! off to C, and then when the work item is called, the Box re-constructed.  This repeats until the
38//! work is no longer needed, at which point the work will be dropped.
39//!
40//! There are two common ways the lifecycle of work can be managed in an embedded system:
41//!
42//! - A set of `Future`'s are allocated once at the start, and these never return a value.  Work
43//!   Futures inside of this (which correspond to `.await` in async code) can have lives and return
44//!   values, but the main loops will not return values, or be dropped.  Embedded Futures will
45//!   typically not be boxed.
46//!
47//! One consequence of the ownership being passed through to C code is that if the work cancellation
48//! mechanism is used on a work queue, the work items themselves will be leaked.
49//!
50//! These work items are also `Pin`, to ensure that the work actions are not moved.
51//!
52//! ## The work queues themselves
53//!
54//! Workqueues themselves are built using [`WorkQueueBuilder`].  This needs a statically defined
55//! stack.  Typical usage will be along the lines of:
56//! ```rust
57//! kobj_define! {
58//!   WORKER_STACK: ThreadStack<2048>;
59//! }
60//! // ...
61//! let main_worker = Box::new(
62//!     WorkQueueBuilder::new()
63//!         .set_priority(2).
64//!         .set_name(c"mainloop")
65//!         .set_no_yield(true)
66//!         .start(MAIN_LOOP_STACK.init_once(()).unwrap())
67//!     );
68//!
69//! let _ = zephyr::kio::spawn(
70//!     mainloop(), // Async or function returning Future.
71//!     &main_worker,
72//!     c"w:mainloop",
73//! );
74//!
75//! ...
76//!
77//! // Leak the Box so that the worker is never freed.
78//! let _ = Box::leak(main_worker);
79//! ```
80//!
81//! It is important that WorkQueues never be dropped.  It has a Drop implementation that invokes
82//! panic.  Zephyr provides no mechanism to stop work queue threads, so dropping would result in
83//! undefined behavior.
84//!
85//! # Current Status
86//!
87//! Although Zephyr has 3 types of work queues, the `k_work_poll` is sufficient to implement all of
88//! the behavior, and this implementation only implements this type.  Non Future work could be built
89//! around the other work types.
90//!
91//! As such, this means that manually constructed work is still built using `Future`.  The `_async`
92//! primitives throughout this crate can be used just as readily by hand-written Futures as by async
93//! code.  Notable, the use of [`Signal`] will likely be common, along with possible timeouts.
94//!
95//! [`sys::sync::Semaphore`]: crate::sys::sync::Semaphore
96//! [`sync::channel`]: crate::sync::channel
97//! [`sync::Mutex`]: crate::sync::Mutex
98//! [`join`]: futures::JoinHandle::join
99//! [`join_async`]: futures::JoinHandle::join_async
100
101extern crate alloc;
102
103use core::{
104    cell::UnsafeCell,
105    ffi::{c_int, c_uint, CStr},
106    mem,
107    pin::Pin,
108    ptr,
109};
110
111use zephyr_sys::{
112    k_poll_signal, k_poll_signal_check, k_poll_signal_init, k_poll_signal_raise,
113    k_poll_signal_reset, k_work, k_work_init, k_work_q, k_work_queue_config, k_work_queue_init,
114    k_work_queue_start, k_work_submit, k_work_submit_to_queue,
115};
116
117use crate::{
118    error::to_result_void,
119    object::Fixed,
120    simpletls::SimpleTls,
121    sync::{Arc, Mutex},
122    sys::thread::ThreadStack,
123};
124
125/// A builder for work queues themselves.
126///
127/// A work queue is a Zephyr thread that instead of directly running a piece of code, manages a work
128/// queue.  Various types of `Work` can be submitted to these queues, along with various types of
129/// triggering conditions.
130pub struct WorkQueueBuilder {
131    /// The "config" value passed in.
132    config: k_work_queue_config,
133    /// Priority for the work queue thread.
134    priority: c_int,
135}
136
137impl WorkQueueBuilder {
138    /// Construct a new WorkQueueBuilder with default values.
139    pub fn new() -> Self {
140        Self {
141            config: k_work_queue_config {
142                name: ptr::null(),
143                no_yield: false,
144                essential: false,
145            },
146            priority: 0,
147        }
148    }
149
150    /// Set the name for the WorkQueue thread.
151    ///
152    /// This name shows up in debuggers and some analysis tools.
153    pub fn set_name(&mut self, name: &'static CStr) -> &mut Self {
154        self.config.name = name.as_ptr();
155        self
156    }
157
158    /// Set the "no yield" flag for the created worker.
159    ///
160    /// If this is not set, the work queue will call `k_yield` between each enqueued work item.  For
161    /// non-preemptible threads, this will allow other threads to run.  For preemptible threads,
162    /// this will allow other threads at the same priority to run.
163    ///
164    /// This method has a negative in the name, which goes against typical conventions.  This is
165    /// done to match the field in the Zephyr config.
166    pub fn set_no_yield(&mut self, value: bool) -> &mut Self {
167        self.config.no_yield = value;
168        self
169    }
170
171    /// Set the "essential" flag for the created worker.
172    ///
173    /// This sets the essential flag on the running thread.  The system considers the termination of
174    /// an essential thread to be a fatal error.
175    pub fn set_essential(&mut self, value: bool) -> &mut Self {
176        self.config.essential = value;
177        self
178    }
179
180    /// Set the priority for the worker thread.
181    ///
182    /// See the Zephyr docs for the meaning of priority.
183    pub fn set_priority(&mut self, value: c_int) -> &mut Self {
184        self.priority = value;
185        self
186    }
187
188    /// Start the given work queue thread.
189    ///
190    /// TODO: Implement a 'start' that works from a static work queue.
191    pub fn start(&self, stack: ThreadStack) -> WorkQueue {
192        let item: Fixed<k_work_q> = Fixed::new(unsafe { mem::zeroed() });
193        unsafe {
194            // SAFETY: Initialize zeroed memory.
195            k_work_queue_init(item.get());
196
197            // SAFETY: This associates the workqueue with the thread ID that runs it.  The thread is
198            // a pointer into this work item, which will not move, because of the Fixed.
199            let this = &mut *item.get();
200            WORK_QUEUES
201                .lock()
202                .unwrap()
203                .insert(&this.thread, WorkQueueRef(item.get()));
204
205            // SAFETY: Start work queue thread.  The main issue here is that the work queue cannot
206            // be deallocated once the thread has started.  We enforce this by making Drop panic.
207            k_work_queue_start(
208                item.get(),
209                stack.base,
210                stack.size,
211                self.priority,
212                &self.config,
213            );
214        }
215
216        WorkQueue { item }
217    }
218}
219
220/// A running work queue thread.
221///
222/// # Panic
223///
224/// Allowing a work queue to drop will result in a panic.  There are two ways to handle this,
225/// depending on whether the WorkQueue is in a Box, or an Arc:
226/// ```
227/// // Leak a work queue in an Arc.
228/// let wq = Arc::new(WorkQueueBuilder::new().start(...));
229/// // If the Arc is used after this:
230/// let _ = Arc::into_raw(wq.clone());
231/// // If the Arc is no longer needed:
232/// let _ = Arc::into_raw(wq);
233///
234/// // Leak a work queue in a Box.
235/// let wq = Box::new(WorkQueueBuilder::new().start(...));
236/// let _ = Box::leak(wq);
237/// ```
238pub struct WorkQueue {
239    #[allow(dead_code)]
240    item: Fixed<k_work_q>,
241}
242
243/// Work queues can be referenced from multiple threads, and thus are Send and Sync.
244unsafe impl Send for WorkQueue {}
245unsafe impl Sync for WorkQueue {}
246
247impl Drop for WorkQueue {
248    fn drop(&mut self) {
249        panic!("WorkQueues must not be dropped");
250    }
251}
252
253/// A simple mapping to get the current work_queue from the currently running thread.
254///
255/// This assumes that Zephyr's works queues have a 1:1 mapping between the work queue and the
256/// thread.
257///
258/// # Safety
259///
260/// The work queue is protected with a sync Mutex (which uses an underlying Zephyr mutex).  It is,
261/// in general, not a good idea to use a mutex in a work queue, as deadlock can happen.  So it is
262/// important to both never .await while holding the lock, as well as to make sure operations within
263/// it are relatively fast.  In this case, `insert` and `get` on the SimpleTls are reasonably fast.
264/// `insert` is usually done just at startup as well.
265///
266/// This is a little bit messy as we don't have a lazy mechanism, so we have to handle this a bit
267/// manually right now.
268static WORK_QUEUES: Mutex<SimpleTls<WorkQueueRef>> = Mutex::new(SimpleTls::new());
269
270/// For the queue mapping, we need a simple wrapper around the underlying pointer, one that doesn't
271/// implement stop.
272#[derive(Copy, Clone)]
273struct WorkQueueRef(*mut k_work_q);
274
275// SAFETY: The work queue reference is also safe for both Send and Sync per Zephyr semantics.
276unsafe impl Send for WorkQueueRef {}
277unsafe impl Sync for WorkQueueRef {}
278
279/// Retrieve the current work queue, if we are running within one.
280pub fn get_current_workq() -> Option<*mut k_work_q> {
281    WORK_QUEUES.lock().unwrap().get().map(|wq| wq.0)
282}
283
284/// A Rust wrapper for `k_poll_signal`.
285///
286/// A signal in Zephyr is an event mechanism that can be used to trigger actions in event queues to
287/// run.  The work somewhat like a kind of half boolean semaphore.  The signaling is robust in the
288/// direction of the event happening, as in a blocked task will definitely wake when the signal happens. However, the clearing of the signal is racy.  Generally, there are two ways to do this:
289///
290/// - A work action can clear the signal as soon as it wakes up, before it starts processing any
291///   data the signal was meant to indicate.  If the race happens, the processing will handle the
292///   extra data.
293/// - A work action can clear the signal after it does it's processing.  This is useful for things
294///   like periodic timers, where if it is still processing when an additional timer tick comes in,
295///   that timer tick will be ignored.  This is useful for periodic events where it is better to
296///   just skip a tick rather than for them to "stack up" and get behind.
297///
298/// Notably, as long as the `reset` method is only ever called by the worker that is waiting upon
299/// it, there shouldn't ever be a race in the `wait_async` itself.
300///
301/// Signals can pass a `c_int` from the signalling task to the task that is waiting for the signal.
302/// It is not specified in the Zephyr documentation what value will be passed if `raise` is called
303/// multiple times before a task waits upon a signal.  The current implementation will return the
304/// most recent raised `result` value.
305///
306/// For most other use cases, channels or semaphores are likely to be better solutions.
307pub struct Signal {
308    /// The raw Zephyr `k_poll_signal`.
309    pub(crate) item: Fixed<k_poll_signal>,
310}
311
312// SAFETY: Zephyr's API maintains thread safety.
313unsafe impl Send for Signal {}
314unsafe impl Sync for Signal {}
315
316impl Signal {
317    /// Create a new `Signal`.
318    ///
319    /// The Signal will be in the non-signaled state.
320    pub fn new() -> Signal {
321        // SAFETY: The memory is zero initialized, and Fixed ensure that it never changes address.
322        let item: Fixed<k_poll_signal> = Fixed::new(unsafe { mem::zeroed() });
323        unsafe {
324            k_poll_signal_init(item.get());
325        }
326        Signal { item }
327    }
328
329    /// Reset the Signal
330    ///
331    /// This resets the signal state to unsignaled.
332    ///
333    /// Please see the [`Signal`] documentation on how to handle the races that this implies.
334    pub fn reset(&self) {
335        // SAFETY: This is safe with a non-mut reference, as the purpose of the Zephyr API is to
336        // coordinate this information between threads.
337        unsafe {
338            k_poll_signal_reset(self.item.get());
339        }
340    }
341
342    /// Check the status of a signal.
343    ///
344    /// This reads the status of the signal.  If the state is "signalled", this will return
345    /// `Some(result)` where the `result` is the result value given to [`raise`].
346    ///
347    /// [`raise`]: Self::raise
348    pub fn check(&self) -> Option<c_int> {
349        let mut signaled: c_uint = 0;
350        let mut result: c_int = 0;
351        unsafe {
352            // SAFETY: Zephyr's signal API coordinates access across threads.
353            k_poll_signal_check(self.item.get(), &mut signaled, &mut result);
354        }
355
356        if signaled != 0 {
357            Some(result)
358        } else {
359            None
360        }
361    }
362
363    /// Signal a signal object.
364    ///
365    /// This will signal to any worker that is waiting on this object that the event has happened.
366    /// The `result` will be returned from the worker's `wait` call.
367    ///
368    /// As per the Zephyr docs, this could return an EAGAIN error if the polling thread is in the
369    /// process of expiring.  The implication is that the signal will not be raised in this case.
370    /// ...
371    pub fn raise(&self, result: c_int) -> crate::Result<()> {
372        to_result_void(unsafe { k_poll_signal_raise(self.item.get(), result) })
373    }
374}
375
376impl Default for Signal {
377    fn default() -> Self {
378        Signal::new()
379    }
380}
381
382/// Possible returns from work queue submission.
383#[derive(Debug, Clone, Copy)]
384pub enum SubmitResult {
385    /// This work was already in a queue.
386    AlreadySubmitted,
387    /// The work has been added to the specified queue.
388    Enqueued,
389    /// The queue was called from the worker itself, and has been queued to the queue that was
390    /// running it.
391    WasRunning,
392}
393
394impl SubmitResult {
395    /// Does this result indicate that the work was enqueued?
396    pub fn enqueued(self) -> bool {
397        matches!(self, Self::Enqueued | Self::WasRunning)
398    }
399
400    /// Convert an int result from a work submit function.
401    fn to_result(value: c_int) -> crate::Result<Self> {
402        crate::error::to_result(value).map(|code| match code {
403            0 => Self::AlreadySubmitted,
404            1 => Self::Enqueued,
405            2 => Self::WasRunning,
406            _ => panic!("Unexpected result {} from Zephyr work submission", code),
407        })
408    }
409}
410
411/// A simple action that just does something with its data.
412///
413/// This is similar to a Future, except there is no concept of it completing.  It manages its
414/// associated data however it wishes, and is responsible for re-queuing as needed.
415///
416/// Note, specifically, that the Act does not take a mutable reference.  This is because the Work
417/// below uses an Arc, so this data can be shared.
418pub trait SimpleAction {
419    /// Perform the action.
420    fn act(self: Pin<&Self>);
421}
422
423/// A basic Zephyr work item.
424///
425/// Holds a `k_work`, along with the data associated with that work.  When the work is queued, the
426/// `act` method will be called on the provided `SimpleAction`.
427pub struct Work<T> {
428    work: UnsafeCell<k_work>,
429    action: T,
430}
431
432/// SAFETY: Work queues can be sent as long as the action itself can be.
433unsafe impl<F> Send for Work<F>
434where
435    F: SimpleAction,
436    F: Send,
437{
438}
439
440/// SAFETY: Work queues are Sync when the action is.
441unsafe impl<F> Sync for Work<F>
442where
443    F: SimpleAction,
444    F: Sync,
445{
446}
447
448impl<T: SimpleAction + Send> Work<T> {
449    /// Construct a new Work from the given action.
450    ///
451    /// Note that the data will be moved into the pinned Work.  The data is internal, and only
452    /// accessible to the work thread (the `act` method).  If shared data is needed, normal
453    /// inter-thread sharing mechanisms are needed.
454    ///
455    /// TODO: Can we come up with a way to allow sharing on the same worker using Rc instead of Arc?
456    pub fn new(action: T) -> Pin<Arc<Self>> {
457        let this = Arc::pin(Self {
458            // SAFETY: will be initialized below, after this is pinned.
459            work: unsafe { mem::zeroed() },
460            action,
461        });
462
463        // SAFETY: Initializes above zero-initialized struct.
464        unsafe {
465            k_work_init(this.work.get(), Some(Self::handler));
466        }
467
468        this
469    }
470
471    /// Submit this work to the system work queue.
472    ///
473    /// This can return several possible `Ok` results.  See the docs on [`SubmitResult`] for an
474    /// explanation of them.
475    pub fn submit(this: Pin<Arc<Self>>) -> crate::Result<SubmitResult> {
476        // We "leak" the arc, so that when the handler runs, it can be safely turned back into an
477        // Arc, and the drop on the arc will then run.
478        let work = this.work.get();
479
480        // SAFETY: C the code does not perform moves on the data, and the `from_raw` below puts it
481        // back into a Pin when it reconstructs the Arc.
482        let this = unsafe { Pin::into_inner_unchecked(this) };
483        let _ = Arc::into_raw(this);
484
485        // SAFETY: The Pin ensures this will not move.  Our implementation of drop ensures that the
486        // work item is no longer queued when the data is dropped.
487        SubmitResult::to_result(unsafe { k_work_submit(work) })
488    }
489
490    /// Submit this work to a specified work queue.
491    ///
492    /// TODO: Change when we have better wrappers for work queues.
493    pub fn submit_to_queue(this: Pin<Arc<Self>>, queue: &WorkQueue) -> crate::Result<SubmitResult> {
494        let work = this.work.get();
495
496        // "leak" the arc to give to C.  We'll reconstruct it in the handler.
497        // SAFETY: The C code does not perform moves on the data, and the `from_raw` below puts it
498        // back into a Pin when it reconstructs the Arc.
499        let this = unsafe { Pin::into_inner_unchecked(this) };
500        let _ = Arc::into_raw(this);
501
502        // SAFETY: The Pin ensures this will not move.  Our implementation of drop ensures that the
503        // work item is no longer queued when the data is dropped.
504        SubmitResult::to_result(unsafe { k_work_submit_to_queue(queue.item.get(), work) })
505    }
506
507    /// Callback, through C, but bound by a specific type.
508    extern "C" fn handler(work: *mut k_work) {
509        // We want to avoid needing a `repr(C)` on our struct, so the `k_work` pointer is not
510        // necessarily at the beginning of the struct.
511        // SAFETY: Converts raw pointer to work back into the box.
512        let this = unsafe { Self::from_raw(work) };
513
514        // Access the action within, still pinned.
515        // SAFETY: It is safe to keep the pin on the interior.
516        let action = unsafe { this.as_ref().map_unchecked(|p| &p.action) };
517
518        action.act();
519    }
520
521    /*
522    /// Consume this Arc, returning the internal pointer.  Needs to have a complementary `from_raw`
523    /// called to avoid leaking the item.
524    fn into_raw(this: Pin<Arc<Self>>) -> *const Self {
525        // SAFETY: This removes the Pin guarantee, but is given as a raw pointer to C, which doesn't
526        // generally use move.
527        let this = unsafe { Pin::into_inner_unchecked(this) };
528        Arc::into_raw(this)
529    }
530    */
531
532    /// Given a pointer to the work_q burried within, recover the Pinned Box containing our data.
533    unsafe fn from_raw(ptr: *const k_work) -> Pin<Arc<Self>> {
534        // SAFETY: This fixes the pointer back to the beginning of Self.  This also assumes the
535        // pointer is valid.
536        let ptr = ptr
537            .cast::<u8>()
538            .sub(mem::offset_of!(Self, work))
539            .cast::<Self>();
540        let this = Arc::from_raw(ptr);
541        Pin::new_unchecked(this)
542    }
543
544    /// Access the inner action.
545    pub fn action(&self) -> &T {
546        &self.action
547    }
548}