zephyr/sync/channel.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720
//! Close-to-Zephyr channels
//!
//! This module attempts to provide a mechanism as close as possible to `crossbeam-channel` as we
//! can get, directly using Zephyr primitives.
//!
//! The channels are built around `k_queue` in Zephyr. As is the case with most Zephyr types,
//! these are typically statically allocated. Similar to the other close-to-zephyr primitives,
//! this means that there is a constructor that can directly take one of these primitives.
//!
//! In other words, `zephyr::sys::Queue` is a Rust friendly implementation of `k_queue` in Zephyr.
//! This module provides `Sender` and `Receiver`, which can be cloned and behave as if they had an
//! internal `Arc` inside them, but without the overhead of an actual Arc.
//!
//! ## IRQ safety
//!
//! These channels are usable from IRQ context on Zephyr in very limited situations. Notably, all
//! of the following must be true:
//! - The channel has been created with `bounded()`, which pre-allocates all of the messages.
//! - If the type `T` has a Drop implementation, this implementation can be called from IRQ context.
//! - Only `try_send` or `try_recv` are used on the channel.
//!
//! The requirement for Drop is only strictly true if the IRQ handler calls `try_recv` and drops
//! received message. If the message is *always* sent over another channel or otherwise not
//! dropped, it *might* be safe to use these messages.
//!
//! ## Dropping of Sender/Receiver
//!
//! Crossbeam channels support detecting when all senders or all receivers have been dropped on a
//! channel, which will cause the handles on the other end to error, including waking up current
//! threads waiting on those channels.
//!
//! At this time, this isn't implementable in Zephyr, as there is no API to wake up all threads
//! blocked on a given `k_queue`. As such, this scenario is not supported. What actually happens
//! is that when all senders or receivers on a channel are dropped, operations on the other end of
//! the channel may just block (or queue forever with unbounded queues). If all handles (both
//! sender and receiver) are dropped, the last drop will cause a panic. It maybe be better to just
//! leak the entire channel, as any data associated with the channels would be leaked at this point,
//! including the underlying Zephyr `k_queue`. Until APIs are added to Zephyr to allow the channel
//! information to be safely freed, these can't actually be freed.
extern crate alloc;
use alloc::boxed::Box;
use core::cell::UnsafeCell;
use core::ffi::c_void;
use core::fmt;
use core::future::Future;
use core::marker::PhantomData;
use core::mem::MaybeUninit;
use core::pin::Pin;
use core::task::Poll;
use crate::kio::ContextExt;
use crate::sys::queue::Queue;
use crate::time::{Duration, Forever, NoWait, Timeout};
mod counter;
// The zephyr queue does not allocate or manage the data of the messages, so we need to handle
// allocation as such as well. However, we don't need to manage anything, so it is sufficient to
// simply Box the message, leak it out of the box, and give it to Zephyr, and then on receipt, wrap
// it back into a Box, and give it to the recipient.
/// Create a multi-producer multi-consumer channel of unbounded capacity, using an existing Queue
/// object.
///
/// The messages are allocated individually as "Box", and the queue is managed by the underlying
/// Zephyr queue.
pub fn unbounded_from<T>(queue: Queue) -> (Sender<T>, Receiver<T>) {
let (s, r) = counter::new(queue);
let s = Sender {
flavor: SenderFlavor::Unbounded {
queue: s,
_phantom: PhantomData,
},
};
let r = Receiver {
flavor: ReceiverFlavor::Unbounded {
queue: r,
_phantom: PhantomData,
},
};
(s, r)
}
/// Create a multi-producer multi-consumer channel of unbounded capacity.
///
/// The messages are allocated individually as "Box". The underlying Zephyr queue will be
/// dynamically allocated.
///
/// **Note**: Currently Drop is not propertly supported on Zephyr. If all senders are dropped, any
/// receivers will likely be blocked forever. Any data that has been queued and not received will
/// be leaked when all receivers have been droped.
pub fn unbounded<T>() -> (Sender<T>, Receiver<T>) {
unbounded_from(Queue::new())
}
/// Create a multi-producer multi-consumer channel with bounded capacity.
///
/// The messages are allocated at channel creation time. If there are no messages at `send` time,
/// send will block (possibly waiting for a timeout).
///
/// At this time, Zephyr does not support crossbeam's 0 capacity queues, which are also called
/// a rendezvous, where both threads wait until in the same region. `bounded` will panic if called
/// with a capacity of zero.
pub fn bounded<T>(cap: usize) -> (Sender<T>, Receiver<T>) {
if cap == 0 {
panic!("Zero capacity queues are not supported on Zephyr");
}
let (s, r) = counter::new(Bounded::new(cap));
let s = Sender {
flavor: SenderFlavor::Bounded(s),
};
let r = Receiver {
flavor: ReceiverFlavor::Bounded(r),
};
(s, r)
}
/// The underlying type for Messages through Zephyr's [`Queue`].
///
/// This wrapper is used internally to wrap user messages through the queue. It is not useful in
/// safe code, but may be useful for implementing other types of message queues.
#[repr(C)]
pub struct Message<T> {
/// The private data used by the kernel to enqueue messages and such.
_private: usize,
/// The actual data being transported.
data: T,
}
impl<T> Message<T> {
fn new(data: T) -> Message<T> {
Message { _private: 0, data }
}
}
/// The sending side of a channel.
pub struct Sender<T> {
flavor: SenderFlavor<T>,
}
// SAFETY: We implement Send and Sync for the Sender itself, as long as the underlying data can be
// sent. The underlying zephyr primitives used for the channel provide the Sync safety.
unsafe impl<T: Send> Send for Sender<T> {}
unsafe impl<T: Send> Sync for Sender<T> {}
impl<T> Sender<T> {
/// Waits for a message to be sent into the channel, but only for a limited time.
///
/// This call will block until the send operation can proceed or the operation times out.
///
/// For unbounded channels, this will perform an allocation (and always send immediately). For
/// bounded channels, no allocation will be performed.
pub fn send_timeout<D>(&self, msg: T, timeout: D) -> Result<(), SendError<T>>
where
D: Into<Timeout>,
{
match &self.flavor {
SenderFlavor::Unbounded { queue, .. } => {
let msg = Box::new(Message::new(msg));
let msg = Box::into_raw(msg);
// SAFETY: Zephyr requires, for as long as the message remains in the queue, that
// the first `usize` of the message be available for its use, and that the message
// not be moved. The `into_raw` of the box consumes the box, so this is entirely a
// raw pointer with no references from the Rust code. The item is not used until it
// has been removed from the queue.
unsafe {
queue.send(msg as *mut c_void);
}
}
SenderFlavor::Bounded(chan) => {
// Retrieve a message buffer from the free list.
// SAFETY: Please see the safety discussion on `Bounded` on what makes this safe.
let buf = unsafe { chan.free.recv(timeout) };
if buf.is_null() {
return Err(SendError(msg));
}
let buf = buf as *mut Message<T>;
unsafe {
buf.write(Message::new(msg));
chan.chan.send(buf as *mut c_void);
}
}
}
Ok(())
}
/// Sends a message over the given channel. Waiting if necessary.
///
/// For unbounded channels, this will allocate space for a message, and immediately send it.
/// For bounded channels, this will block until a message slot is available, and then send the
/// message.
pub fn send(&self, msg: T) -> Result<(), SendError<T>> {
self.send_timeout(msg, Forever)
}
/// Attempts to send a message into the channel without blocking.
///
/// This message will either send a message into the channel immediately or return an error if
/// the channel is full. The returned error contains the original message.
pub fn try_send(&self, msg: T) -> Result<(), SendError<T>> {
self.send_timeout(msg, NoWait)
}
}
// A little note about the Unpin constraint here. Because Futures are pinned in Rust Async code,
// and the future stores the messages, we can only send and receive messages that aren't pinned.
impl<T: Unpin> Sender<T> {
/// Waits for a message to be sent into the channel, but only for a limited time. Async
/// version.
///
/// This has the same behavior as [`send_timeout`], but as an Async function.
///
/// [`send_timeout`]: Sender::send_timeout
pub fn send_timeout_async<'a>(
&'a self,
msg: T,
timeout: impl Into<Timeout>,
) -> impl Future<Output = Result<(), SendError<T>>> + 'a {
SendFuture {
sender: self,
msg: Some(msg),
timeout: timeout.into(),
waited: false,
}
}
/// Sends a message over the given channel, waiting if necessary. Async version.
pub async fn send_async(&self, msg: T) -> Result<(), SendError<T>> {
self.send_timeout_async(msg, Forever).await
}
// Note that there is no async version of `try_send`.
}
/// The implementation of Future for Sender::send_timeout_async.
struct SendFuture<'a, T: Unpin> {
sender: &'a Sender<T>,
msg: Option<T>,
timeout: Timeout,
waited: bool,
}
impl<'a, T: Unpin> Future for SendFuture<'a, T> {
type Output = Result<(), SendError<T>>;
fn poll(
self: Pin<&mut Self>,
cx: &mut core::task::Context<'_>,
) -> core::task::Poll<Self::Output> {
/*
let this = unsafe {
Pin::get_unchecked_mut(self)
};
*/
let this = Pin::get_mut(self);
// Take the message out in preparation to try sending it. It is a logic error if the unwrap
// fails.
let msg = this.msg.take().unwrap();
// Try sending the message, with no timeout.
let msg = match this.sender.try_send(msg) {
Ok(()) => return Poll::Ready(Ok(())),
Err(SendError(msg)) => msg,
};
if this.waited {
// We already waited, and no message, so give the messagre back, indiciating a timeout.
return Poll::Ready(Err(SendError(msg)));
}
// Send didn't happen, put the message back to have for the next call.
this.msg = Some(msg);
// Otherwise, schedule to wake up on receipt or timeout.
match &this.sender.flavor {
SenderFlavor::Unbounded { .. } => {
panic!("Implementation error: unbounded queues should never fail");
}
SenderFlavor::Bounded(chan) => {
cx.add_queue(&chan.free, this.timeout);
}
}
Poll::Pending
}
}
impl<T> Drop for Sender<T> {
fn drop(&mut self) {
match &self.flavor {
SenderFlavor::Unbounded { queue, .. } => {
// SAFETY: It is not possible to free from Zephyr queues. This means drop has to
// either leak or panic. We will panic for now.
unsafe {
queue.release(|_| {
panic!("Unbounded queues cannot currently be dropped");
})
}
}
SenderFlavor::Bounded(chan) => {
// SAFETY: It is not possible to free from Zephyr queues. This means drop has to
// either leak or panic. We will panic for now.
unsafe {
chan.release(|_| {
panic!("Bounded queues cannot currently be dropped");
})
}
}
}
}
}
impl<T> Clone for Sender<T> {
fn clone(&self) -> Self {
let flavor = match &self.flavor {
SenderFlavor::Unbounded { queue, .. } => SenderFlavor::Unbounded {
queue: queue.acquire(),
_phantom: PhantomData,
},
SenderFlavor::Bounded(chan) => SenderFlavor::Bounded(chan.acquire()),
};
Sender { flavor }
}
}
/// The "flavor" of a sender. This maps to the type of channel.
enum SenderFlavor<T> {
/// An unbounded queue. Messages are allocated with Box, and sent directly.
Unbounded {
queue: counter::Sender<Queue>,
_phantom: PhantomData<T>,
},
Bounded(counter::Sender<Bounded<T>>),
}
impl<T: fmt::Debug> fmt::Debug for Sender<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Sender")
}
}
/// The receiving side of a channel.
pub struct Receiver<T> {
flavor: ReceiverFlavor<T>,
}
// SAFETY: We implement Send and Sync for the Receiver itself, as long as the underlying data can be
// sent. The underlying zephyr primitives used for the channel provide the Sync safety.
unsafe impl<T: Send> Send for Receiver<T> {}
unsafe impl<T: Send> Sync for Receiver<T> {}
impl<T> Receiver<T> {
/// Waits for a message to be received from the channel, but only for a limited time.
///
/// If the channel is empty and not disconnected, this call will block until the receive
/// operation can proceed or the operation times out.
/// wake up and return an error.
pub fn recv_timeout<D>(&self, timeout: D) -> Result<T, RecvError>
where
D: Into<Timeout>,
{
match &self.flavor {
ReceiverFlavor::Unbounded { queue, .. } => {
// SAFETY: Messages were sent by converting a Box through `into_raw()`.
let msg = unsafe {
let msg = queue.recv(timeout);
if msg.is_null() {
return Err(RecvError);
}
msg
};
let msg = msg as *mut Message<T>;
// SAFETY: After receiving the message from the queue's `recv` method, Zephyr will
// no longer use the `usize` at the beginning, and it is safe for us to convert the
// message back into a box, copy the field out of it, an allow the Box itself to be
// freed.
let msg = unsafe { Box::from_raw(msg) };
Ok(msg.data)
}
ReceiverFlavor::Bounded(chan) => {
// SAFETY: Please see the safety discussion on Bounded.
let rawbuf = unsafe {
let buf = chan.chan.recv(timeout);
if buf.is_null() {
return Err(RecvError);
}
buf
};
let buf = rawbuf as *mut Message<T>;
let msg: Message<T> = unsafe { buf.read() };
unsafe {
chan.free.send(buf as *mut c_void);
}
Ok(msg.data)
}
}
}
/// Blocks the current thread until a message is received or the channel is empty and
/// disconnected.
///
/// If the channel is empty and not disconnected, this call will block until the receive
/// operation can proceed.
pub fn recv(&self) -> Result<T, RecvError> {
self.recv_timeout(Forever)
}
/// Attempts to receive a message from the channel without blocking.
///
/// This method will either receive a message from the channel immediately, or return an error
/// if the channel is empty.
///
/// This method is safe to use from IRQ context, if and only if the channel was created as a
/// bounded channel.
pub fn try_recv(&self) -> Result<T, RecvError> {
self.recv_timeout(NoWait)
}
}
// Note that receive doesn't need the Unpin constraint, as we aren't storing any message.
impl<T> Receiver<T> {
/// Waits for a message to be received from the channel, but only for a limited time.
/// Async version.
///
/// If the channel is empty and not disconnected, this call will block until the receive
/// operation can proceed or the operation times out.
/// wake up and return an error.
pub fn recv_timeout_async<'a>(
&'a self,
timeout: impl Into<Timeout>,
) -> impl Future<Output = Result<T, RecvError>> + 'a {
RecvFuture {
receiver: self,
timeout: timeout.into(),
waited: false,
}
}
/// Blocks the current thread until a message is received or the channel is empty and
/// disconnected. Async version.
///
/// If the channel is empty and not disconnected, this call will block until the receive
/// operation can proceed.
pub async fn recv_async(&self) -> Result<T, RecvError> {
self.recv_timeout_async(Forever).await
}
/// Return a reference to the inner queue.
fn as_queue(&self) -> &Queue {
match &self.flavor {
ReceiverFlavor::Unbounded { queue, .. } => queue,
ReceiverFlavor::Bounded(chan) => &chan.chan,
}
}
}
impl<T> Drop for Receiver<T> {
fn drop(&mut self) {
match &self.flavor {
ReceiverFlavor::Unbounded { queue, .. } => {
// SAFETY: As the Zephyr channel cannot be freed we must either leak or panic.
// Chose panic for now.
unsafe {
queue.release(|_| {
panic!("Unnbounded channel cannot be dropped");
})
}
}
ReceiverFlavor::Bounded(chan) => {
// SAFETY: As the Zephyr channel cannot be freed we must either leak or panic.
// Chose panic for now.
unsafe {
chan.release(|_| {
panic!("Bounded channels cannot be dropped");
})
}
}
}
}
}
impl<T> Clone for Receiver<T> {
fn clone(&self) -> Self {
let flavor = match &self.flavor {
ReceiverFlavor::Unbounded { queue, .. } => ReceiverFlavor::Unbounded {
queue: queue.acquire(),
_phantom: PhantomData,
},
ReceiverFlavor::Bounded(chan) => ReceiverFlavor::Bounded(chan.acquire()),
};
Receiver { flavor }
}
}
impl<T> fmt::Debug for Receiver<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Sender")
}
}
struct RecvFuture<'a, T> {
receiver: &'a Receiver<T>,
timeout: Timeout,
waited: bool,
}
impl<'a, T> Future for RecvFuture<'a, T> {
type Output = Result<T, RecvError>;
fn poll(mut self: Pin<&mut Self>, cx: &mut core::task::Context<'_>) -> Poll<Self::Output> {
// Try to receive a message.
if let Ok(msg) = self.receiver.try_recv() {
return Poll::Ready(Ok(msg));
}
if self.waited {
// Wait already happened, so this is a timeout.
return Poll::Ready(Err(RecvError));
}
// Otherwise, schedule to wakeup on receipt or timeout.
cx.add_queue(self.receiver.as_queue(), self.timeout);
self.waited = true;
Poll::Pending
}
}
/// The "flavor" of a receiver. This maps to the type of the channel.
enum ReceiverFlavor<T> {
/// An unbounded queue. Messages were allocated with Box, and will be freed upon receipt.
Unbounded {
queue: counter::Receiver<Queue>,
_phantom: PhantomData<T>,
},
Bounded(counter::Receiver<Bounded<T>>),
}
type Slot<T> = UnsafeCell<MaybeUninit<Message<T>>>;
// SAFETY: A Bounded channel contains an array of messages that are allocated together in a Box.
// This Box is held for an eventual future implementation that is able to free the messages, once
// they have all been taken from Zephyr's knowledge. For now, they could also be leaked.
// It is a `Pin<Box<...>>` because it is important that the data never be moved, as we maintain
// pointers to the items in Zephyr queues.
//
// There are two `Queue`s used here: `free` is the free list of messages that are not being sent,
// and `chan` for messages that have been sent but not received. Initially, all slots are placed on
// the `free` queue. At any time, outside of the calls in this module, each slot must live inside
// of one of the two queues. This means that the messages cannot be moved or accessed, except
// inside of the individual send/receive operations. Zephyr makes use of the initial `usize` field
// at the beginning of each Slot.
//
// We use MaybeUninit for the messages to avoid needing to initialize the messages. The individual
// messages are accessed through pointers when they are retrieved from the Zephyr `Queue`, so these
// values are never marked as initialized.
/// Bounded channel implementation.
struct Bounded<T> {
/// The messages themselves. This Box owns the allocation of the messages, although it is
/// unsafe to drop this with any messages stored in either of the Zephyr queues.
///
/// The UnsafeCell is needed to indicate that this data is handled outside of what Rust is aware
/// of. MaybeUninit allows us to create this without allocation.
_slots: Pin<Box<[Slot<T>]>>,
/// The free queue, holds messages that aren't be used. The free queue has to be in a box,
/// because it cannot move after the constructor runs. The chan is fine to wait until first
/// use, when the object has settled. As we are also boxing the messages, this isn't really
/// that costly.
free: Box<Queue>,
/// The channel queue. These are messages that have been sent and are waiting to be received.
chan: Queue,
}
impl<T> Bounded<T> {
fn new(cap: usize) -> Self {
let slots: Box<[Slot<T>]> = (0..cap)
.map(|_| UnsafeCell::new(MaybeUninit::uninit()))
.collect();
let slots = Box::into_pin(slots);
let free = Box::new(Queue::new());
let chan = Queue::new();
// Add each of the boxes to the free list.
for slot in slots.as_ref().iter() {
// SAFETY: See safety discussion on `Bounded`.
unsafe {
free.send(slot.get() as *mut c_void);
}
}
Bounded {
_slots: slots,
free,
chan,
}
}
}
// TODO: Move to err
/// An error returned from the [`send`] method.
///
/// The message could not be sent because the channel is disconnected.
///
/// The error contains the message so it can be recovered.
///
/// [`send`]: Sender::send
#[derive(PartialEq, Eq, Clone, Copy)]
pub struct SendError<T>(pub T);
impl<T> fmt::Debug for SendError<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
"SendError(..)".fmt(f)
}
}
/// An error returned from the [`recv`] method.
///
/// A message could not be received because the channel is empty and disconnected.
///
/// [`recv`]: Receiver::recv
#[derive(PartialEq, Eq, Clone, Copy, Debug)]
pub struct RecvError;
/// Wait loop
///
/// A common scenario for async work tasks is to wait for, and process messages off of a queue, but
/// to also wake periodically to perform some task.
///
/// This performs this periodic loop. It has some support for handling the case where the
/// processing takes longer than the loop duration, but it merely re-schedules for the period past
/// the current time. This means the phase of the period will change upon dropped ticks.
///
/// Each time an event is received, 'handle' is called with `Some(ev)`. In addition, periodically
/// (based on `period`) `handle` will be called with None.
///
/// **Note**: It needs to be a single handler, because this closure will frequently be in a move
/// closure, and this would force shared data to be shared in Sync types of wrappers. The main
/// purpose of combining the event handling and the periodic is to avoid that.
///
/// Note that also, if the timer is just barely able to run, it will still be scheduled "shortly" in
/// the future.
///
/// T is the type of the messages expected to be received.
///
/// TODO: This function, in general, is completely worthless without Rust support for [async
/// closures](https://rust-lang.github.io/rfcs/3668-async-closures.html).
pub async fn event_loop_useless<T, EF, EFF>(
events: Receiver<T>,
period: Duration,
mut handle: EF,
) -> !
where
EF: FnMut(Option<T>) -> EFF,
EFF: Future<Output = ()>,
{
// Start with a deadline 'period' out in the future.
let mut next = crate::time::now() + period;
loop {
if let Ok(ev) = events.recv_timeout_async(next).await {
handle(Some(ev)).await;
continue;
}
// We either reached, or exceeded our timeout.
handle(None).await;
// Calculate the next time.
next += period;
// If this is passed, just reschedule after our Duration from "now".
let now = crate::time::now();
if next <= now {
next = now + period;
}
}
}
/// Wait loop, as a macro.
///
/// This is the `event loop` above, implemented as a macro, which becomes more useful as the async
/// closures aren't needed.
#[macro_export]
macro_rules! event_loop {
($events:expr, $period:expr,
Some($eventvar:ident) => $event_body:block,
None => $periodic_body: block $(,)?) =>
{
let events = $events;
let period = $period;
let mut next = $crate::time::now() + period;
loop {
if let Ok($eventvar) = events.recv_timeout_async(next).await {
$event_body
} else {
// Note that ':block' above requires the braces, so this body can't introduce
// bindings that shadow our local variables.
$periodic_body
next += period;
// If this is passed, just reschedule after our Duration from "now".
let now = $crate::time::now();
if next <= now {
::log::warn!("periodic overflow: {} ticks, {}:{}",
(now - next).ticks(),
core::file!(), core::line!());
next = now + period;
}
}
}
};
}