zephyr/work.rs
1//! Zephyr Work Queues
2//!
3//! # Zephyr Work Queues and Work
4//!
5//! Zephyr has a mechanism called a
6//! [Workqueue](https://docs.zephyrproject.org/latest/kernel/services/threads/workqueue.html).
7//!
8//! Each workqueue is backed by a single Zephyr thread, and has its own stack. The work queue
9//! consists of a FIFO queue of work items that will be run consecutively on that thread. The
10//! underlying types are `k_work_q` for the work queue itself, and `k_work` for the worker.
11//!
12//! In addition to the simple schedulable work, Zephyr also has two additional types of work:
13//! `k_work_delayable` which can be scheduled to run in the future, and `k_work_poll`, described as
14//! triggered work in the docs. This can be scheduled to run when various items within Zephyr
15//! become available. This triggered work also has a timeout. In this sense the triggered work is
16//! a superset of the other types of work. Both delayable and triggered work are implemented by
17//! having the `k_work` embedded in their structure, and Zephyr schedules the work when the given
18//! reason happens.
19//!
20//! At this time, only the basic work queue type is supported.
21//!
22//! Zephyr's work queues can be used in different ways:
23//!
24//! - Work can be scheduled as needed. For example, an IRQ handler can queue a work item to process
25//! data it has received from a device.
26//! - Work can be scheduled periodically.
27//!
28//! As most C use of Zephyr statically allocates things like work, these are typically rescheduled
29//! when the work is complete. The work queue scheduling functions are designed, and intended, for
30//! a given work item to be able to reschedule itself, and such usage is common.
31//!
32//! ## Ownership
33//!
34//! The remaining challenge with implementing `k_work` for Rust is that of ownership. The model
35//! taken here is that the work items are held in a `Box` that is effectively owned by the work
36//! itself. When the work item is scheduled to Zephyr, ownership of that box is effectively handed
37//! off to C, and then when the work item is called, the Box re-constructed. This repeats until the
38//! work is no longer needed, at which point the work will be dropped.
39//!
40//! There are two common ways the lifecycle of work can be managed in an embedded system:
41//!
42//! - A set of `Future`'s are allocated once at the start, and these never return a value. Work
43//! Futures inside of this (which correspond to `.await` in async code) can have lives and return
44//! values, but the main loops will not return values, or be dropped. Embedded Futures will
45//! typically not be boxed.
46//!
47//! One consequence of the ownership being passed through to C code is that if the work cancellation
48//! mechanism is used on a work queue, the work items themselves will be leaked.
49//!
50//! These work items are also `Pin`, to ensure that the work actions are not moved.
51//!
52//! ## The work queues themselves
53//!
54//! Workqueues themselves are built using [`WorkQueueBuilder`]. This needs a statically defined
55//! stack. Typical usage will be along the lines of:
56//! ```rust
57//! kobj_define! {
58//! WORKER_STACK: ThreadStack<2048>;
59//! }
60//! // ...
61//! let main_worker = Box::new(
62//! WorkQueueBuilder::new()
63//! .set_priority(2).
64//! .set_name(c"mainloop")
65//! .set_no_yield(true)
66//! .start(MAIN_LOOP_STACK.init_once(()).unwrap())
67//! );
68//!
69//! let _ = zephyr::kio::spawn(
70//! mainloop(), // Async or function returning Future.
71//! &main_worker,
72//! c"w:mainloop",
73//! );
74//!
75//! ...
76//!
77//! // Leak the Box so that the worker is never freed.
78//! let _ = Box::leak(main_worker);
79//! ```
80//!
81//! It is important that WorkQueues never be dropped. It has a Drop implementation that invokes
82//! panic. Zephyr provides no mechanism to stop work queue threads, so dropping would result in
83//! undefined behavior.
84//!
85//! # Current Status
86//!
87//! Although Zephyr has 3 types of work queues, the `k_work_poll` is sufficient to implement all of
88//! the behavior, and this implementation only implements this type. Non Future work could be built
89//! around the other work types.
90//!
91//! As such, this means that manually constructed work is still built using `Future`. The `_async`
92//! primitives throughout this crate can be used just as readily by hand-written Futures as by async
93//! code. Notable, the use of [`Signal`] will likely be common, along with possible timeouts.
94//!
95//! [`sys::sync::Semaphore`]: crate::sys::sync::Semaphore
96//! [`sync::channel`]: crate::sync::channel
97//! [`sync::Mutex`]: crate::sync::Mutex
98//! [`join`]: futures::JoinHandle::join
99//! [`join_async`]: futures::JoinHandle::join_async
100
101extern crate alloc;
102
103use core::{
104 cell::UnsafeCell,
105 ffi::{c_int, c_uint, CStr},
106 mem,
107 pin::Pin,
108 ptr,
109};
110
111use zephyr_sys::{
112 k_poll_signal, k_poll_signal_check, k_poll_signal_init, k_poll_signal_raise,
113 k_poll_signal_reset, k_work, k_work_init, k_work_q, k_work_queue_config, k_work_queue_init,
114 k_work_queue_start, k_work_submit, k_work_submit_to_queue,
115};
116
117use crate::{
118 error::to_result_void,
119 object::Fixed,
120 simpletls::SimpleTls,
121 sync::{Arc, Mutex},
122 sys::thread::ThreadStack,
123};
124
125/// A builder for work queues themselves.
126///
127/// A work queue is a Zephyr thread that instead of directly running a piece of code, manages a work
128/// queue. Various types of `Work` can be submitted to these queues, along with various types of
129/// triggering conditions.
130pub struct WorkQueueBuilder {
131 /// The "config" value passed in.
132 config: k_work_queue_config,
133 /// Priority for the work queue thread.
134 priority: c_int,
135}
136
137impl WorkQueueBuilder {
138 /// Construct a new WorkQueueBuilder with default values.
139 pub fn new() -> Self {
140 Self {
141 config: k_work_queue_config {
142 name: ptr::null(),
143 no_yield: false,
144 essential: false,
145 },
146 priority: 0,
147 }
148 }
149
150 /// Set the name for the WorkQueue thread.
151 ///
152 /// This name shows up in debuggers and some analysis tools.
153 pub fn set_name(&mut self, name: &'static CStr) -> &mut Self {
154 self.config.name = name.as_ptr();
155 self
156 }
157
158 /// Set the "no yield" flag for the created worker.
159 ///
160 /// If this is not set, the work queue will call `k_yield` between each enqueued work item. For
161 /// non-preemptible threads, this will allow other threads to run. For preemptible threads,
162 /// this will allow other threads at the same priority to run.
163 ///
164 /// This method has a negative in the name, which goes against typical conventions. This is
165 /// done to match the field in the Zephyr config.
166 pub fn set_no_yield(&mut self, value: bool) -> &mut Self {
167 self.config.no_yield = value;
168 self
169 }
170
171 /// Set the "essential" flag for the created worker.
172 ///
173 /// This sets the essential flag on the running thread. The system considers the termination of
174 /// an essential thread to be a fatal error.
175 pub fn set_essential(&mut self, value: bool) -> &mut Self {
176 self.config.essential = value;
177 self
178 }
179
180 /// Set the priority for the worker thread.
181 ///
182 /// See the Zephyr docs for the meaning of priority.
183 pub fn set_priority(&mut self, value: c_int) -> &mut Self {
184 self.priority = value;
185 self
186 }
187
188 /// Start the given work queue thread.
189 ///
190 /// TODO: Implement a 'start' that works from a static work queue.
191 pub fn start(&self, stack: ThreadStack) -> WorkQueue {
192 let item: Fixed<k_work_q> = Fixed::new(unsafe { mem::zeroed() });
193 unsafe {
194 // SAFETY: Initialize zeroed memory.
195 k_work_queue_init(item.get());
196
197 // SAFETY: This associates the workqueue with the thread ID that runs it. The thread is
198 // a pointer into this work item, which will not move, because of the Fixed.
199 let this = &mut *item.get();
200 WORK_QUEUES
201 .lock()
202 .unwrap()
203 .insert(&this.thread, WorkQueueRef(item.get()));
204
205 // SAFETY: Start work queue thread. The main issue here is that the work queue cannot
206 // be deallocated once the thread has started. We enforce this by making Drop panic.
207 k_work_queue_start(
208 item.get(),
209 stack.base,
210 stack.size,
211 self.priority,
212 &self.config,
213 );
214 }
215
216 WorkQueue { item }
217 }
218}
219
220/// A running work queue thread.
221///
222/// # Panic
223///
224/// Allowing a work queue to drop will result in a panic. There are two ways to handle this,
225/// depending on whether the WorkQueue is in a Box, or an Arc:
226/// ```
227/// // Leak a work queue in an Arc.
228/// let wq = Arc::new(WorkQueueBuilder::new().start(...));
229/// // If the Arc is used after this:
230/// let _ = Arc::into_raw(wq.clone());
231/// // If the Arc is no longer needed:
232/// let _ = Arc::into_raw(wq);
233///
234/// // Leak a work queue in a Box.
235/// let wq = Box::new(WorkQueueBuilder::new().start(...));
236/// let _ = Box::leak(wq);
237/// ```
238pub struct WorkQueue {
239 #[allow(dead_code)]
240 item: Fixed<k_work_q>,
241}
242
243/// Work queues can be referenced from multiple threads, and thus are Send and Sync.
244unsafe impl Send for WorkQueue {}
245unsafe impl Sync for WorkQueue {}
246
247impl Drop for WorkQueue {
248 fn drop(&mut self) {
249 panic!("WorkQueues must not be dropped");
250 }
251}
252
253/// A simple mapping to get the current work_queue from the currently running thread.
254///
255/// This assumes that Zephyr's works queues have a 1:1 mapping between the work queue and the
256/// thread.
257///
258/// # Safety
259///
260/// The work queue is protected with a sync Mutex (which uses an underlying Zephyr mutex). It is,
261/// in general, not a good idea to use a mutex in a work queue, as deadlock can happen. So it is
262/// important to both never .await while holding the lock, as well as to make sure operations within
263/// it are relatively fast. In this case, `insert` and `get` on the SimpleTls are reasonably fast.
264/// `insert` is usually done just at startup as well.
265///
266/// This is a little bit messy as we don't have a lazy mechanism, so we have to handle this a bit
267/// manually right now.
268static WORK_QUEUES: Mutex<SimpleTls<WorkQueueRef>> = Mutex::new(SimpleTls::new());
269
270/// For the queue mapping, we need a simple wrapper around the underlying pointer, one that doesn't
271/// implement stop.
272#[derive(Copy, Clone)]
273struct WorkQueueRef(*mut k_work_q);
274
275// SAFETY: The work queue reference is also safe for both Send and Sync per Zephyr semantics.
276unsafe impl Send for WorkQueueRef {}
277unsafe impl Sync for WorkQueueRef {}
278
279/// Retrieve the current work queue, if we are running within one.
280pub fn get_current_workq() -> Option<*mut k_work_q> {
281 WORK_QUEUES.lock().unwrap().get().map(|wq| wq.0)
282}
283
284/// A Rust wrapper for `k_poll_signal`.
285///
286/// A signal in Zephyr is an event mechanism that can be used to trigger actions in event queues to
287/// run. The work somewhat like a kind of half boolean semaphore. The signaling is robust in the
288/// direction of the event happening, as in a blocked task will definitely wake when the signal happens. However, the clearing of the signal is racy. Generally, there are two ways to do this:
289///
290/// - A work action can clear the signal as soon as it wakes up, before it starts processing any
291/// data the signal was meant to indicate. If the race happens, the processing will handle the
292/// extra data.
293/// - A work action can clear the signal after it does it's processing. This is useful for things
294/// like periodic timers, where if it is still processing when an additional timer tick comes in,
295/// that timer tick will be ignored. This is useful for periodic events where it is better to
296/// just skip a tick rather than for them to "stack up" and get behind.
297///
298/// Notably, as long as the `reset` method is only ever called by the worker that is waiting upon
299/// it, there shouldn't ever be a race in the `wait_async` itself.
300///
301/// Signals can pass a `c_int` from the signalling task to the task that is waiting for the signal.
302/// It is not specified in the Zephyr documentation what value will be passed if `raise` is called
303/// multiple times before a task waits upon a signal. The current implementation will return the
304/// most recent raised `result` value.
305///
306/// For most other use cases, channels or semaphores are likely to be better solutions.
307pub struct Signal {
308 /// The raw Zephyr `k_poll_signal`.
309 pub(crate) item: Fixed<k_poll_signal>,
310}
311
312// SAFETY: Zephyr's API maintains thread safety.
313unsafe impl Send for Signal {}
314unsafe impl Sync for Signal {}
315
316impl Signal {
317 /// Create a new `Signal`.
318 ///
319 /// The Signal will be in the non-signaled state.
320 pub fn new() -> Signal {
321 // SAFETY: The memory is zero initialized, and Fixed ensure that it never changes address.
322 let item: Fixed<k_poll_signal> = Fixed::new(unsafe { mem::zeroed() });
323 unsafe {
324 k_poll_signal_init(item.get());
325 }
326 Signal { item }
327 }
328
329 /// Reset the Signal
330 ///
331 /// This resets the signal state to unsignaled.
332 ///
333 /// Please see the [`Signal`] documentation on how to handle the races that this implies.
334 pub fn reset(&self) {
335 // SAFETY: This is safe with a non-mut reference, as the purpose of the Zephyr API is to
336 // coordinate this information between threads.
337 unsafe {
338 k_poll_signal_reset(self.item.get());
339 }
340 }
341
342 /// Check the status of a signal.
343 ///
344 /// This reads the status of the signal. If the state is "signalled", this will return
345 /// `Some(result)` where the `result` is the result value given to [`raise`].
346 ///
347 /// [`raise`]: Self::raise
348 pub fn check(&self) -> Option<c_int> {
349 let mut signaled: c_uint = 0;
350 let mut result: c_int = 0;
351 unsafe {
352 // SAFETY: Zephyr's signal API coordinates access across threads.
353 k_poll_signal_check(self.item.get(), &mut signaled, &mut result);
354 }
355
356 if signaled != 0 {
357 Some(result)
358 } else {
359 None
360 }
361 }
362
363 /// Signal a signal object.
364 ///
365 /// This will signal to any worker that is waiting on this object that the event has happened.
366 /// The `result` will be returned from the worker's `wait` call.
367 ///
368 /// As per the Zephyr docs, this could return an EAGAIN error if the polling thread is in the
369 /// process of expiring. The implication is that the signal will not be raised in this case.
370 /// ...
371 pub fn raise(&self, result: c_int) -> crate::Result<()> {
372 to_result_void(unsafe { k_poll_signal_raise(self.item.get(), result) })
373 }
374}
375
376impl Default for Signal {
377 fn default() -> Self {
378 Signal::new()
379 }
380}
381
382/// Possible returns from work queue submission.
383#[derive(Debug, Clone, Copy)]
384pub enum SubmitResult {
385 /// This work was already in a queue.
386 AlreadySubmitted,
387 /// The work has been added to the specified queue.
388 Enqueued,
389 /// The queue was called from the worker itself, and has been queued to the queue that was
390 /// running it.
391 WasRunning,
392}
393
394impl SubmitResult {
395 /// Does this result indicate that the work was enqueued?
396 pub fn enqueued(self) -> bool {
397 matches!(self, Self::Enqueued | Self::WasRunning)
398 }
399
400 /// Convert an int result from a work submit function.
401 fn to_result(value: c_int) -> crate::Result<Self> {
402 crate::error::to_result(value).map(|code| match code {
403 0 => Self::AlreadySubmitted,
404 1 => Self::Enqueued,
405 2 => Self::WasRunning,
406 _ => panic!("Unexpected result {} from Zephyr work submission", code),
407 })
408 }
409}
410
411/// A simple action that just does something with its data.
412///
413/// This is similar to a Future, except there is no concept of it completing. It manages its
414/// associated data however it wishes, and is responsible for re-queuing as needed.
415///
416/// Note, specifically, that the Act does not take a mutable reference. This is because the Work
417/// below uses an Arc, so this data can be shared.
418pub trait SimpleAction {
419 /// Perform the action.
420 fn act(self: Pin<&Self>);
421}
422
423/// A basic Zephyr work item.
424///
425/// Holds a `k_work`, along with the data associated with that work. When the work is queued, the
426/// `act` method will be called on the provided `SimpleAction`.
427pub struct Work<T> {
428 work: UnsafeCell<k_work>,
429 action: T,
430}
431
432/// SAFETY: Work queues can be sent as long as the action itself can be.
433unsafe impl<F> Send for Work<F>
434where
435 F: SimpleAction,
436 F: Send,
437{
438}
439
440/// SAFETY: Work queues are Sync when the action is.
441unsafe impl<F> Sync for Work<F>
442where
443 F: SimpleAction,
444 F: Sync,
445{
446}
447
448impl<T: SimpleAction + Send> Work<T> {
449 /// Construct a new Work from the given action.
450 ///
451 /// Note that the data will be moved into the pinned Work. The data is internal, and only
452 /// accessible to the work thread (the `act` method). If shared data is needed, normal
453 /// inter-thread sharing mechanisms are needed.
454 ///
455 /// TODO: Can we come up with a way to allow sharing on the same worker using Rc instead of Arc?
456 pub fn new(action: T) -> Pin<Arc<Self>> {
457 let this = Arc::pin(Self {
458 // SAFETY: will be initialized below, after this is pinned.
459 work: unsafe { mem::zeroed() },
460 action,
461 });
462
463 // SAFETY: Initializes above zero-initialized struct.
464 unsafe {
465 k_work_init(this.work.get(), Some(Self::handler));
466 }
467
468 this
469 }
470
471 /// Submit this work to the system work queue.
472 ///
473 /// This can return several possible `Ok` results. See the docs on [`SubmitResult`] for an
474 /// explanation of them.
475 pub fn submit(this: Pin<Arc<Self>>) -> crate::Result<SubmitResult> {
476 // We "leak" the arc, so that when the handler runs, it can be safely turned back into an
477 // Arc, and the drop on the arc will then run.
478 let work = this.work.get();
479
480 // SAFETY: C the code does not perform moves on the data, and the `from_raw` below puts it
481 // back into a Pin when it reconstructs the Arc.
482 let this = unsafe { Pin::into_inner_unchecked(this) };
483 let _ = Arc::into_raw(this);
484
485 // SAFETY: The Pin ensures this will not move. Our implementation of drop ensures that the
486 // work item is no longer queued when the data is dropped.
487 SubmitResult::to_result(unsafe { k_work_submit(work) })
488 }
489
490 /// Submit this work to a specified work queue.
491 ///
492 /// TODO: Change when we have better wrappers for work queues.
493 pub fn submit_to_queue(this: Pin<Arc<Self>>, queue: &WorkQueue) -> crate::Result<SubmitResult> {
494 let work = this.work.get();
495
496 // "leak" the arc to give to C. We'll reconstruct it in the handler.
497 // SAFETY: The C code does not perform moves on the data, and the `from_raw` below puts it
498 // back into a Pin when it reconstructs the Arc.
499 let this = unsafe { Pin::into_inner_unchecked(this) };
500 let _ = Arc::into_raw(this);
501
502 // SAFETY: The Pin ensures this will not move. Our implementation of drop ensures that the
503 // work item is no longer queued when the data is dropped.
504 SubmitResult::to_result(unsafe { k_work_submit_to_queue(queue.item.get(), work) })
505 }
506
507 /// Callback, through C, but bound by a specific type.
508 extern "C" fn handler(work: *mut k_work) {
509 // We want to avoid needing a `repr(C)` on our struct, so the `k_work` pointer is not
510 // necessarily at the beginning of the struct.
511 // SAFETY: Converts raw pointer to work back into the box.
512 let this = unsafe { Self::from_raw(work) };
513
514 // Access the action within, still pinned.
515 // SAFETY: It is safe to keep the pin on the interior.
516 let action = unsafe { this.as_ref().map_unchecked(|p| &p.action) };
517
518 action.act();
519 }
520
521 /*
522 /// Consume this Arc, returning the internal pointer. Needs to have a complementary `from_raw`
523 /// called to avoid leaking the item.
524 fn into_raw(this: Pin<Arc<Self>>) -> *const Self {
525 // SAFETY: This removes the Pin guarantee, but is given as a raw pointer to C, which doesn't
526 // generally use move.
527 let this = unsafe { Pin::into_inner_unchecked(this) };
528 Arc::into_raw(this)
529 }
530 */
531
532 /// Given a pointer to the work_q burried within, recover the Pinned Box containing our data.
533 unsafe fn from_raw(ptr: *const k_work) -> Pin<Arc<Self>> {
534 // SAFETY: This fixes the pointer back to the beginning of Self. This also assumes the
535 // pointer is valid.
536 let ptr = ptr
537 .cast::<u8>()
538 .sub(mem::offset_of!(Self, work))
539 .cast::<Self>();
540 let this = Arc::from_raw(ptr);
541 Pin::new_unchecked(this)
542 }
543
544 /// Access the inner action.
545 pub fn action(&self) -> &T {
546 &self.action
547 }
548}