Last active
April 30, 2022 07:19
-
-
Save Horusiath/acbbb53c332d54caf8cba192ab07dfd8 to your computer and use it in GitHub Desktop.
Revisions
-
Horusiath revised this gist
Apr 30, 2022 . 1 changed file with 3 additions and 0 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -8,6 +8,9 @@ use std::fmt::Formatter; #[derive(Debug, Eq, PartialEq, Serialize, Deserialize)] pub struct Record { /// A number of milliseconds since UNIX_EPOCH. /// /// I'm using u64 for ease of comparison during tests, as `SystemTime::now()` /// has < 1ms precision, which will be cut off by this approach. timestamp: u64, data: Vec<u8>, } -
Horusiath revised this gist
Apr 30, 2022 . 1 changed file with 45 additions and 21 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1,17 +1,15 @@ use crate::session::{SessionDeserialize, SessionSerialize}; use serde::de::{Error, SeqAccess, Visitor}; use serde::ser::SerializeStruct; use serde::{Deserialize, Deserializer, Serialize, Serializer}; use std::fmt::Formatter; /// A sample record of time-series data. #[derive(Debug, Eq, PartialEq, Serialize, Deserialize)] pub struct Record { /// A number of milliseconds since UNIX_EPOCH. timestamp: u64, data: Vec<u8>, } impl Record { @@ -23,7 +21,7 @@ impl Record { /// Session used during serialization/deserialization. #[derive(Default)] pub struct RecordSession { latest: i64, } impl RecordSession { @@ -43,7 +41,14 @@ impl RecordSession { impl SessionSerialize for Record { type Session = RecordSession; fn session_serialize<S>( &self, session: &mut Self::Session, serializer: S, ) -> Result<S::Ok, S::Error> where S: Serializer, { let mut s = serializer.serialize_struct("Record", 2)?; let timestamp_delta = session.timestamp_to_delta(self.timestamp); s.serialize_field("timestamp", ×tamp_delta)?; @@ -55,22 +60,35 @@ impl SessionSerialize for Record { impl<'de> SessionDeserialize<'de> for Record { type Session = RecordSession; fn session_deserialize<D>( session: &mut Self::Session, deserializer: D, ) -> Result<Self, D::Error> where D: Deserializer<'de>, { const FIELDS: &'static [&'static str] = &["timestamp", "data"]; struct RecordVisitor<'s, 'de> { session: &'s mut <Record as SessionDeserialize<'de>>::Session, } impl<'s, 'de> Visitor<'de> for RecordVisitor<'s, 'de> { type Value = Record; fn expecting(&self, formatter: &mut Formatter) -> std::fmt::Result { write!(formatter, "struct Record") } fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error> where A: SeqAccess<'de>, { let timestamp_delta = seq .next_element()? .ok_or_else(|| A::Error::invalid_length(0, &self))?; let timestamp = self.session.timestamp_from_delta(timestamp_delta); let data = seq .next_element()? .ok_or_else(|| A::Error::invalid_length(1, &self))?; Ok(Record::new(timestamp, data)) } } @@ -81,18 +99,22 @@ impl<'de> SessionDeserialize<'de> for Record { #[cfg(test)] mod test { use crate::delta_encoding::{Record, RecordSession}; use crate::session::{SessionDeserialize, SessionSerialize}; use serde::{Deserialize, Serialize}; use std::io::Cursor; use std::time::{SystemTime, UNIX_EPOCH}; fn now() -> u64 { SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap() .as_millis() as u64 } fn generate_sample(len: usize) -> Vec<Record> { (0..len) .into_iter() .map(|i| Record::new(now(), vec![i as u8])) .collect() } @@ -130,7 +152,9 @@ mod test { let mut serializer = rmp_serde::Serializer::new(&mut buf); input.len().serialize(&mut serializer).unwrap(); for record in input.iter() { record .session_serialize(&mut session, &mut serializer) .unwrap(); } println!("delta payload size: {} bytes", buf.len()); // 409 bytes @@ -144,4 +168,4 @@ mod test { assert_eq!(record, input[i]); } } } -
Horusiath created this gist
Apr 30, 2022 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,147 @@ use std::fmt::Formatter; use std::ops::Add; use std::time::{Duration, SystemTime, UNIX_EPOCH}; use serde::{Deserializer, Serializer, Serialize, Deserialize}; use serde::de::{ Error, MapAccess, SeqAccess, Visitor}; use serde::ser::{SerializeSeq, SerializeStruct}; use crate::session::{SessionDeserialize, SessionSerialize}; /// A sample record of time-series data. #[derive(Debug, Eq, PartialEq, Serialize, Deserialize)] pub struct Record { /// A number of milliseconds since UNIX_EPOCH. timestamp: u64, data: Vec<u8> } impl Record { pub fn new(timestamp: u64, data: Vec<u8>) -> Self { Record { timestamp, data } } } /// Session used during serialization/deserialization. #[derive(Default)] pub struct RecordSession { latest: i64 } impl RecordSession { fn timestamp_to_delta(&mut self, timestamp: u64) -> i64 { let ms = timestamp as i64; let delta = ms - self.latest; self.latest = ms; delta } fn timestamp_from_delta(&mut self, timestamp_delta: i64) -> u64 { self.latest += timestamp_delta; self.latest as u64 } } impl SessionSerialize for Record { type Session = RecordSession; fn session_serialize<S>(&self, session: &mut Self::Session, serializer: S) -> Result<S::Ok, S::Error> where S: Serializer { let mut s = serializer.serialize_struct("Record", 2)?; let timestamp_delta = session.timestamp_to_delta(self.timestamp); s.serialize_field("timestamp", ×tamp_delta)?; s.serialize_field("data", &self.data)?; s.end() } } impl<'de> SessionDeserialize<'de> for Record { type Session = RecordSession; fn session_deserialize<D>(session: &mut Self::Session, deserializer: D) -> Result<Self, D::Error> where D: Deserializer<'de> { const FIELDS: &'static [&'static str] = &["timestamp", "data"]; struct RecordVisitor<'s, 'de> { session: &'s mut <Record as SessionDeserialize<'de>>::Session, } impl<'s, 'de> Visitor<'de> for RecordVisitor<'s,'de> { type Value = Record; fn expecting(&self, formatter: &mut Formatter) -> std::fmt::Result { write!(formatter, "struct Record") } fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error> where A: SeqAccess<'de> { let timestamp_delta = seq.next_element()?.ok_or_else(|| A::Error::invalid_length(0, &self))?; let timestamp = self.session.timestamp_from_delta(timestamp_delta); let data = seq.next_element()?.ok_or_else(|| A::Error::invalid_length(1, &self))?; Ok(Record::new(timestamp, data)) } } let visitor = RecordVisitor { session }; deserializer.deserialize_struct("Record", FIELDS, visitor) } } #[cfg(test)] mod test { use std::io::Cursor; use std::time::{SystemTime, UNIX_EPOCH}; use serde::{Deserialize, Serialize}; use crate::delta_encoding::{Record, RecordSession}; use crate::session::{SessionDeserialize, SessionSerialize}; fn now() -> u64 { SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis() as u64 } fn generate_sample(len: usize) -> Vec<Record> { (0..len).into_iter() .map(|i| Record::new(now(), vec![i as u8])) .collect() } #[test] fn standard_serialize() { let input = generate_sample(100); let mut buf = Vec::new(); // serialize 100 records using length-prefix encoding let mut serializer = rmp_serde::Serializer::new(&mut buf); input.len().serialize(&mut serializer).unwrap(); for record in input.iter() { record.serialize(&mut serializer).unwrap(); } println!("standard payload size: {} bytes", buf.len()); // 1201 bytes // deserialize previously serialized input let mut deserializer = rmp_serde::Deserializer::new(Cursor::new(buf)); let len = usize::deserialize(&mut deserializer).unwrap(); for i in 0..len { let record = Record::deserialize(&mut deserializer).unwrap(); assert_eq!(record, input[i]); } } #[test] fn session_serialize() { let input = generate_sample(100); let mut buf = Vec::new(); // serialize 100 records using length-prefix encoding with delta session let mut session = RecordSession::default(); let mut serializer = rmp_serde::Serializer::new(&mut buf); input.len().serialize(&mut serializer).unwrap(); for record in input.iter() { record.session_serialize(&mut session, &mut serializer).unwrap(); } println!("delta payload size: {} bytes", buf.len()); // 409 bytes // deserialize previously serialized input using new session (as we emulate remote end) let mut session = RecordSession::default(); let mut deserializer = rmp_serde::Deserializer::new(Cursor::new(buf)); let len = usize::deserialize(&mut deserializer).unwrap(); for i in 0..len { let record = Record::session_deserialize(&mut session, &mut deserializer).unwrap(); assert_eq!(record, input[i]); } } }