chalkydri/
subsystem.rs
1use std::fmt::Debug;
2
3use gstreamer::{Buffer, Element, Pipeline, glib::WeakRef};
4use gstreamer_app::AppSink;
5use minint::NtConn;
6use tokio::sync::watch;
7
8use crate::config;
9
10pub trait Subsystem: Sized {
23 const NAME: &'static str;
24
25 type Config: Debug + Send + Sync + Clone + 'static;
26 type Output: Send + 'static;
27 type Error: Debug + Send + 'static;
28
29 async fn init(cam_config: config::Camera) -> Result<Self, Self::Error>;
31
32 fn preproc(
34 config: config::Camera,
35 pipeline: &Pipeline,
36 ) -> Result<(Element, Element), Self::Error>;
37
38 async fn process(
40 &mut self,
41 nt: NtConn,
42 rx: watch::Receiver<Option<Buffer>>,
43 ) -> Result<Self::Output, Self::Error>;
44}
45
46pub struct SubsysCtx {
47 pub appsink: WeakRef<AppSink>,
48}
49
50pub async fn frame_proc_loop(
51 mut rx: watch::Receiver<Option<Buffer>>,
52 mut func: impl AsyncFnMut(Buffer),
53) {
54 loop {
55 'inner: loop {
56 match rx.changed().await {
57 Ok(()) => match rx.borrow_and_update().clone() {
58 Some(frame) => {
59 func(frame).await;
60 }
61 None => {
62 warn!("waiting on first frame...");
63 }
64 },
65 Err(err) => {
66 error!("error waiting for new frame: {err:?}");
67 break 'inner;
68 }
69 }
70 }
71 tokio::task::yield_now().await;
72 }
73}