chalkydri/subsystems/python/
mod.rs1mod api;
2
3use std::{collections::HashMap, ffi::CStr, sync::Arc};
4
5use gstreamer::{Caps, Element, ElementFactory, prelude::GstBinExtManual};
6use minint::NtTopic;
7use numpy::ndarray;
8use tokio::sync::RwLock;
9
10use crate::{
11 Cfg, Nt, cameras::pipeline::Preprocessor, config, error::Error, subsystems::Subsystem,
12};
13
14use pyo3::prelude::*;
15
16use super::frame_proc_loop;
17
18#[derive(Clone)]
19pub struct PythonSubsys;
20impl Subsystem for PythonSubsys {
21 const NAME: &'static str = "python";
22
23 type Error = Error;
24 type Config = Vec<config::CustomSubsystem>;
25 type Preproc = PythonPreproc;
26 type Output = ();
27
28 async fn init() -> Result<Self, Self::Error> {
29 Ok::<Self, Self::Error>(PythonSubsys)
30 }
31
32 async fn process(
33 &self,
34 nt: minint::NtConn,
35 cam_config: config::Camera,
36 rx: tokio::sync::watch::Receiver<
37 Option<<<Self as Subsystem>::Preproc as Preprocessor>::Frame>,
38 >,
39 ) -> Result<Self::Output, Self::Error> {
40 let handle = tokio::runtime::Handle::current();
41
42 Python::with_gil(|py| -> PyResult<()> {
43 let mut modules = Vec::new();
44
45 for camera in futures_executor::block_on(Cfg.read())
46 .cameras
47 .clone()
48 .unwrap()
49 {
50 for subsys in camera.subsystems.custom {
51 let subsystems = futures_executor::block_on(Cfg.read())
53 .custom_subsystems
54 .clone();
55 if let Some(subsys) = subsystems.get(&subsys) {
56 let code = [subsys.code.as_bytes(), &[0u8]].concat();
58 let file_name = [b"custom_code.py".as_slice(), &[0u8]].concat();
59 let module_name = [b"custom_code".as_slice(), &[0u8]].concat();
60
61 let code = CStr::from_bytes_with_nul(&code).unwrap();
63 let file_name = CStr::from_bytes_with_nul(&file_name).unwrap();
64 let module_name = CStr::from_bytes_with_nul(&module_name).unwrap();
65
66 let module = PyModule::from_code(py, code, file_name, module_name).unwrap();
68 let module = module.unbind();
70
71 modules.push(module);
73 }
74 }
75 }
76
77 py.allow_threads(move || {
78 let handle_ = handle.clone();
79 handle.spawn(async move {
80 let topics = Arc::new(RwLock::new(HashMap::<String, NtTopic<f64>>::new()));
81 frame_proc_loop::<Self::Preproc, _>(rx, async move |buf| {
82 if let Some(settings) = &cam_config.settings {
83 let py_ret = Python::with_gil(|py| -> PyResult<()> {
84 let nparr = numpy::PyArray::from_array(py, &buf);
85
86 for module in &modules {
87 let ret: HashMap<String, f64> = module
88 .getattr(py, "run")?
89 .call1(py, (nparr.clone(),))?
90 .extract(py)?;
91
92 for (k, v) in ret {
93 let (k, v) = (k.clone(), v.clone());
94 let topics = topics.clone();
95 handle_.spawn(async move {
96 let topic_name = format!("/chalkydri/subsystems/{k}");
97
98 let mut topics = topics.write().await;
99
100 if let Some(topic) = topics.get_mut(&k) {
101 topic.set(v).await.unwrap();
102 } else {
103 let mut topic = Nt
104 .publish::<f64>(topic_name.clone())
105 .await
106 .unwrap();
107 topic.set(v).await.unwrap();
108 topics.insert(topic_name, topic);
109 }
110 });
111 }
112 }
113
114 Ok(())
115 });
116
117 if let Err(err) = py_ret {
118 error!("{err}");
119 }
120 }
121 })
122 .await;
123 });
124 });
125
126 Ok(())
127 })
128 .unwrap();
129
130 Ok::<Self::Output, Self::Error>(())
131 }
132}
133
134pub struct PythonPreproc {
135 videoconvertscale: Arc<Element>,
136 filter: Arc<Element>,
137}
138impl Preprocessor for PythonPreproc {
139 type Frame = ndarray::Array3<u8>;
140 type Subsys = PythonSubsys;
141
142 fn new(pipeline: &gstreamer::Pipeline) -> Self {
143 let videoconvertscale = ElementFactory::make("videoconvertscale").build().unwrap();
145 let filter = ElementFactory::make("capsfilter")
146 .property(
147 "caps",
148 &Caps::builder("video/x-raw").field("format", "BGR").build(),
149 )
150 .build()
151 .unwrap();
152
153 pipeline.add_many([&videoconvertscale, &filter]).unwrap();
155
156 Self {
157 videoconvertscale: videoconvertscale.into(),
158 filter: filter.into(),
159 }
160 }
161 fn link(&self, src: Element, sink: Element) {
162 Element::link_many([&src, &self.videoconvertscale, &self.filter, &sink]).unwrap();
163 }
164 fn unlink(&self, src: Element, sink: Element) {
165 Element::unlink_many([&src, &self.videoconvertscale, &self.filter, &sink]);
166 }
167 fn sampler(
168 appsink: &gstreamer_app::AppSink,
169 tx: tokio::sync::watch::Sender<Option<Arc<Self::Frame>>>,
170 ) -> Result<Option<()>, Error> {
171 let sample = appsink
172 .pull_sample()
173 .map_err(|_| Error::FailedToPullSample)?;
174 let buf = sample.buffer().unwrap();
175 let buf = buf
176 .to_owned()
177 .into_mapped_buffer_readable()
178 .unwrap()
179 .to_vec();
180
181 let arr = ndarray::Array::from_shape_vec(
182 (1280, 720, 3),
184 buf,
185 )
186 .expect("something is really braken");
187
188 tx.send(Arc::new(Some(arr))).unwrap();
189
190 Ok(Some(()))
191 }
192}