chalkydri/subsystems/python/
mod.rs1mod api;
2
3use std::{any::Any, collections::HashMap, ffi::CStr, sync::Arc};
4
5use gstreamer::{
6 Caps, Element, ElementFactory,
7 prelude::{GstBinExt, GstBinExtManual},
8};
9use minint::NtTopic;
10use numpy::{
11 ndarray::{self, ShapeBuilder},
12};
13use tokio::sync::RwLock;
14
15use crate::{Cfg, Nt, config, error::Error, subsystems::Subsystem};
16
17use pyo3::prelude::*;
18
19use super::frame_proc_loop;
20
21#[derive(Clone)]
22pub struct PythonSubsys;
23impl Subsystem for PythonSubsys {
24 const NAME: &'static str = "python";
25
26 type Error = Error;
27 type Config = Vec<config::CustomSubsystem>;
28 type Output = ();
29
30 async fn init() -> Result<Self, Self::Error> {
31 Ok::<Self, Self::Error>(PythonSubsys)
32 }
33
34 async fn process(
35 &self,
36 manager: super::SubsysManager,
37 nt: minint::NtConn,
38 cam_config: crate::config::Camera,
39 rx: tokio::sync::watch::Receiver<Option<Vec<u8>>>,
40 ) -> Result<Self::Output, Self::Error> {
41 let handle = tokio::runtime::Handle::current();
42
43 Python::with_gil(|py| -> PyResult<()> {
44 let mut modules = Vec::new();
45
46 for camera in futures_executor::block_on(Cfg.read())
47 .cameras
48 .clone()
49 .unwrap()
50 {
51 for subsys in camera.subsystems.custom {
52 let subsystems = futures_executor::block_on(Cfg.read())
54 .custom_subsystems
55 .clone();
56 if let Some(subsys) = subsystems.get(&subsys) {
57 let code = [subsys.code.as_bytes(), &[0u8]].concat();
59 let file_name = [b"custom_code.py".as_slice(), &[0u8]].concat();
60 let module_name = [b"custom_code".as_slice(), &[0u8]].concat();
61
62 let code = CStr::from_bytes_with_nul(&code).unwrap();
64 let file_name = CStr::from_bytes_with_nul(&file_name).unwrap();
65 let module_name = CStr::from_bytes_with_nul(&module_name).unwrap();
66
67 let module = PyModule::from_code(py, code, file_name, module_name).unwrap();
69 let module = module.unbind();
71
72 modules.push(module);
74 }
75 }
76 }
77
78 py.allow_threads(move || {
79 let handle_ = handle.clone();
80 handle.spawn(async move {
81 let topics = Arc::new(RwLock::new(HashMap::<String, NtTopic<f64>>::new()));
82 frame_proc_loop(rx, async move |buf| {
83 if let Some(settings) = &cam_config.settings {
84 let py_ret = Python::with_gil(|py| -> PyResult<()> {
85 let arr = ndarray::Array::from_shape_vec(
86 (settings.height as usize, settings.width as usize, 3usize),
87 buf,
88 )
89 .expect("something is really braken");
90 let nparr = numpy::PyArray::from_array(py, &arr);
91
92 for module in &modules {
93 let ret: HashMap<String, f64> = module
94 .getattr(py, "run")?
95 .call1(py, (nparr.clone(),))?
96 .extract(py)?;
97
98 for (k, v) in ret {
99 let (k, v) = (k.clone(), v.clone());
100 let topics = topics.clone();
101 handle_.spawn(async move {
102 let topic_name = format!("/chalkydri/subsystems/{k}");
103
104 let mut topics = topics.write().await;
105
106 if let Some(topic) = topics.get_mut(&k) {
107 topic.set(v).await.unwrap();
108 } else {
109 let mut topic = Nt
110 .publish::<f64>(topic_name.clone())
111 .await
112 .unwrap();
113 topic.set(v).await.unwrap();
114 topics.insert(topic_name, topic);
115 }
116 });
117 }
118 }
119
120 Ok(())
121 });
122
123 if let Err(err) = py_ret {
124 error!("{err}");
125 }
126 }
127 })
128 .await;
129 });
130 });
131
132 Ok(())
133 })
134 .unwrap();
135
136 Ok::<Self::Output, Self::Error>(())
137 }
138
139 fn preproc(
140 config: crate::config::Camera,
141 pipeline: &gstreamer::Pipeline,
142 ) -> Result<(gstreamer::Element, gstreamer::Element), Self::Error> {
143 let videoconvertscale = ElementFactory::make("videoconvertscale").build().unwrap();
145 let filter = ElementFactory::make("capsfilter")
146 .property(
147 "caps",
148 &Caps::builder("video/x-raw").field("format", "BGR").build(),
149 )
150 .build()
151 .unwrap();
152
153 pipeline.add_many([&videoconvertscale, &filter]).unwrap();
155
156 Element::link_many([&videoconvertscale, &filter]).unwrap();
158
159 Ok((videoconvertscale, filter))
160 }
161}