chalkydri/cameras/
mod.rs

1/***
2 * THIS FILE IS CURSED
3 * PLES SEND HELP
4 */
5
6pub(crate) mod mjpeg;
7pub(crate) mod pipeline;
8mod providers;
9mod publisher;
10
11use gstreamer::{
12    Bin, Bus, BusSyncReply, Caps, Device, DeviceProvider, DeviceProviderFactory, Element,
13    ElementFactory, FlowError, FlowSuccess, Fraction, Message, MessageView, PadDirection, Pipeline,
14    State, Structure, glib::WeakRef, prelude::*,
15};
16
17use gstreamer_app::{AppSink, AppSinkCallbacks};
18use minint::NtConn;
19use pipeline::CamPipeline;
20use providers::{CamProvider, ProviderEvent, V4l2Provider};
21use publisher::CamPublisher;
22#[cfg(feature = "rerun")]
23use re_types::archetypes::EncodedImage;
24use std::{collections::HashMap, mem::ManuallyDrop, sync::Arc};
25use tokio::{
26    sync::{Mutex, MutexGuard, RwLock, mpsc, watch},
27    task::JoinHandle,
28};
29use tracing::Level;
30
31#[cfg(feature = "rerun")]
32use crate::Rerun;
33use crate::{
34    Cfg,
35    calibration::Calibrator,
36    config::{self, CameraSettings, CfgFraction},
37    error::Error,
38    subsystems::Subsystem,
39};
40
41#[derive(Clone)]
42pub struct CameraCtx {
43    cfgg: config::Camera,
44    tee: WeakRef<Element>,
45}
46
47#[derive(Clone)]
48pub struct CamManager {
49    v4l2_prov: Arc<Mutex<V4l2Provider>>,
50    pub pipelines: Arc<RwLock<HashMap<String, CamPipeline>>>,
51
52    restart_tx: mpsc::Sender<()>,
53    dev_msg_tx: mpsc::Sender<ProviderEvent>,
54    pub new_dev_rx: Arc<Mutex<mpsc::Receiver<config::Camera>>>,
55}
56impl CamManager {
57    pub async fn new(nt: NtConn, restart_tx: mpsc::Sender<()>) -> (Self, impl Future<Output = ()>) {
58        let v4l2_prov = Arc::new(Mutex::new(V4l2Provider::init()));
59
60        let pipelines = Arc::new(RwLock::new(HashMap::new()));
61
62        let (dev_msg_tx, dev_msg_rx) = mpsc::channel(20);
63        let (new_dev_tx, new_dev_rx) = mpsc::channel(20);
64
65        let pipelines_ = pipelines.clone();
66        let runner = Self::spawn_dev_msg_handler(pipelines_, dev_msg_rx, new_dev_tx);
67
68        (
69            Self {
70                v4l2_prov,
71                pipelines,
72
73                restart_tx,
74                dev_msg_tx,
75                new_dev_rx: Arc::new(Mutex::new(new_dev_rx)),
76            },
77            runner,
78        )
79    }
80
81    pub async fn start_dev_providers(&self) {
82        self.v4l2_prov
83            .lock()
84            .await
85            .register_handler(self.dev_msg_tx.clone());
86        self.v4l2_prov.lock().await.start();
87    }
88    pub async fn stop_dev_providers(&self) {
89        self.v4l2_prov.lock().await.stop();
90        self.v4l2_prov.lock().await.unregister_handler();
91    }
92
93    async fn spawn_dev_msg_handler(
94        pipelines: Arc<RwLock<HashMap<String, CamPipeline>>>,
95        mut rx: mpsc::Receiver<ProviderEvent>,
96        tx: mpsc::Sender<config::Camera>,
97    ) {
98        debug!("starting dev msg handler...");
99        'outer: while let Some(event) = rx.recv().await {
100            match event {
101                ProviderEvent::Connected(id, dev) => {
102                    let id = V4l2Provider::get_id(&dev);
103                    println!("idfk: {id}");
104
105                    if let Some(cameras) = Cfg.read().await.cameras.clone() {
106                        for cam in cameras {
107                            if id == cam.id {
108                                let pipeline = CamPipeline::new(dev.clone(), cam).await;
109                                let _ = pipelines.write().await.insert(id.clone(), pipeline);
110                                println!("existing cam: {id}");
111                                continue 'outer;
112                            }
113                        }
114                    }
115
116                    println!("new cam: {id}");
117                    tx.send(config::Camera {
118                        id,
119                        possible_settings: Some(
120                            dev.caps()
121                                .unwrap()
122                                .iter()
123                                //.filter(|cap| cap.name().as_str() == "video/x-raw")
124                                .filter_map(|cap| {
125                                    let width = cap.get::<i32>("width").ok().map(|v| v as u32);
126                                    let height = cap.get::<i32>("height").ok().map(|v| v as u32);
127                                    let frame_rate =
128                                        cap.get::<Fraction>("framerate").ok().map(|v| {
129                                            CfgFraction {
130                                                num: v.numer() as u32,
131                                                den: v.denom() as u32,
132                                            }
133                                        });
134                                    if width.is_none() || height.is_none() {
135                                        //panic!("Either width or height doesn't exist. Need to look into that...");
136                                        None
137                                    } else {
138                                        Some(CameraSettings {
139                                            width: width.unwrap(),
140                                            height: height.unwrap(),
141                                            frame_rate,
142                                            format: Some(
143                                                cap.get::<String>("format").unwrap_or_default(),
144                                            ),
145                                        })
146                                    }
147                                })
148                                .collect(),
149                        ),
150                        ..Default::default()
151                    })
152                    .await
153                    .unwrap();
154                }
155                ProviderEvent::Disconnected(id, dev) => {
156                    let mut pipelines = pipelines.write().await;
157                    if pipelines.contains_key(&id) {
158                        pipelines.remove(&id);
159                    }
160                }
161            }
162        }
163
164        panic!("dev msg handler died");
165    }
166
167    pub async fn refresh_devices(&self) {
168        let mut cfgg = Cfg.write().await;
169
170        let mut cameras = cfgg.cameras.clone();
171
172        if let Some(ref mut cams) = cameras {
173            for mut cam in cams {
174                cam.online = self.pipelines.read().await.contains_key(&cam.id);
175            }
176        }
177
178        (*cfgg).cameras = cameras;
179    }
180
181    pub async fn update_pipeline(&self, cam_id: String) {
182        let cfgg = Cfg.read().await.clone();
183        let cam_config = cfgg
184            .cameras
185            .unwrap()
186            .iter()
187            .filter(|cam| cam.id == cam_id)
188            .next()
189            .unwrap()
190            .clone();
191        self.pipelines
192            .read()
193            .await
194            .get(&cam_id)
195            .unwrap()
196            .update(cam_config)
197            .await;
198    }
199}
200
201/*
202/// The camera manager
203///
204/// This manages all of the GStreamer pipelines.
205/// It also handles device events.
206#[derive(Clone)]
207pub struct CameraManager {
208    dev_prov: DeviceProvider,
209    pipelines: Arc<RwLock<HashMap<String, Pipeline>>>,
210    calibrators: Arc<Mutex<HashMap<String, Calibrator>>>,
211    mjpeg_streams: Arc<Mutex<HashMap<String, MjpegStream>>>,
212    restart_tx: mpsc::Sender<()>,
213    subsys_man: SubsysManager,
214    publisher: Arc<Mutex<CamPublisher>>,
215}
216impl CameraManager {
217    /// Initialize a camera manager
218    ///
219    /// **You MUST call [gstreamer::init] first.**
220    pub async fn new(nt: NtConn, restart_tx: mpsc::Sender<()>) -> Self {
221        // Make sure gstreamer is initialized
222        gstreamer::assert_initialized();
223
224        // Get a copy of the global configuration
225        let config = {
226            let cfgg = Cfg.read().await;
227            let ret = (*cfgg).clone();
228            drop(cfgg);
229            ret
230        };
231
232        let pipelines = Arc::new(RwLock::new(HashMap::new()));
233        let calibrators: Arc<Mutex<HashMap<String, Calibrator>>> =
234            Arc::new(Mutex::new(HashMap::new()));
235        let mjpeg_streams: Arc<Mutex<HashMap<String, MjpegStream>>> =
236            Arc::new(Mutex::new(HashMap::new()));
237        let publisher = Arc::new(Mutex::new(CamPublisher::new()));
238
239        let subsys_man = SubsysManager::new().await.unwrap();
240
241        // Create a device monitor to watch for new devices
242        let dev_prov = DeviceProviderFactory::find("v4l2deviceprovider")
243            .unwrap()
244            .load()
245            .unwrap()
246            .get()
247            .unwrap();
248
249        let bus = dev_prov.bus();
250
251        let calibrators_ = calibrators.clone();
252        let mjpeg_streams_ = mjpeg_streams.clone();
253        let pipelines_ = pipelines.clone();
254        let publisher_ = publisher.clone();
255        let manager = subsys_man.clone();
256        let rt_handle = tokio::runtime::Handle::current();
257        bus.set_sync_handler(move |_, msg| {
258            let calibrators = &calibrators_;
259            let mjpeg_streams = &mjpeg_streams_;
260            let pipelines = &pipelines_;
261            let publisher = &publisher_;
262            match msg.view() {
263                MessageView::DeviceAdded(msg) => {
264                    let dev = msg.device();
265                    debug!("got a new device");
266
267                    // Create a new pipeline
268                    let pipeline = Pipeline::new();
269
270                    if let Some(cam_configs) = &config.cameras {
271                        let id = Self::get_id(&dev);
272
273                        if let Some(cam_config) =
274                            cam_configs.clone().iter().filter(|cam| cam.id == id).next()
275                        {
276                            let span = span!(Level::INFO, "camera", id = cam_config.id);
277                            let _enter = span.enter();
278
279                            debug!("found a config");
280
281                            // Create the camera source
282                            let cam = dev.create_element(Some("camera")).unwrap();
283
284                            let mut extra_controls = Structure::new_empty("extra-controls");
285                            extra_controls.set(
286                                "auto_exposure",
287                                if cam_config.auto_exposure { 3 } else { 1 },
288                            );
289                            if let Some(manual_exposure) = cam_config.manual_exposure {
290                                extra_controls.set("exposure_time_absolute", &manual_exposure);
291                            }
292                            cam.set_property("extra-controls", extra_controls);
293
294                            // The camera preprocessing part:
295                            //   [src]> capsfilter -> queue -> tee -> ...
296
297                            // Create the elements
298                            let settings = cam_config.settings.clone().unwrap_or_default();
299
300                            let is_mjpeg = settings.format == Some(String::new());
301
302                            let filter = ElementFactory::make("capsfilter")
303                                .name("capsfilter")
304                                .property(
305                                    "caps",
306                                    &Caps::builder(if is_mjpeg {
307                                        "image/jpeg"
308                                    } else {
309                                        "video/x-raw"
310                                    })
311                                    .field("width", settings.width as i32)
312                                    .field("height", settings.height as i32)
313                                    //.field(
314                                    //    "framerate",
315                                    //    &Fraction::new(
316                                    //        settings.frame_rate.num as i32,
317                                    //        settings.frame_rate.den as i32,
318                                    //    ),
319                                    //)
320                                    .build(),
321                                )
322                                .build()
323                                .unwrap();
324
325                            // This element rotates/flips the video to deal with weird
326                            // mounting configurations
327                            let videoflip = ElementFactory::make("videoflip")
328                                .name("videoflip")
329                                .property_from_str(
330                                    "method",
331                                    &serde_json::to_string(&cam_config.orientation)
332                                        .unwrap()
333                                        .trim_matches('"'),
334                                )
335                                .build()
336                                .unwrap();
337
338                            // This element splits the stream off into multiple branches of the
339                            // pipeline:
340                            //  - MJPEG stream
341                            //  - Calibration
342                            //  - Subsystems
343                            let tee = ElementFactory::make("tee").build().unwrap();
344
345                            if is_mjpeg {
346                                let jpegdec =
347                                    ElementFactory::make_with_name("jpegdec", Some("jpegdec"))
348                                        .unwrap();
349                                // Add them to the pipeline
350                                pipeline
351                                    .add_many([&cam, &filter, &jpegdec, &videoflip, &tee])
352                                    .unwrap();
353
354                                // Link them
355                                Element::link_many([&cam, &filter, &jpegdec, &videoflip, &tee])
356                                    .unwrap();
357                            } else {
358                                // Add them to the pipeline
359                                pipeline
360                                    .add_many([&cam, &filter, &videoflip, &tee])
361                                    .unwrap();
362
363                                // Link them
364                                Element::link_many([&cam, &filter, &videoflip, &tee]).unwrap();
365                            }
366
367                            debug!("initializing calibrator");
368                            let calibrator = Self::add_calib(&pipeline, &tee, cam_config.clone());
369
370                            {
371                                debug!("adding calibrator");
372                                let mut calibrators =
373                                    tokio::task::block_in_place(|| calibrators.blocking_lock());
374                                (*calibrators).insert(id.clone(), calibrator);
375                                drop(calibrators);
376                                debug!("dropped lock");
377                            }
378
379                            debug!("initializing mjpeg stream");
380                            let mjpeg_stream = Self::add_mjpeg(&pipeline, &tee, cam_config.clone());
381
382                            {
383                                debug!("adding mjpeg stream");
384                                let mut mjpeg_streams =
385                                    tokio::task::block_in_place(|| mjpeg_streams.blocking_lock());
386                                (*mjpeg_streams).insert(id.clone(), mjpeg_stream);
387                                drop(mjpeg_streams);
388                                debug!("dropped lock");
389                            }
390
391                            rt_handle.block_on(async {
392                                manager.spawn(cam_config.clone(), &pipeline, &tee).await;
393                            });
394
395                            tokio::task::block_in_place(|| pipelines.blocking_write())
396                                .insert(id, pipeline);
397
398                            publisher_.lock().await.publish(cam_config).await;
399                        }
400                    }
401                }
402                _ => {}
403            }
404
405            BusSyncReply::Pass
406        });
407
408        // Start the device provider
409        dev_prov.start().unwrap();
410
411        for (cam_name, pipeline) in pipelines.read().await.clone() {
412            debug!("starting pipeline for {cam_name}");
413
414            // Start the pipeline
415            pipeline.set_state(State::Playing).unwrap();
416
417            // Get the pipeline's bus
418            let bus = pipeline.bus().unwrap();
419            // Hook up event handler for the pipeline
420            bus.set_sync_handler(|_, _| BusSyncReply::Pass);
421        }
422
423        Self {
424            dev_prov,
425            pipelines,
426            calibrators,
427            mjpeg_streams,
428            restart_tx,
429            subsys_man,
430            publisher,
431        }
432    }
433
434    /// Trigger a restart of Chalkydri
435    pub async fn restart(&self) {
436        self.restart_tx.send(()).await.unwrap();
437    }
438
439    /// Get unique identifier for the given device
440    fn get_id(dev: &Device) -> String {
441        dev.property::<Structure>("properties")
442            .get::<String>("device.serial")
443            .unwrap()
444    }
445
446    /// Update the given pipeline
447    pub async fn update_pipeline(&self, dev_id: String) {
448        self.pause(dev_id.clone()).await;
449        {
450            let mut pipelines = self.pipelines.write().await;
451            if let Some(pipeline) = pipelines.get_mut(&dev_id) {
452                // Get a copy of the global configuration
453                let config = {
454                    let cfgg = Cfg.read().await;
455                    let ret = (*cfgg).clone();
456                    drop(cfgg);
457                    ret
458                };
459
460                if let Some(cam_configs) = &config.cameras {
461                    if let Some(cam_config) = cam_configs
462                        .clone()
463                        .iter()
464                        .filter(|cam| cam.id == dev_id)
465                        .next()
466                    {
467                        if let Some(settings) = &cam_config.settings {
468                            let capsfilter = pipeline.by_name("capsfilter").unwrap();
469                            let mut old_caps = pipeline
470                                .by_name("capsfilter")
471                                .unwrap()
472                                .property::<Caps>("caps")
473                                .to_owned();
474                            let caps = old_caps.make_mut();
475                            caps.set_value("width", (&(settings.width as i32)).into());
476                            caps.set_value("height", (&(settings.height as i32)).into());
477                            //caps.set_value(
478                            //            "framerate",
479                            //            (&Fraction::new(
480                            //                settings.frame_rate.num as i32,
481                            //                settings.frame_rate.den as i32,
482                            //            )).into(),
483                            //);
484                            capsfilter.set_property("caps", caps.to_owned());
485
486                            // Reconfigure [Caps]
487                            pipeline.foreach_sink_pad(|_elem, pad| {
488                                pad.mark_reconfigure();
489                                true
490                            });
491
492                            let camera = pipeline.by_name("camera").unwrap();
493
494                            let mut extra_controls = camera.property::<Structure>("extra-controls");
495                            extra_controls.set(
496                                "auto_exposure",
497                                if cam_config.auto_exposure { 3 } else { 1 },
498                            );
499                            if let Some(manual_exposure) = cam_config.manual_exposure {
500                                extra_controls.set("exposure_time_absolute", &manual_exposure);
501                            }
502                            camera.set_property("extra-controls", extra_controls);
503
504                            pipeline
505                                .by_name("videoflip")
506                                .unwrap()
507                                .set_property_from_str(
508                                    "method",
509                                    &serde_json::to_string(&cam_config.orientation)
510                                        .unwrap()
511                                        .trim_matches('"'),
512                                );
513
514                            if let Some(capriltags_valve) = pipeline.by_name("capriltags_valve") {
515                                capriltags_valve.set_property(
516                                    "drop",
517                                    cam_config.subsystems.capriltags.is_none(),
518                                );
519                            }
520                        }
521                    }
522                }
523            }
524        }
525        self.start(dev_id).await;
526    }
527
528    /// List connected cameras
529    pub fn devices(&self) -> Vec<config::Camera> {
530        let mut devices = Vec::new();
531
532        for dev in self.dev_prov.devices().iter() {
533            let id = Self::get_id(&dev);
534            devices.push(config::Camera {
535                id,
536                possible_settings: Some(
537                    dev.caps()
538                        .unwrap()
539                        .iter()
540                        //.filter(|cap| cap.name().as_str() == "video/x-raw")
541                        .filter_map(|cap| {
542                            let width = cap.get::<i32>("width").ok().map(|v| v as u32);
543                            let height = cap.get::<i32>("height").ok().map(|v| v as u32);
544                            let frame_rate = cap
545                                .get::<Fraction>("framerate").ok().map(|v| CfgFraction {
546                                    num: v.numer() as u32,
547                                    den: v.denom() as u32,
548                                });
549                            if width.is_none() || height.is_none() {
550                                error!("Either width or height doesn't exist. Need to look into that...");
551                                None
552                            } else {
553                                Some(CameraSettings {
554                                    width: width.unwrap(),
555                                    height: height.unwrap(),
556                                    frame_rate,
557                                    format: Some(cap.get::<String>("format").unwrap_or_default()),
558                                })
559                            }
560                        })
561                        .collect(),
562                ),
563                ..Default::default()
564            });
565        }
566
567        devices
568    }
569
570    /// Run a calibration step
571    pub async fn calib_step(&self, name: String) -> usize {
572        self.calibrators().await.get_mut(&name).unwrap().step()
573    }
574
575    /// Get an [`MJPEG stream`](MjpegStream)
576    pub async fn mjpeg_stream(&self, name: String) -> MjpegStream {
577        self.mjpeg_streams().await.get(&name).unwrap().clone()
578    }
579
580    /// Add [`subsystem`](Subsystem) to pipeline
581    pub(crate) fn add_subsys<S: Subsystem>(
582        pipeline: &Pipeline,
583        cam: &Element,
584        cam_config: config::Camera,
585        enabled: bool,
586    ) -> watch::Receiver<Option<Vec<u8>>> {
587        let span = span!(Level::INFO, "subsys", subsystem = S::NAME);
588        let _enter = span.enter();
589
590        debug!("initializing preproc pipeline chunk subsystem...");
591        let (input, output) = S::preproc(cam_config.clone(), pipeline).unwrap();
592
593        let valve = ElementFactory::make("valve")
594            .name(&format!("{}_valve", S::NAME))
595            .property("drop", !enabled)
596            .build()
597            .unwrap();
598        let videorate = ElementFactory::make("videorate")
599            .property("max-rate", 40)
600            .property("drop-only", true)
601            .build()
602            .unwrap();
603        let appsink = ElementFactory::make("appsink")
604            .name(&format!("{}_appsink", S::NAME))
605            .build()
606            .unwrap();
607        pipeline.add_many([&valve, &videorate, &appsink]).unwrap();
608
609        debug!("linking preproc pipeline chunk...");
610        Element::link_many([&cam, &valve, &videorate, &input]).unwrap();
611        output.link(&appsink).unwrap();
612
613        let appsink = appsink.dynamic_cast::<AppSink>().unwrap();
614        appsink.set_drop(true);
615
616        let (tx, rx) = watch::channel(None);
617
618        let appsink_ = appsink.clone();
619
620        debug!("setting appsink callbacks...");
621        appsink.set_callbacks(
622            AppSinkCallbacks::builder()
623                .new_sample(move |_| match appsink_.pull_sample() {
624                    Ok(sample) => {
625                        let buf = sample.buffer().unwrap();
626                        let buf = buf
627                            .to_owned()
628                            .into_mapped_buffer_readable()
629                            .unwrap()
630                            .to_vec();
631                        if let Err(err) = tx.send(Some(buf)) {
632                            error!("failed to send frame to subsys appsink: {err:?}");
633                        }
634
635                        Ok(FlowSuccess::Ok)
636                    }
637                    Err(err) => {
638                        error!("failed to pull sample: {err:?}");
639                        Err(FlowError::Error)
640                    }
641                })
642                .build(),
643        );
644        appsink.set_async(false);
645
646        debug!("linked subsys junk");
647
648        //let cam_config = cam_config.clone();
649        //std::thread::spawn(move || {
650        //    debug!("capriltags worker thread started");
651        //    futures_executor::block_on(async move {
652        //        debug!("initializing subsystem...");
653        //        let mut subsys = S::init(cam_config).await.unwrap();
654
655        //        debug!("starting subsystem...");
656        //        subsys.process(Nt.clone(), rx).await.unwrap();
657        //    });
658        //});
659
660        rx
661    }
662
663    /// Add [Calibrator] to pipeline
664    pub(crate) fn add_calib(
665        pipeline: &Pipeline,
666        cam: &Element,
667        cam_config: config::Camera,
668    ) -> Calibrator {
669        let span = span!(Level::INFO, "calib");
670        let _enter = span.enter();
671
672        let bin = Bin::builder().name("calib").build();
673
674        let valve = ElementFactory::make("valve")
675            .property("drop", false)
676            .build()
677            .unwrap();
678        let queue = ElementFactory::make("queue").build().unwrap();
679        let videoconvertscale = ElementFactory::make("videoconvertscale").build().unwrap();
680        let filter = ElementFactory::make("capsfilter")
681            .property(
682                "caps",
683                &Caps::builder("video/x-raw")
684                    .field("width", &1280)
685                    .field("height", &720)
686                    .field("format", "RGB")
687                    .build(),
688            )
689            .build()
690            .unwrap();
691        let appsink = ElementFactory::make("appsink")
692            .name("calib_appsink")
693            .build()
694            .unwrap();
695
696        bin.add_many([&valve, &queue, &videoconvertscale, &filter, &appsink])
697            .unwrap();
698        Element::link_many([&cam, &valve, &queue, &videoconvertscale, &filter, &appsink]).unwrap();
699
700        let appsink = appsink.dynamic_cast::<AppSink>().unwrap();
701        appsink.set_drop(true);
702
703        let (tx, rx) = watch::channel(None);
704
705        debug!("setting appsink callbacks...");
706        appsink.set_callbacks(
707            AppSinkCallbacks::builder()
708                .new_sample(move |appsink| {
709                    let sample = appsink.pull_sample().unwrap();
710                    let buf = sample.buffer().unwrap();
711                    while let Err(err) = tx.send(Some(buf.to_owned())) {
712                        error!("error sending frame: {err:?}");
713                    }
714
715                    Ok(FlowSuccess::Ok)
716                })
717                .build(),
718        );
719
720        debug!("linked subsys junk");
721
722        Calibrator::new(valve.downgrade(), rx)
723    }
724
725    // gamma gamma=2.0 ! fpsdisplaysink ! videorate drop-only=true ! omxh264enc ! mpegtsenc !
726
727    /// Add [MjpegStream] to pipeline
728    pub(crate) fn add_mjpeg(
729        pipeline: &Pipeline,
730        cam: &Element,
731        cam_config: config::Camera,
732    ) -> MjpegStream {
733        let span = span!(Level::INFO, "mjpeg");
734        let _enter = span.enter();
735
736        let valve = ElementFactory::make("valve")
737            .property("drop", false)
738            .build()
739            .unwrap();
740        let videorate = ElementFactory::make("videorate")
741            .property("max-rate", 20)
742            .property("drop-only", true)
743            .build()
744            .unwrap();
745        //let queue = ElementFactory::make("queue").build().unwrap();
746        let videoconvertscale = ElementFactory::make("videoconvertscale")
747            .property_from_str("method", "nearest-neighbour")
748            .build()
749            .unwrap();
750        let filter = ElementFactory::make("capsfilter")
751            .property(
752                "caps",
753                &Caps::builder("video/x-raw")
754                    .field("width", &640)
755                    .field("height", &480)
756                    .field("format", "RGB")
757                    .build(),
758            )
759            .build()
760            .unwrap();
761        //let jpegenc = ElementFactory::make("jpegenc")
762        //    .property("quality", &85)
763        //    .build()
764        //    .unwrap();
765        let appsink = ElementFactory::make("appsink")
766            .name("mjpeg_appsink")
767            .build()
768            .unwrap();
769
770        pipeline
771            .add_many([
772                &valve,
773                &videorate,
774                &videoconvertscale,
775                &filter,
776                //&jpegenc,
777                &appsink,
778            ])
779            .unwrap();
780        Element::link_many([
781            &cam,
782            &valve,
783            &videorate,
784            &videoconvertscale,
785            &filter,
786            //&jpegenc,
787            &appsink,
788        ])
789        .unwrap();
790
791        let appsink = appsink.dynamic_cast::<AppSink>().unwrap();
792        appsink.set_drop(true);
793
794        let (tx, rx) = watch::channel(None);
795
796        debug!("setting appsink callbacks...");
797        appsink.set_callbacks(
798            AppSinkCallbacks::builder()
799                .new_sample(move |appsink| {
800                    let sample = appsink
801                        .pull_sample()
802                        .map_err(|_| Error::FailedToPullSample)
803                        .unwrap();
804                    match sample.buffer() {
805                        Some(buf) => {
806                            let jpeg = turbojpeg::compress(
807                                turbojpeg::Image {
808                                    width: 640,
809                                    height: 480,
810                                    pitch: 640 * 3,
811                                    format: turbojpeg::PixelFormat::RGB,
812                                    pixels: buf
813                                        .to_owned()
814                                        .into_mapped_buffer_readable()
815                                        .unwrap()
816                                        .to_vec()
817                                        .as_slice(),
818                                },
819                                50,
820                                turbojpeg::Subsamp::None,
821                            )
822                            .unwrap();
823                            while let Err(err) = tx.send(Some(jpeg.to_vec())) {
824                                error!("error sending frame: {err:?}");
825                            }
826                        }
827                        None => {
828                            error!("failed to get buffer");
829                        }
830                    }
831
832                    Ok(FlowSuccess::Ok)
833                })
834                .build(),
835        );
836
837        debug!("linked subsys junk");
838
839        MjpegStream { rx }
840    }
841
842    pub async fn run(&self, name: String) -> Result<(), Box<dyn std::error::Error>> {
843        // Define the event loop or something?
844        self.pipelines
845            .read()
846            .await
847            .get(&name)
848            .unwrap()
849            .bus()
850            .unwrap()
851            .connect_message(Some("error"), move |_, msg| match msg.view() {
852                MessageView::Error(err) => {
853                    error!(
854                        "error received from element {:?}: {}",
855                        err.src().map(|s| s.path_string()),
856                        err.error()
857                    );
858                    debug!("{:?}", err.debug());
859                }
860                _ => unimplemented!(),
861            });
862
863        Ok(())
864    }
865
866    /// Start the given camera's pipeline
867    pub async fn start(&self, name: String) {
868        // Start the pipeline
869        if let Some(pipeline) = self.pipelines.read().await.get(&name) {
870            pipeline.set_state(State::Playing).unwrap();
871        }
872        //.expect("Unable to set the pipeline to the `Playing` state.");
873    }
874
875    /// Pause the given camera's pipeline
876    pub async fn pause(&self, name: String) {
877        if let Some(pipeline) = self.pipelines.read().await.get(&name) {
878            pipeline
879                .set_state(State::Paused)
880                .expect("Unable to set the pipeline to the `Null` state.");
881        }
882    }
883
884    pub async fn calibrators(&self) -> MutexGuard<HashMap<String, Calibrator>> {
885        self.calibrators.lock().await
886    }
887    pub async fn mjpeg_streams(&self) -> MutexGuard<HashMap<String, MjpegStream>> {
888        self.mjpeg_streams.lock().await
889    }
890}
891*/