chalkydri/cameras/
pipeline.rs1use std::marker::PhantomData;
2use std::sync::Arc;
3
4use gstreamer::{
5 Caps, Device, Element, ElementFactory, FlowSuccess, Pipeline, State, Structure, prelude::*,
6};
7use gstreamer_app::{AppSink, AppSinkCallbacks};
8use tokio::sync::watch;
9
10use crate::{Cfg, config, error::Error, subsystems::Subsystem};
11
12use super::mjpeg::MjpegProc;
13
14pub struct CamPipeline {
15 dev: Device,
16 pipeline: Pipeline,
17
18 input: Element,
19 filter: Element,
20 jpegdec: Element,
21 videoflip: Element,
22 tee: Element,
23
24 pub mjpeg_preproc: PreprocWrap<MjpegProc>,
25}
26impl CamPipeline {
27 pub async fn new(dev: Device, cam_config: config::Camera) -> Self {
28 let pipeline = Pipeline::new();
29
30 let input = dev.create_element(Some("camera")).unwrap();
31
32 let settings = cam_config.settings.clone().unwrap_or_default();
33 let is_mjpeg = settings.format == Some(String::new());
34
35 let filter = ElementFactory::make("capsfilter")
36 .name("capsfilter")
37 .property(
38 "caps",
39 &Caps::builder(if is_mjpeg {
40 "image/jpeg"
41 } else {
42 "video/x-raw"
43 })
44 .field("width", settings.width as i32)
45 .field("height", settings.height as i32)
46 .build(),
54 )
55 .build()
56 .unwrap();
57
58 let jpegdec = ElementFactory::make_with_name("jpegdec", None).unwrap();
60
61 let videoflip = ElementFactory::make("videoflip")
64 .name("videoflip")
65 .property_from_str(
66 "method",
67 &serde_json::to_string(&cam_config.orientation)
68 .unwrap()
69 .trim_matches('"'),
70 )
71 .build()
72 .unwrap();
73
74 let tee = ElementFactory::make("tee").build().unwrap();
80
81 pipeline
82 .add_many([&input, &filter, &jpegdec, &videoflip, &tee])
83 .unwrap();
84
85 let mjpeg_preproc = PreprocWrap::new(MjpegProc::new(&pipeline));
86
87 Self {
88 dev,
89 pipeline,
90
91 input,
92 filter,
93 jpegdec,
94 videoflip,
95 tee,
96
97 mjpeg_preproc,
98 }
99 }
100 fn link_preprocs(&self, cam_config: config::Camera) {
101 if cam_config.subsystems.mjpeg.is_some() {
102 self.mjpeg_preproc.link(self.tee.clone());
103 }
104 }
105 fn unlink_preprocs(&self, cam_config: config::Camera) {
106 if cam_config.subsystems.mjpeg.is_some() {
107 self.mjpeg_preproc.unlink(self.tee.clone());
108 }
109 }
110
111 pub fn start(&self) {
112 self.pipeline.set_state(State::Playing).unwrap();
113 }
114 pub fn pause(&self) {
115 self.pipeline.set_state(State::Paused).unwrap();
116 }
117
118 pub async fn update(&self, cam_config: config::Camera) {
119 self.pause();
120
121 if let Some(settings) = &cam_config.settings {
122 let capsfilter = self.pipeline.by_name("capsfilter").unwrap();
123 let mut old_caps = self
124 .pipeline
125 .by_name("capsfilter")
126 .unwrap()
127 .property::<Caps>("caps")
128 .to_owned();
129 let caps = old_caps.make_mut();
130 caps.set_value("width", (&(settings.width as i32)).into());
131 caps.set_value("height", (&(settings.height as i32)).into());
132 capsfilter.set_property("caps", caps.to_owned());
140
141 self.pipeline.foreach_sink_pad(|_elem, pad| {
143 pad.mark_reconfigure();
144 true
145 });
146
147 let camera = self.pipeline.by_name("camera").unwrap();
148
149 let mut extra_controls = camera.property::<Structure>("extra-controls");
150 extra_controls.set(
151 "auto_exposure",
152 if cam_config.auto_exposure { 3 } else { 1 },
153 );
154 if let Some(manual_exposure) = cam_config.manual_exposure {
155 extra_controls.set("exposure_time_absolute", &manual_exposure);
156 }
157 camera.set_property("extra-controls", extra_controls);
158
159 self.pipeline
160 .by_name("videoflip")
161 .unwrap()
162 .set_property_from_str(
163 "method",
164 &serde_json::to_string(&cam_config.orientation)
165 .unwrap()
166 .trim_matches('"'),
167 );
168
169 if let Some(capriltags_valve) = self.pipeline.by_name("capriltags_valve") {
170 capriltags_valve.set_property("drop", cam_config.subsystems.capriltags.is_none());
171 }
172 }
173
174 self.start();
175 }
176}
177
178pub trait Preprocessor {
180 type Subsys: Subsystem;
181 type Frame: Clone + Send + Sync + 'static;
182
183 fn new(pipeline: &Pipeline) -> Self;
184 fn link(&self, src: Element, sink: Element);
185 fn unlink(&self, src: Element, sink: Element);
186 fn sampler(
187 appsink: &AppSink,
188 tx: watch::Sender<Option<Arc<Self::Frame>>>,
189 ) -> Result<Option<()>, Error>;
190}
191
192pub struct NoopPreproc<S: Subsystem>(PhantomData<S>);
194impl<S: Subsystem> NoopPreproc<S> {
195 #[inline(always)]
196 pub const fn new() -> Self {
197 Self(PhantomData)
198 }
199}
200impl<S: Subsystem> Preprocessor for NoopPreproc<S> {
201 type Subsys = S;
202 type Frame = ();
203
204 fn new(_pipeline: &Pipeline) -> Self {
205 Self::new()
206 }
207 fn link(&self, _src: Element, _dst: Element) {}
208 fn unlink(&self, _src: Element, _dst: Element) {}
209 fn sampler(
210 appsink: &AppSink,
211 tx: watch::Sender<Option<Arc<Self::Frame>>>,
212 ) -> Result<Option<()>, Error> {
213 Ok(None)
214 }
215}
216
217pub struct PreprocWrap<P: Preprocessor> {
219 inner: P,
220 appsink: Element,
221}
222impl<P: Preprocessor> PreprocWrap<P> {
223 pub fn new(inner: P) -> Self {
224 let appsink = ElementFactory::make("appsink")
225 .name("mjpeg_appsink")
226 .build()
227 .unwrap();
228
229 Self { inner, appsink }
230 }
231 pub fn link(&self, src: Element) {
232 let appsink = self.appsink.clone();
233 self.inner.link(src, appsink);
234 }
235 pub fn unlink(&self, src: Element) {
236 let appsink = self.appsink.clone();
237 self.inner.unlink(src, appsink);
238 }
239 pub fn setup_sampler(
240 &self,
241 tx: watch::Sender<Option<Arc<P::Frame>>>,
242 ) -> Result<Option<()>, Error> {
243 let appsink = self.appsink.clone().dynamic_cast::<AppSink>().unwrap();
244 appsink.set_drop(true);
245
246 let tx = tx.clone();
247
248 appsink.set_callbacks(
249 AppSinkCallbacks::builder()
250 .new_sample(move |appsink| {
251 P::sampler(appsink, tx.clone()).unwrap();
252 Ok(FlowSuccess::Ok)
253 })
254 .build(),
255 );
256
257 Ok(None)
258 }
259 pub fn inner(&self) -> &P {
260 &self.inner
261 }
262}