chalkydri/subsystems/
mod.rs

1use std::fmt::Debug;
2
3use gstreamer::{Element, Pipeline};
4use minint::NtConn;
5use tokio::sync::watch;
6
7use crate::config;
8
9#[cfg(feature = "apriltags")]
10pub mod apriltags;
11#[cfg(feature = "capriltags")]
12pub mod capriltags;
13mod manager;
14#[cfg(feature = "ml")]
15pub mod ml;
16#[cfg(feature = "python")]
17pub mod python;
18
19pub use manager::SubsysManager;
20
21/// A processing subsystem
22///
23/// Subsystems implement different computer vision tasks, such as AprilTags or object detection.
24///
25/// A subsystem should be generic, not something that is only used for some specific aspect of a
26/// game.
27/// For example, note detection for the 2024 game, Crescendo, would go under the object detection
28/// subsystem, rather than a brand new subsystem.
29///
30/// Make sure to pay attention to and respect each subsystem's documentation and structure.
31pub trait Subsystem: Sized {
32    const NAME: &'static str;
33
34    type Config: Debug + Send + Sync + Clone + 'static;
35    type Output: Send + 'static;
36    type Error: Debug + Send + 'static;
37
38    /// Initialize the subsystem
39    async fn init() -> Result<Self, Self::Error>;
40
41    /// Initialize the subsystem's preprocessing pipeline chunk
42    fn preproc(
43        cam_config: config::Camera,
44        pipeline: &Pipeline,
45    ) -> Result<(Element, Element), Self::Error>;
46
47    /// Process a frame
48    async fn process(
49        &self,
50        manager: SubsysManager,
51        nt: NtConn,
52        cam_config: config::Camera,
53        rx: watch::Receiver<Option<Vec<u8>>>,
54    ) -> Result<Self::Output, Self::Error>;
55}
56
57/// Run frame processing loop
58pub async fn frame_proc_loop<F: AsyncFnMut(Vec<u8>) + Sync + Send + 'static>(
59    mut rx: watch::Receiver<Option<Vec<u8>>>,
60    mut func: F,
61) {
62    loop {
63        'inner: loop {
64            match rx.changed().await {
65                Ok(()) => match rx.borrow_and_update().clone() {
66                    Some(frame) => {
67                        futures_executor::block_on(async { func(frame).await });
68                    }
69                    None => {
70                        warn!("waiting on first frame...");
71                    }
72                },
73                Err(err) => {
74                    error!("error waiting for new frame: {err:?}");
75                    break 'inner;
76                }
77            }
78        }
79        tokio::task::yield_now().await;
80    }
81}