chalkydri/cameras/
pipeline.rs

1use std::marker::PhantomData;
2use std::sync::Arc;
3
4use gstreamer::{
5    Caps, Device, Element, ElementFactory, FlowSuccess, Pipeline, State, Structure, prelude::*,
6};
7use gstreamer_app::{AppSink, AppSinkCallbacks};
8use tokio::sync::watch;
9
10use crate::{Cfg, config, error::Error, subsystems::Subsystem};
11
12use super::mjpeg::MjpegProc;
13
14pub struct CamPipeline {
15    dev: Device,
16    pipeline: Pipeline,
17
18    input: Element,
19    filter: Element,
20    jpegdec: Element,
21    videoflip: Element,
22    tee: Element,
23
24    pub mjpeg_preproc: PreprocWrap<MjpegProc>,
25}
26impl CamPipeline {
27    pub async fn new(dev: Device, cam_config: config::Camera) -> Self {
28        let pipeline = Pipeline::new();
29
30        let input = dev.create_element(Some("camera")).unwrap();
31
32        let settings = cam_config.settings.clone().unwrap_or_default();
33        let is_mjpeg = settings.format == Some(String::new());
34
35        let filter = ElementFactory::make("capsfilter")
36            .name("capsfilter")
37            .property(
38                "caps",
39                &Caps::builder(if is_mjpeg {
40                    "image/jpeg"
41                } else {
42                    "video/x-raw"
43                })
44                .field("width", settings.width as i32)
45                .field("height", settings.height as i32)
46                //.field(
47                //    "framerate",
48                //    &Fraction::new(
49                //        settings.frame_rate.num as i32,
50                //        settings.frame_rate.den as i32,
51                //    ),
52                //)
53                .build(),
54            )
55            .build()
56            .unwrap();
57
58        // MJPEG video must be decoded into raw video before we can use it
59        let jpegdec = ElementFactory::make_with_name("jpegdec", None).unwrap();
60
61        // This element rotates/flips the video to deal with weird
62        // mounting configurations
63        let videoflip = ElementFactory::make("videoflip")
64            .name("videoflip")
65            .property_from_str(
66                "method",
67                &serde_json::to_string(&cam_config.orientation)
68                    .unwrap()
69                    .trim_matches('"'),
70            )
71            .build()
72            .unwrap();
73
74        // This element splits the stream off into multiple branches of the
75        // pipeline:
76        //  - MJPEG stream
77        //  - Calibration
78        //  - Subsystems
79        let tee = ElementFactory::make("tee").build().unwrap();
80
81        pipeline
82            .add_many([&input, &filter, &jpegdec, &videoflip, &tee])
83            .unwrap();
84
85        let mjpeg_preproc = PreprocWrap::new(MjpegProc::new(&pipeline));
86
87        Self {
88            dev,
89            pipeline,
90
91            input,
92            filter,
93            jpegdec,
94            videoflip,
95            tee,
96
97            mjpeg_preproc,
98        }
99    }
100    fn link_preprocs(&self, cam_config: config::Camera) {
101        if cam_config.subsystems.mjpeg.is_some() {
102            self.mjpeg_preproc.link(self.tee.clone());
103        }
104    }
105    fn unlink_preprocs(&self, cam_config: config::Camera) {
106        if cam_config.subsystems.mjpeg.is_some() {
107            self.mjpeg_preproc.unlink(self.tee.clone());
108        }
109    }
110
111    pub fn start(&self) {
112        self.pipeline.set_state(State::Playing).unwrap();
113    }
114    pub fn pause(&self) {
115        self.pipeline.set_state(State::Paused).unwrap();
116    }
117
118    pub async fn update(&self, cam_config: config::Camera) {
119        self.pause();
120
121        if let Some(settings) = &cam_config.settings {
122            let capsfilter = self.pipeline.by_name("capsfilter").unwrap();
123            let mut old_caps = self
124                .pipeline
125                .by_name("capsfilter")
126                .unwrap()
127                .property::<Caps>("caps")
128                .to_owned();
129            let caps = old_caps.make_mut();
130            caps.set_value("width", (&(settings.width as i32)).into());
131            caps.set_value("height", (&(settings.height as i32)).into());
132            //caps.set_value(
133            //            "framerate",
134            //            (&Fraction::new(
135            //                settings.frame_rate.num as i32,
136            //                settings.frame_rate.den as i32,
137            //            )).into(),
138            //);
139            capsfilter.set_property("caps", caps.to_owned());
140
141            // Reconfigure [Caps]
142            self.pipeline.foreach_sink_pad(|_elem, pad| {
143                pad.mark_reconfigure();
144                true
145            });
146
147            let camera = self.pipeline.by_name("camera").unwrap();
148
149            let mut extra_controls = camera.property::<Structure>("extra-controls");
150            extra_controls.set(
151                "auto_exposure",
152                if cam_config.auto_exposure { 3 } else { 1 },
153            );
154            if let Some(manual_exposure) = cam_config.manual_exposure {
155                extra_controls.set("exposure_time_absolute", &manual_exposure);
156            }
157            camera.set_property("extra-controls", extra_controls);
158
159            self.pipeline
160                .by_name("videoflip")
161                .unwrap()
162                .set_property_from_str(
163                    "method",
164                    &serde_json::to_string(&cam_config.orientation)
165                        .unwrap()
166                        .trim_matches('"'),
167                );
168
169            if let Some(capriltags_valve) = self.pipeline.by_name("capriltags_valve") {
170                capriltags_valve.set_property("drop", cam_config.subsystems.capriltags.is_none());
171            }
172        }
173
174        self.start();
175    }
176}
177
178/// A set of Gstreamer elements used to preprocess the stream for a [Subsystem]
179pub trait Preprocessor {
180    type Subsys: Subsystem;
181    type Frame: Clone + Send + Sync + 'static;
182
183    fn new(pipeline: &Pipeline) -> Self;
184    fn link(&self, src: Element, sink: Element);
185    fn unlink(&self, src: Element, sink: Element);
186    fn sampler(
187        appsink: &AppSink,
188        tx: watch::Sender<Option<Arc<Self::Frame>>>,
189    ) -> Result<Option<()>, Error>;
190}
191
192/// A no-op preprocessor for subsystems that don't require any preprocessing
193pub struct NoopPreproc<S: Subsystem>(PhantomData<S>);
194impl<S: Subsystem> NoopPreproc<S> {
195    #[inline(always)]
196    pub const fn new() -> Self {
197        Self(PhantomData)
198    }
199}
200impl<S: Subsystem> Preprocessor for NoopPreproc<S> {
201    type Subsys = S;
202    type Frame = ();
203
204    fn new(_pipeline: &Pipeline) -> Self {
205        Self::new()
206    }
207    fn link(&self, _src: Element, _dst: Element) {}
208    fn unlink(&self, _src: Element, _dst: Element) {}
209    fn sampler(
210        appsink: &AppSink,
211        tx: watch::Sender<Option<Arc<Self::Frame>>>,
212    ) -> Result<Option<()>, Error> {
213        Ok(None)
214    }
215}
216
217/// Wrapper around [Preprocessor] implementations that handles the [AppSink] junk
218pub struct PreprocWrap<P: Preprocessor> {
219    inner: P,
220    appsink: Element,
221}
222impl<P: Preprocessor> PreprocWrap<P> {
223    pub fn new(inner: P) -> Self {
224        let appsink = ElementFactory::make("appsink")
225            .name("mjpeg_appsink")
226            .build()
227            .unwrap();
228
229        Self { inner, appsink }
230    }
231    pub fn link(&self, src: Element) {
232        let appsink = self.appsink.clone();
233        self.inner.link(src, appsink);
234    }
235    pub fn unlink(&self, src: Element) {
236        let appsink = self.appsink.clone();
237        self.inner.unlink(src, appsink);
238    }
239    pub fn setup_sampler(
240        &self,
241        tx: watch::Sender<Option<Arc<P::Frame>>>,
242    ) -> Result<Option<()>, Error> {
243        let appsink = self.appsink.clone().dynamic_cast::<AppSink>().unwrap();
244        appsink.set_drop(true);
245
246        let tx = tx.clone();
247
248        appsink.set_callbacks(
249            AppSinkCallbacks::builder()
250                .new_sample(move |appsink| {
251                    P::sampler(appsink, tx.clone()).unwrap();
252                    Ok(FlowSuccess::Ok)
253                })
254                .build(),
255        );
256
257        Ok(None)
258    }
259    pub fn inner(&self) -> &P {
260        &self.inner
261    }
262}