chalkydri/
subsystem.rs

1use std::fmt::Debug;
2
3use gstreamer::{Buffer, Element, Pipeline, glib::WeakRef};
4use gstreamer_app::AppSink;
5use minint::NtConn;
6use tokio::sync::watch;
7
8use crate::config;
9
10//pub type Buffer = Arc<Vec<u8>>;
11
12/// A processing subsystem
13///
14/// Subsystems implement different computer vision tasks, such as AprilTags or object detection.
15///
16/// A subsystem should be generic, not something that is only used for some specific aspect of a
17/// game.
18/// For example, note detection for the 2024 game, Crescendo, would go under the object detection
19/// subsystem, rather than a brand new subsystem.
20///
21/// Make sure to pay attention to and respect each subsystem's documentation and structure.
22pub trait Subsystem: Sized {
23    const NAME: &'static str;
24
25    type Config: Debug + Send + Sync + Clone + 'static;
26    type Output: Send + 'static;
27    type Error: Debug + Send + 'static;
28
29    /// Initialize the subsystem
30    async fn init(cam_config: config::Camera) -> Result<Self, Self::Error>;
31
32    /// Initialize the subsystem's preprocessing pipeline chunk
33    fn preproc(
34        config: config::Camera,
35        pipeline: &Pipeline,
36    ) -> Result<(Element, Element), Self::Error>;
37
38    /// Process a frame
39    async fn process(
40        &mut self,
41        nt: NtConn,
42        rx: watch::Receiver<Option<Buffer>>,
43    ) -> Result<Self::Output, Self::Error>;
44}
45
46pub struct SubsysCtx {
47    pub appsink: WeakRef<AppSink>,
48}
49
50pub async fn frame_proc_loop(
51    mut rx: watch::Receiver<Option<Buffer>>,
52    mut func: impl AsyncFnMut(Buffer),
53) {
54    loop {
55        'inner: loop {
56            match rx.changed().await {
57                Ok(()) => match rx.borrow_and_update().clone() {
58                    Some(frame) => {
59                        func(frame).await;
60                    }
61                    None => {
62                        warn!("waiting on first frame...");
63                    }
64                },
65                Err(err) => {
66                    error!("error waiting for new frame: {err:?}");
67                    break 'inner;
68                }
69            }
70        }
71        tokio::task::yield_now().await;
72    }
73}