use crate::storage::StoreableItem; #[cfg(debug_assertions)] use crate::wrapper::time; use anyhow::bail; use embedded_svc::http::client::{Client, Request, RequestWrite, Response}; use embedded_svc::http::{SendHeaders, Status}; use embedded_svc::io; use esp_idf_svc::http::client::{EspHttpClient, EspHttpClientConfiguration}; use log::{debug, error}; use rust_decimal::Decimal; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::fmt::{Display, Formatter}; use std::sync::mpsc::Receiver; use std::thread; use std::thread::JoinHandle; use std::time::Duration; const INFLUX_STACK_SIZE: usize = 12 * 1024; const BUFFER_SIZE: usize = 20; const POINT_PER_MESSAGE_LIMIT: usize = 20; pub struct InfluxClient { client: EspHttpClient, pub config: InfluxConfiguration, point_buffer: Vec, } #[derive(Clone, Serialize, Deserialize)] pub struct InfluxConfiguration { pub url: String, pub token: String, pub org: String, pub bucket: String, #[serde(skip)] pub precision: InfluxPrecision, } #[derive(Clone, Serialize, Deserialize)] pub enum InfluxPrecision { Seconds, MilliSeconds, } impl Default for InfluxPrecision { fn default() -> Self { Self::Seconds } } impl Display for InfluxPrecision { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { match self { InfluxPrecision::Seconds => f.write_str("s"), InfluxPrecision::MilliSeconds => f.write_str("ms"), } } } impl StoreableItem for InfluxConfiguration { fn storage_field() -> &'static str { "influx_config" } } impl InfluxClient { pub fn new(config: InfluxConfiguration) -> anyhow::Result { let client = EspHttpClient::new(&EspHttpClientConfiguration { crt_bundle_attach: Some(esp_idf_sys::esp_crt_bundle_attach), ..Default::default() })?; Ok(Self { client, config, point_buffer: Vec::with_capacity(BUFFER_SIZE), }) } pub fn spawn( config: InfluxConfiguration, point_receiver: Receiver, ) -> anyhow::Result> { Ok(thread::Builder::new() .name("Influx_Thread".to_string()) .stack_size(INFLUX_STACK_SIZE) .spawn(move || { let mut client = Self::new(config).expect("Setting up Influx client failed"); #[cfg(debug_assertions)] let mut current_sec = time(); loop { #[cfg(debug_assertions)] { let now = time(); if current_sec < now { debug!( "Second passed for Influx Thread {}, current_buffer_size: {}", current_sec, client.point_buffer.len() ); current_sec = now; } } for point in point_receiver.try_iter() { client.add(point); } client .flush_if_needed() .unwrap_or_else(|e| error!("Flushing failed {}", e)); thread::sleep(Duration::from_millis(40)); } })?) } pub fn add(&mut self, point: Point) { if point.is_empty() { return; } #[cfg(debug_assertions)] debug!( "Adding point {}, buffer size: {}", point.to_line_format(), self.point_buffer.len() ); self.point_buffer.push(point); } pub fn flush_if_needed(&mut self) -> anyhow::Result<()> { if self.point_buffer.len() >= BUFFER_SIZE { self.flush() } else { Ok(()) } } pub fn flush(&mut self) -> anyhow::Result<()> { let mut points = self.point_buffer.split_off(0); while !points.is_empty() { let mut message: String = String::new(); let mut points_in_message = 0; while !points.is_empty() && points_in_message < POINT_PER_MESSAGE_LIMIT { if let Some(point) = points.pop() { message.push_str(point.to_line_format().as_str()); points_in_message += 1; } } if message.is_empty() { bail!( "Writing Data to influx failed, message is empty, while points had been available" ); } self.send_message(message, true)?; } Ok(()) } fn send_message(&mut self, message: String, retry: bool) -> anyhow::Result<()> { let mut request = self .client .post(&self.get_post_to_url()) .map_err(|e| anyhow::Error::from(e).context("Opening Post request failed"))?; request.set_header( "Authorization", format!("Token {}", self.config.token).as_str(), ); let request_write = request .send_str(message.as_str()) .map_err(|e| anyhow::Error::from(e).context("Writing points to request failed"))?; match request_write.submit() { Ok(mut response) => match response.status() { 204 => {} s => { let mut body = [0_u8; 3048]; let (body, _) = io::read_max(response.reader(), &mut body)?; bail!( "Sending data to influx failed. Server responded with {}\nResponse:\n{:?}", s, String::from_utf8_lossy(body).into_owned() ); } }, Err(e) => { if retry { debug!("Connection was broken, reestablishing. {}", e); let client = EspHttpClient::new(&EspHttpClientConfiguration { crt_bundle_attach: Some(esp_idf_sys::esp_crt_bundle_attach), ..Default::default() })?; self.client = client; self.send_message(message, false)?; } else { return Err(anyhow::Error::from(e).context("Submitting request failed")); } } } Ok(()) } fn get_post_to_url(&self) -> String { format!( "{}/api/v2/write?org={}&bucket={}&precision={}", self.config.url, self.config.org, self.config.bucket, self.config.precision ) } } pub struct Point { measurement: String, keys: HashMap, values: HashMap, timestamp: Option, } pub enum Value { String(String), Decimal(Decimal), Int(i32), Unsigned(u32), } /// One datapoint that is used to be send to an influx database /// /// Example: /// ``` /// let mut point = Point::new("sample_measurement"); /// point /// .add_key("id", 5) /// .add_value("voltage", 4.56) /// .add_value("temperature", 16.7); /// assert_eq!(point.to_line_format(), "sample_measurement,id=5 voltage=4.56,temperature=16.7"); /// ``` impl Point { pub fn new(measurement: &str) -> Self { Self { measurement: measurement.to_string(), keys: Default::default(), values: Default::default(), timestamp: None, } } pub fn add_key>(&mut self, name: &str, value: V) -> &mut Self { self.keys.insert(name.to_string(), value.into()); self } pub fn add_value>(&mut self, name: &str, value: V) -> &mut Self { self.values.insert(name.to_string(), value.into()); self } pub fn add_option>(&mut self, name: &str, option: Option) -> &mut Self { if let Some(value) = option { self.values.insert(name.to_string(), value.into()); } self } pub fn set_timestamp(&mut self, timestamp: i64) -> &mut Self { self.timestamp = Some(timestamp); self } pub fn is_empty(&self) -> bool { self.values.is_empty() } pub fn to_line_format(&self) -> String { let mut line = self.measurement.clone(); if !self.keys.is_empty() { line.push(','); line.push_str(Self::join_key_values(&self.keys).as_str()); } line.push(' '); line.push_str(Self::join_key_values(&self.values).as_str()); if let Some(timestamp) = self.timestamp.as_ref() { line.push(' '); line.push_str(timestamp.to_string().as_str()); } line.push_str("\n"); line } fn join_key_values(source: &HashMap) -> String { source .iter() .map(|(name, value)| format!("{}={}", name, value)) .intersperse(",".to_string()) .collect::() } } impl Display for Point { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.write_str(self.to_line_format().as_str()) } } impl Display for Value { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { match self { Value::String(s) => { f.write_str("\"")?; s.fmt(f)?; f.write_str("\"")?; Ok(()) } Value::Decimal(d) => d.fmt(f), Value::Int(i) => i.fmt(f), Value::Unsigned(i) => i.fmt(f), } } } impl From for Value { fn from(source: String) -> Self { Self::String(source) } } impl From<&str> for Value { fn from(source: &str) -> Self { Self::String(source.to_string()) } } impl From for Value { fn from(source: Decimal) -> Self { Self::Decimal(source) } } impl From<&Decimal> for Value { fn from(source: &Decimal) -> Self { Self::Decimal(*source) } } impl From for Value { fn from(source: f32) -> Self { Self::Decimal(Decimal::from_f32_retain(source).unwrap()) } } impl From for Value { fn from(source: i32) -> Self { Self::Int(source) } } impl From for Value { fn from(source: u32) -> Self { Self::Unsigned(source) } } impl From for Value { fn from(source: bool) -> Self { if source { Self::Unsigned(1) } else { Self::Unsigned(0) } } }