#![feature(test)] #![feature(const_fn)] extern crate test; extern crate cadence; use std::io; use std::mem; use std::str::from_utf8; use std::sync::Mutex; use std::time::{Duration, Instant}; use std::sync::atomic::{AtomicBool, Ordering}; use std::net::{ToSocketAddrs, SocketAddr, UdpSocket}; use cadence::{MetricSink, MetricResult, ErrorKind}; static mut CLIENT: *const Metric = &NopMetric; // IS_INITIALIZED indicates the state of CLIENT, // `false` for uninitialized, `true` for initialized. static IS_INITIALIZED: AtomicBool = AtomicBool::new(false); pub trait Metric: MetricSink + Sync + Send{} pub struct NopMetric; impl MetricSink for NopMetric { fn emit(&self, _: &str) -> io::Result {Ok(0)} } impl Metric for NopMetric {} pub fn set_metric(metric: Box) -> Result<(), ()> { unsafe { if IS_INITIALIZED.compare_and_swap(false, true, Ordering::SeqCst) != false { return Err(()); } CLIENT = mem::transmute(metric); Ok(()) } } #[doc(hidden)] pub fn __client() -> Option<&'static Metric> { if IS_INITIALIZED.load(Ordering::SeqCst) != true { return None; } Some(unsafe { &*CLIENT }) } /// ======================= /// NonblockUdpMetricSink /// ======================= pub struct NonblockUdpMetricSink { sink_addr: SocketAddr, socket: UdpSocket, } impl NonblockUdpMetricSink { pub fn from(sink_addr: A, socket: UdpSocket) -> MetricResult where A: ToSocketAddrs { let mut addr_iter = try!(sink_addr.to_socket_addrs()); let addr = try!(addr_iter.next() .ok_or((ErrorKind::InvalidInput, "No socket addresses yielded"))); // Moves this UDP stream into nonblocking mode. let _ = socket.set_nonblocking(true).unwrap(); Ok(NonblockUdpMetricSink { sink_addr: addr, socket: socket, }) } } impl MetricSink for NonblockUdpMetricSink { fn emit(&self, metric: &str) -> io::Result { self.socket.send_to(metric.as_bytes(), &self.sink_addr) } } impl Metric for NonblockUdpMetricSink {} /// =============================================== /// BlockUdpMetricSink AKA cadence::UdpMetricSink /// =============================================== pub struct BlockUdpMetricSink { sink_addr: SocketAddr, socket: UdpSocket, } impl BlockUdpMetricSink { pub fn from(sink_addr: A, socket: UdpSocket) -> MetricResult where A: ToSocketAddrs { let mut addr_iter = try!(sink_addr.to_socket_addrs()); let addr = try!(addr_iter.next() .ok_or((ErrorKind::InvalidInput, "No socket addresses yielded"))); Ok(BlockUdpMetricSink { sink_addr: addr, socket: socket, }) } } impl MetricSink for BlockUdpMetricSink { fn emit(&self, metric: &str) -> io::Result { self.socket.send_to(metric.as_bytes(), &self.sink_addr) } } impl Metric for BlockUdpMetricSink {} /// ================================ /// Buffered NonblockUdpMetricSink /// ================================ pub struct BufferedUdpMetricSink { sink: NonblockUdpMetricSink, buffer: Mutex>, last_flush_time: Mutex, flush_period: Duration, } impl BufferedUdpMetricSink { pub fn from(sink_addr: A, socket: UdpSocket, flush_period: Duration) -> MetricResult where A: ToSocketAddrs { Ok(BufferedUdpMetricSink { sink: NonblockUdpMetricSink::from(sink_addr, socket).unwrap(), buffer: Mutex::new(Vec::with_capacity(1024)), flush_period: flush_period, last_flush_time: Mutex::new(Instant::now()), }) } fn append_to_buffer(&self, metric: &str) -> io::Result { let mut buffer = self.buffer.lock().unwrap(); let mut last_flush_time = self.last_flush_time.lock().unwrap(); // +1 for '\n' let elapse = last_flush_time.elapsed(); if (buffer.len() + metric.len() + 1) > buffer.capacity() || (elapse >= self.flush_period) { self.flush(&mut buffer); *last_flush_time = Instant::now(); } buffer.extend_from_slice(metric.as_bytes()); buffer.push('\n' as u8); Ok(metric.len()) } fn flush(&self, buffer: &mut Vec) { let _ = self.sink.emit(from_utf8(buffer.as_slice()).unwrap()); buffer.clear(); } } impl MetricSink for BufferedUdpMetricSink { fn emit(&self, metric: &str) -> io::Result { self.append_to_buffer(metric) } } #[cfg(test)] mod tests { use super::*; use test::Bencher; use std::time::Duration; use std::net::UdpSocket; use std::thread::{self, JoinHandle}; use std::sync::mpsc::{channel, Sender, Receiver}; use cadence::MetricSink; const MTABLE: &'static [ &'static str ] = &[ "foo:1|c", "bar:1|d", "foo:2|t" ]; fn new_nonblock_metric() -> NonblockUdpMetricSink { let host = "127.0.0.1:8125"; let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); NonblockUdpMetricSink::from(host, socket).unwrap() } fn new_block_metric() -> BlockUdpMetricSink { let host = "127.0.0.1:8125"; let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); BlockUdpMetricSink::from(host, socket).unwrap() } fn new_buffer_metric() -> BufferedUdpMetricSink { let host = "127.0.0.1:8125"; let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); BufferedUdpMetricSink::from(host, socket, Duration::from_millis(10)).unwrap() } #[bench] fn bench_single_thread_block_udp(b: &mut Bencher) { let sink = new_block_metric(); b.iter(|| sink.emit("foo:1|c")); } #[bench] fn bench_single_thread_nonblock_udp(b: &mut Bencher) { let sink = new_nonblock_metric(); b.iter(|| sink.emit("foo:1|c")); } #[bench] fn bench_single_thread_buffer_udp(b: &mut Bencher) { let sink = new_buffer_metric(); b.iter(|| sink.emit("foo:1|c")); } fn thread_worker() -> (JoinHandle<()>, Sender, Receiver) { let (tx, rx) = channel(); let (tx1, done) = channel(); let t = thread::spawn(move || { let sink = __client().unwrap(); let length = MTABLE.len(); loop { let n = rx.recv().unwrap(); if n == 0 { return } for _ in 0 .. n { let s = MTABLE[n%length]; let _ = sink.emit(s).unwrap(); } tx1.send(1).unwrap() } }); (t, tx, done) } #[bench] fn bench_multi_thread_block_udp(b: &mut Bencher) { let sink = new_block_metric(); let _ = set_metric(Box::new(sink)); let (t0, tx0, done0) = thread_worker(); let (t1, tx1, done1) = thread_worker(); let (t2, tx2, done2) = thread_worker(); b.iter(|| { let _ = tx0.send(5000).unwrap(); let _ = tx1.send(5000).unwrap(); let _ = tx2.send(5000).unwrap(); let _ = done0.recv().unwrap(); let _ = done1.recv().unwrap(); let _ = done2.recv().unwrap(); }); let _ = tx0.send(0).unwrap(); let _ = tx1.send(0).unwrap(); let _ = tx2.send(0).unwrap(); let _ = t0.join().unwrap(); let _ = t1.join().unwrap(); let _ = t2.join().unwrap(); } #[bench] fn bench_multi_thread_nonblock_udp(b: &mut Bencher) { let sink = new_nonblock_metric(); let _ = set_metric(Box::new(sink)); let (t0, tx0, done0) = thread_worker(); let (t1, tx1, done1) = thread_worker(); let (t2, tx2, done2) = thread_worker(); b.iter(|| { let _ = tx0.send(5000).unwrap(); let _ = tx1.send(5000).unwrap(); let _ = tx2.send(5000).unwrap(); let _ = done0.recv().unwrap(); let _ = done1.recv().unwrap(); let _ = done2.recv().unwrap(); }); let _ = tx0.send(0).unwrap(); let _ = tx1.send(0).unwrap(); let _ = tx2.send(0).unwrap(); let _ = t0.join().unwrap(); let _ = t1.join().unwrap(); let _ = t2.join().unwrap(); } }