zephyr/work.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686
//! Zephyr Work Queues
//!
//! # Zephyr Work Queues and Work
//!
//! Zephyr has a mechanism called a
//! [Workqueue](https://docs.zephyrproject.org/latest/kernel/services/threads/workqueue.html).
//!
//! Each workqueue is backed by a single Zephyr thread, and has its own stack. The work queue
//! consists of a FIFO queue of work items that will be run consecutively on that thread. The
//! underlying types are `k_work_q` for the work queue itself, and `k_work` for the worker.
//!
//! In addition to the simple schedulable work, Zephyr also has two additional types of work:
//! `k_work_delayable` which can be scheduled to run in the future, and `k_work_poll`, described as
//! triggered work in the docs. This can be scheduled to run when various items within Zephyr
//! become available. This triggered work also has a timeout. In this sense the triggered work is
//! a superset of the other types of work. Both delayable and triggered work are implemented by
//! having the `k_work` embedded in their structure, and Zephyr schedules the work when the given
//! reason happens.
//!
//! Zephyr's work queues can be used in different ways:
//!
//! - Work can be scheduled as needed. For example, an IRQ handler can queue a work item to process
//! data it has received from a device.
//! - Work can be scheduled periodically.
//!
//! As most C use of Zephyr statically allocates things like work, these are typically rescheduled
//! when the work is complete. The work queue scheduling functions are designed, and intended, for
//! a given work item to be able to reschedule itself, and such usage is common.
//!
//! ## Waitable events
//!
//! The triggerable work items can be triggered to wake on a set of any of the following:
//!
//! - A signal. `k_poll_signal` is a type used just for waking work items. This works similar to a
//! binary semaphore, but is lighter weight for use just by this mechanism.
//! - A semaphore. Work can be scheduled to run when a `k_sem` is available. Since
//! [`sys::sync::Semaphore`] is built on top of `k_sem`, the "take" operation for these semaphores
//! can be a trigger source.
//! - A queue/FIFO/LIFO. The queue is used to implement [`sync::channel`] and thus any blocking
//! operation on queues can be a trigger source.
//! - Message Queues, and Pipes. Although not yet provided in Rust, these can also be a source of
//! triggering.
//!
//! It is important to note that the trigger source may not necessarily still be available by the
//! time the work item is actually run. This depends on the design of the system. If there is only
//! a single waiter, then it will still be available (the mechanism does not have false triggers,
//! like CondVar).
//!
//! Also, note, specifically, that Zephyr Mutexes cannot be used as a trigger source. That means
//! that locking a [`sync::Mutex`] shouldn't be use within work items. There is another
//! [`kio::sync::Mutex`], which is a simplified Mutex that is implemented with a Semaphore that can
//! be used from work-queue based code.
//!
//! # Rust `Future`
//!
//! The rust language, also has built-in support for something rather similar to Zephyr work queues.
//! The main user-visible type behind this is [`Future`]. The rust compiler has support for
//! functions, as well as code blocks to be declared as `async`. For this code, instead of directly
//! returning the given data, returns a `Future` that has as its output type the data. What this
//! does is essentially capture what would be stored on the stack to maintain the state of that code
//! into the data of the `Future` itself. For rust code running on a typical OS, a crate such as
//! [Tokio](https://tokio.rs/) provides what is known as an executor, which implements the schedule
//! for these `Futures` as well as provides equivalent primitives for Mutex, Semaphores and channels
//! for this code to use for synchronization.
//!
//! It is notable that the Zephyr implementation of `Future` operations under a fairly simple
//! assumption of how this scheduling will work. Each future is invoked with a Context, which
//! contains a dynamic `Waker` that can be invoked to schedule this Future to run again. This means
//! that the primitives are typically implemented above OS primitives, where each manages wake
//! queues to determine the work that needs to be woken.
//!
//! # Bringing it together.
//!
//! There are a couple of issues that need to be addressed to bring work-queue support to Rust.
//! First is the question of how they will be used. On the one hand, there are users that will
//! definitely want to make use of `async` in rust, and it is important to implement a executor,
//! similar to Tokio, that will schedule this `async` code. On the other hand, it will likely be
//! common for others to want to make more direct use of the work queues themselves. As such, these
//! users will want more direct access to scheduling and triggering of work.
//!
//! ## Future erasure
//!
//! One challenge with using `Future` for work is that the `Future` type intentionally erases the
//! details of scheduling work, reducing it down to a single `Waker`, which similar to a trait, has
//! a `wake` method to cause the executor to schedule this work. Unfortunately, this simple
//! mechanism makes it challenging to take advantage of Zephyr's existing mechanisms to be able to
//! automatically trigger work based on primitives.
//!
//! As such, what we do is have a structure `Work` that contains both a `k_work_poll` as well as
//! `Context` from Rust. Our handler can use a mechanism similar to C's `CONTAINER_OF` macro to
//! recover this outer structure.
//!
//! There is some extra complexity to this process, as the `Future` we are storing associated with
//! the work is `?Sized`, since each particular Future will have a different size. As such, it is
//! not possible to recover the full work type. To work around this, we have a Sized struct at the
//! beginning of this structure, that along with judicious use of `#[repr(C)]` allows us to recover
//! this fixed data. This structure contains the information needed to re-schedule the work, based
//! on what is needed.
//!
//! ## Ownership
//!
//! The remaining challenge with implementing `k_work` for Rust is that of ownership. The model
//! taken here is that the work items are held in a `Box` that is effectively owned by the work
//! itself. When the work item is scheduled to Zephyr, ownership of that box is effectively handed
//! off to C, and then when the work item is called, the Box re-constructed. This repeats until the
//! work is no longer needed (e.g. when a [`Future::poll`] returns `Ready`), at which point the work
//! will be dropped.
//!
//! There are two common ways the lifecycle of work can be managed in an embedded system:
//!
//! - A set of `Future`'s are allocated once at the start, and these never return a value. Work
//! Futures inside of this (which correspond to `.await` in async code) can have lives and return
//! values, but the main loops will not return values, or be dropped. Embedded Futures will
//! typically not be boxed.
//! - Work will be dynamically created based on system need, with threads using [`kio::spawn`] to
//! create additional work (or creating the `Work` items directly). These can use [`join`] or
//! [`join_async`] to wait for the results.
//!
//! One consequence of the ownership being passed through to C code is that if the work cancellation
//! mechanism is used on a work queue, the work items themselves will be leaked.
//!
//! The Future mechanism in Rust relies on the use of [`Pin`] to ensure that work items are not
//! moved. We have the same requirements here, although currently, the pin is only applied while
//! the future is run, and we do not expose the `Box` that we use, thus preventing moves of the work
//! items.
//!
//! ## The work queues themselves
//!
//! Workqueues themselves are built using [`WorkQueueBuilder`]. This needs a statically defined
//! stack. Typical usage will be along the lines of:
//! ```rust
//! kobj_define! {
//! WORKER_STACK: ThreadStack<2048>;
//! }
//! // ...
//! let main_worker = Box::new(
//! WorkQueueBuilder::new()
//! .set_priority(2).
//! .set_name(c"mainloop")
//! .set_no_yield(true)
//! .start(MAIN_LOOP_STACK.init_once(()).unwrap())
//! );
//!
//! let _ = zephyr::kio::spawn(
//! mainloop(), // Async or function returning Future.
//! &main_worker,
//! c"w:mainloop",
//! );
//!
//! ...
//!
//! // Leak the Box so that the worker is never freed.
//! let _ = Box::leak(main_worker);
//! ```
//!
//! It is important that WorkQueues never be dropped. It has a Drop implementation that invokes
//! panic. Zephyr provides no mechanism to stop work queue threads, so dropping would result in
//! undefined behavior.
//!
//! # Current Status
//!
//! Although Zephyr has 3 types of work queues, the `k_work_poll` is sufficient to implement all of
//! the behavior, and this implementation only implements this type. Non Future work could be built
//! around the other work types.
//!
//! As such, this means that manually constructed work is still built using `Future`. The `_async`
//! primitives throughout this crate can be used just as readily by hand-written Futures as by async
//! code. Notable, the use of [`Signal`] will likely be common, along with possible timeouts.
//!
//! [`sys::sync::Semaphore`]: crate::sys::sync::Semaphore
//! [`sync::channel`]: crate::sync::channel
//! [`sync::Mutex`]: crate::sync::Mutex
//! [`kio::sync::Mutex`]: crate::kio::sync::Mutex
//! [`kio::spawn`]: crate::kio::spawn
//! [`join`]: futures::JoinHandle::join
//! [`join_async`]: futures::JoinHandle::join_async
extern crate alloc;
use core::{
cell::UnsafeCell,
ffi::{c_int, c_uint, CStr},
future::Future,
mem,
pin::Pin,
ptr,
task::Poll,
};
use zephyr_sys::{
k_poll_signal, k_poll_signal_check, k_poll_signal_init, k_poll_signal_raise,
k_poll_signal_reset, k_work, k_work_init, k_work_q, k_work_queue_config, k_work_queue_init,
k_work_queue_start, k_work_submit, k_work_submit_to_queue, ETIMEDOUT,
};
use crate::{
error::to_result_void,
kio::ContextExt,
object::Fixed,
simpletls::SimpleTls,
sync::{Arc, Mutex},
sys::thread::ThreadStack,
time::Timeout,
};
pub mod futures;
/// A builder for work queues themselves.
///
/// A work queue is a Zephyr thread that instead of directly running a piece of code, manages a work
/// queue. Various types of `Work` can be submitted to these queues, along with various types of
/// triggering conditions.
pub struct WorkQueueBuilder {
/// The "config" value passed in.
config: k_work_queue_config,
/// Priority for the work queue thread.
priority: c_int,
}
impl WorkQueueBuilder {
/// Construct a new WorkQueueBuilder with default values.
pub fn new() -> Self {
Self {
config: k_work_queue_config {
name: ptr::null(),
no_yield: false,
essential: false,
},
priority: 0,
}
}
/// Set the name for the WorkQueue thread.
///
/// This name shows up in debuggers and some analysis tools.
pub fn set_name(&mut self, name: &'static CStr) -> &mut Self {
self.config.name = name.as_ptr();
self
}
/// Set the "no yield" flag for the created worker.
///
/// If this is not set, the work queue will call `k_yield` between each enqueued work item. For
/// non-preemptible threads, this will allow other threads to run. For preemptible threads,
/// this will allow other threads at the same priority to run.
///
/// This method has a negative in the name, which goes against typical conventions. This is
/// done to match the field in the Zephyr config.
pub fn set_no_yield(&mut self, value: bool) -> &mut Self {
self.config.no_yield = value;
self
}
/// Set the "essential" flag for the created worker.
///
/// This sets the essential flag on the running thread. The system considers the termination of
/// an essential thread to be a fatal error.
pub fn set_essential(&mut self, value: bool) -> &mut Self {
self.config.essential = value;
self
}
/// Set the priority for the worker thread.
///
/// See the Zephyr docs for the meaning of priority.
pub fn set_priority(&mut self, value: c_int) -> &mut Self {
self.priority = value;
self
}
/// Start the given work queue thread.
///
/// TODO: Implement a 'start' that works from a static work queue.
pub fn start(&self, stack: ThreadStack) -> WorkQueue {
let item: Fixed<k_work_q> = Fixed::new(unsafe { mem::zeroed() });
unsafe {
// SAFETY: Initialize zeroed memory.
k_work_queue_init(item.get());
// SAFETY: This associates the workqueue with the thread ID that runs it. The thread is
// a pointer into this work item, which will not move, because of the Fixed.
let this = &mut *item.get();
WORK_QUEUES
.lock()
.unwrap()
.insert(&this.thread, WorkQueueRef(item.get()));
// SAFETY: Start work queue thread. The main issue here is that the work queue cannot
// be deallocated once the thread has started. We enforce this by making Drop panic.
k_work_queue_start(
item.get(),
stack.base,
stack.size,
self.priority,
&self.config,
);
}
WorkQueue { item }
}
}
/// A running work queue thread.
///
/// # Panic
///
/// Allowing a work queue to drop will result in a panic. There are two ways to handle this,
/// depending on whether the WorkQueue is in a Box, or an Arc:
/// ```
/// // Leak a work queue in an Arc.
/// let wq = Arc::new(WorkQueueBuilder::new().start(...));
/// // If the Arc is used after this:
/// let _ = Arc::into_raw(wq.clone());
/// // If the Arc is no longer needed:
/// let _ = Arc::into_raw(wq);
///
/// // Leak a work queue in a Box.
/// let wq = Box::new(WorkQueueBuilder::new().start(...));
/// let _ = Box::leak(wq);
/// ```
pub struct WorkQueue {
#[allow(dead_code)]
item: Fixed<k_work_q>,
}
/// Work queues can be referenced from multiple threads, and thus are Send and Sync.
unsafe impl Send for WorkQueue {}
unsafe impl Sync for WorkQueue {}
impl Drop for WorkQueue {
fn drop(&mut self) {
panic!("WorkQueues must not be dropped");
}
}
/// A simple mapping to get the current work_queue from the currently running thread.
///
/// This assumes that Zephyr's works queues have a 1:1 mapping between the work queue and the
/// thread.
///
/// # Safety
///
/// The work queue is protected with a sync Mutex (which uses an underlying Zephyr mutex). It is,
/// in general, not a good idea to use a mutex in a work queue, as deadlock can happen. So it is
/// important to both never .await while holding the lock, as well as to make sure operations within
/// it are relatively fast. In this case, `insert` and `get` on the SimpleTls are reasonably fast.
/// `insert` is usually done just at startup as well.
///
/// This is a little bit messy as we don't have a lazy mechanism, so we have to handle this a bit
/// manually right now.
static WORK_QUEUES: Mutex<SimpleTls<WorkQueueRef>> = Mutex::new(SimpleTls::new());
/// For the queue mapping, we need a simple wrapper around the underlying pointer, one that doesn't
/// implement stop.
#[derive(Copy, Clone)]
struct WorkQueueRef(*mut k_work_q);
// SAFETY: The work queue reference is also safe for both Send and Sync per Zephyr semantics.
unsafe impl Send for WorkQueueRef {}
unsafe impl Sync for WorkQueueRef {}
/// Retrieve the current work queue, if we are running within one.
pub fn get_current_workq() -> Option<*mut k_work_q> {
WORK_QUEUES.lock().unwrap().get().map(|wq| wq.0)
}
/// A Rust wrapper for `k_poll_signal`.
///
/// A signal in Zephyr is an event mechanism that can be used to trigger actions in event queues to
/// run. The work somewhat like a kind of half boolean semaphore. The signaling is robust in the
/// direction of the event happening, as in a blocked task will definitely wake when the signal happens. However, the clearing of the signal is racy. Generally, there are two ways to do this:
///
/// - A work action can clear the signal as soon as it wakes up, before it starts processing any
/// data the signal was meant to indicate. If the race happens, the processing will handle the
/// extra data.
/// - A work action can clear the signal after it does it's processing. This is useful for things
/// like periodic timers, where if it is still processing when an additional timer tick comes in,
/// that timer tick will be ignored. This is useful for periodic events where it is better to
/// just skip a tick rather than for them to "stack up" and get behind.
///
/// Notably, as long as the `reset` method is only ever called by the worker that is waiting upon
/// it, there shouldn't ever be a race in the `wait_async` itself.
///
/// Signals can pass a `c_int` from the signalling task to the task that is waiting for the signal.
/// It is not specified in the Zephyr documentation what value will be passed if `raise` is called
/// multiple times before a task waits upon a signal. The current implementation will return the
/// most recent raised `result` value.
///
/// For most other use cases, channels or semaphores are likely to be better solutions.
pub struct Signal {
/// The raw Zephyr `k_poll_signal`.
pub(crate) item: Fixed<k_poll_signal>,
}
// SAFETY: Zephyr's API maintains thread safety.
unsafe impl Send for Signal {}
unsafe impl Sync for Signal {}
impl Signal {
/// Create a new `Signal`.
///
/// The Signal will be in the non-signaled state.
pub fn new() -> Signal {
// SAFETY: The memory is zero initialized, and Fixed ensure that it never changes address.
let item: Fixed<k_poll_signal> = Fixed::new(unsafe { mem::zeroed() });
unsafe {
k_poll_signal_init(item.get());
}
Signal { item }
}
/// Reset the Signal
///
/// This resets the signal state to unsignaled.
///
/// Please see the [`Signal`] documentation on how to handle the races that this implies.
pub fn reset(&self) {
// SAFETY: This is safe with a non-mut reference, as the purpose of the Zephyr API is to
// coordinate this information between threads.
unsafe {
k_poll_signal_reset(self.item.get());
}
}
/// Check the status of a signal.
///
/// This reads the status of the signal. If the state is "signalled", this will return
/// `Some(result)` where the `result` is the result value given to [`raise`].
///
/// [`raise`]: Self::raise
pub fn check(&self) -> Option<c_int> {
let mut signaled: c_uint = 0;
let mut result: c_int = 0;
unsafe {
// SAFETY: Zephyr's signal API coordinates access across threads.
k_poll_signal_check(self.item.get(), &mut signaled, &mut result);
}
if signaled != 0 {
Some(result)
} else {
None
}
}
/// Signal a signal object.
///
/// This will signal to any worker that is waiting on this object that the event has happened.
/// The `result` will be returned from the worker's `wait` call.
///
/// As per the Zephyr docs, this could return an EAGAIN error if the polling thread is in the
/// process of expiring. The implication is that the signal will not be raised in this case.
/// ...
pub fn raise(&self, result: c_int) -> crate::Result<()> {
to_result_void(unsafe { k_poll_signal_raise(self.item.get(), result) })
}
/// Asynchronously wait for a signal to be signaled.
///
/// If the signal has not been raised, will wait until it has been. If the signal has been
/// raised, the Future will immediately return that value without waiting.
///
/// **Note**: there is no sync wait, as Zephyr does not provide a convenient mechanmism for
/// this. It could be implemented with `k_poll` if needed.
pub fn wait_async<'a>(
&'a self,
timeout: impl Into<Timeout>,
) -> impl Future<Output = crate::Result<c_int>> + 'a {
SignalWait {
signal: self,
timeout: timeout.into(),
ran: false,
}
}
}
impl Default for Signal {
fn default() -> Self {
Signal::new()
}
}
/// The Future for Signal::wait_async.
struct SignalWait<'a> {
/// The signal we are waiting on.
signal: &'a Signal,
/// The timeout to use.
timeout: Timeout,
/// Set after we've waited once,
ran: bool,
}
impl<'a> Future for SignalWait<'a> {
type Output = crate::Result<c_int>;
fn poll(
mut self: Pin<&mut Self>,
cx: &mut core::task::Context<'_>,
) -> core::task::Poll<Self::Output> {
// We can check if the even happened immediately, and avoid blocking if we were already
// signaled.
if let Some(result) = self.signal.check() {
return Poll::Ready(Ok(result));
}
if self.ran {
// If it is not ready, assuming a timeout. Note that if a thread other than this work
// thread resets the signal, it is possible to see a timeout even if `Forever` was given
// as the timeout.
return Poll::Ready(Err(crate::Error(ETIMEDOUT)));
}
cx.add_signal(self.signal, self.timeout);
self.ran = true;
Poll::Pending
}
}
/// Possible returns from work queue submission.
#[derive(Debug, Clone, Copy)]
pub enum SubmitResult {
/// This work was already in a queue.
AlreadySubmitted,
/// The work has been added to the specified queue.
Enqueued,
/// The queue was called from the worker itself, and has been queued to the queue that was
/// running it.
WasRunning,
}
impl SubmitResult {
/// Does this result indicate that the work was enqueued?
pub fn enqueued(self) -> bool {
matches!(self, Self::Enqueued | Self::WasRunning)
}
/// Convert an int result from a work submit function.
fn to_result(value: c_int) -> crate::Result<Self> {
crate::error::to_result(value).map(|code| match code {
0 => Self::AlreadySubmitted,
1 => Self::Enqueued,
2 => Self::WasRunning,
_ => panic!("Unexpected result {} from Zephyr work submission", code),
})
}
}
/// A simple action that just does something with its data.
///
/// This is similar to a Future, except there is no concept of it completing. It manages its
/// associated data however it wishes, and is responsible for re-queuing as needed.
///
/// Note, specifically, that the Act does not take a mutable reference. This is because the Work
/// below uses an Arc, so this data can be shared.
pub trait SimpleAction {
/// Perform the action.
fn act(self: Pin<&Self>);
}
/// A basic Zephyr work item.
///
/// Holds a `k_work`, along with the data associated with that work. When the work is queued, the
/// `act` method will be called on the provided `SimpleAction`.
pub struct Work<T> {
work: UnsafeCell<k_work>,
action: T,
}
/// SAFETY: Work queues can be sent as long as the action itself can be.
unsafe impl<F> Send for Work<F>
where
F: SimpleAction,
F: Send,
{
}
/// SAFETY: Work queues are Sync when the action is.
unsafe impl<F> Sync for Work<F>
where
F: SimpleAction,
F: Sync,
{
}
impl<T: SimpleAction + Send> Work<T> {
/// Construct a new Work from the given action.
///
/// Note that the data will be moved into the pinned Work. The data is internal, and only
/// accessible to the work thread (the `act` method). If shared data is needed, normal
/// inter-thread sharing mechanisms are needed.
///
/// TODO: Can we come up with a way to allow sharing on the same worker using Rc instead of Arc?
pub fn new(action: T) -> Pin<Arc<Self>> {
let this = Arc::pin(Self {
// SAFETY: will be initialized below, after this is pinned.
work: unsafe { mem::zeroed() },
action,
});
// SAFETY: Initializes above zero-initialized struct.
unsafe {
k_work_init(this.work.get(), Some(Self::handler));
}
this
}
/// Submit this work to the system work queue.
///
/// This can return several possible `Ok` results. See the docs on [`SubmitResult`] for an
/// explanation of them.
pub fn submit(this: Pin<Arc<Self>>) -> crate::Result<SubmitResult> {
// We "leak" the arc, so that when the handler runs, it can be safely turned back into an
// Arc, and the drop on the arc will then run.
let work = this.work.get();
// SAFETY: C the code does not perform moves on the data, and the `from_raw` below puts it
// back into a Pin when it reconstructs the Arc.
let this = unsafe { Pin::into_inner_unchecked(this) };
let _ = Arc::into_raw(this);
// SAFETY: The Pin ensures this will not move. Our implementation of drop ensures that the
// work item is no longer queued when the data is dropped.
SubmitResult::to_result(unsafe { k_work_submit(work) })
}
/// Submit this work to a specified work queue.
///
/// TODO: Change when we have better wrappers for work queues.
pub fn submit_to_queue(this: Pin<Arc<Self>>, queue: &WorkQueue) -> crate::Result<SubmitResult> {
let work = this.work.get();
// "leak" the arc to give to C. We'll reconstruct it in the handler.
// SAFETY: The C code does not perform moves on the data, and the `from_raw` below puts it
// back into a Pin when it reconstructs the Arc.
let this = unsafe { Pin::into_inner_unchecked(this) };
let _ = Arc::into_raw(this);
// SAFETY: The Pin ensures this will not move. Our implementation of drop ensures that the
// work item is no longer queued when the data is dropped.
SubmitResult::to_result(unsafe { k_work_submit_to_queue(queue.item.get(), work) })
}
/// Callback, through C, but bound by a specific type.
extern "C" fn handler(work: *mut k_work) {
// We want to avoid needing a `repr(C)` on our struct, so the `k_work` pointer is not
// necessarily at the beginning of the struct.
// SAFETY: Converts raw pointer to work back into the box.
let this = unsafe { Self::from_raw(work) };
// Access the action within, still pinned.
// SAFETY: It is safe to keep the pin on the interior.
let action = unsafe { this.as_ref().map_unchecked(|p| &p.action) };
action.act();
}
/*
/// Consume this Arc, returning the internal pointer. Needs to have a complementary `from_raw`
/// called to avoid leaking the item.
fn into_raw(this: Pin<Arc<Self>>) -> *const Self {
// SAFETY: This removes the Pin guarantee, but is given as a raw pointer to C, which doesn't
// generally use move.
let this = unsafe { Pin::into_inner_unchecked(this) };
Arc::into_raw(this)
}
*/
/// Given a pointer to the work_q burried within, recover the Pinned Box containing our data.
unsafe fn from_raw(ptr: *const k_work) -> Pin<Arc<Self>> {
// SAFETY: This fixes the pointer back to the beginning of Self. This also assumes the
// pointer is valid.
let ptr = ptr
.cast::<u8>()
.sub(mem::offset_of!(Self, work))
.cast::<Self>();
let this = Arc::from_raw(ptr);
Pin::new_unchecked(this)
}
/// Access the inner action.
pub fn action(&self) -> &T {
&self.action
}
}