Skip to content

Instantly share code, notes, and snippets.

@Horusiath
Last active April 30, 2022 07:19
Show Gist options
  • Select an option

  • Save Horusiath/acbbb53c332d54caf8cba192ab07dfd8 to your computer and use it in GitHub Desktop.

Select an option

Save Horusiath/acbbb53c332d54caf8cba192ab07dfd8 to your computer and use it in GitHub Desktop.

Revisions

  1. Horusiath revised this gist Apr 30, 2022. 1 changed file with 3 additions and 0 deletions.
    3 changes: 3 additions & 0 deletions delta_encoding.rs
    Original file line number Diff line number Diff line change
    @@ -8,6 +8,9 @@ use std::fmt::Formatter;
    #[derive(Debug, Eq, PartialEq, Serialize, Deserialize)]
    pub struct Record {
    /// A number of milliseconds since UNIX_EPOCH.
    ///
    /// I'm using u64 for ease of comparison during tests, as `SystemTime::now()`
    /// has < 1ms precision, which will be cut off by this approach.
    timestamp: u64,
    data: Vec<u8>,
    }
  2. Horusiath revised this gist Apr 30, 2022. 1 changed file with 45 additions and 21 deletions.
    66 changes: 45 additions & 21 deletions delta_encoding.rs
    Original file line number Diff line number Diff line change
    @@ -1,17 +1,15 @@
    use std::fmt::Formatter;
    use std::ops::Add;
    use std::time::{Duration, SystemTime, UNIX_EPOCH};
    use serde::{Deserializer, Serializer, Serialize, Deserialize};
    use serde::de::{ Error, MapAccess, SeqAccess, Visitor};
    use serde::ser::{SerializeSeq, SerializeStruct};
    use crate::session::{SessionDeserialize, SessionSerialize};
    use serde::de::{Error, SeqAccess, Visitor};
    use serde::ser::SerializeStruct;
    use serde::{Deserialize, Deserializer, Serialize, Serializer};
    use std::fmt::Formatter;

    /// A sample record of time-series data.
    #[derive(Debug, Eq, PartialEq, Serialize, Deserialize)]
    pub struct Record {
    /// A number of milliseconds since UNIX_EPOCH.
    timestamp: u64,
    data: Vec<u8>
    data: Vec<u8>,
    }

    impl Record {
    @@ -23,7 +21,7 @@ impl Record {
    /// Session used during serialization/deserialization.
    #[derive(Default)]
    pub struct RecordSession {
    latest: i64
    latest: i64,
    }

    impl RecordSession {
    @@ -43,7 +41,14 @@ impl RecordSession {
    impl SessionSerialize for Record {
    type Session = RecordSession;

    fn session_serialize<S>(&self, session: &mut Self::Session, serializer: S) -> Result<S::Ok, S::Error> where S: Serializer {
    fn session_serialize<S>(
    &self,
    session: &mut Self::Session,
    serializer: S,
    ) -> Result<S::Ok, S::Error>
    where
    S: Serializer,
    {
    let mut s = serializer.serialize_struct("Record", 2)?;
    let timestamp_delta = session.timestamp_to_delta(self.timestamp);
    s.serialize_field("timestamp", &timestamp_delta)?;
    @@ -55,22 +60,35 @@ impl SessionSerialize for Record {
    impl<'de> SessionDeserialize<'de> for Record {
    type Session = RecordSession;

    fn session_deserialize<D>(session: &mut Self::Session, deserializer: D) -> Result<Self, D::Error> where D: Deserializer<'de> {
    fn session_deserialize<D>(
    session: &mut Self::Session,
    deserializer: D,
    ) -> Result<Self, D::Error>
    where
    D: Deserializer<'de>,
    {
    const FIELDS: &'static [&'static str] = &["timestamp", "data"];
    struct RecordVisitor<'s, 'de> {
    session: &'s mut <Record as SessionDeserialize<'de>>::Session,
    }
    impl<'s, 'de> Visitor<'de> for RecordVisitor<'s,'de> {
    impl<'s, 'de> Visitor<'de> for RecordVisitor<'s, 'de> {
    type Value = Record;

    fn expecting(&self, formatter: &mut Formatter) -> std::fmt::Result {
    write!(formatter, "struct Record")
    }

    fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error> where A: SeqAccess<'de> {
    let timestamp_delta = seq.next_element()?.ok_or_else(|| A::Error::invalid_length(0, &self))?;
    fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
    where
    A: SeqAccess<'de>,
    {
    let timestamp_delta = seq
    .next_element()?
    .ok_or_else(|| A::Error::invalid_length(0, &self))?;
    let timestamp = self.session.timestamp_from_delta(timestamp_delta);
    let data = seq.next_element()?.ok_or_else(|| A::Error::invalid_length(1, &self))?;
    let data = seq
    .next_element()?
    .ok_or_else(|| A::Error::invalid_length(1, &self))?;
    Ok(Record::new(timestamp, data))
    }
    }
    @@ -81,18 +99,22 @@ impl<'de> SessionDeserialize<'de> for Record {

    #[cfg(test)]
    mod test {
    use std::io::Cursor;
    use std::time::{SystemTime, UNIX_EPOCH};
    use serde::{Deserialize, Serialize};
    use crate::delta_encoding::{Record, RecordSession};
    use crate::session::{SessionDeserialize, SessionSerialize};
    use serde::{Deserialize, Serialize};
    use std::io::Cursor;
    use std::time::{SystemTime, UNIX_EPOCH};

    fn now() -> u64 {
    SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis() as u64
    SystemTime::now()
    .duration_since(UNIX_EPOCH)
    .unwrap()
    .as_millis() as u64
    }

    fn generate_sample(len: usize) -> Vec<Record> {
    (0..len).into_iter()
    (0..len)
    .into_iter()
    .map(|i| Record::new(now(), vec![i as u8]))
    .collect()
    }
    @@ -130,7 +152,9 @@ mod test {
    let mut serializer = rmp_serde::Serializer::new(&mut buf);
    input.len().serialize(&mut serializer).unwrap();
    for record in input.iter() {
    record.session_serialize(&mut session, &mut serializer).unwrap();
    record
    .session_serialize(&mut session, &mut serializer)
    .unwrap();
    }

    println!("delta payload size: {} bytes", buf.len()); // 409 bytes
    @@ -144,4 +168,4 @@ mod test {
    assert_eq!(record, input[i]);
    }
    }
    }
    }
  3. Horusiath created this gist Apr 30, 2022.
    147 changes: 147 additions & 0 deletions delta_encoding.rs
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,147 @@
    use std::fmt::Formatter;
    use std::ops::Add;
    use std::time::{Duration, SystemTime, UNIX_EPOCH};
    use serde::{Deserializer, Serializer, Serialize, Deserialize};
    use serde::de::{ Error, MapAccess, SeqAccess, Visitor};
    use serde::ser::{SerializeSeq, SerializeStruct};
    use crate::session::{SessionDeserialize, SessionSerialize};

    /// A sample record of time-series data.
    #[derive(Debug, Eq, PartialEq, Serialize, Deserialize)]
    pub struct Record {
    /// A number of milliseconds since UNIX_EPOCH.
    timestamp: u64,
    data: Vec<u8>
    }

    impl Record {
    pub fn new(timestamp: u64, data: Vec<u8>) -> Self {
    Record { timestamp, data }
    }
    }

    /// Session used during serialization/deserialization.
    #[derive(Default)]
    pub struct RecordSession {
    latest: i64
    }

    impl RecordSession {
    fn timestamp_to_delta(&mut self, timestamp: u64) -> i64 {
    let ms = timestamp as i64;
    let delta = ms - self.latest;
    self.latest = ms;
    delta
    }

    fn timestamp_from_delta(&mut self, timestamp_delta: i64) -> u64 {
    self.latest += timestamp_delta;
    self.latest as u64
    }
    }

    impl SessionSerialize for Record {
    type Session = RecordSession;

    fn session_serialize<S>(&self, session: &mut Self::Session, serializer: S) -> Result<S::Ok, S::Error> where S: Serializer {
    let mut s = serializer.serialize_struct("Record", 2)?;
    let timestamp_delta = session.timestamp_to_delta(self.timestamp);
    s.serialize_field("timestamp", &timestamp_delta)?;
    s.serialize_field("data", &self.data)?;
    s.end()
    }
    }

    impl<'de> SessionDeserialize<'de> for Record {
    type Session = RecordSession;

    fn session_deserialize<D>(session: &mut Self::Session, deserializer: D) -> Result<Self, D::Error> where D: Deserializer<'de> {
    const FIELDS: &'static [&'static str] = &["timestamp", "data"];
    struct RecordVisitor<'s, 'de> {
    session: &'s mut <Record as SessionDeserialize<'de>>::Session,
    }
    impl<'s, 'de> Visitor<'de> for RecordVisitor<'s,'de> {
    type Value = Record;

    fn expecting(&self, formatter: &mut Formatter) -> std::fmt::Result {
    write!(formatter, "struct Record")
    }

    fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error> where A: SeqAccess<'de> {
    let timestamp_delta = seq.next_element()?.ok_or_else(|| A::Error::invalid_length(0, &self))?;
    let timestamp = self.session.timestamp_from_delta(timestamp_delta);
    let data = seq.next_element()?.ok_or_else(|| A::Error::invalid_length(1, &self))?;
    Ok(Record::new(timestamp, data))
    }
    }
    let visitor = RecordVisitor { session };
    deserializer.deserialize_struct("Record", FIELDS, visitor)
    }
    }

    #[cfg(test)]
    mod test {
    use std::io::Cursor;
    use std::time::{SystemTime, UNIX_EPOCH};
    use serde::{Deserialize, Serialize};
    use crate::delta_encoding::{Record, RecordSession};
    use crate::session::{SessionDeserialize, SessionSerialize};

    fn now() -> u64 {
    SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis() as u64
    }

    fn generate_sample(len: usize) -> Vec<Record> {
    (0..len).into_iter()
    .map(|i| Record::new(now(), vec![i as u8]))
    .collect()
    }

    #[test]
    fn standard_serialize() {
    let input = generate_sample(100);
    let mut buf = Vec::new();

    // serialize 100 records using length-prefix encoding
    let mut serializer = rmp_serde::Serializer::new(&mut buf);
    input.len().serialize(&mut serializer).unwrap();
    for record in input.iter() {
    record.serialize(&mut serializer).unwrap();
    }

    println!("standard payload size: {} bytes", buf.len()); // 1201 bytes

    // deserialize previously serialized input
    let mut deserializer = rmp_serde::Deserializer::new(Cursor::new(buf));
    let len = usize::deserialize(&mut deserializer).unwrap();
    for i in 0..len {
    let record = Record::deserialize(&mut deserializer).unwrap();
    assert_eq!(record, input[i]);
    }
    }

    #[test]
    fn session_serialize() {
    let input = generate_sample(100);
    let mut buf = Vec::new();

    // serialize 100 records using length-prefix encoding with delta session
    let mut session = RecordSession::default();
    let mut serializer = rmp_serde::Serializer::new(&mut buf);
    input.len().serialize(&mut serializer).unwrap();
    for record in input.iter() {
    record.session_serialize(&mut session, &mut serializer).unwrap();
    }

    println!("delta payload size: {} bytes", buf.len()); // 409 bytes

    // deserialize previously serialized input using new session (as we emulate remote end)
    let mut session = RecordSession::default();
    let mut deserializer = rmp_serde::Deserializer::new(Cursor::new(buf));
    let len = usize::deserialize(&mut deserializer).unwrap();
    for i in 0..len {
    let record = Record::session_deserialize(&mut session, &mut deserializer).unwrap();
    assert_eq!(record, input[i]);
    }
    }
    }