chalkydri/
cameras.rs

1/***
2 * THIS FILE IS CURSED
3 * PLES SEND HELP
4 */
5
6use actix_web::web::Bytes;
7use futures_core::Stream;
8use gstreamer::{
9    Buffer, BusSyncReply, Caps, DeviceProvider, DeviceProviderFactory, Element, ElementFactory,
10    FlowSuccess, Fraction, MessageView, Pipeline, State, Structure, glib::WeakRef, prelude::*,
11};
12
13use gstreamer_app::{AppSink, AppSinkCallbacks};
14use minint::{NtConn, NtTopic};
15#[cfg(feature = "rerun")]
16use re_types::archetypes::EncodedImage;
17use std::{collections::HashMap, mem::ManuallyDrop, sync::Arc, task::Poll};
18use tokio::sync::{Mutex, MutexGuard, RwLock, watch};
19
20#[cfg(feature = "rerun")]
21use crate::Rerun;
22use crate::{
23    Cfg,
24    calibration::Calibrator,
25    config::{self, CameraSettings, CfgFraction},
26    error::Error,
27    subsys::capriltags::CApriltagsDetector,
28    subsystem::Subsystem,
29};
30
31#[derive(Clone)]
32pub struct CameraCtx {
33    cfgg: config::Camera,
34    tee: WeakRef<Element>,
35}
36
37#[derive(Clone)]
38pub struct MjpegStream {
39    rx: watch::Receiver<Option<Buffer>>,
40}
41impl Stream for MjpegStream {
42    type Item = Result<Bytes, Error>;
43    fn poll_next(
44        self: std::pin::Pin<&mut Self>,
45        cx: &mut std::task::Context<'_>,
46    ) -> std::task::Poll<Option<Self::Item>> {
47        loop {
48            match self.rx.has_changed() {
49                Ok(true) => {
50                    info!("working!!!");
51                    let mut bytes = Vec::new();
52                    bytes.clear();
53                    if let Some(frame) = self.get_mut().rx.borrow_and_update().as_deref() {
54                        bytes.extend_from_slice(
55                            &[
56                                b"--frame\r\nContent-Length: ",
57                                frame.size().to_string().as_bytes(),
58                                b"\r\nContent-Type: image/jpeg\r\n\r\n",
59                            ]
60                            .concat(),
61                        );
62                        bytes.extend_from_slice(
63                            frame
64                                .map_readable()
65                                .map_err(|_| Error::FailedToMapBuffer)?
66                                .as_slice(),
67                        );
68                    }
69
70                    return Poll::Ready(Some(Ok(bytes.into())));
71                }
72                Ok(false) => {}
73                Err(err) => {
74                    error!("error getting frame: {err:?}");
75
76                    return Poll::Ready(None);
77                }
78            }
79        }
80    }
81}
82
83#[derive(Clone)]
84pub struct CameraManager {
85    //dev_mon: DeviceMonitor,
86    dev_prov: DeviceProvider,
87    pipelines: Arc<RwLock<HashMap<String, Pipeline>>>,
88    calibrators: Arc<Mutex<HashMap<String, Calibrator>>>,
89    mjpeg_streams: Arc<Mutex<HashMap<String, MjpegStream>>>,
90}
91impl CameraManager {
92    pub async fn new(nt: NtConn) -> Self {
93        // Make sure gstreamer is initialized
94        gstreamer::assert_initialized();
95
96        // Get a copy of the global configuration
97        let config = {
98            let cfgg = Cfg.read().await;
99            let ret = (*cfgg).clone();
100            drop(cfgg);
101            ret
102        };
103
104        // Create a device monitor to watch for new devices
105        let dev_prov = DeviceProviderFactory::find("v4l2deviceprovider")
106            .unwrap()
107            .load()
108            .unwrap()
109            .get()
110            .unwrap();
111
112        // Create the pipeline
113        let pipelines = Arc::new(RwLock::new(HashMap::new()));
114
115        //let bus = dev_mon.bus();
116        let bus = dev_prov.bus();
117
118        let calibrators: Arc<Mutex<HashMap<String, Calibrator>>> =
119            Arc::new(Mutex::new(HashMap::new()));
120
121        let mjpeg_streams: Arc<Mutex<HashMap<String, MjpegStream>>> =
122            Arc::new(Mutex::new(HashMap::new()));
123
124        let calibrators_ = calibrators.clone();
125        let mjpeg_streams_ = mjpeg_streams.clone();
126        let pipelines_ = pipelines.clone();
127        bus.set_sync_handler(move |_, msg| {
128            let calibrators = &calibrators_;
129            let mjpeg_streams = &mjpeg_streams_;
130            let pipelines = &pipelines_;
131
132            // Upgrade the weak ref to work with the pipeline
133            let pipeline = Pipeline::new();
134
135            match msg.view() {
136                MessageView::DeviceAdded(msg) => {
137                    let dev = msg.device();
138                    debug!("got a new device");
139
140                    if let Some(cam_configs) = &config.cameras {
141                        let id = dev
142                            .property::<Structure>("properties")
143                            .get::<String>("device.serial")
144                            .unwrap();
145                        if let Some(cam_config) =
146                            cam_configs.clone().iter().filter(|cam| cam.id == id).next()
147                        {
148                            debug!("found a config");
149                            dbg!(
150                                dev.list_properties()
151                                    .iter()
152                                    .map(|prop| prop.name().to_string())
153                                    .collect::<Vec<_>>()
154                            );
155                            dbg!(
156                                dev.property::<Structure>("properties")
157                                    .iter()
158                                    .map(|(k, v)| (k.to_string(), v.to_value()))
159                                    .collect::<Vec<_>>()
160                            );
161
162                            // Create the camera source
163                            let cam = dev.create_element(Some("camera")).unwrap();
164                            let mut extra_controls = Structure::new_empty("extra-controls");
165                            extra_controls.set(
166                                "auto_exposure",
167                                if cam_config.auto_exposure { 3 } else { 1 },
168                            );
169                            if let Some(manual_exposure) = cam_config.manual_exposure {
170                                extra_controls.set("exposure_time_absolute", &manual_exposure);
171                            }
172                            cam.set_property("extra-controls", extra_controls);
173
174                            // The camera preprocessing part:
175                            //   [src]> capsfilter -> queue -> tee -> ...
176
177                            // Create the elements
178                            let filter = ElementFactory::make("capsfilter")
179                                .name("capsfilter")
180                                .property(
181                                    "caps",
182                                    &Caps::builder("video/x-raw")
183                                        .field("width", &1280)
184                                        .field("height", &720)
185                                        .build(),
186                                )
187                                .build()
188                                .unwrap();
189                            //let queue = ElementFactory::make("queue").build().unwrap();
190                            let tee = ElementFactory::make("tee").build().unwrap();
191
192                            // Add them to the pipeline
193                            pipeline.add_many([&cam, &filter, &tee]).unwrap();
194
195                            // Link them
196                            Element::link_many([&cam, &filter, &tee]).unwrap();
197
198                            debug!("initializing calibrator");
199                            let calibrator = Self::add_calib(&pipeline, &tee, cam_config.clone());
200
201                            {
202                                debug!("adding calibrator");
203                                let mut calibrators =
204                                    tokio::task::block_in_place(|| calibrators.blocking_lock());
205                                (*calibrators).insert(id.clone(), calibrator);
206                                drop(calibrators);
207                                debug!("dropped lock");
208                            }
209
210                            debug!("initializing mjpeg stream");
211                            let mjpeg_stream = Self::add_mjpeg(&pipeline, &tee, cam_config.clone());
212
213                            {
214                                debug!("adding mjpeg stream");
215                                let mut mjpeg_streams =
216                                    tokio::task::block_in_place(|| mjpeg_streams.blocking_lock());
217                                (*mjpeg_streams).insert(id.clone(), mjpeg_stream);
218                                drop(mjpeg_streams);
219                                debug!("dropped lock");
220                            }
221
222                            if cam_config.calib.is_some() {
223                                Self::add_subsys::<CApriltagsDetector>(
224                                    &pipeline,
225                                    &tee,
226                                    cam_config.clone(),
227                                    nt.clone(),
228                                    cam_config.subsystems.capriltags.enabled,
229                                );
230                            }
231
232                            tokio::task::block_in_place(|| pipelines.blocking_write())
233                                .insert(id, pipeline);
234
235                            futures_executor::block_on(async {
236                                let mut streams = ManuallyDrop::new(
237                                    nt.publish(format!(
238                                        "/CameraPublisher/{}/streams",
239                                        cam_config.name
240                                    ))
241                                    .await
242                                    .unwrap(),
243                                );
244                                streams
245                                    .set(vec![format!(
246                                        "mjpeg:http://localhost:6942/stream/{}",
247                                        cam_config.id
248                                    )])
249                                    .await
250                                    .unwrap();
251                            });
252                        }
253                    }
254                }
255                _ => {}
256            }
257
258            BusSyncReply::Pass
259        });
260
261        // Start the device monitor
262        dev_prov.start().unwrap();
263
264        for (cam_name, pipeline) in pipelines.read().await.clone() {
265            // Start the pipeline
266            pipeline.set_state(State::Playing).unwrap();
267
268            // Get the pipeline's bus
269            let bus = pipeline.bus().unwrap();
270            // Hook up event handler for the pipeline
271            bus.set_sync_handler(|_, _| BusSyncReply::Pass);
272        }
273
274        Self {
275            dev_prov,
276            pipelines,
277            calibrators,
278            mjpeg_streams,
279        }
280    }
281
282    pub async fn update_pipeline(&self, dev_id: String) {
283        self.pause(dev_id.clone()).await;
284        {
285            let mut pipelines = self.pipelines.write().await;
286            if let Some(pipeline) = pipelines.get_mut(&dev_id) {
287                // Get a copy of the global configuration
288                let config = {
289                    let cfgg = Cfg.read().await;
290                    let ret = (*cfgg).clone();
291                    drop(cfgg);
292                    ret
293                };
294
295                if let Some(cam_configs) = &config.cameras {
296                    if let Some(cam_config) = cam_configs
297                        .clone()
298                        .iter()
299                        .filter(|cam| cam.id == dev_id)
300                        .next()
301                    {
302                        if let Some(settings) = &cam_config.settings {
303                            //pipeline.by_name("capsfilter").unwrap().set_property(
304                            //    "caps",
305                            //    &Caps::builder("video/x-raw")
306                            //        .field("width", &settings.width)
307                            //        .field("height", &settings.height)
308                            //        //.field(
309                            //        //    "framerate",
310                            //        //    &Fraction::new(
311                            //        //        settings.frame_rate.num as i32,
312                            //        //        settings.frame_rate.den as i32,
313                            //        //    ),
314                            //        //)
315                            //        .build(),
316                            //);
317
318                            let camera = pipeline.by_name("camera").unwrap();
319                            let mut extra_controls = camera.property::<Structure>("extra-controls");
320                            extra_controls.set(
321                                "auto_exposure",
322                                if cam_config.auto_exposure { 3 } else { 1 },
323                            );
324                            if let Some(manual_exposure) = cam_config.manual_exposure {
325                                extra_controls.set("exposure_time_absolute", &manual_exposure);
326                            }
327                            camera.set_property("extra-controls", extra_controls);
328                            pipeline
329                                .by_name("capriltags_valve")
330                                .unwrap()
331                                .set_property("drop", !cam_config.subsystems.capriltags.enabled);
332                        }
333                    }
334                }
335            }
336        }
337        self.start(dev_id).await;
338    }
339
340    pub async fn destroy_pipeline(&self, dev_id: String) {
341        let mut pipelines = self.pipelines.write().await;
342        unsafe {
343            pipelines
344                .get_mut(&dev_id)
345                .unwrap()
346                .set_state(State::Null)
347                .unwrap();
348            pipelines.get_mut(&dev_id).unwrap().run_dispose();
349        }
350        pipelines.remove(&dev_id);
351        self.pipelines.write().await.remove(&dev_id);
352    }
353
354    /// List connected cameras
355    pub fn devices(&self) -> Vec<config::Camera> {
356        let mut devices = Vec::new();
357
358        for dev in self.dev_prov.devices().iter() {
359            let id = dev
360                .property::<Structure>("properties")
361                .get::<String>("device.serial")
362                .unwrap();
363            devices.push(config::Camera {
364                id,
365                name: String::new(),
366                settings: None,
367                auto_exposure: true,
368                manual_exposure: None,
369                possible_settings: Some(
370                    dev.caps()
371                        .unwrap()
372                        .iter()
373                        .map(|cap| {
374                            let frame_rate = cap
375                                .get::<Fraction>("framerate")
376                                .unwrap_or_else(|_| Fraction::new(30, 1));
377                            CameraSettings {
378                                width: cap.get::<i32>("width").unwrap_or(1280) as u32,
379                                height: cap.get::<i32>("height").unwrap_or(720) as u32,
380                                frame_rate: CfgFraction {
381                                    num: frame_rate.numer() as u32,
382                                    den: frame_rate.denom() as u32,
383                                },
384                            }
385                        })
386                        .collect(),
387                ),
388                subsystems: config::Subsystems {
389                    capriltags: config::CAprilTagsSubsys {
390                        enabled: false,
391                        field_layout: None,
392                        gamma: None,
393                    },
394                    ml: config::MlSubsys { enabled: false },
395                },
396                calib: None,
397            });
398        }
399
400        devices
401    }
402    pub async fn calib_step(&self, name: String) -> usize {
403        self.calibrators().await.get_mut(&name).unwrap().step()
404    }
405    pub async fn mjpeg_stream(&self, name: String) -> MjpegStream {
406        self.mjpeg_streams().await.get(&name).unwrap().clone()
407    }
408
409    /// Add [`subsystem`](Subsystem) to pipeline
410    pub(crate) fn add_subsys<S: Subsystem>(
411        pipeline: &Pipeline,
412        cam: &Element,
413        cam_config: config::Camera,
414        nt: NtConn,
415        enabled: bool,
416    ) {
417        let target = format!("chalkydri::subsys::{}", S::NAME);
418
419        debug!(target: &target, "initializing preproc pipeline chunk subsystem...");
420        let (input, output) = S::preproc(cam_config.clone(), pipeline).unwrap();
421
422        let valve = ElementFactory::make("valve")
423            .name(&format!("{}_valve", S::NAME))
424            .property("drop", !enabled)
425            .build()
426            .unwrap();
427        let queue = ElementFactory::make("queue").build().unwrap();
428        let appsink = ElementFactory::make("appsink").build().unwrap();
429        pipeline.add_many([&valve, &queue, &appsink]).unwrap();
430
431        debug!(target: &target, "linking preproc pipeline chunk...");
432        Element::link_many([&cam, &valve, &queue, &input]).unwrap();
433        output.link(&appsink).unwrap();
434
435        let appsink = appsink.dynamic_cast::<AppSink>().unwrap();
436
437        let (tx, rx) = watch::channel(None);
438
439        let appsink_ = appsink.clone();
440
441        debug!(target: &target, "setting appsink callbacks...");
442        appsink.set_callbacks(
443            AppSinkCallbacks::builder()
444                .new_sample(move |_| {
445                    let sample = appsink_.pull_sample().unwrap();
446                    let buf = sample.buffer().unwrap();
447                    if let Err(err) = tx.send(Some(buf.to_owned())) {
448                        error!("failed to send frame to subsys appsink: {err:?}");
449                    }
450
451                    Ok(FlowSuccess::Ok)
452                })
453                .build(),
454        );
455        appsink.set_async(false);
456
457        debug!("linked subsys junk");
458
459        let nt_ = nt.clone();
460        let cam_config = cam_config.clone();
461        std::thread::spawn(move || {
462            debug!("capriltags worker thread started");
463            let nt = nt_;
464
465            futures_executor::block_on(async move {
466                debug!("initializing subsystem...");
467                let mut subsys = S::init(cam_config).await.unwrap();
468
469                debug!("starting subsystem...");
470                subsys.process(nt, rx).await.unwrap();
471            });
472        });
473    }
474
475    pub(crate) fn add_calib(
476        pipeline: &Pipeline,
477        cam: &Element,
478        cam_config: config::Camera,
479    ) -> Calibrator {
480        let target = format!("chalkydri::camera::{}", cam_config.id);
481
482        let valve = ElementFactory::make("valve")
483            .property("drop", false)
484            .build()
485            .unwrap();
486        let queue = ElementFactory::make("queue").build().unwrap();
487        let videoconvertscale = ElementFactory::make("videoconvertscale").build().unwrap();
488        let filter = ElementFactory::make("capsfilter")
489            .property(
490                "caps",
491                &Caps::builder("video/x-raw")
492                    .field("width", &1280)
493                    .field("height", &720)
494                    .field("format", "RGB")
495                    .build(),
496            )
497            .build()
498            .unwrap();
499        let appsink = ElementFactory::make("appsink").build().unwrap();
500
501        pipeline
502            .add_many([&valve, &queue, &videoconvertscale, &filter, &appsink])
503            .unwrap();
504        Element::link_many([&cam, &valve, &queue, &videoconvertscale, &filter, &appsink]).unwrap();
505
506        let appsink = appsink.dynamic_cast::<AppSink>().unwrap();
507
508        let (tx, rx) = watch::channel(None);
509
510        debug!(target: &target, "setting appsink callbacks...");
511        appsink.set_callbacks(
512            AppSinkCallbacks::builder()
513                .new_sample(move |appsink| {
514                    let sample = appsink.pull_sample().unwrap();
515                    let buf = sample.buffer().unwrap();
516                    while let Err(err) = tx.send(Some(buf.to_owned())) {
517                        error!("error sending frame: {err:?}");
518                    }
519
520                    Ok(FlowSuccess::Ok)
521                })
522                .build(),
523        );
524
525        debug!("linked subsys junk");
526
527        Calibrator::new(valve.downgrade(), rx)
528    }
529
530    // gamma gamma=2.0 ! fpsdisplaysink ! videorate drop-only=true ! omxh264enc ! mpegtsenc !
531    pub(crate) fn add_mjpeg(
532        pipeline: &Pipeline,
533        cam: &Element,
534        cam_config: config::Camera,
535    ) -> MjpegStream {
536        let target = format!("chalkydri::camera::{}", cam_config.id);
537
538        let valve = ElementFactory::make("valve")
539            .property("drop", false)
540            .build()
541            .unwrap();
542        let queue = ElementFactory::make("queue").build().unwrap();
543        let videoconvertscale = ElementFactory::make("videoconvertscale")
544            .property_from_str("method", "nearest-neighbour")
545            .build()
546            .unwrap();
547        let filter = ElementFactory::make("capsfilter")
548            .property(
549                "caps",
550                &Caps::builder("video/x-raw")
551                    .field("width", &640)
552                    .field("height", &480)
553                    .field("format", "RGB")
554                    .build(),
555            )
556            .build()
557            .unwrap();
558        let jpegenc = ElementFactory::make("jpegenc")
559            .property("quality", &25)
560            .build()
561            .unwrap();
562        let appsink = ElementFactory::make("appsink").build().unwrap();
563
564        pipeline
565            .add_many([
566                &valve,
567                &queue,
568                &videoconvertscale,
569                &filter,
570                &jpegenc,
571                &appsink,
572            ])
573            .unwrap();
574        Element::link_many([
575            &cam,
576            &valve,
577            &queue,
578            &videoconvertscale,
579            &filter,
580            &jpegenc,
581            &appsink,
582        ])
583        .unwrap();
584
585        let appsink = appsink.dynamic_cast::<AppSink>().unwrap();
586
587        let (tx, rx) = watch::channel(None);
588
589        debug!(target: &target, "setting appsink callbacks...");
590        appsink.set_callbacks(
591            AppSinkCallbacks::builder()
592                .new_sample(move |appsink| {
593                    let sample = appsink
594                        .pull_sample()
595                        .map_err(|_| Error::FailedToPullSample)
596                        .unwrap();
597                    match sample.buffer() {
598                        Some(buf) => {
599                            while let Err(err) = tx.send(Some(buf.to_owned())) {
600                                error!("error sending frame: {err:?}");
601                            }
602                        }
603                        None => {
604                            error!("failed to get buffer");
605                        }
606                    }
607
608                    Ok(FlowSuccess::Ok)
609                })
610                .build(),
611        );
612
613        debug!("linked subsys junk");
614
615        MjpegStream { rx }
616    }
617
618    pub async fn run(&self, name: String) -> Result<(), Box<dyn std::error::Error>> {
619        // Define the event loop or something?
620        self.pipelines
621            .read()
622            .await
623            .get(&name)
624            .unwrap()
625            .bus()
626            .unwrap()
627            .connect_message(Some("error"), move |_, msg| match msg.view() {
628                MessageView::Error(err) => {
629                    error!(
630                        "error received from element {:?}: {}",
631                        err.src().map(|s| s.path_string()),
632                        err.error()
633                    );
634                    debug!("{:?}", err.debug());
635                }
636                _ => unimplemented!(),
637            });
638
639        Ok(())
640    }
641
642    pub async fn start(&self, name: String) {
643        // Start the pipeline
644        if let Some(pipeline) = self.pipelines.read().await.get(&name) {
645            pipeline.set_state(State::Playing).unwrap();
646        }
647        //.expect("Unable to set the pipeline to the `Playing` state.");
648    }
649    pub async fn pause(&self, name: String) {
650        if let Some(pipeline) = self.pipelines.read().await.get(&name) {
651            pipeline
652                .set_state(State::Paused)
653                .expect("Unable to set the pipeline to the `Null` state.");
654        }
655    }
656
657    pub async fn calibrators(&self) -> MutexGuard<HashMap<String, Calibrator>> {
658        self.calibrators.lock().await
659    }
660    pub async fn mjpeg_streams(&self) -> MutexGuard<HashMap<String, MjpegStream>> {
661        self.mjpeg_streams.lock().await
662    }
663}