1use actix_web::web::Bytes;
7use futures_core::Stream;
8use gstreamer::{
9 Buffer, BusSyncReply, Caps, DeviceProvider, DeviceProviderFactory, Element, ElementFactory,
10 FlowSuccess, Fraction, MessageView, Pipeline, State, Structure, glib::WeakRef, prelude::*,
11};
12
13use gstreamer_app::{AppSink, AppSinkCallbacks};
14use minint::{NtConn, NtTopic};
15#[cfg(feature = "rerun")]
16use re_types::archetypes::EncodedImage;
17use std::{collections::HashMap, mem::ManuallyDrop, sync::Arc, task::Poll};
18use tokio::sync::{Mutex, MutexGuard, RwLock, watch};
19
20#[cfg(feature = "rerun")]
21use crate::Rerun;
22use crate::{
23 Cfg,
24 calibration::Calibrator,
25 config::{self, CameraSettings, CfgFraction},
26 error::Error,
27 subsys::capriltags::CApriltagsDetector,
28 subsystem::Subsystem,
29};
30
31#[derive(Clone)]
32pub struct CameraCtx {
33 cfgg: config::Camera,
34 tee: WeakRef<Element>,
35}
36
37#[derive(Clone)]
38pub struct MjpegStream {
39 rx: watch::Receiver<Option<Buffer>>,
40}
41impl Stream for MjpegStream {
42 type Item = Result<Bytes, Error>;
43 fn poll_next(
44 self: std::pin::Pin<&mut Self>,
45 cx: &mut std::task::Context<'_>,
46 ) -> std::task::Poll<Option<Self::Item>> {
47 loop {
48 match self.rx.has_changed() {
49 Ok(true) => {
50 info!("working!!!");
51 let mut bytes = Vec::new();
52 bytes.clear();
53 if let Some(frame) = self.get_mut().rx.borrow_and_update().as_deref() {
54 bytes.extend_from_slice(
55 &[
56 b"--frame\r\nContent-Length: ",
57 frame.size().to_string().as_bytes(),
58 b"\r\nContent-Type: image/jpeg\r\n\r\n",
59 ]
60 .concat(),
61 );
62 bytes.extend_from_slice(
63 frame
64 .map_readable()
65 .map_err(|_| Error::FailedToMapBuffer)?
66 .as_slice(),
67 );
68 }
69
70 return Poll::Ready(Some(Ok(bytes.into())));
71 }
72 Ok(false) => {}
73 Err(err) => {
74 error!("error getting frame: {err:?}");
75
76 return Poll::Ready(None);
77 }
78 }
79 }
80 }
81}
82
83#[derive(Clone)]
84pub struct CameraManager {
85 dev_prov: DeviceProvider,
87 pipelines: Arc<RwLock<HashMap<String, Pipeline>>>,
88 calibrators: Arc<Mutex<HashMap<String, Calibrator>>>,
89 mjpeg_streams: Arc<Mutex<HashMap<String, MjpegStream>>>,
90}
91impl CameraManager {
92 pub async fn new(nt: NtConn) -> Self {
93 gstreamer::assert_initialized();
95
96 let config = {
98 let cfgg = Cfg.read().await;
99 let ret = (*cfgg).clone();
100 drop(cfgg);
101 ret
102 };
103
104 let dev_prov = DeviceProviderFactory::find("v4l2deviceprovider")
106 .unwrap()
107 .load()
108 .unwrap()
109 .get()
110 .unwrap();
111
112 let pipelines = Arc::new(RwLock::new(HashMap::new()));
114
115 let bus = dev_prov.bus();
117
118 let calibrators: Arc<Mutex<HashMap<String, Calibrator>>> =
119 Arc::new(Mutex::new(HashMap::new()));
120
121 let mjpeg_streams: Arc<Mutex<HashMap<String, MjpegStream>>> =
122 Arc::new(Mutex::new(HashMap::new()));
123
124 let calibrators_ = calibrators.clone();
125 let mjpeg_streams_ = mjpeg_streams.clone();
126 let pipelines_ = pipelines.clone();
127 bus.set_sync_handler(move |_, msg| {
128 let calibrators = &calibrators_;
129 let mjpeg_streams = &mjpeg_streams_;
130 let pipelines = &pipelines_;
131
132 let pipeline = Pipeline::new();
134
135 match msg.view() {
136 MessageView::DeviceAdded(msg) => {
137 let dev = msg.device();
138 debug!("got a new device");
139
140 if let Some(cam_configs) = &config.cameras {
141 let id = dev
142 .property::<Structure>("properties")
143 .get::<String>("device.serial")
144 .unwrap();
145 if let Some(cam_config) =
146 cam_configs.clone().iter().filter(|cam| cam.id == id).next()
147 {
148 debug!("found a config");
149 dbg!(
150 dev.list_properties()
151 .iter()
152 .map(|prop| prop.name().to_string())
153 .collect::<Vec<_>>()
154 );
155 dbg!(
156 dev.property::<Structure>("properties")
157 .iter()
158 .map(|(k, v)| (k.to_string(), v.to_value()))
159 .collect::<Vec<_>>()
160 );
161
162 let cam = dev.create_element(Some("camera")).unwrap();
164 let mut extra_controls = Structure::new_empty("extra-controls");
165 extra_controls.set(
166 "auto_exposure",
167 if cam_config.auto_exposure { 3 } else { 1 },
168 );
169 if let Some(manual_exposure) = cam_config.manual_exposure {
170 extra_controls.set("exposure_time_absolute", &manual_exposure);
171 }
172 cam.set_property("extra-controls", extra_controls);
173
174 let filter = ElementFactory::make("capsfilter")
179 .name("capsfilter")
180 .property(
181 "caps",
182 &Caps::builder("video/x-raw")
183 .field("width", &1280)
184 .field("height", &720)
185 .build(),
186 )
187 .build()
188 .unwrap();
189 let tee = ElementFactory::make("tee").build().unwrap();
191
192 pipeline.add_many([&cam, &filter, &tee]).unwrap();
194
195 Element::link_many([&cam, &filter, &tee]).unwrap();
197
198 debug!("initializing calibrator");
199 let calibrator = Self::add_calib(&pipeline, &tee, cam_config.clone());
200
201 {
202 debug!("adding calibrator");
203 let mut calibrators =
204 tokio::task::block_in_place(|| calibrators.blocking_lock());
205 (*calibrators).insert(id.clone(), calibrator);
206 drop(calibrators);
207 debug!("dropped lock");
208 }
209
210 debug!("initializing mjpeg stream");
211 let mjpeg_stream = Self::add_mjpeg(&pipeline, &tee, cam_config.clone());
212
213 {
214 debug!("adding mjpeg stream");
215 let mut mjpeg_streams =
216 tokio::task::block_in_place(|| mjpeg_streams.blocking_lock());
217 (*mjpeg_streams).insert(id.clone(), mjpeg_stream);
218 drop(mjpeg_streams);
219 debug!("dropped lock");
220 }
221
222 if cam_config.calib.is_some() {
223 Self::add_subsys::<CApriltagsDetector>(
224 &pipeline,
225 &tee,
226 cam_config.clone(),
227 nt.clone(),
228 cam_config.subsystems.capriltags.enabled,
229 );
230 }
231
232 tokio::task::block_in_place(|| pipelines.blocking_write())
233 .insert(id, pipeline);
234
235 futures_executor::block_on(async {
236 let mut streams = ManuallyDrop::new(
237 nt.publish(format!(
238 "/CameraPublisher/{}/streams",
239 cam_config.name
240 ))
241 .await
242 .unwrap(),
243 );
244 streams
245 .set(vec![format!(
246 "mjpeg:http://localhost:6942/stream/{}",
247 cam_config.id
248 )])
249 .await
250 .unwrap();
251 });
252 }
253 }
254 }
255 _ => {}
256 }
257
258 BusSyncReply::Pass
259 });
260
261 dev_prov.start().unwrap();
263
264 for (cam_name, pipeline) in pipelines.read().await.clone() {
265 pipeline.set_state(State::Playing).unwrap();
267
268 let bus = pipeline.bus().unwrap();
270 bus.set_sync_handler(|_, _| BusSyncReply::Pass);
272 }
273
274 Self {
275 dev_prov,
276 pipelines,
277 calibrators,
278 mjpeg_streams,
279 }
280 }
281
282 pub async fn update_pipeline(&self, dev_id: String) {
283 self.pause(dev_id.clone()).await;
284 {
285 let mut pipelines = self.pipelines.write().await;
286 if let Some(pipeline) = pipelines.get_mut(&dev_id) {
287 let config = {
289 let cfgg = Cfg.read().await;
290 let ret = (*cfgg).clone();
291 drop(cfgg);
292 ret
293 };
294
295 if let Some(cam_configs) = &config.cameras {
296 if let Some(cam_config) = cam_configs
297 .clone()
298 .iter()
299 .filter(|cam| cam.id == dev_id)
300 .next()
301 {
302 if let Some(settings) = &cam_config.settings {
303 let camera = pipeline.by_name("camera").unwrap();
319 let mut extra_controls = camera.property::<Structure>("extra-controls");
320 extra_controls.set(
321 "auto_exposure",
322 if cam_config.auto_exposure { 3 } else { 1 },
323 );
324 if let Some(manual_exposure) = cam_config.manual_exposure {
325 extra_controls.set("exposure_time_absolute", &manual_exposure);
326 }
327 camera.set_property("extra-controls", extra_controls);
328 pipeline
329 .by_name("capriltags_valve")
330 .unwrap()
331 .set_property("drop", !cam_config.subsystems.capriltags.enabled);
332 }
333 }
334 }
335 }
336 }
337 self.start(dev_id).await;
338 }
339
340 pub async fn destroy_pipeline(&self, dev_id: String) {
341 let mut pipelines = self.pipelines.write().await;
342 unsafe {
343 pipelines
344 .get_mut(&dev_id)
345 .unwrap()
346 .set_state(State::Null)
347 .unwrap();
348 pipelines.get_mut(&dev_id).unwrap().run_dispose();
349 }
350 pipelines.remove(&dev_id);
351 self.pipelines.write().await.remove(&dev_id);
352 }
353
354 pub fn devices(&self) -> Vec<config::Camera> {
356 let mut devices = Vec::new();
357
358 for dev in self.dev_prov.devices().iter() {
359 let id = dev
360 .property::<Structure>("properties")
361 .get::<String>("device.serial")
362 .unwrap();
363 devices.push(config::Camera {
364 id,
365 name: String::new(),
366 settings: None,
367 auto_exposure: true,
368 manual_exposure: None,
369 possible_settings: Some(
370 dev.caps()
371 .unwrap()
372 .iter()
373 .map(|cap| {
374 let frame_rate = cap
375 .get::<Fraction>("framerate")
376 .unwrap_or_else(|_| Fraction::new(30, 1));
377 CameraSettings {
378 width: cap.get::<i32>("width").unwrap_or(1280) as u32,
379 height: cap.get::<i32>("height").unwrap_or(720) as u32,
380 frame_rate: CfgFraction {
381 num: frame_rate.numer() as u32,
382 den: frame_rate.denom() as u32,
383 },
384 }
385 })
386 .collect(),
387 ),
388 subsystems: config::Subsystems {
389 capriltags: config::CAprilTagsSubsys {
390 enabled: false,
391 field_layout: None,
392 gamma: None,
393 },
394 ml: config::MlSubsys { enabled: false },
395 },
396 calib: None,
397 });
398 }
399
400 devices
401 }
402 pub async fn calib_step(&self, name: String) -> usize {
403 self.calibrators().await.get_mut(&name).unwrap().step()
404 }
405 pub async fn mjpeg_stream(&self, name: String) -> MjpegStream {
406 self.mjpeg_streams().await.get(&name).unwrap().clone()
407 }
408
409 pub(crate) fn add_subsys<S: Subsystem>(
411 pipeline: &Pipeline,
412 cam: &Element,
413 cam_config: config::Camera,
414 nt: NtConn,
415 enabled: bool,
416 ) {
417 let target = format!("chalkydri::subsys::{}", S::NAME);
418
419 debug!(target: &target, "initializing preproc pipeline chunk subsystem...");
420 let (input, output) = S::preproc(cam_config.clone(), pipeline).unwrap();
421
422 let valve = ElementFactory::make("valve")
423 .name(&format!("{}_valve", S::NAME))
424 .property("drop", !enabled)
425 .build()
426 .unwrap();
427 let queue = ElementFactory::make("queue").build().unwrap();
428 let appsink = ElementFactory::make("appsink").build().unwrap();
429 pipeline.add_many([&valve, &queue, &appsink]).unwrap();
430
431 debug!(target: &target, "linking preproc pipeline chunk...");
432 Element::link_many([&cam, &valve, &queue, &input]).unwrap();
433 output.link(&appsink).unwrap();
434
435 let appsink = appsink.dynamic_cast::<AppSink>().unwrap();
436
437 let (tx, rx) = watch::channel(None);
438
439 let appsink_ = appsink.clone();
440
441 debug!(target: &target, "setting appsink callbacks...");
442 appsink.set_callbacks(
443 AppSinkCallbacks::builder()
444 .new_sample(move |_| {
445 let sample = appsink_.pull_sample().unwrap();
446 let buf = sample.buffer().unwrap();
447 if let Err(err) = tx.send(Some(buf.to_owned())) {
448 error!("failed to send frame to subsys appsink: {err:?}");
449 }
450
451 Ok(FlowSuccess::Ok)
452 })
453 .build(),
454 );
455 appsink.set_async(false);
456
457 debug!("linked subsys junk");
458
459 let nt_ = nt.clone();
460 let cam_config = cam_config.clone();
461 std::thread::spawn(move || {
462 debug!("capriltags worker thread started");
463 let nt = nt_;
464
465 futures_executor::block_on(async move {
466 debug!("initializing subsystem...");
467 let mut subsys = S::init(cam_config).await.unwrap();
468
469 debug!("starting subsystem...");
470 subsys.process(nt, rx).await.unwrap();
471 });
472 });
473 }
474
475 pub(crate) fn add_calib(
476 pipeline: &Pipeline,
477 cam: &Element,
478 cam_config: config::Camera,
479 ) -> Calibrator {
480 let target = format!("chalkydri::camera::{}", cam_config.id);
481
482 let valve = ElementFactory::make("valve")
483 .property("drop", false)
484 .build()
485 .unwrap();
486 let queue = ElementFactory::make("queue").build().unwrap();
487 let videoconvertscale = ElementFactory::make("videoconvertscale").build().unwrap();
488 let filter = ElementFactory::make("capsfilter")
489 .property(
490 "caps",
491 &Caps::builder("video/x-raw")
492 .field("width", &1280)
493 .field("height", &720)
494 .field("format", "RGB")
495 .build(),
496 )
497 .build()
498 .unwrap();
499 let appsink = ElementFactory::make("appsink").build().unwrap();
500
501 pipeline
502 .add_many([&valve, &queue, &videoconvertscale, &filter, &appsink])
503 .unwrap();
504 Element::link_many([&cam, &valve, &queue, &videoconvertscale, &filter, &appsink]).unwrap();
505
506 let appsink = appsink.dynamic_cast::<AppSink>().unwrap();
507
508 let (tx, rx) = watch::channel(None);
509
510 debug!(target: &target, "setting appsink callbacks...");
511 appsink.set_callbacks(
512 AppSinkCallbacks::builder()
513 .new_sample(move |appsink| {
514 let sample = appsink.pull_sample().unwrap();
515 let buf = sample.buffer().unwrap();
516 while let Err(err) = tx.send(Some(buf.to_owned())) {
517 error!("error sending frame: {err:?}");
518 }
519
520 Ok(FlowSuccess::Ok)
521 })
522 .build(),
523 );
524
525 debug!("linked subsys junk");
526
527 Calibrator::new(valve.downgrade(), rx)
528 }
529
530 pub(crate) fn add_mjpeg(
532 pipeline: &Pipeline,
533 cam: &Element,
534 cam_config: config::Camera,
535 ) -> MjpegStream {
536 let target = format!("chalkydri::camera::{}", cam_config.id);
537
538 let valve = ElementFactory::make("valve")
539 .property("drop", false)
540 .build()
541 .unwrap();
542 let queue = ElementFactory::make("queue").build().unwrap();
543 let videoconvertscale = ElementFactory::make("videoconvertscale")
544 .property_from_str("method", "nearest-neighbour")
545 .build()
546 .unwrap();
547 let filter = ElementFactory::make("capsfilter")
548 .property(
549 "caps",
550 &Caps::builder("video/x-raw")
551 .field("width", &640)
552 .field("height", &480)
553 .field("format", "RGB")
554 .build(),
555 )
556 .build()
557 .unwrap();
558 let jpegenc = ElementFactory::make("jpegenc")
559 .property("quality", &25)
560 .build()
561 .unwrap();
562 let appsink = ElementFactory::make("appsink").build().unwrap();
563
564 pipeline
565 .add_many([
566 &valve,
567 &queue,
568 &videoconvertscale,
569 &filter,
570 &jpegenc,
571 &appsink,
572 ])
573 .unwrap();
574 Element::link_many([
575 &cam,
576 &valve,
577 &queue,
578 &videoconvertscale,
579 &filter,
580 &jpegenc,
581 &appsink,
582 ])
583 .unwrap();
584
585 let appsink = appsink.dynamic_cast::<AppSink>().unwrap();
586
587 let (tx, rx) = watch::channel(None);
588
589 debug!(target: &target, "setting appsink callbacks...");
590 appsink.set_callbacks(
591 AppSinkCallbacks::builder()
592 .new_sample(move |appsink| {
593 let sample = appsink
594 .pull_sample()
595 .map_err(|_| Error::FailedToPullSample)
596 .unwrap();
597 match sample.buffer() {
598 Some(buf) => {
599 while let Err(err) = tx.send(Some(buf.to_owned())) {
600 error!("error sending frame: {err:?}");
601 }
602 }
603 None => {
604 error!("failed to get buffer");
605 }
606 }
607
608 Ok(FlowSuccess::Ok)
609 })
610 .build(),
611 );
612
613 debug!("linked subsys junk");
614
615 MjpegStream { rx }
616 }
617
618 pub async fn run(&self, name: String) -> Result<(), Box<dyn std::error::Error>> {
619 self.pipelines
621 .read()
622 .await
623 .get(&name)
624 .unwrap()
625 .bus()
626 .unwrap()
627 .connect_message(Some("error"), move |_, msg| match msg.view() {
628 MessageView::Error(err) => {
629 error!(
630 "error received from element {:?}: {}",
631 err.src().map(|s| s.path_string()),
632 err.error()
633 );
634 debug!("{:?}", err.debug());
635 }
636 _ => unimplemented!(),
637 });
638
639 Ok(())
640 }
641
642 pub async fn start(&self, name: String) {
643 if let Some(pipeline) = self.pipelines.read().await.get(&name) {
645 pipeline.set_state(State::Playing).unwrap();
646 }
647 }
649 pub async fn pause(&self, name: String) {
650 if let Some(pipeline) = self.pipelines.read().await.get(&name) {
651 pipeline
652 .set_state(State::Paused)
653 .expect("Unable to set the pipeline to the `Null` state.");
654 }
655 }
656
657 pub async fn calibrators(&self) -> MutexGuard<HashMap<String, Calibrator>> {
658 self.calibrators.lock().await
659 }
660 pub async fn mjpeg_streams(&self) -> MutexGuard<HashMap<String, MjpegStream>> {
661 self.mjpeg_streams.lock().await
662 }
663}