chalkydri/subsystems/
mod.rs1use std::fmt::Debug;
2
3use gstreamer::{Element, Pipeline};
4use minint::NtConn;
5use tokio::sync::watch;
6
7use crate::config;
8
9#[cfg(feature = "apriltags")]
10pub mod apriltags;
11#[cfg(feature = "capriltags")]
12pub mod capriltags;
13mod manager;
14#[cfg(feature = "ml")]
15pub mod ml;
16#[cfg(feature = "python")]
17pub mod python;
18
19pub use manager::SubsysManager;
20
21pub trait Subsystem: Sized {
32 const NAME: &'static str;
33
34 type Config: Debug + Send + Sync + Clone + 'static;
35 type Output: Send + 'static;
36 type Error: Debug + Send + 'static;
37
38 async fn init() -> Result<Self, Self::Error>;
40
41 fn preproc(
43 cam_config: config::Camera,
44 pipeline: &Pipeline,
45 ) -> Result<(Element, Element), Self::Error>;
46
47 async fn process(
49 &self,
50 manager: SubsysManager,
51 nt: NtConn,
52 cam_config: config::Camera,
53 rx: watch::Receiver<Option<Vec<u8>>>,
54 ) -> Result<Self::Output, Self::Error>;
55}
56
57pub async fn frame_proc_loop<F: AsyncFnMut(Vec<u8>) + Sync + Send + 'static>(
59 mut rx: watch::Receiver<Option<Vec<u8>>>,
60 mut func: F,
61) {
62 loop {
63 'inner: loop {
64 match rx.changed().await {
65 Ok(()) => match rx.borrow_and_update().clone() {
66 Some(frame) => {
67 futures_executor::block_on(async { func(frame).await });
68 }
69 None => {
70 warn!("waiting on first frame...");
71 }
72 },
73 Err(err) => {
74 error!("error waiting for new frame: {err:?}");
75 break 'inner;
76 }
77 }
78 }
79 tokio::task::yield_now().await;
80 }
81}