chalkydri/cameras/
mjpeg.rs1use std::{sync::Arc, task::Poll};
2
3use actix_web::web::Bytes;
4use futures_core::Stream;
5use gstreamer::{
6 Caps, Element, ElementFactory, FlowSuccess, Pipeline, glib::object::Cast,
7 prelude::GstBinExtManual,
8};
9use gstreamer_app::{AppSink, AppSinkCallbacks};
10use tokio::sync::watch;
11
12use crate::{error::Error, subsystems::NoopSubsys};
13
14use super::pipeline::Preprocessor;
15
16#[derive(Clone)]
19pub struct MjpegProc {
20 videorate: Arc<Element>,
21 videoconvertscale: Arc<Element>,
22 filter: Arc<Element>,
23 tx: watch::Sender<Option<Vec<u8>>>,
24 rx: watch::Receiver<Option<Vec<u8>>>,
25}
26impl Preprocessor for MjpegProc {
27 type Subsys = NoopSubsys<Self>;
28 type Frame = Vec<u8>;
29
30 fn new(pipeline: &Pipeline) -> Self {
31 let videorate = ElementFactory::make("videorate")
32 .property("max-rate", 20)
33 .property("drop-only", true)
34 .build()
35 .unwrap();
36
37 let videoconvertscale = ElementFactory::make("videoconvertscale")
38 .property_from_str("method", "nearest-neighbour")
39 .build()
40 .unwrap();
41
42 let filter = ElementFactory::make("capsfilter")
43 .property(
44 "caps",
45 &Caps::builder("video/x-raw")
46 .field("width", &640)
47 .field("height", &480)
48 .field("format", "RGB")
49 .build(),
50 )
51 .build()
52 .unwrap();
53
54 pipeline
55 .add_many([&videorate, &videoconvertscale, &filter])
56 .unwrap();
57
58 let (tx, rx) = watch::channel(None);
59
60 MjpegProc {
61 videorate: videorate.into(),
62 videoconvertscale: videoconvertscale.into(),
63 filter: filter.into(),
64 tx,
65 rx,
66 }
67 }
68 fn link(&self, src: Element, sink: Element) {
69 Element::link_many([
70 &src,
71 &self.videorate,
72 &self.videoconvertscale,
73 &self.filter,
74 &sink,
75 ])
76 .unwrap();
77 }
78 fn unlink(&self, src: Element, sink: Element) {
79 Element::unlink_many([
80 &src,
81 &self.videorate,
82 &self.videoconvertscale,
83 &self.filter,
84 &sink,
85 ]);
86 }
87 fn sampler(
88 appsink: &AppSink,
89 tx: watch::Sender<Option<Arc<Self::Frame>>>,
90 ) -> Result<Option<()>, Error> {
91 let sample = appsink
92 .pull_sample()
93 .map_err(|_| Error::FailedToPullSample)
94 .unwrap();
95 match sample.buffer() {
96 Some(buf) => {
97 let jpeg = turbojpeg::compress(
98 turbojpeg::Image {
99 width: 640,
100 height: 480,
101 pitch: 640 * 3,
102 format: turbojpeg::PixelFormat::RGB,
103 pixels: buf
104 .to_owned()
105 .into_mapped_buffer_readable()
106 .unwrap()
107 .to_vec()
108 .as_slice(),
109 },
110 50,
111 turbojpeg::Subsamp::None,
112 )
113 .unwrap();
114 while let Err(err) = tx.send(Some(jpeg.to_vec().into())) {
115 error!("error sending frame: {err:?}");
116 }
117 }
118 None => {
119 error!("failed to get buffer");
120 }
121 }
122 Ok(Some(()))
123 }
124}
125impl Stream for MjpegProc {
126 type Item = Result<Bytes, Error>;
127
128 fn poll_next(
129 self: std::pin::Pin<&mut Self>,
130 _cx: &mut std::task::Context<'_>,
131 ) -> std::task::Poll<Option<Self::Item>> {
132 loop {
133 match self.rx.has_changed() {
134 Ok(true) => {
135 info!("working!!!");
136
137 let bytes =
138 if let Some(frame) = self.get_mut().rx.borrow_and_update().as_deref() {
139 [
140 b"--frame\r\nContent-Length: ",
141 frame.len().to_string().as_bytes(),
142 b"\r\nContent-Type: image/jpeg\r\n\r\n",
143 frame,
144 ]
145 .concat()
146 } else {
147 Vec::new()
148 };
149
150 return Poll::Ready(Some(Ok(bytes.into())));
151 }
152 Ok(false) => {}
153 Err(err) => {
154 error!("error getting frame: {err:?}");
155
156 return Poll::Ready(None);
157 }
158 }
159 }
160 }
161}