zephyr/sync/channel.rs
1//! Close-to-Zephyr channels
2//!
3//! This module attempts to provide a mechanism as close as possible to `crossbeam-channel` as we
4//! can get, directly using Zephyr primitives.
5//!
6//! The channels are built around `k_queue` in Zephyr. As is the case with most Zephyr types,
7//! these are typically statically allocated. Similar to the other close-to-zephyr primitives,
8//! this means that there is a constructor that can directly take one of these primitives.
9//!
10//! In other words, `zephyr::sys::Queue` is a Rust friendly implementation of `k_queue` in Zephyr.
11//! This module provides `Sender` and `Receiver`, which can be cloned and behave as if they had an
12//! internal `Arc` inside them, but without the overhead of an actual Arc.
13//!
14//! ## IRQ safety
15//!
16//! These channels are usable from IRQ context on Zephyr in very limited situations. Notably, all
17//! of the following must be true:
18//! - The channel has been created with `bounded()`, which pre-allocates all of the messages.
19//! - If the type `T` has a Drop implementation, this implementation can be called from IRQ context.
20//! - Only `try_send` or `try_recv` are used on the channel.
21//!
22//! The requirement for Drop is only strictly true if the IRQ handler calls `try_recv` and drops
23//! received message. If the message is *always* sent over another channel or otherwise not
24//! dropped, it *might* be safe to use these messages.
25//!
26//! ## Dropping of Sender/Receiver
27//!
28//! Crossbeam channels support detecting when all senders or all receivers have been dropped on a
29//! channel, which will cause the handles on the other end to error, including waking up current
30//! threads waiting on those channels.
31//!
32//! At this time, this isn't implementable in Zephyr, as there is no API to wake up all threads
33//! blocked on a given `k_queue`. As such, this scenario is not supported. What actually happens
34//! is that when all senders or receivers on a channel are dropped, operations on the other end of
35//! the channel may just block (or queue forever with unbounded queues). If all handles (both
36//! sender and receiver) are dropped, the last drop will cause a panic. It maybe be better to just
37//! leak the entire channel, as any data associated with the channels would be leaked at this point,
38//! including the underlying Zephyr `k_queue`. Until APIs are added to Zephyr to allow the channel
39//! information to be safely freed, these can't actually be freed.
40
41extern crate alloc;
42
43use alloc::boxed::Box;
44
45use core::cell::UnsafeCell;
46use core::ffi::c_void;
47use core::fmt;
48use core::marker::PhantomData;
49use core::mem::MaybeUninit;
50use core::pin::Pin;
51
52use crate::sys::queue::Queue;
53use crate::time::{Forever, NoWait, Timeout};
54
55mod counter;
56
57// The zephyr queue does not allocate or manage the data of the messages, so we need to handle
58// allocation as such as well. However, we don't need to manage anything, so it is sufficient to
59// simply Box the message, leak it out of the box, and give it to Zephyr, and then on receipt, wrap
60// it back into a Box, and give it to the recipient.
61
62/// Create a multi-producer multi-consumer channel of unbounded capacity, using an existing Queue
63/// object.
64///
65/// The messages are allocated individually as "Box", and the queue is managed by the underlying
66/// Zephyr queue.
67pub fn unbounded_from<T>(queue: Queue) -> (Sender<T>, Receiver<T>) {
68 let (s, r) = counter::new(queue);
69 let s = Sender {
70 flavor: SenderFlavor::Unbounded {
71 queue: s,
72 _phantom: PhantomData,
73 },
74 };
75 let r = Receiver {
76 flavor: ReceiverFlavor::Unbounded {
77 queue: r,
78 _phantom: PhantomData,
79 },
80 };
81 (s, r)
82}
83
84/// Create a multi-producer multi-consumer channel of unbounded capacity.
85///
86/// The messages are allocated individually as "Box". The underlying Zephyr queue will be
87/// dynamically allocated.
88///
89/// **Note**: Currently Drop is not propertly supported on Zephyr. If all senders are dropped, any
90/// receivers will likely be blocked forever. Any data that has been queued and not received will
91/// be leaked when all receivers have been droped.
92pub fn unbounded<T>() -> (Sender<T>, Receiver<T>) {
93 unbounded_from(Queue::new())
94}
95
96/// Create a multi-producer multi-consumer channel with bounded capacity.
97///
98/// The messages are allocated at channel creation time. If there are no messages at `send` time,
99/// send will block (possibly waiting for a timeout).
100///
101/// At this time, Zephyr does not support crossbeam's 0 capacity queues, which are also called
102/// a rendezvous, where both threads wait until in the same region. `bounded` will panic if called
103/// with a capacity of zero.
104pub fn bounded<T>(cap: usize) -> (Sender<T>, Receiver<T>) {
105 if cap == 0 {
106 panic!("Zero capacity queues are not supported on Zephyr");
107 }
108
109 let (s, r) = counter::new(Bounded::new(cap));
110 let s = Sender {
111 flavor: SenderFlavor::Bounded(s),
112 };
113 let r = Receiver {
114 flavor: ReceiverFlavor::Bounded(r),
115 };
116 (s, r)
117}
118
119/// The underlying type for Messages through Zephyr's [`Queue`].
120///
121/// This wrapper is used internally to wrap user messages through the queue. It is not useful in
122/// safe code, but may be useful for implementing other types of message queues.
123#[repr(C)]
124pub struct Message<T> {
125 /// The private data used by the kernel to enqueue messages and such.
126 _private: usize,
127 /// The actual data being transported.
128 data: T,
129}
130
131impl<T> Message<T> {
132 fn new(data: T) -> Message<T> {
133 Message { _private: 0, data }
134 }
135}
136
137/// The sending side of a channel.
138pub struct Sender<T> {
139 flavor: SenderFlavor<T>,
140}
141
142// SAFETY: We implement Send and Sync for the Sender itself, as long as the underlying data can be
143// sent. The underlying zephyr primitives used for the channel provide the Sync safety.
144unsafe impl<T: Send> Send for Sender<T> {}
145unsafe impl<T: Send> Sync for Sender<T> {}
146
147impl<T> Sender<T> {
148 /// Waits for a message to be sent into the channel, but only for a limited time.
149 ///
150 /// This call will block until the send operation can proceed or the operation times out.
151 ///
152 /// For unbounded channels, this will perform an allocation (and always send immediately). For
153 /// bounded channels, no allocation will be performed.
154 pub fn send_timeout<D>(&self, msg: T, timeout: D) -> Result<(), SendError<T>>
155 where
156 D: Into<Timeout>,
157 {
158 match &self.flavor {
159 SenderFlavor::Unbounded { queue, .. } => {
160 let msg = Box::new(Message::new(msg));
161 let msg = Box::into_raw(msg);
162 // SAFETY: Zephyr requires, for as long as the message remains in the queue, that
163 // the first `usize` of the message be available for its use, and that the message
164 // not be moved. The `into_raw` of the box consumes the box, so this is entirely a
165 // raw pointer with no references from the Rust code. The item is not used until it
166 // has been removed from the queue.
167 unsafe {
168 queue.send(msg as *mut c_void);
169 }
170 }
171 SenderFlavor::Bounded(chan) => {
172 // Retrieve a message buffer from the free list.
173 // SAFETY: Please see the safety discussion on `Bounded` on what makes this safe.
174 let buf = unsafe { chan.free.recv(timeout) };
175 if buf.is_null() {
176 return Err(SendError(msg));
177 }
178 let buf = buf as *mut Message<T>;
179 unsafe {
180 buf.write(Message::new(msg));
181 chan.chan.send(buf as *mut c_void);
182 }
183 }
184 }
185 Ok(())
186 }
187
188 /// Sends a message over the given channel. Waiting if necessary.
189 ///
190 /// For unbounded channels, this will allocate space for a message, and immediately send it.
191 /// For bounded channels, this will block until a message slot is available, and then send the
192 /// message.
193 pub fn send(&self, msg: T) -> Result<(), SendError<T>> {
194 self.send_timeout(msg, Forever)
195 }
196
197 /// Attempts to send a message into the channel without blocking.
198 ///
199 /// This message will either send a message into the channel immediately or return an error if
200 /// the channel is full. The returned error contains the original message.
201 pub fn try_send(&self, msg: T) -> Result<(), SendError<T>> {
202 self.send_timeout(msg, NoWait)
203 }
204}
205
206impl<T> Drop for Sender<T> {
207 fn drop(&mut self) {
208 match &self.flavor {
209 SenderFlavor::Unbounded { queue, .. } => {
210 // SAFETY: It is not possible to free from Zephyr queues. This means drop has to
211 // either leak or panic. We will panic for now.
212 unsafe {
213 queue.release(|_| {
214 panic!("Unbounded queues cannot currently be dropped");
215 })
216 }
217 }
218 SenderFlavor::Bounded(chan) => {
219 // SAFETY: It is not possible to free from Zephyr queues. This means drop has to
220 // either leak or panic. We will panic for now.
221 unsafe {
222 chan.release(|_| {
223 panic!("Bounded queues cannot currently be dropped");
224 })
225 }
226 }
227 }
228 }
229}
230
231impl<T> Clone for Sender<T> {
232 fn clone(&self) -> Self {
233 let flavor = match &self.flavor {
234 SenderFlavor::Unbounded { queue, .. } => SenderFlavor::Unbounded {
235 queue: queue.acquire(),
236 _phantom: PhantomData,
237 },
238 SenderFlavor::Bounded(chan) => SenderFlavor::Bounded(chan.acquire()),
239 };
240
241 Sender { flavor }
242 }
243}
244
245/// The "flavor" of a sender. This maps to the type of channel.
246enum SenderFlavor<T> {
247 /// An unbounded queue. Messages are allocated with Box, and sent directly.
248 Unbounded {
249 queue: counter::Sender<Queue>,
250 _phantom: PhantomData<T>,
251 },
252 Bounded(counter::Sender<Bounded<T>>),
253}
254
255impl<T: fmt::Debug> fmt::Debug for Sender<T> {
256 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
257 write!(f, "Sender")
258 }
259}
260
261/// The receiving side of a channel.
262pub struct Receiver<T> {
263 flavor: ReceiverFlavor<T>,
264}
265
266// SAFETY: We implement Send and Sync for the Receiver itself, as long as the underlying data can be
267// sent. The underlying zephyr primitives used for the channel provide the Sync safety.
268unsafe impl<T: Send> Send for Receiver<T> {}
269unsafe impl<T: Send> Sync for Receiver<T> {}
270
271impl<T> Receiver<T> {
272 /// Waits for a message to be received from the channel, but only for a limited time.
273 ///
274 /// If the channel is empty and not disconnected, this call will block until the receive
275 /// operation can proceed or the operation times out.
276 /// wake up and return an error.
277 pub fn recv_timeout<D>(&self, timeout: D) -> Result<T, RecvError>
278 where
279 D: Into<Timeout>,
280 {
281 match &self.flavor {
282 ReceiverFlavor::Unbounded { queue, .. } => {
283 // SAFETY: Messages were sent by converting a Box through `into_raw()`.
284 let msg = unsafe {
285 let msg = queue.recv(timeout);
286 if msg.is_null() {
287 return Err(RecvError);
288 }
289 msg
290 };
291 let msg = msg as *mut Message<T>;
292 // SAFETY: After receiving the message from the queue's `recv` method, Zephyr will
293 // no longer use the `usize` at the beginning, and it is safe for us to convert the
294 // message back into a box, copy the field out of it, an allow the Box itself to be
295 // freed.
296 let msg = unsafe { Box::from_raw(msg) };
297 Ok(msg.data)
298 }
299 ReceiverFlavor::Bounded(chan) => {
300 // SAFETY: Please see the safety discussion on Bounded.
301 let rawbuf = unsafe {
302 let buf = chan.chan.recv(timeout);
303 if buf.is_null() {
304 return Err(RecvError);
305 }
306 buf
307 };
308 let buf = rawbuf as *mut Message<T>;
309 let msg: Message<T> = unsafe { buf.read() };
310 unsafe {
311 chan.free.send(buf as *mut c_void);
312 }
313 Ok(msg.data)
314 }
315 }
316 }
317
318 /// Blocks the current thread until a message is received or the channel is empty and
319 /// disconnected.
320 ///
321 /// If the channel is empty and not disconnected, this call will block until the receive
322 /// operation can proceed.
323 pub fn recv(&self) -> Result<T, RecvError> {
324 self.recv_timeout(Forever)
325 }
326
327 /// Attempts to receive a message from the channel without blocking.
328 ///
329 /// This method will either receive a message from the channel immediately, or return an error
330 /// if the channel is empty.
331 ///
332 /// This method is safe to use from IRQ context, if and only if the channel was created as a
333 /// bounded channel.
334 pub fn try_recv(&self) -> Result<T, RecvError> {
335 self.recv_timeout(NoWait)
336 }
337}
338
339impl<T> Drop for Receiver<T> {
340 fn drop(&mut self) {
341 match &self.flavor {
342 ReceiverFlavor::Unbounded { queue, .. } => {
343 // SAFETY: As the Zephyr channel cannot be freed we must either leak or panic.
344 // Chose panic for now.
345 unsafe {
346 queue.release(|_| {
347 panic!("Unnbounded channel cannot be dropped");
348 })
349 }
350 }
351 ReceiverFlavor::Bounded(chan) => {
352 // SAFETY: As the Zephyr channel cannot be freed we must either leak or panic.
353 // Chose panic for now.
354 unsafe {
355 chan.release(|_| {
356 panic!("Bounded channels cannot be dropped");
357 })
358 }
359 }
360 }
361 }
362}
363
364impl<T> Clone for Receiver<T> {
365 fn clone(&self) -> Self {
366 let flavor = match &self.flavor {
367 ReceiverFlavor::Unbounded { queue, .. } => ReceiverFlavor::Unbounded {
368 queue: queue.acquire(),
369 _phantom: PhantomData,
370 },
371 ReceiverFlavor::Bounded(chan) => ReceiverFlavor::Bounded(chan.acquire()),
372 };
373
374 Receiver { flavor }
375 }
376}
377
378impl<T> fmt::Debug for Receiver<T> {
379 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
380 write!(f, "Sender")
381 }
382}
383
384/// The "flavor" of a receiver. This maps to the type of the channel.
385enum ReceiverFlavor<T> {
386 /// An unbounded queue. Messages were allocated with Box, and will be freed upon receipt.
387 Unbounded {
388 queue: counter::Receiver<Queue>,
389 _phantom: PhantomData<T>,
390 },
391 Bounded(counter::Receiver<Bounded<T>>),
392}
393
394type Slot<T> = UnsafeCell<MaybeUninit<Message<T>>>;
395
396// SAFETY: A Bounded channel contains an array of messages that are allocated together in a Box.
397// This Box is held for an eventual future implementation that is able to free the messages, once
398// they have all been taken from Zephyr's knowledge. For now, they could also be leaked.
399// It is a `Pin<Box<...>>` because it is important that the data never be moved, as we maintain
400// pointers to the items in Zephyr queues.
401//
402// There are two `Queue`s used here: `free` is the free list of messages that are not being sent,
403// and `chan` for messages that have been sent but not received. Initially, all slots are placed on
404// the `free` queue. At any time, outside of the calls in this module, each slot must live inside
405// of one of the two queues. This means that the messages cannot be moved or accessed, except
406// inside of the individual send/receive operations. Zephyr makes use of the initial `usize` field
407// at the beginning of each Slot.
408//
409// We use MaybeUninit for the messages to avoid needing to initialize the messages. The individual
410// messages are accessed through pointers when they are retrieved from the Zephyr `Queue`, so these
411// values are never marked as initialized.
412/// Bounded channel implementation.
413struct Bounded<T> {
414 /// The messages themselves. This Box owns the allocation of the messages, although it is
415 /// unsafe to drop this with any messages stored in either of the Zephyr queues.
416 ///
417 /// The UnsafeCell is needed to indicate that this data is handled outside of what Rust is aware
418 /// of. MaybeUninit allows us to create this without allocation.
419 _slots: Pin<Box<[Slot<T>]>>,
420 /// The free queue, holds messages that aren't be used. The free queue has to be in a box,
421 /// because it cannot move after the constructor runs. The chan is fine to wait until first
422 /// use, when the object has settled. As we are also boxing the messages, this isn't really
423 /// that costly.
424 free: Box<Queue>,
425 /// The channel queue. These are messages that have been sent and are waiting to be received.
426 chan: Queue,
427}
428
429impl<T> Bounded<T> {
430 fn new(cap: usize) -> Self {
431 let slots: Box<[Slot<T>]> = (0..cap)
432 .map(|_| UnsafeCell::new(MaybeUninit::uninit()))
433 .collect();
434 let slots = Box::into_pin(slots);
435
436 let free = Box::new(Queue::new());
437 let chan = Queue::new();
438
439 // Add each of the boxes to the free list.
440 for slot in slots.as_ref().iter() {
441 // SAFETY: See safety discussion on `Bounded`.
442 unsafe {
443 free.send(slot.get() as *mut c_void);
444 }
445 }
446
447 Bounded {
448 _slots: slots,
449 free,
450 chan,
451 }
452 }
453}
454
455// TODO: Move to err
456
457/// An error returned from the [`send`] method.
458///
459/// The message could not be sent because the channel is disconnected.
460///
461/// The error contains the message so it can be recovered.
462///
463/// [`send`]: Sender::send
464#[derive(PartialEq, Eq, Clone, Copy)]
465pub struct SendError<T>(pub T);
466
467impl<T> fmt::Debug for SendError<T> {
468 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
469 "SendError(..)".fmt(f)
470 }
471}
472
473/// An error returned from the [`recv`] method.
474///
475/// A message could not be received because the channel is empty and disconnected.
476///
477/// [`recv`]: Receiver::recv
478#[derive(PartialEq, Eq, Clone, Copy, Debug)]
479pub struct RecvError;