use dashmap::DashMap; use memmap2::{Mmap, MmapMut, MmapOptions}; use serde::{Deserialize, Serialize}; use serde_json::json; use std::fs::{File, OpenOptions}; use std::io; use std::sync::atomic::{AtomicBool, AtomicPtr, AtomicUsize, Ordering}; use std::sync::mpsc::Sender; use std::sync::Arc; use std::thread; use std::time::Duration; use tiny_http::{Header, Method, Request, Response, Server, StatusCode}; #[derive(Deserialize, Serialize, Debug)] struct KeyValue { key: String, value: String, } fn handle_request( mut request: Request, sender: &Sender, kv_store: &Arc>, ) { match ( request.method(), request.url().split('?').nth(0).unwrap_or("/uknown"), ) { // Handle POST requests to submit key-value pairs (&Method::Post, "/submit") => { let content_length = request .headers() .iter() .find(|h| h.field.as_str().to_ascii_lowercase() == "content-length") .and_then(|h| h.value.as_str().parse::().ok()) .expect("need content length"); let mut content = vec![0; content_length]; request.as_reader().read_exact(&mut content).unwrap(); println!("Received content: {}", String::from_utf8_lossy(&content)); // Print raw JSON content if let Ok(key_value) = serde_json::from_slice::(&content) { sender .send(key_value) .expect("Failed to send key-value to the writer"); let json = json!({"status": "submitted"}); let response = Response::from_string(json.to_string()) .with_header( Header::from_bytes(&b"Content-Type"[..], &b"application/json"[..]).unwrap(), ) .with_status_code(StatusCode(200)); request.respond(response).unwrap(); } else { let json = json!({"status": "error: invalid data"}); let response = Response::from_string(json.to_string()) .with_header( Header::from_bytes(&b"Content-Type"[..], &b"application/json"[..]).unwrap(), ) .with_status_code(StatusCode(400)); request.respond(response).unwrap(); } } // Handle GET requests to lookup key-value pairs (&Method::Get, "/lookup") => { let key = request .url() .split('?') .nth(1) .and_then(|q| q.split('=').nth(1)) .unwrap_or(""); let mm_ptr = kv_store.load(Ordering::SeqCst); let mm = unsafe { &*mm_ptr }; if let Some(entry) = mm.locations.get(key) { let (pos, len) = *entry; let value = String::from_utf8_lossy(&mm.mmap_immutable[pos..pos + len]); println!("Return data for key: {}={:?}", key, value); let json = json!({"key": key, "value": value}); let response = Response::from_string(json.to_string()) .with_header( Header::from_bytes(&b"Content-Type"[..], &b"application/json"[..]).unwrap(), ) .with_status_code(StatusCode(200)); request.respond(response).unwrap(); } else { eprintln!("expected to read {}", key); let json = json!({"status": format!("error: {} key not found", key)}); let response = Response::from_string(json.to_string()) .with_header( Header::from_bytes(&b"Content-Type"[..], &b"application/json"[..]).unwrap(), ) .with_status_code(StatusCode(404)); request.respond(response).unwrap(); } } // Handle not found (_method, other) => { let json = json!({"status": format!("error: {} route not found", other)}); let response = Response::from_string(json.to_string()) .with_header( Header::from_bytes(&b"Content-Type"[..], &b"application/json"[..]).unwrap(), ) .with_status_code(StatusCode(404)); request.respond(response).unwrap(); } } } struct ManagedMemory { mmap: MmapMut, mmap_immutable: Mmap, // FIXME: just simple to use DashMap to start but could switch to LeftRight or whatever locations: DashMap, highest_byte_position: AtomicUsize, unused_bytes: AtomicUsize, } impl ManagedMemory { fn from_compacted( file: File, new_locations: DashMap, highest_byte_position: AtomicUsize, ) -> io::Result { let mmap = unsafe { MmapOptions::new().map_mut(&file).expect("open compacted") }; let mmap_immutable = unsafe { MmapOptions::new().map(&file).expect("open compacted") }; Ok(Self { mmap, mmap_immutable, locations: new_locations, highest_byte_position, unused_bytes: AtomicUsize::new(0), }) } fn new(file_path: &str, size: u64) -> io::Result { let file = OpenOptions::new() .read(true) .write(true) .create(true) .open(file_path) .expect("creating new dat file"); file.set_len(size)?; let mmap = unsafe { MmapOptions::new().map_mut(&file)? }; let mmap_immutable = unsafe { MmapOptions::new().map(&file)? }; Ok(Self { mmap, mmap_immutable, locations: DashMap::new(), highest_byte_position: AtomicUsize::new(0), unused_bytes: AtomicUsize::new(0), }) } fn room_to_write(&self, data: &Vec) -> bool { let data_len = data.len(); let new_pos = self.highest_byte_position.load(Ordering::SeqCst) + data_len; new_pos + data_len < self.mmap.len() } fn write_data(&mut self, key: String, data: Vec) -> io::Result<()> { let data_len = data.len(); let new_pos = self .highest_byte_position .fetch_add(data_len, Ordering::SeqCst); for (i, &byte) in data.iter().enumerate() { self.mmap[new_pos + i] = byte; } if let Some((_old_pos, old_len)) = self.locations.insert(key.clone(), (new_pos, data_len)) { self.unused_bytes.fetch_add(old_len, Ordering::SeqCst); } Ok(()) } } fn setup_shared_memory(file_path: &str, size: u64) -> io::Result>> { println!("will setup shared_memory at {}", file_path); let managed_memory = ManagedMemory::new(file_path, size)?; let managed_memory_box = Box::new(managed_memory); let managed_memory_ptr = Box::into_raw(managed_memory_box); Ok(Arc::new(AtomicPtr::new(managed_memory_ptr))) } fn swap_memory(atomic_managed_memory: &Arc>, new_memory: ManagedMemory) { println!("swapping managed memory pointer"); let new_memory_box = Box::new(new_memory); let new_memory_ptr = Box::into_raw(new_memory_box); let old_memory_ptr = atomic_managed_memory.swap(new_memory_ptr, Ordering::SeqCst); println!("swapped managed memory pointer"); unsafe { // drop old memory let _ = Box::from_raw(old_memory_ptr); } } fn compact_file( original_file: &str, new_file: &str, locations: &DashMap, file_size: u64, ) -> io::Result<(File, DashMap, usize)> { let mut new_file = OpenOptions::new() .write(true) .read(true) .create(true) .open(new_file)?; let mut current_pos = 0; let new_locations = DashMap::new(); for entry in locations.iter() { let (key, (pos, len)) = entry.pair(); let mut buffer = vec![0u8; *len]; let old_file = File::open(original_file)?; match std::os::unix::fs::FileExt::read_exact_at(&old_file, &mut buffer, *pos as u64) { Ok(_) => { io::Write::write_all(&mut new_file, &buffer)?; new_locations.insert(key.clone(), (current_pos, *len)); current_pos += len; } Err(e) => { eprintln!("Failed to read data for key {}: {}", key, e); continue; } } } println!("compacted to {}", current_pos as f32 / file_size as f32); // otherwise the file is only as big as you compacted to... new_file.set_len(file_size)?; Ok((new_file, new_locations, current_pos)) } static COMPACTING: AtomicBool = AtomicBool::new(false); static VERSION: AtomicUsize = AtomicUsize::new(0); // NOTE: smaller values while testing is helpful static MAX_FILE_SIZE: u64 = 1024 * 1024; static COMPACTION_PERCENT_TRIGGER: f32 = 0.3; static COMPACTION_SLEEP_SECS: u64 = 2; static READ_SAMPLE_KEYS_INTERVAL_SECS: u64 = 10; fn main() -> io::Result<()> { // TODO: just always starting from afresh rather than loading let shared_memory = setup_shared_memory( &format!("./data/example.{}.dat", VERSION.load(Ordering::Relaxed)), MAX_FILE_SIZE, )?; let (tx, rx) = std::sync::mpsc::channel::(); let sm_clone = shared_memory.clone(); // NOTE: this thread must not panic, or you have stranded the server from writes let writer_thread = thread::spawn(move || { while let Ok(KeyValue { key, value }) = rx.recv() { println!("will write key={}, value={}", key, value); // FIXME: poor man's write buffer is unbounded... while COMPACTING.load(Ordering::SeqCst).eq(&true) { println!("buffering a write due to compaction..."); thread::sleep(Duration::from_secs(1)); } let mm_ptr = sm_clone.load(Ordering::SeqCst); let mm = unsafe { &mut *mm_ptr }; let data = value.into_bytes(); // FIXME: probably not what we want here let mut attempts = 0; while !mm.room_to_write(&data) && attempts < 5 { println!("delaying a write because file was full..."); attempts += 1; thread::sleep(Duration::from_secs(2_i32.pow(attempts).try_into().unwrap())); } match mm.write_data(key.clone(), data) { Ok(_) => (), Err(e) => { // NOTE: could flush the write data to a dead letter queue type log... eprintln!("unrecoverably failed a write to {} due to {}", key, e) } } } }); let sm_clone_read = shared_memory.clone(); let compaction_thread = thread::spawn(move || loop { thread::sleep(Duration::from_secs(COMPACTION_SLEEP_SECS)); COMPACTING.swap(true, Ordering::SeqCst); let mm_ptr = sm_clone_read.load(Ordering::SeqCst); let mm = unsafe { &*mm_ptr }; let total = mm.unused_bytes.load(Ordering::Relaxed) as f32 / mm.mmap_immutable.len() as f32; if total > COMPACTION_PERCENT_TRIGGER { let current_version = VERSION.load(Ordering::SeqCst); let new_version = VERSION.fetch_add(1, Ordering::SeqCst) + 1; println!( "will increment dat version from {} to {}", current_version, new_version ); let (new_file, new_locations, max_pos) = compact_file( &format!("./data/example.{}.dat", current_version), &format!("./data/example.{}.dat", new_version), &mm.locations, MAX_FILE_SIZE, ) .expect("compacted"); let new_memory = ManagedMemory::from_compacted(new_file, new_locations, max_pos.into()) .expect("new mm"); swap_memory(&sm_clone_read, new_memory); } else { println!("compacting found {} dead, sleeping", total); } COMPACTING.swap(false, Ordering::SeqCst); }); // this is just a thread that's continously logging a sample of keys let sm_clone_read = shared_memory.clone(); let reader_thread = thread::spawn(move || { loop { thread::sleep(Duration::from_secs(READ_SAMPLE_KEYS_INTERVAL_SECS)); let mm_ptr = sm_clone_read.load(Ordering::SeqCst); let mm = unsafe { &*mm_ptr }; let keys: Vec<_> = mm .locations .iter() .take(3) .map(|entry| entry.key().clone()) .collect(); //let keys = vec!["key1", "key2", "key3"]; for key in keys { if let Some(entry) = mm.locations.get(&key) { let (pos, len) = *entry; println!( "Read data for key: {}={:?}", key, String::from_utf8_lossy(&mm.mmap_immutable[pos..pos + len]) ); } else { eprintln!("expected to read {}", key) } } } }); let sm_clone_read = shared_memory.clone(); let server_addr = "0.0.0.0:6000"; let server = Server::http(server_addr).unwrap(); thread::spawn(move || { println!("starting tiny-http server on: {}", server_addr); for request in server.incoming_requests() { handle_request(request, &tx, &sm_clone_read); } }); reader_thread.join().unwrap(); compaction_thread.join().unwrap(); writer_thread.join().unwrap(); Ok(()) }