Skip to content

Instantly share code, notes, and snippets.

@mooreniemi
Created May 20, 2024 03:17
Show Gist options
  • Save mooreniemi/bb23d386ce6eb84d764cbb834e4197fe to your computer and use it in GitHub Desktop.
Save mooreniemi/bb23d386ce6eb84d764cbb834e4197fe to your computer and use it in GitHub Desktop.

Revisions

  1. mooreniemi created this gist May 20, 2024.
    22 changes: 22 additions & 0 deletions Cargo.toml
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,22 @@
    [package]
    name = "delta_fake"
    version = "0.1.0"
    edition = "2021"

    # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

    [dependencies]
    anyhow = "1.0.86"
    dashmap = "5.5.3"
    fd-lock = "4.0.2"
    left-right = "0.11.5"
    memmap2 = "0.9.4"
    nix = "0.28.0"
    rand = "0.8.5"
    hyper = "1"
    serde = { version = "1.0", features = ["derive"] }
    serde_json = "1.0"
    tiny_http = "*"
    tokio = { version = "1", features = ["full"] }
    reqwest = { version = "0.12", features = ["json"] }

    99 changes: 99 additions & 0 deletions client.rs
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,99 @@
    use rand::{distributions::Alphanumeric, seq::IteratorRandom, Rng};
    use reqwest::{Client, StatusCode};
    use serde_json::json;
    use std::collections::HashSet;
    use tokio;
    use tokio::time::{Duration, Instant};

    async fn send_key_value(
    client: &Client,
    submit_url: &str,
    lookup_url: &str,
    key: &str,
    value: &str,
    times: &mut Vec<Duration>,
    ) -> Result<(), reqwest::Error> {
    let body = json!({ "key": key, "value": value });

    // Measure time for submission
    let start = Instant::now();
    client
    .post(submit_url)
    .json(&body)
    .send()
    .await?
    .error_for_status()?; // Ensure the response status is checked
    let elapsed = start.elapsed();
    times.push(elapsed);
    //println!("Time to submit '{}': {:?}", key, elapsed);

    // Measure time for lookup
    let start = Instant::now();
    let response = client
    .get(format!("{}?key={}", lookup_url, key))
    .send()
    .await?
    .error_for_status()?;
    let elapsed = start.elapsed();
    times.push(elapsed);
    //println!("Time to lookup '{}': {:?}", key, elapsed);

    // Print the value if the lookup was successful
    if response.status() == StatusCode::OK {
    let r_value: String = response
    .json::<serde_json::Value>()
    .await?
    .get("value")
    .unwrap()
    .as_str() // This returns an Option<&str>
    .unwrap() // Unwrap the Option<&str> to &str
    .to_string(); // Convert &str to String
    assert_eq!(value, r_value.as_str(), "we got back what we sent in");
    //println!("Lookup value for '{}': {}", key, value);
    }

    Ok(())
    }

    #[tokio::main]
    async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let client = Client::new();
    let submit_url = "http://localhost:6000/submit";
    let lookup_url = "http://localhost:6000/lookup";
    let mut rng = rand::thread_rng();
    let mut keys: HashSet<String> = HashSet::new();
    let mut times: Vec<Duration> = Vec::new();

    for _ in 0..100 {
    // sometimes edit, sometimes create
    let key = if rng.gen_bool(0.05) && !keys.is_empty() {
    keys.iter().choose(&mut rng).unwrap().to_string()
    } else {
    let new_key: String = std::iter::repeat(())
    .map(|()| rng.sample(Alphanumeric))
    .map(char::from)
    .take(10)
    .collect();
    keys.insert(new_key.clone());
    new_key
    };

    let value: String = std::iter::repeat(())
    .map(|()| rng.sample(Alphanumeric))
    .map(char::from)
    .take(20)
    .collect();

    if let Err(e) =
    send_key_value(&client, submit_url, lookup_url, &key, &value, &mut times).await
    {
    eprintln!("Error sending or looking up key-value: {}", e);
    }
    }

    // Calculate and print the average times
    let average_time = times.iter().sum::<Duration>() / times.len() as u32;
    println!("Average time for operations: {:?}", average_time);

    Ok(())
    }
    366 changes: 366 additions & 0 deletions main.rs
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,366 @@
    use dashmap::DashMap;
    use memmap2::{Mmap, MmapMut, MmapOptions};
    use serde::{Deserialize, Serialize};
    use serde_json::json;
    use std::fs::{File, OpenOptions};
    use std::io;
    use std::sync::atomic::{AtomicBool, AtomicPtr, AtomicUsize, Ordering};
    use std::sync::mpsc::Sender;
    use std::sync::Arc;
    use std::thread;
    use std::time::Duration;
    use tiny_http::{Header, Method, Request, Response, Server, StatusCode};

    #[derive(Deserialize, Serialize, Debug)]
    struct KeyValue {
    key: String,
    value: String,
    }

    fn handle_request(
    mut request: Request,
    sender: &Sender<KeyValue>,
    kv_store: &Arc<AtomicPtr<ManagedMemory>>,
    ) {
    match (
    request.method(),
    request.url().split('?').nth(0).unwrap_or("/uknown"),
    ) {
    // Handle POST requests to submit key-value pairs
    (&Method::Post, "/submit") => {
    let content_length = request
    .headers()
    .iter()
    .find(|h| h.field.as_str().to_ascii_lowercase() == "content-length")
    .and_then(|h| h.value.as_str().parse::<usize>().ok())
    .expect("need content length");

    let mut content = vec![0; content_length];
    request.as_reader().read_exact(&mut content).unwrap();
    println!("Received content: {}", String::from_utf8_lossy(&content)); // Print raw JSON content

    if let Ok(key_value) = serde_json::from_slice::<KeyValue>(&content) {
    sender
    .send(key_value)
    .expect("Failed to send key-value to the writer");
    let json = json!({"status": "submitted"});
    let response = Response::from_string(json.to_string())
    .with_header(
    Header::from_bytes(&b"Content-Type"[..], &b"application/json"[..]).unwrap(),
    )
    .with_status_code(StatusCode(200));

    request.respond(response).unwrap();
    } else {
    let json = json!({"status": "error: invalid data"});
    let response = Response::from_string(json.to_string())
    .with_header(
    Header::from_bytes(&b"Content-Type"[..], &b"application/json"[..]).unwrap(),
    )
    .with_status_code(StatusCode(400));

    request.respond(response).unwrap();
    }
    }
    // Handle GET requests to lookup key-value pairs
    (&Method::Get, "/lookup") => {
    let key = request
    .url()
    .split('?')
    .nth(1)
    .and_then(|q| q.split('=').nth(1))
    .unwrap_or("");

    let mm_ptr = kv_store.load(Ordering::SeqCst);
    let mm = unsafe { &*mm_ptr };
    if let Some(entry) = mm.locations.get(key) {
    let (pos, len) = *entry;
    let value = String::from_utf8_lossy(&mm.mmap_immutable[pos..pos + len]);
    println!("Return data for key: {}={:?}", key, value);
    let json = json!({"key": key, "value": value});
    let response = Response::from_string(json.to_string())
    .with_header(
    Header::from_bytes(&b"Content-Type"[..], &b"application/json"[..]).unwrap(),
    )
    .with_status_code(StatusCode(200));

    request.respond(response).unwrap();
    } else {
    eprintln!("expected to read {}", key);
    let json = json!({"status": format!("error: {} key not found", key)});
    let response = Response::from_string(json.to_string())
    .with_header(
    Header::from_bytes(&b"Content-Type"[..], &b"application/json"[..]).unwrap(),
    )
    .with_status_code(StatusCode(404));
    request.respond(response).unwrap();
    }
    }
    // Handle not found
    (_method, other) => {
    let json = json!({"status": format!("error: {} route not found", other)});
    let response = Response::from_string(json.to_string())
    .with_header(
    Header::from_bytes(&b"Content-Type"[..], &b"application/json"[..]).unwrap(),
    )
    .with_status_code(StatusCode(404));
    request.respond(response).unwrap();
    }
    }
    }

    struct ManagedMemory {
    mmap: MmapMut,
    mmap_immutable: Mmap,
    // FIXME: just simple to use DashMap to start but could switch to LeftRight or whatever
    locations: DashMap<String, (usize, usize)>,
    highest_byte_position: AtomicUsize,
    unused_bytes: AtomicUsize,
    }

    impl ManagedMemory {
    fn from_compacted(
    file: File,
    new_locations: DashMap<String, (usize, usize)>,
    highest_byte_position: AtomicUsize,
    ) -> io::Result<Self> {
    let mmap = unsafe { MmapOptions::new().map_mut(&file).expect("open compacted") };
    let mmap_immutable = unsafe { MmapOptions::new().map(&file).expect("open compacted") };
    Ok(Self {
    mmap,
    mmap_immutable,
    locations: new_locations,
    highest_byte_position,
    unused_bytes: AtomicUsize::new(0),
    })
    }

    fn new(file_path: &str, size: u64) -> io::Result<Self> {
    let file = OpenOptions::new()
    .read(true)
    .write(true)
    .create(true)
    .open(file_path)
    .expect("creating new dat file");
    file.set_len(size)?;

    let mmap = unsafe { MmapOptions::new().map_mut(&file)? };
    let mmap_immutable = unsafe { MmapOptions::new().map(&file)? };

    Ok(Self {
    mmap,
    mmap_immutable,
    locations: DashMap::new(),
    highest_byte_position: AtomicUsize::new(0),
    unused_bytes: AtomicUsize::new(0),
    })
    }

    fn room_to_write(&self, data: &Vec<u8>) -> bool {
    let data_len = data.len();
    let new_pos = self.highest_byte_position.load(Ordering::SeqCst) + data_len;
    new_pos + data_len < self.mmap.len()
    }

    fn write_data(&mut self, key: String, data: Vec<u8>) -> io::Result<()> {
    let data_len = data.len();
    let new_pos = self
    .highest_byte_position
    .fetch_add(data_len, Ordering::SeqCst);

    for (i, &byte) in data.iter().enumerate() {
    self.mmap[new_pos + i] = byte;
    }

    if let Some((_old_pos, old_len)) = self.locations.insert(key.clone(), (new_pos, data_len)) {
    self.unused_bytes.fetch_add(old_len, Ordering::SeqCst);
    }

    Ok(())
    }
    }

    fn setup_shared_memory(file_path: &str, size: u64) -> io::Result<Arc<AtomicPtr<ManagedMemory>>> {
    println!("will setup shared_memory at {}", file_path);
    let managed_memory = ManagedMemory::new(file_path, size)?;
    let managed_memory_box = Box::new(managed_memory);
    let managed_memory_ptr = Box::into_raw(managed_memory_box);
    Ok(Arc::new(AtomicPtr::new(managed_memory_ptr)))
    }

    fn swap_memory(atomic_managed_memory: &Arc<AtomicPtr<ManagedMemory>>, new_memory: ManagedMemory) {
    println!("swapping managed memory pointer");
    let new_memory_box = Box::new(new_memory);
    let new_memory_ptr = Box::into_raw(new_memory_box);
    let old_memory_ptr = atomic_managed_memory.swap(new_memory_ptr, Ordering::SeqCst);
    println!("swapped managed memory pointer");

    unsafe {
    // drop old memory
    let _ = Box::from_raw(old_memory_ptr);
    }
    }

    fn compact_file(
    original_file: &str,
    new_file: &str,
    locations: &DashMap<String, (usize, usize)>,
    file_size: u64,
    ) -> io::Result<(File, DashMap<String, (usize, usize)>, usize)> {
    let mut new_file = OpenOptions::new()
    .write(true)
    .read(true)
    .create(true)
    .open(new_file)?;
    let mut current_pos = 0;
    let new_locations = DashMap::new();

    for entry in locations.iter() {
    let (key, (pos, len)) = entry.pair();
    let mut buffer = vec![0u8; *len];
    let old_file = File::open(original_file)?;

    match std::os::unix::fs::FileExt::read_exact_at(&old_file, &mut buffer, *pos as u64) {
    Ok(_) => {
    io::Write::write_all(&mut new_file, &buffer)?;
    new_locations.insert(key.clone(), (current_pos, *len));
    current_pos += len;
    }
    Err(e) => {
    eprintln!("Failed to read data for key {}: {}", key, e);
    continue;
    }
    }
    }

    println!("compacted to {}", current_pos as f32 / file_size as f32);
    // otherwise the file is only as big as you compacted to...
    new_file.set_len(file_size)?;

    Ok((new_file, new_locations, current_pos))
    }

    static COMPACTING: AtomicBool = AtomicBool::new(false);
    static VERSION: AtomicUsize = AtomicUsize::new(0);

    // NOTE: smaller values while testing is helpful
    static MAX_FILE_SIZE: u64 = 1024 * 1024;
    static COMPACTION_PERCENT_TRIGGER: f32 = 0.3;

    static COMPACTION_SLEEP_SECS: u64 = 2;
    static READ_SAMPLE_KEYS_INTERVAL_SECS: u64 = 10;

    fn main() -> io::Result<()> {
    // TODO: just always starting from afresh rather than loading
    let shared_memory = setup_shared_memory(
    &format!("./data/example.{}.dat", VERSION.load(Ordering::Relaxed)),
    MAX_FILE_SIZE,
    )?;
    let (tx, rx) = std::sync::mpsc::channel::<KeyValue>();

    let sm_clone = shared_memory.clone();
    // NOTE: this thread must not panic, or you have stranded the server from writes
    let writer_thread = thread::spawn(move || {
    while let Ok(KeyValue { key, value }) = rx.recv() {
    println!("will write key={}, value={}", key, value);
    // FIXME: poor man's write buffer is unbounded...
    while COMPACTING.load(Ordering::SeqCst).eq(&true) {
    println!("buffering a write due to compaction...");
    thread::sleep(Duration::from_secs(1));
    }
    let mm_ptr = sm_clone.load(Ordering::SeqCst);
    let mm = unsafe { &mut *mm_ptr };
    let data = value.into_bytes();
    // FIXME: probably not what we want here
    let mut attempts = 0;
    while !mm.room_to_write(&data) && attempts < 5 {
    println!("delaying a write because file was full...");
    attempts += 1;
    thread::sleep(Duration::from_secs(2_i32.pow(attempts).try_into().unwrap()));
    }
    match mm.write_data(key.clone(), data) {
    Ok(_) => (),
    Err(e) => {
    // NOTE: could flush the write data to a dead letter queue type log...
    eprintln!("unrecoverably failed a write to {} due to {}", key, e)
    }
    }
    }
    });

    let sm_clone_read = shared_memory.clone();
    let compaction_thread = thread::spawn(move || loop {
    thread::sleep(Duration::from_secs(COMPACTION_SLEEP_SECS));
    COMPACTING.swap(true, Ordering::SeqCst);
    let mm_ptr = sm_clone_read.load(Ordering::SeqCst);
    let mm = unsafe { &*mm_ptr };
    let total = mm.unused_bytes.load(Ordering::Relaxed) as f32 / mm.mmap_immutable.len() as f32;
    if total > COMPACTION_PERCENT_TRIGGER {
    let current_version = VERSION.load(Ordering::SeqCst);
    let new_version = VERSION.fetch_add(1, Ordering::SeqCst) + 1;
    println!(
    "will increment dat version from {} to {}",
    current_version, new_version
    );
    let (new_file, new_locations, max_pos) = compact_file(
    &format!("./data/example.{}.dat", current_version),
    &format!("./data/example.{}.dat", new_version),
    &mm.locations,
    MAX_FILE_SIZE,
    )
    .expect("compacted");
    let new_memory = ManagedMemory::from_compacted(new_file, new_locations, max_pos.into())
    .expect("new mm");
    swap_memory(&sm_clone_read, new_memory);
    } else {
    println!("compacting found {} dead, sleeping", total);
    }
    COMPACTING.swap(false, Ordering::SeqCst);
    });

    // this is just a thread that's continously logging a sample of keys
    let sm_clone_read = shared_memory.clone();
    let reader_thread = thread::spawn(move || {
    loop {
    thread::sleep(Duration::from_secs(READ_SAMPLE_KEYS_INTERVAL_SECS));
    let mm_ptr = sm_clone_read.load(Ordering::SeqCst);
    let mm = unsafe { &*mm_ptr };

    let keys: Vec<_> = mm
    .locations
    .iter()
    .take(3)
    .map(|entry| entry.key().clone())
    .collect();
    //let keys = vec!["key1", "key2", "key3"];
    for key in keys {
    if let Some(entry) = mm.locations.get(&key) {
    let (pos, len) = *entry;
    println!(
    "Read data for key: {}={:?}",
    key,
    String::from_utf8_lossy(&mm.mmap_immutable[pos..pos + len])
    );
    } else {
    eprintln!("expected to read {}", key)
    }
    }
    }
    });

    let sm_clone_read = shared_memory.clone();
    let server_addr = "0.0.0.0:6000";
    let server = Server::http(server_addr).unwrap();
    thread::spawn(move || {
    println!("starting tiny-http server on: {}", server_addr);
    for request in server.incoming_requests() {
    handle_request(request, &tx, &sm_clone_read);
    }
    });

    reader_thread.join().unwrap();
    compaction_thread.join().unwrap();
    writer_thread.join().unwrap();

    Ok(())
    }