Last active
July 7, 2016 03:34
-
-
Save overvenus/31b3adba8d60fbc81325a958a0e9fc9d to your computer and use it in GitHub Desktop.
Revisions
-
overvenus revised this gist
Jul 7, 2016 . 3 changed files with 487 additions and 0 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -5,3 +5,6 @@ authors = ["overvenus <[email protected]>"] [dependencies] cadence = "0.5.0" log = "0.3" quick-error = "0.2" This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -3,9 +3,14 @@ extern crate test; extern crate cadence; #[macro_use] extern crate log; #[macro_use] extern crate quick_error; use std::io; use std::mem; use std::fmt; use std::str::from_utf8; use std::sync::Mutex; use std::time::{Duration, Instant}; @@ -14,6 +19,9 @@ use std::net::{ToSocketAddrs, SocketAddr, UdpSocket}; use cadence::{MetricSink, MetricResult, ErrorKind}; pub mod worker; use worker::{Worker, Runnable}; static mut CLIENT: *const Metric = &NopMetric; // IS_INITIALIZED indicates the state of CLIENT, // `false` for uninitialized, `true` for initialized. @@ -173,6 +181,92 @@ impl MetricSink for BufferedUdpMetricSink { impl Metric for BufferedUdpMetricSink {} // ======================================== /// AsyncUdpMetricSink // ======================================== const BUFFER_SIZE: usize = 1024; struct BufferSender { sink_addr: SocketAddr, socket: UdpSocket, buffer: Vec<u8>, flush_period: Duration, last_flush_time: Instant, } impl Runnable<String> for BufferSender { fn run(&mut self, metric: String) { // +1 for '\n' if (self.buffer.len() + metric.len() + 1) > self.buffer.capacity() || (self.last_flush_time.elapsed() >= self.flush_period) { self.last_flush_time = Instant::now(); if let Err(e) = self.socket.send_to(self.buffer.as_slice(), &self.sink_addr) { warn!("send metric failed {:?}", e); } self.buffer.clear() } self.buffer.extend_from_slice(metric.as_bytes()); self.buffer.push(b'\n'); } } pub struct AsyncUdpMetricSink { worker: Mutex<Worker<String>>, } impl AsyncUdpMetricSink { pub fn from<A>(sink_addr: A, socket: UdpSocket, flush_period: Duration) -> MetricResult<AsyncUdpMetricSink> where A: ToSocketAddrs { let mut addr_iter = try!(sink_addr.to_socket_addrs()); let addr = try!(addr_iter.next() .ok_or((ErrorKind::InvalidInput, "No socket addresses yielded"))); let sender = BufferSender { sink_addr: addr, socket: socket, buffer:Vec::with_capacity(BUFFER_SIZE), flush_period: flush_period, last_flush_time: Instant::now(), }; let mut worker = Worker::new("mteric-worker"); worker.start(sender).unwrap(); Ok(AsyncUdpMetricSink { worker: Mutex::new(worker), }) } fn handle_to_sender(&self, metric: &str) -> io::Result<usize> { let worker = self.worker.lock().unwrap(); if let Err(e) = worker.schedule(metric.to_owned()) { warn!("{}", e); } Ok(metric.len()) } } impl MetricSink for AsyncUdpMetricSink { fn emit(&self, metric: &str) -> io::Result<usize> { self.handle_to_sender(metric) } } impl Metric for AsyncUdpMetricSink {} #[cfg(test)] mod tests { @@ -205,6 +299,12 @@ mod tests { BufferedUdpMetricSink::from(host, socket, Duration::from_millis(10)).unwrap() } fn new_async_metric() -> AsyncUdpMetricSink { let host = "127.0.0.1:8125"; let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); AsyncUdpMetricSink::from(host, socket, Duration::from_millis(10)).unwrap() } #[bench] fn bench_single_thread_block_udp(b: &mut Bencher) { let sink = new_block_metric(); @@ -223,6 +323,12 @@ mod tests { b.iter(|| sink.emit("foo:1|c")); } #[bench] fn bench_single_thread_async_udp(b: &mut Bencher) { let sink = new_async_metric(); b.iter(|| sink.emit("foo:1|c")); } fn thread_worker() -> (JoinHandle<()>, Sender<usize>, Receiver<i64>) { let (tx, rx) = channel(); let (tx1, done) = channel(); @@ -327,4 +433,32 @@ mod tests { let _ = t1.join().unwrap(); let _ = t2.join().unwrap(); } #[bench] fn bench_multi_thread_async_udp(b: &mut Bencher) { let sink = new_async_metric(); let _ = set_metric(Box::new(sink)); let (t0, tx0, done0) = thread_worker(); let (t1, tx1, done1) = thread_worker(); let (t2, tx2, done2) = thread_worker(); b.iter(|| { let _ = tx0.send(5000).unwrap(); let _ = tx1.send(5000).unwrap(); let _ = tx2.send(5000).unwrap(); let _ = done0.recv().unwrap(); let _ = done1.recv().unwrap(); let _ = done2.recv().unwrap(); }); let _ = tx0.send(0).unwrap(); let _ = tx1.send(0).unwrap(); let _ = tx2.send(0).unwrap(); let _ = t0.join().unwrap(); let _ = t1.join().unwrap(); let _ = t2.join().unwrap(); } } This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,350 @@ /// Worker contains all workers that do the expensive job in background. use std::io; use std::sync::Arc; use std::fmt::Display; use std::time::{Instant, Duration}; use std::thread::{self, JoinHandle, Builder}; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::mpsc::{self, Sender, Receiver}; use std::result; macro_rules! slow_log { ($t:expr, $($arg:tt)*) => {{ if $t.is_slow() { warn!($($arg)*); } else { trace!($($arg)*); } }} } pub struct SlowTimer { slow_time: Duration, t: Instant, } impl SlowTimer { pub fn new() -> SlowTimer { SlowTimer::default() } pub fn from(slow_time: Duration) -> SlowTimer { SlowTimer { slow_time: slow_time, t: Instant::now(), } } pub fn from_secs(secs: u64) -> SlowTimer { SlowTimer::from(Duration::from_secs(secs)) } pub fn from_millis(millis: u64) -> SlowTimer { SlowTimer::from(Duration::from_millis(millis)) } pub fn elapsed(&self) -> Duration { self.t.elapsed() } pub fn is_slow(&self) -> bool { self.elapsed() >= self.slow_time } } const DEFAULT_SLOW_SECS: u64 = 1; impl Default for SlowTimer { fn default() -> SlowTimer { SlowTimer::from_secs(DEFAULT_SLOW_SECS) } } quick_error! { #[derive(Debug)] pub enum Error { Stopped IoError(e: io::Error) { from() display("{}", e) } } } impl<T> From<mpsc::SendError<T>> for Error { fn from(_: mpsc::SendError<T>) -> Error { // do we need to return the failed data Error::Stopped } } pub type Result<T> = result::Result<T, Error>; pub trait Runnable<T: Display> { fn run(&mut self, t: T); } pub trait BatchRunnable<T: Display> { /// run a batch of tasks. /// /// Please note that ts will be clear after invoking this method. fn run_batch(&mut self, ts: &mut Vec<T>); } impl<T: Display, R: Runnable<T>> BatchRunnable<T> for R { fn run_batch(&mut self, ts: &mut Vec<T>) { for t in ts.drain(..) { let task_str = format!("{}", t); let timer = SlowTimer::new(); self.run(t); slow_log!(timer, "task {} takes {:?} to finish.", task_str, timer.elapsed()); } } } /// Scheduler provides interface to schedule task to underlying workers. pub struct Scheduler<T> { counter: Arc<AtomicUsize>, sender: Sender<Option<T>>, } impl<T: Display> Scheduler<T> { fn new(counter: AtomicUsize, sender: Sender<Option<T>>) -> Scheduler<T> { Scheduler { counter: Arc::new(counter), sender: sender, } } /// Schedule a task to run. /// /// If the worker is stopped, an error will return. pub fn schedule(&self, task: T) -> Result<()> { debug!("scheduling task {}", task); try!(self.sender.send(Some(task))); self.counter.fetch_add(1, Ordering::SeqCst); Ok(()) } /// Check if underlying worker can't handle task immediately. pub fn is_busy(&self) -> bool { self.counter.load(Ordering::SeqCst) > 0 } } impl<T: Display> Clone for Scheduler<T> { fn clone(&self) -> Scheduler<T> { Scheduler { counter: self.counter.clone(), sender: self.sender.clone(), } } } /// Create a scheduler that can't be scheduled any task. /// /// Useful for test purpose. #[cfg(test)] pub fn dummy_scheduler<T: Display>() -> Scheduler<T> { let (tx, _) = mpsc::channel(); Scheduler::new(AtomicUsize::new(0), tx) } /// A worker that can schedule time consuming tasks. pub struct Worker<T: Display> { name: String, scheduler: Scheduler<T>, receiver: Option<Receiver<Option<T>>>, handle: Option<JoinHandle<()>>, } fn poll<R, T>(mut runner: R, rx: Receiver<Option<T>>, counter: Arc<AtomicUsize>, batch_size: usize) where R: BatchRunnable<T> + Send + 'static, T: Display + Send + 'static { let mut keep_going = true; let mut buffer = Vec::with_capacity(batch_size); while keep_going { let t = rx.recv(); match t { Ok(Some(t)) => buffer.push(t), _ => return, } while buffer.len() < batch_size { match rx.try_recv() { Ok(None) => { keep_going = false; break; } Ok(Some(t)) => buffer.push(t), _ => break, } } counter.fetch_sub(buffer.len(), Ordering::SeqCst); runner.run_batch(&mut buffer); buffer.clear(); } } impl<T: Display + Send + 'static> Worker<T> { /// Create a worker. pub fn new<S: Into<String>>(name: S) -> Worker<T> { let (tx, rx) = mpsc::channel(); Worker { name: name.into(), scheduler: Scheduler::new(AtomicUsize::new(0), tx), receiver: Some(rx), handle: None, } } /// Start the worker. pub fn start<R: Runnable<T> + Send + 'static>(&mut self, runner: R) -> Result<()> { self.start_batch(runner, 1) } pub fn start_batch<R>(&mut self, runner: R, batch_size: usize) -> Result<()> where R: BatchRunnable<T> + Send + 'static { info!("starting working thread: {}", self.name); if self.receiver.is_none() { warn!("worker {} has been started.", self.name); return Ok(()); } let rx = self.receiver.take().unwrap(); let counter = self.scheduler.counter.clone(); let h = try!(Builder::new() .name(self.name.clone()) .spawn(move || poll(runner, rx, counter, batch_size))); self.handle = Some(h); Ok(()) } /// Get a scheduler to schedule task. pub fn scheduler(&self) -> Scheduler<T> { self.scheduler.clone() } /// Schedule a task to run. /// /// If the worker is stopped, an error will return. pub fn schedule(&self, task: T) -> Result<()> { self.scheduler.schedule(task) } /// Check if underlying worker can't handle task immediately. pub fn is_busy(&self) -> bool { self.handle.is_none() || self.scheduler.is_busy() } /// Stop the worker thread. pub fn stop(&mut self) -> thread::Result<()> { let handler = match self.handle.take() { Some(h) => h, None => return Ok(()), }; // close sender explicitly so the background thread will exit. info!("stoping {}", self.name); if let Err(e) = self.scheduler.sender.send(None) { warn!("failed to stop worker thread: {:?}", e); } handler.join() } } #[cfg(test)] mod test { use std::thread; use std::sync::Arc; use std::sync::atomic::*; use std::cmp; use std::time::Duration; use super::*; struct CountRunner { count: Arc<AtomicUsize>, } impl Runnable<u64> for CountRunner { fn run(&mut self, step: u64) { self.count.fetch_add(step as usize, Ordering::SeqCst); thread::sleep(Duration::from_millis(10)); } } struct BatchRunner { count: Arc<AtomicUsize>, } impl BatchRunnable<u64> for BatchRunner { fn run_batch(&mut self, ms: &mut Vec<u64>) { let total = ms.iter().fold(0, |l, &r| l + r); self.count.fetch_add(total as usize, Ordering::SeqCst); let max_sleep = ms.iter().fold(0, |l, &r| cmp::max(l, r)); thread::sleep(Duration::from_millis(max_sleep)); } } #[test] fn test_worker() { let mut worker = Worker::new("test-worker"); let count = Arc::new(AtomicUsize::new(0)); worker.start(CountRunner { count: count.clone() }).unwrap(); assert!(!worker.is_busy()); worker.schedule(50).unwrap(); worker.schedule(50).unwrap(); worker.schedule(50).unwrap(); assert!(worker.is_busy()); for _ in 0..100 { if !worker.is_busy() { break; } thread::sleep(Duration::from_millis(10)); } assert!(!worker.is_busy()); worker.stop().unwrap(); assert_eq!(count.load(Ordering::SeqCst), 150); // now worker can't handle any task assert!(worker.is_busy()); } #[test] fn test_threaded() { let mut worker = Worker::new("test-worker-threaded"); let count = Arc::new(AtomicUsize::new(0)); worker.start(CountRunner { count: count.clone() }).unwrap(); let scheduler = worker.scheduler(); thread::spawn(move || { scheduler.schedule(100).unwrap(); scheduler.schedule(100).unwrap(); }); for _ in 1..1000 { if worker.is_busy() { break; } thread::sleep(Duration::from_millis(1)); } worker.stop().unwrap(); assert_eq!(count.load(Ordering::SeqCst), 200); } #[test] fn test_batch() { let mut worker = Worker::new("test-worker-batch"); let count = Arc::new(AtomicUsize::new(0)); worker.start_batch(BatchRunner { count: count.clone() }, 10).unwrap(); for _ in 0..20 { worker.schedule(50).unwrap(); } worker.stop().unwrap(); assert_eq!(count.load(Ordering::SeqCst), 20 * 50); } } -
overvenus revised this gist
Jul 2, 2016 . 1 changed file with 29 additions and 0 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -171,6 +171,7 @@ impl MetricSink for BufferedUdpMetricSink { } } impl Metric for BufferedUdpMetricSink {} #[cfg(test)] @@ -298,4 +299,32 @@ mod tests { let _ = t1.join().unwrap(); let _ = t2.join().unwrap(); } #[bench] fn bench_multi_thread_buffer_udp(b: &mut Bencher) { let sink = new_buffer_metric(); let _ = set_metric(Box::new(sink)); let (t0, tx0, done0) = thread_worker(); let (t1, tx1, done1) = thread_worker(); let (t2, tx2, done2) = thread_worker(); b.iter(|| { let _ = tx0.send(5000).unwrap(); let _ = tx1.send(5000).unwrap(); let _ = tx2.send(5000).unwrap(); let _ = done0.recv().unwrap(); let _ = done1.recv().unwrap(); let _ = done2.recv().unwrap(); }); let _ = tx0.send(0).unwrap(); let _ = tx1.send(0).unwrap(); let _ = tx2.send(0).unwrap(); let _ = t0.join().unwrap(); let _ = t1.join().unwrap(); let _ = t2.join().unwrap(); } } -
overvenus revised this gist
Jul 1, 2016 . 1 changed file with 75 additions and 0 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -6,6 +6,9 @@ extern crate cadence; use std::io; use std::mem; use std::str::from_utf8; use std::sync::Mutex; use std::time::{Duration, Instant}; use std::sync::atomic::{AtomicBool, Ordering}; use std::net::{ToSocketAddrs, SocketAddr, UdpSocket}; @@ -111,10 +114,70 @@ impl MetricSink for BlockUdpMetricSink { impl Metric for BlockUdpMetricSink {} /// ================================ /// Buffered NonblockUdpMetricSink /// ================================ pub struct BufferedUdpMetricSink { sink: NonblockUdpMetricSink, buffer: Mutex<Vec<u8>>, last_flush_time: Mutex<Instant>, flush_period: Duration, } impl BufferedUdpMetricSink { pub fn from<A>(sink_addr: A, socket: UdpSocket, flush_period: Duration) -> MetricResult<BufferedUdpMetricSink> where A: ToSocketAddrs { Ok(BufferedUdpMetricSink { sink: NonblockUdpMetricSink::from(sink_addr, socket).unwrap(), buffer: Mutex::new(Vec::with_capacity(1024)), flush_period: flush_period, last_flush_time: Mutex::new(Instant::now()), }) } fn append_to_buffer(&self, metric: &str) -> io::Result<usize> { let mut buffer = self.buffer.lock().unwrap(); let mut last_flush_time = self.last_flush_time.lock().unwrap(); // +1 for '\n' let elapse = last_flush_time.elapsed(); if (buffer.len() + metric.len() + 1) > buffer.capacity() || (elapse >= self.flush_period) { self.flush(&mut buffer); *last_flush_time = Instant::now(); } buffer.extend_from_slice(metric.as_bytes()); buffer.push('\n' as u8); Ok(metric.len()) } fn flush(&self, buffer: &mut Vec<u8>) { let _ = self.sink.emit(from_utf8(buffer.as_slice()).unwrap()); buffer.clear(); } } impl MetricSink for BufferedUdpMetricSink { fn emit(&self, metric: &str) -> io::Result<usize> { self.append_to_buffer(metric) } } #[cfg(test)] mod tests { use super::*; use test::Bencher; use std::time::Duration; use std::net::UdpSocket; use std::thread::{self, JoinHandle}; use std::sync::mpsc::{channel, Sender, Receiver}; @@ -135,6 +198,12 @@ mod tests { BlockUdpMetricSink::from(host, socket).unwrap() } fn new_buffer_metric() -> BufferedUdpMetricSink { let host = "127.0.0.1:8125"; let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); BufferedUdpMetricSink::from(host, socket, Duration::from_millis(10)).unwrap() } #[bench] fn bench_single_thread_block_udp(b: &mut Bencher) { let sink = new_block_metric(); @@ -147,6 +216,12 @@ mod tests { b.iter(|| sink.emit("foo:1|c")); } #[bench] fn bench_single_thread_buffer_udp(b: &mut Bencher) { let sink = new_buffer_metric(); b.iter(|| sink.emit("foo:1|c")); } fn thread_worker() -> (JoinHandle<()>, Sender<usize>, Receiver<i64>) { let (tx, rx) = channel(); let (tx1, done) = channel(); -
overvenus revised this gist
May 31, 2016 . 1 changed file with 18 additions and 16 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -121,6 +121,8 @@ mod tests { use cadence::MetricSink; const MTABLE: &'static [ &'static str ] = &[ "foo:1|c", "bar:1|d", "foo:2|t" ]; fn new_nonblock_metric() -> NonblockUdpMetricSink { let host = "127.0.0.1:8125"; let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); @@ -145,19 +147,19 @@ mod tests { b.iter(|| sink.emit("foo:1|c")); } fn thread_worker() -> (JoinHandle<()>, Sender<usize>, Receiver<i64>) { let (tx, rx) = channel(); let (tx1, done) = channel(); let t = thread::spawn(move || { let sink = __client().unwrap(); let length = MTABLE.len(); loop { let n = rx.recv().unwrap(); if n == 0 { return } for _ in 0 .. n { let s = MTABLE[n%length]; let _ = sink.emit(s).unwrap(); } tx1.send(1).unwrap() } @@ -171,14 +173,14 @@ mod tests { let sink = new_block_metric(); let _ = set_metric(Box::new(sink)); let (t0, tx0, done0) = thread_worker(); let (t1, tx1, done1) = thread_worker(); let (t2, tx2, done2) = thread_worker(); b.iter(|| { let _ = tx0.send(5000).unwrap(); let _ = tx1.send(5000).unwrap(); let _ = tx2.send(5000).unwrap(); let _ = done0.recv().unwrap(); let _ = done1.recv().unwrap(); @@ -199,14 +201,14 @@ mod tests { let sink = new_nonblock_metric(); let _ = set_metric(Box::new(sink)); let (t0, tx0, done0) = thread_worker(); let (t1, tx1, done1) = thread_worker(); let (t2, tx2, done2) = thread_worker(); b.iter(|| { let _ = tx0.send(5000).unwrap(); let _ = tx1.send(5000).unwrap(); let _ = tx2.send(5000).unwrap(); let _ = done0.recv().unwrap(); let _ = done1.recv().unwrap(); -
overvenus revised this gist
May 31, 2016 . 1 changed file with 1 addition and 1 deletion.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -63,7 +63,7 @@ impl NonblockUdpMetricSink { .ok_or((ErrorKind::InvalidInput, "No socket addresses yielded"))); // Moves this UDP stream into nonblocking mode. let _ = socket.set_nonblocking(true).unwrap(); Ok(NonblockUdpMetricSink { sink_addr: addr, -
overvenus created this gist
May 31, 2016 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,7 @@ [package] name = "bench_metric" version = "0.1.0" authors = ["overvenus <[email protected]>"] [dependencies] cadence = "0.5.0" This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,224 @@ #![feature(test)] #![feature(const_fn)] extern crate test; extern crate cadence; use std::io; use std::mem; use std::sync::atomic::{AtomicBool, Ordering}; use std::net::{ToSocketAddrs, SocketAddr, UdpSocket}; use cadence::{MetricSink, MetricResult, ErrorKind}; static mut CLIENT: *const Metric = &NopMetric; // IS_INITIALIZED indicates the state of CLIENT, // `false` for uninitialized, `true` for initialized. static IS_INITIALIZED: AtomicBool = AtomicBool::new(false); pub trait Metric: MetricSink + Sync + Send{} pub struct NopMetric; impl MetricSink for NopMetric { fn emit(&self, _: &str) -> io::Result<usize> {Ok(0)} } impl Metric for NopMetric {} pub fn set_metric(metric: Box<Metric + Sync + Send>) -> Result<(), ()> { unsafe { if IS_INITIALIZED.compare_and_swap(false, true, Ordering::SeqCst) != false { return Err(()); } CLIENT = mem::transmute(metric); Ok(()) } } #[doc(hidden)] pub fn __client() -> Option<&'static Metric> { if IS_INITIALIZED.load(Ordering::SeqCst) != true { return None; } Some(unsafe { &*CLIENT }) } /// ======================= /// NonblockUdpMetricSink /// ======================= pub struct NonblockUdpMetricSink { sink_addr: SocketAddr, socket: UdpSocket, } impl NonblockUdpMetricSink { pub fn from<A>(sink_addr: A, socket: UdpSocket) -> MetricResult<NonblockUdpMetricSink> where A: ToSocketAddrs { let mut addr_iter = try!(sink_addr.to_socket_addrs()); let addr = try!(addr_iter.next() .ok_or((ErrorKind::InvalidInput, "No socket addresses yielded"))); // Moves this UDP stream into nonblocking mode. let _ = socket.set_nonblocking(false).unwrap(); Ok(NonblockUdpMetricSink { sink_addr: addr, socket: socket, }) } } impl MetricSink for NonblockUdpMetricSink { fn emit(&self, metric: &str) -> io::Result<usize> { self.socket.send_to(metric.as_bytes(), &self.sink_addr) } } impl Metric for NonblockUdpMetricSink {} /// =============================================== /// BlockUdpMetricSink AKA cadence::UdpMetricSink /// =============================================== pub struct BlockUdpMetricSink { sink_addr: SocketAddr, socket: UdpSocket, } impl BlockUdpMetricSink { pub fn from<A>(sink_addr: A, socket: UdpSocket) -> MetricResult<BlockUdpMetricSink> where A: ToSocketAddrs { let mut addr_iter = try!(sink_addr.to_socket_addrs()); let addr = try!(addr_iter.next() .ok_or((ErrorKind::InvalidInput, "No socket addresses yielded"))); Ok(BlockUdpMetricSink { sink_addr: addr, socket: socket, }) } } impl MetricSink for BlockUdpMetricSink { fn emit(&self, metric: &str) -> io::Result<usize> { self.socket.send_to(metric.as_bytes(), &self.sink_addr) } } impl Metric for BlockUdpMetricSink {} #[cfg(test)] mod tests { use super::*; use test::Bencher; use std::net::UdpSocket; use std::thread::{self, JoinHandle}; use std::sync::mpsc::{channel, Sender, Receiver}; use cadence::MetricSink; fn new_nonblock_metric() -> NonblockUdpMetricSink { let host = "127.0.0.1:8125"; let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); NonblockUdpMetricSink::from(host, socket).unwrap() } fn new_block_metric() -> BlockUdpMetricSink { let host = "127.0.0.1:8125"; let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); BlockUdpMetricSink::from(host, socket).unwrap() } #[bench] fn bench_single_thread_block_udp(b: &mut Bencher) { let sink = new_block_metric(); b.iter(|| sink.emit("foo:1|c")); } #[bench] fn bench_single_thread_nonblock_udp(b: &mut Bencher) { let sink = new_nonblock_metric(); b.iter(|| sink.emit("foo:1|c")); } fn thread_worker(s: &str) -> (JoinHandle<()>, Sender<i64>, Receiver<i64>) { let s = s.to_owned(); let (tx, rx) = channel(); let (tx1, done) = channel(); let t = thread::spawn(move || { let sink = __client().unwrap(); loop { let n = rx.recv().unwrap(); if n == 0 { return } for _ in 0 .. n { let _ = sink.emit(&s).unwrap(); } tx1.send(1).unwrap() } }); (t, tx, done) } #[bench] fn bench_multi_thread_block_udp(b: &mut Bencher) { let sink = new_block_metric(); let _ = set_metric(Box::new(sink)); let (t0, tx0, done0) = thread_worker("foo:1|c"); let (t1, tx1, done1) = thread_worker("foo:1|c"); let (t2, tx2, done2) = thread_worker("foo:1|c"); b.iter(|| { let _ = tx0.send(500).unwrap(); let _ = tx1.send(500).unwrap(); let _ = tx2.send(500).unwrap(); let _ = done0.recv().unwrap(); let _ = done1.recv().unwrap(); let _ = done2.recv().unwrap(); }); let _ = tx0.send(0).unwrap(); let _ = tx1.send(0).unwrap(); let _ = tx2.send(0).unwrap(); let _ = t0.join().unwrap(); let _ = t1.join().unwrap(); let _ = t2.join().unwrap(); } #[bench] fn bench_multi_thread_nonblock_udp(b: &mut Bencher) { let sink = new_nonblock_metric(); let _ = set_metric(Box::new(sink)); let (t0, tx0, done0) = thread_worker("foo:1|c"); let (t1, tx1, done1) = thread_worker("foo:1|c"); let (t2, tx2, done2) = thread_worker("foo:1|c"); b.iter(|| { let _ = tx0.send(500).unwrap(); let _ = tx1.send(500).unwrap(); let _ = tx2.send(500).unwrap(); let _ = done0.recv().unwrap(); let _ = done1.recv().unwrap(); let _ = done2.recv().unwrap(); }); let _ = tx0.send(0).unwrap(); let _ = tx1.send(0).unwrap(); let _ = tx2.send(0).unwrap(); let _ = t0.join().unwrap(); let _ = t1.join().unwrap(); let _ = t2.join().unwrap(); } }