chalkydri/cameras/mod.rs
1/***
2 * THIS FILE IS CURSED
3 * PLES SEND HELP
4 */
5
6pub(crate) mod mjpeg;
7pub(crate) mod pipeline;
8mod providers;
9mod publisher;
10
11use gstreamer::{
12 Bin, Bus, BusSyncReply, Caps, Device, DeviceProvider, DeviceProviderFactory, Element,
13 ElementFactory, FlowError, FlowSuccess, Fraction, Message, MessageView, PadDirection, Pipeline,
14 State, Structure, glib::WeakRef, prelude::*,
15};
16
17use gstreamer_app::{AppSink, AppSinkCallbacks};
18use minint::NtConn;
19use pipeline::CamPipeline;
20use providers::{CamProvider, ProviderEvent, V4l2Provider};
21use publisher::CamPublisher;
22#[cfg(feature = "rerun")]
23use re_types::archetypes::EncodedImage;
24use std::{collections::HashMap, mem::ManuallyDrop, sync::Arc};
25use tokio::{
26 sync::{Mutex, MutexGuard, RwLock, mpsc, watch},
27 task::JoinHandle,
28};
29use tracing::Level;
30
31#[cfg(feature = "rerun")]
32use crate::Rerun;
33use crate::{
34 Cfg,
35 calibration::Calibrator,
36 config::{self, CameraSettings, CfgFraction},
37 error::Error,
38 subsystems::Subsystem,
39};
40
41#[derive(Clone)]
42pub struct CameraCtx {
43 cfgg: config::Camera,
44 tee: WeakRef<Element>,
45}
46
47#[derive(Clone)]
48pub struct CamManager {
49 v4l2_prov: Arc<Mutex<V4l2Provider>>,
50 pub pipelines: Arc<RwLock<HashMap<String, CamPipeline>>>,
51
52 restart_tx: mpsc::Sender<()>,
53 dev_msg_tx: mpsc::Sender<ProviderEvent>,
54 pub new_dev_rx: Arc<Mutex<mpsc::Receiver<config::Camera>>>,
55}
56impl CamManager {
57 pub async fn new(nt: NtConn, restart_tx: mpsc::Sender<()>) -> (Self, impl Future<Output = ()>) {
58 let v4l2_prov = Arc::new(Mutex::new(V4l2Provider::init()));
59
60 let pipelines = Arc::new(RwLock::new(HashMap::new()));
61
62 let (dev_msg_tx, dev_msg_rx) = mpsc::channel(20);
63 let (new_dev_tx, new_dev_rx) = mpsc::channel(20);
64
65 let pipelines_ = pipelines.clone();
66 let runner = Self::spawn_dev_msg_handler(pipelines_, dev_msg_rx, new_dev_tx);
67
68 (
69 Self {
70 v4l2_prov,
71 pipelines,
72
73 restart_tx,
74 dev_msg_tx,
75 new_dev_rx: Arc::new(Mutex::new(new_dev_rx)),
76 },
77 runner,
78 )
79 }
80
81 pub async fn start_dev_providers(&self) {
82 self.v4l2_prov
83 .lock()
84 .await
85 .register_handler(self.dev_msg_tx.clone());
86 self.v4l2_prov.lock().await.start();
87 }
88 pub async fn stop_dev_providers(&self) {
89 self.v4l2_prov.lock().await.stop();
90 self.v4l2_prov.lock().await.unregister_handler();
91 }
92
93 async fn spawn_dev_msg_handler(
94 pipelines: Arc<RwLock<HashMap<String, CamPipeline>>>,
95 mut rx: mpsc::Receiver<ProviderEvent>,
96 tx: mpsc::Sender<config::Camera>,
97 ) {
98 debug!("starting dev msg handler...");
99 'outer: while let Some(event) = rx.recv().await {
100 match event {
101 ProviderEvent::Connected(id, dev) => {
102 let id = V4l2Provider::get_id(&dev);
103 println!("idfk: {id}");
104
105 if let Some(cameras) = Cfg.read().await.cameras.clone() {
106 for cam in cameras {
107 if id == cam.id {
108 let pipeline = CamPipeline::new(dev.clone(), cam).await;
109 let _ = pipelines.write().await.insert(id.clone(), pipeline);
110 println!("existing cam: {id}");
111 continue 'outer;
112 }
113 }
114 }
115
116 println!("new cam: {id}");
117 tx.send(config::Camera {
118 id,
119 possible_settings: Some(
120 dev.caps()
121 .unwrap()
122 .iter()
123 //.filter(|cap| cap.name().as_str() == "video/x-raw")
124 .filter_map(|cap| {
125 let width = cap.get::<i32>("width").ok().map(|v| v as u32);
126 let height = cap.get::<i32>("height").ok().map(|v| v as u32);
127 let frame_rate =
128 cap.get::<Fraction>("framerate").ok().map(|v| {
129 CfgFraction {
130 num: v.numer() as u32,
131 den: v.denom() as u32,
132 }
133 });
134 if width.is_none() || height.is_none() {
135 //panic!("Either width or height doesn't exist. Need to look into that...");
136 None
137 } else {
138 Some(CameraSettings {
139 width: width.unwrap(),
140 height: height.unwrap(),
141 frame_rate,
142 format: Some(
143 cap.get::<String>("format").unwrap_or_default(),
144 ),
145 })
146 }
147 })
148 .collect(),
149 ),
150 ..Default::default()
151 })
152 .await
153 .unwrap();
154 }
155 ProviderEvent::Disconnected(id, dev) => {
156 let mut pipelines = pipelines.write().await;
157 if pipelines.contains_key(&id) {
158 pipelines.remove(&id);
159 }
160 }
161 }
162 }
163
164 panic!("dev msg handler died");
165 }
166
167 pub async fn refresh_devices(&self) {
168 let mut cfgg = Cfg.write().await;
169
170 let mut cameras = cfgg.cameras.clone();
171
172 if let Some(ref mut cams) = cameras {
173 for mut cam in cams {
174 cam.online = self.pipelines.read().await.contains_key(&cam.id);
175 }
176 }
177
178 (*cfgg).cameras = cameras;
179 }
180
181 pub async fn update_pipeline(&self, cam_id: String) {
182 let cfgg = Cfg.read().await.clone();
183 let cam_config = cfgg
184 .cameras
185 .unwrap()
186 .iter()
187 .filter(|cam| cam.id == cam_id)
188 .next()
189 .unwrap()
190 .clone();
191 self.pipelines
192 .read()
193 .await
194 .get(&cam_id)
195 .unwrap()
196 .update(cam_config)
197 .await;
198 }
199}
200
201/*
202/// The camera manager
203///
204/// This manages all of the GStreamer pipelines.
205/// It also handles device events.
206#[derive(Clone)]
207pub struct CameraManager {
208 dev_prov: DeviceProvider,
209 pipelines: Arc<RwLock<HashMap<String, Pipeline>>>,
210 calibrators: Arc<Mutex<HashMap<String, Calibrator>>>,
211 mjpeg_streams: Arc<Mutex<HashMap<String, MjpegStream>>>,
212 restart_tx: mpsc::Sender<()>,
213 subsys_man: SubsysManager,
214 publisher: Arc<Mutex<CamPublisher>>,
215}
216impl CameraManager {
217 /// Initialize a camera manager
218 ///
219 /// **You MUST call [gstreamer::init] first.**
220 pub async fn new(nt: NtConn, restart_tx: mpsc::Sender<()>) -> Self {
221 // Make sure gstreamer is initialized
222 gstreamer::assert_initialized();
223
224 // Get a copy of the global configuration
225 let config = {
226 let cfgg = Cfg.read().await;
227 let ret = (*cfgg).clone();
228 drop(cfgg);
229 ret
230 };
231
232 let pipelines = Arc::new(RwLock::new(HashMap::new()));
233 let calibrators: Arc<Mutex<HashMap<String, Calibrator>>> =
234 Arc::new(Mutex::new(HashMap::new()));
235 let mjpeg_streams: Arc<Mutex<HashMap<String, MjpegStream>>> =
236 Arc::new(Mutex::new(HashMap::new()));
237 let publisher = Arc::new(Mutex::new(CamPublisher::new()));
238
239 let subsys_man = SubsysManager::new().await.unwrap();
240
241 // Create a device monitor to watch for new devices
242 let dev_prov = DeviceProviderFactory::find("v4l2deviceprovider")
243 .unwrap()
244 .load()
245 .unwrap()
246 .get()
247 .unwrap();
248
249 let bus = dev_prov.bus();
250
251 let calibrators_ = calibrators.clone();
252 let mjpeg_streams_ = mjpeg_streams.clone();
253 let pipelines_ = pipelines.clone();
254 let publisher_ = publisher.clone();
255 let manager = subsys_man.clone();
256 let rt_handle = tokio::runtime::Handle::current();
257 bus.set_sync_handler(move |_, msg| {
258 let calibrators = &calibrators_;
259 let mjpeg_streams = &mjpeg_streams_;
260 let pipelines = &pipelines_;
261 let publisher = &publisher_;
262 match msg.view() {
263 MessageView::DeviceAdded(msg) => {
264 let dev = msg.device();
265 debug!("got a new device");
266
267 // Create a new pipeline
268 let pipeline = Pipeline::new();
269
270 if let Some(cam_configs) = &config.cameras {
271 let id = Self::get_id(&dev);
272
273 if let Some(cam_config) =
274 cam_configs.clone().iter().filter(|cam| cam.id == id).next()
275 {
276 let span = span!(Level::INFO, "camera", id = cam_config.id);
277 let _enter = span.enter();
278
279 debug!("found a config");
280
281 // Create the camera source
282 let cam = dev.create_element(Some("camera")).unwrap();
283
284 let mut extra_controls = Structure::new_empty("extra-controls");
285 extra_controls.set(
286 "auto_exposure",
287 if cam_config.auto_exposure { 3 } else { 1 },
288 );
289 if let Some(manual_exposure) = cam_config.manual_exposure {
290 extra_controls.set("exposure_time_absolute", &manual_exposure);
291 }
292 cam.set_property("extra-controls", extra_controls);
293
294 // The camera preprocessing part:
295 // [src]> capsfilter -> queue -> tee -> ...
296
297 // Create the elements
298 let settings = cam_config.settings.clone().unwrap_or_default();
299
300 let is_mjpeg = settings.format == Some(String::new());
301
302 let filter = ElementFactory::make("capsfilter")
303 .name("capsfilter")
304 .property(
305 "caps",
306 &Caps::builder(if is_mjpeg {
307 "image/jpeg"
308 } else {
309 "video/x-raw"
310 })
311 .field("width", settings.width as i32)
312 .field("height", settings.height as i32)
313 //.field(
314 // "framerate",
315 // &Fraction::new(
316 // settings.frame_rate.num as i32,
317 // settings.frame_rate.den as i32,
318 // ),
319 //)
320 .build(),
321 )
322 .build()
323 .unwrap();
324
325 // This element rotates/flips the video to deal with weird
326 // mounting configurations
327 let videoflip = ElementFactory::make("videoflip")
328 .name("videoflip")
329 .property_from_str(
330 "method",
331 &serde_json::to_string(&cam_config.orientation)
332 .unwrap()
333 .trim_matches('"'),
334 )
335 .build()
336 .unwrap();
337
338 // This element splits the stream off into multiple branches of the
339 // pipeline:
340 // - MJPEG stream
341 // - Calibration
342 // - Subsystems
343 let tee = ElementFactory::make("tee").build().unwrap();
344
345 if is_mjpeg {
346 let jpegdec =
347 ElementFactory::make_with_name("jpegdec", Some("jpegdec"))
348 .unwrap();
349 // Add them to the pipeline
350 pipeline
351 .add_many([&cam, &filter, &jpegdec, &videoflip, &tee])
352 .unwrap();
353
354 // Link them
355 Element::link_many([&cam, &filter, &jpegdec, &videoflip, &tee])
356 .unwrap();
357 } else {
358 // Add them to the pipeline
359 pipeline
360 .add_many([&cam, &filter, &videoflip, &tee])
361 .unwrap();
362
363 // Link them
364 Element::link_many([&cam, &filter, &videoflip, &tee]).unwrap();
365 }
366
367 debug!("initializing calibrator");
368 let calibrator = Self::add_calib(&pipeline, &tee, cam_config.clone());
369
370 {
371 debug!("adding calibrator");
372 let mut calibrators =
373 tokio::task::block_in_place(|| calibrators.blocking_lock());
374 (*calibrators).insert(id.clone(), calibrator);
375 drop(calibrators);
376 debug!("dropped lock");
377 }
378
379 debug!("initializing mjpeg stream");
380 let mjpeg_stream = Self::add_mjpeg(&pipeline, &tee, cam_config.clone());
381
382 {
383 debug!("adding mjpeg stream");
384 let mut mjpeg_streams =
385 tokio::task::block_in_place(|| mjpeg_streams.blocking_lock());
386 (*mjpeg_streams).insert(id.clone(), mjpeg_stream);
387 drop(mjpeg_streams);
388 debug!("dropped lock");
389 }
390
391 rt_handle.block_on(async {
392 manager.spawn(cam_config.clone(), &pipeline, &tee).await;
393 });
394
395 tokio::task::block_in_place(|| pipelines.blocking_write())
396 .insert(id, pipeline);
397
398 publisher_.lock().await.publish(cam_config).await;
399 }
400 }
401 }
402 _ => {}
403 }
404
405 BusSyncReply::Pass
406 });
407
408 // Start the device provider
409 dev_prov.start().unwrap();
410
411 for (cam_name, pipeline) in pipelines.read().await.clone() {
412 debug!("starting pipeline for {cam_name}");
413
414 // Start the pipeline
415 pipeline.set_state(State::Playing).unwrap();
416
417 // Get the pipeline's bus
418 let bus = pipeline.bus().unwrap();
419 // Hook up event handler for the pipeline
420 bus.set_sync_handler(|_, _| BusSyncReply::Pass);
421 }
422
423 Self {
424 dev_prov,
425 pipelines,
426 calibrators,
427 mjpeg_streams,
428 restart_tx,
429 subsys_man,
430 publisher,
431 }
432 }
433
434 /// Trigger a restart of Chalkydri
435 pub async fn restart(&self) {
436 self.restart_tx.send(()).await.unwrap();
437 }
438
439 /// Get unique identifier for the given device
440 fn get_id(dev: &Device) -> String {
441 dev.property::<Structure>("properties")
442 .get::<String>("device.serial")
443 .unwrap()
444 }
445
446 /// Update the given pipeline
447 pub async fn update_pipeline(&self, dev_id: String) {
448 self.pause(dev_id.clone()).await;
449 {
450 let mut pipelines = self.pipelines.write().await;
451 if let Some(pipeline) = pipelines.get_mut(&dev_id) {
452 // Get a copy of the global configuration
453 let config = {
454 let cfgg = Cfg.read().await;
455 let ret = (*cfgg).clone();
456 drop(cfgg);
457 ret
458 };
459
460 if let Some(cam_configs) = &config.cameras {
461 if let Some(cam_config) = cam_configs
462 .clone()
463 .iter()
464 .filter(|cam| cam.id == dev_id)
465 .next()
466 {
467 if let Some(settings) = &cam_config.settings {
468 let capsfilter = pipeline.by_name("capsfilter").unwrap();
469 let mut old_caps = pipeline
470 .by_name("capsfilter")
471 .unwrap()
472 .property::<Caps>("caps")
473 .to_owned();
474 let caps = old_caps.make_mut();
475 caps.set_value("width", (&(settings.width as i32)).into());
476 caps.set_value("height", (&(settings.height as i32)).into());
477 //caps.set_value(
478 // "framerate",
479 // (&Fraction::new(
480 // settings.frame_rate.num as i32,
481 // settings.frame_rate.den as i32,
482 // )).into(),
483 //);
484 capsfilter.set_property("caps", caps.to_owned());
485
486 // Reconfigure [Caps]
487 pipeline.foreach_sink_pad(|_elem, pad| {
488 pad.mark_reconfigure();
489 true
490 });
491
492 let camera = pipeline.by_name("camera").unwrap();
493
494 let mut extra_controls = camera.property::<Structure>("extra-controls");
495 extra_controls.set(
496 "auto_exposure",
497 if cam_config.auto_exposure { 3 } else { 1 },
498 );
499 if let Some(manual_exposure) = cam_config.manual_exposure {
500 extra_controls.set("exposure_time_absolute", &manual_exposure);
501 }
502 camera.set_property("extra-controls", extra_controls);
503
504 pipeline
505 .by_name("videoflip")
506 .unwrap()
507 .set_property_from_str(
508 "method",
509 &serde_json::to_string(&cam_config.orientation)
510 .unwrap()
511 .trim_matches('"'),
512 );
513
514 if let Some(capriltags_valve) = pipeline.by_name("capriltags_valve") {
515 capriltags_valve.set_property(
516 "drop",
517 cam_config.subsystems.capriltags.is_none(),
518 );
519 }
520 }
521 }
522 }
523 }
524 }
525 self.start(dev_id).await;
526 }
527
528 /// List connected cameras
529 pub fn devices(&self) -> Vec<config::Camera> {
530 let mut devices = Vec::new();
531
532 for dev in self.dev_prov.devices().iter() {
533 let id = Self::get_id(&dev);
534 devices.push(config::Camera {
535 id,
536 possible_settings: Some(
537 dev.caps()
538 .unwrap()
539 .iter()
540 //.filter(|cap| cap.name().as_str() == "video/x-raw")
541 .filter_map(|cap| {
542 let width = cap.get::<i32>("width").ok().map(|v| v as u32);
543 let height = cap.get::<i32>("height").ok().map(|v| v as u32);
544 let frame_rate = cap
545 .get::<Fraction>("framerate").ok().map(|v| CfgFraction {
546 num: v.numer() as u32,
547 den: v.denom() as u32,
548 });
549 if width.is_none() || height.is_none() {
550 error!("Either width or height doesn't exist. Need to look into that...");
551 None
552 } else {
553 Some(CameraSettings {
554 width: width.unwrap(),
555 height: height.unwrap(),
556 frame_rate,
557 format: Some(cap.get::<String>("format").unwrap_or_default()),
558 })
559 }
560 })
561 .collect(),
562 ),
563 ..Default::default()
564 });
565 }
566
567 devices
568 }
569
570 /// Run a calibration step
571 pub async fn calib_step(&self, name: String) -> usize {
572 self.calibrators().await.get_mut(&name).unwrap().step()
573 }
574
575 /// Get an [`MJPEG stream`](MjpegStream)
576 pub async fn mjpeg_stream(&self, name: String) -> MjpegStream {
577 self.mjpeg_streams().await.get(&name).unwrap().clone()
578 }
579
580 /// Add [`subsystem`](Subsystem) to pipeline
581 pub(crate) fn add_subsys<S: Subsystem>(
582 pipeline: &Pipeline,
583 cam: &Element,
584 cam_config: config::Camera,
585 enabled: bool,
586 ) -> watch::Receiver<Option<Vec<u8>>> {
587 let span = span!(Level::INFO, "subsys", subsystem = S::NAME);
588 let _enter = span.enter();
589
590 debug!("initializing preproc pipeline chunk subsystem...");
591 let (input, output) = S::preproc(cam_config.clone(), pipeline).unwrap();
592
593 let valve = ElementFactory::make("valve")
594 .name(&format!("{}_valve", S::NAME))
595 .property("drop", !enabled)
596 .build()
597 .unwrap();
598 let videorate = ElementFactory::make("videorate")
599 .property("max-rate", 40)
600 .property("drop-only", true)
601 .build()
602 .unwrap();
603 let appsink = ElementFactory::make("appsink")
604 .name(&format!("{}_appsink", S::NAME))
605 .build()
606 .unwrap();
607 pipeline.add_many([&valve, &videorate, &appsink]).unwrap();
608
609 debug!("linking preproc pipeline chunk...");
610 Element::link_many([&cam, &valve, &videorate, &input]).unwrap();
611 output.link(&appsink).unwrap();
612
613 let appsink = appsink.dynamic_cast::<AppSink>().unwrap();
614 appsink.set_drop(true);
615
616 let (tx, rx) = watch::channel(None);
617
618 let appsink_ = appsink.clone();
619
620 debug!("setting appsink callbacks...");
621 appsink.set_callbacks(
622 AppSinkCallbacks::builder()
623 .new_sample(move |_| match appsink_.pull_sample() {
624 Ok(sample) => {
625 let buf = sample.buffer().unwrap();
626 let buf = buf
627 .to_owned()
628 .into_mapped_buffer_readable()
629 .unwrap()
630 .to_vec();
631 if let Err(err) = tx.send(Some(buf)) {
632 error!("failed to send frame to subsys appsink: {err:?}");
633 }
634
635 Ok(FlowSuccess::Ok)
636 }
637 Err(err) => {
638 error!("failed to pull sample: {err:?}");
639 Err(FlowError::Error)
640 }
641 })
642 .build(),
643 );
644 appsink.set_async(false);
645
646 debug!("linked subsys junk");
647
648 //let cam_config = cam_config.clone();
649 //std::thread::spawn(move || {
650 // debug!("capriltags worker thread started");
651 // futures_executor::block_on(async move {
652 // debug!("initializing subsystem...");
653 // let mut subsys = S::init(cam_config).await.unwrap();
654
655 // debug!("starting subsystem...");
656 // subsys.process(Nt.clone(), rx).await.unwrap();
657 // });
658 //});
659
660 rx
661 }
662
663 /// Add [Calibrator] to pipeline
664 pub(crate) fn add_calib(
665 pipeline: &Pipeline,
666 cam: &Element,
667 cam_config: config::Camera,
668 ) -> Calibrator {
669 let span = span!(Level::INFO, "calib");
670 let _enter = span.enter();
671
672 let bin = Bin::builder().name("calib").build();
673
674 let valve = ElementFactory::make("valve")
675 .property("drop", false)
676 .build()
677 .unwrap();
678 let queue = ElementFactory::make("queue").build().unwrap();
679 let videoconvertscale = ElementFactory::make("videoconvertscale").build().unwrap();
680 let filter = ElementFactory::make("capsfilter")
681 .property(
682 "caps",
683 &Caps::builder("video/x-raw")
684 .field("width", &1280)
685 .field("height", &720)
686 .field("format", "RGB")
687 .build(),
688 )
689 .build()
690 .unwrap();
691 let appsink = ElementFactory::make("appsink")
692 .name("calib_appsink")
693 .build()
694 .unwrap();
695
696 bin.add_many([&valve, &queue, &videoconvertscale, &filter, &appsink])
697 .unwrap();
698 Element::link_many([&cam, &valve, &queue, &videoconvertscale, &filter, &appsink]).unwrap();
699
700 let appsink = appsink.dynamic_cast::<AppSink>().unwrap();
701 appsink.set_drop(true);
702
703 let (tx, rx) = watch::channel(None);
704
705 debug!("setting appsink callbacks...");
706 appsink.set_callbacks(
707 AppSinkCallbacks::builder()
708 .new_sample(move |appsink| {
709 let sample = appsink.pull_sample().unwrap();
710 let buf = sample.buffer().unwrap();
711 while let Err(err) = tx.send(Some(buf.to_owned())) {
712 error!("error sending frame: {err:?}");
713 }
714
715 Ok(FlowSuccess::Ok)
716 })
717 .build(),
718 );
719
720 debug!("linked subsys junk");
721
722 Calibrator::new(valve.downgrade(), rx)
723 }
724
725 // gamma gamma=2.0 ! fpsdisplaysink ! videorate drop-only=true ! omxh264enc ! mpegtsenc !
726
727 /// Add [MjpegStream] to pipeline
728 pub(crate) fn add_mjpeg(
729 pipeline: &Pipeline,
730 cam: &Element,
731 cam_config: config::Camera,
732 ) -> MjpegStream {
733 let span = span!(Level::INFO, "mjpeg");
734 let _enter = span.enter();
735
736 let valve = ElementFactory::make("valve")
737 .property("drop", false)
738 .build()
739 .unwrap();
740 let videorate = ElementFactory::make("videorate")
741 .property("max-rate", 20)
742 .property("drop-only", true)
743 .build()
744 .unwrap();
745 //let queue = ElementFactory::make("queue").build().unwrap();
746 let videoconvertscale = ElementFactory::make("videoconvertscale")
747 .property_from_str("method", "nearest-neighbour")
748 .build()
749 .unwrap();
750 let filter = ElementFactory::make("capsfilter")
751 .property(
752 "caps",
753 &Caps::builder("video/x-raw")
754 .field("width", &640)
755 .field("height", &480)
756 .field("format", "RGB")
757 .build(),
758 )
759 .build()
760 .unwrap();
761 //let jpegenc = ElementFactory::make("jpegenc")
762 // .property("quality", &85)
763 // .build()
764 // .unwrap();
765 let appsink = ElementFactory::make("appsink")
766 .name("mjpeg_appsink")
767 .build()
768 .unwrap();
769
770 pipeline
771 .add_many([
772 &valve,
773 &videorate,
774 &videoconvertscale,
775 &filter,
776 //&jpegenc,
777 &appsink,
778 ])
779 .unwrap();
780 Element::link_many([
781 &cam,
782 &valve,
783 &videorate,
784 &videoconvertscale,
785 &filter,
786 //&jpegenc,
787 &appsink,
788 ])
789 .unwrap();
790
791 let appsink = appsink.dynamic_cast::<AppSink>().unwrap();
792 appsink.set_drop(true);
793
794 let (tx, rx) = watch::channel(None);
795
796 debug!("setting appsink callbacks...");
797 appsink.set_callbacks(
798 AppSinkCallbacks::builder()
799 .new_sample(move |appsink| {
800 let sample = appsink
801 .pull_sample()
802 .map_err(|_| Error::FailedToPullSample)
803 .unwrap();
804 match sample.buffer() {
805 Some(buf) => {
806 let jpeg = turbojpeg::compress(
807 turbojpeg::Image {
808 width: 640,
809 height: 480,
810 pitch: 640 * 3,
811 format: turbojpeg::PixelFormat::RGB,
812 pixels: buf
813 .to_owned()
814 .into_mapped_buffer_readable()
815 .unwrap()
816 .to_vec()
817 .as_slice(),
818 },
819 50,
820 turbojpeg::Subsamp::None,
821 )
822 .unwrap();
823 while let Err(err) = tx.send(Some(jpeg.to_vec())) {
824 error!("error sending frame: {err:?}");
825 }
826 }
827 None => {
828 error!("failed to get buffer");
829 }
830 }
831
832 Ok(FlowSuccess::Ok)
833 })
834 .build(),
835 );
836
837 debug!("linked subsys junk");
838
839 MjpegStream { rx }
840 }
841
842 pub async fn run(&self, name: String) -> Result<(), Box<dyn std::error::Error>> {
843 // Define the event loop or something?
844 self.pipelines
845 .read()
846 .await
847 .get(&name)
848 .unwrap()
849 .bus()
850 .unwrap()
851 .connect_message(Some("error"), move |_, msg| match msg.view() {
852 MessageView::Error(err) => {
853 error!(
854 "error received from element {:?}: {}",
855 err.src().map(|s| s.path_string()),
856 err.error()
857 );
858 debug!("{:?}", err.debug());
859 }
860 _ => unimplemented!(),
861 });
862
863 Ok(())
864 }
865
866 /// Start the given camera's pipeline
867 pub async fn start(&self, name: String) {
868 // Start the pipeline
869 if let Some(pipeline) = self.pipelines.read().await.get(&name) {
870 pipeline.set_state(State::Playing).unwrap();
871 }
872 //.expect("Unable to set the pipeline to the `Playing` state.");
873 }
874
875 /// Pause the given camera's pipeline
876 pub async fn pause(&self, name: String) {
877 if let Some(pipeline) = self.pipelines.read().await.get(&name) {
878 pipeline
879 .set_state(State::Paused)
880 .expect("Unable to set the pipeline to the `Null` state.");
881 }
882 }
883
884 pub async fn calibrators(&self) -> MutexGuard<HashMap<String, Calibrator>> {
885 self.calibrators.lock().await
886 }
887 pub async fn mjpeg_streams(&self) -> MutexGuard<HashMap<String, MjpegStream>> {
888 self.mjpeg_streams.lock().await
889 }
890}
891*/