Last active
July 7, 2016 03:34
-
-
Save overvenus/31b3adba8d60fbc81325a958a0e9fc9d to your computer and use it in GitHub Desktop.
BlockUdpMetric VS NonBlockUdpMetric
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| [package] | |
| name = "bench_metric" | |
| version = "0.1.0" | |
| authors = ["overvenus <[email protected]>"] | |
| [dependencies] | |
| cadence = "0.5.0" |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #![feature(test)] | |
| #![feature(const_fn)] | |
| extern crate test; | |
| extern crate cadence; | |
| use std::io; | |
| use std::mem; | |
| use std::str::from_utf8; | |
| use std::sync::Mutex; | |
| use std::time::{Duration, Instant}; | |
| use std::sync::atomic::{AtomicBool, Ordering}; | |
| use std::net::{ToSocketAddrs, SocketAddr, UdpSocket}; | |
| use cadence::{MetricSink, MetricResult, ErrorKind}; | |
| static mut CLIENT: *const Metric = &NopMetric; | |
| // IS_INITIALIZED indicates the state of CLIENT, | |
| // `false` for uninitialized, `true` for initialized. | |
| static IS_INITIALIZED: AtomicBool = AtomicBool::new(false); | |
| pub trait Metric: MetricSink + Sync + Send{} | |
| pub struct NopMetric; | |
| impl MetricSink for NopMetric { | |
| fn emit(&self, _: &str) -> io::Result<usize> {Ok(0)} | |
| } | |
| impl Metric for NopMetric {} | |
| pub fn set_metric(metric: Box<Metric + Sync + Send>) -> Result<(), ()> { | |
| unsafe { | |
| if IS_INITIALIZED.compare_and_swap(false, true, Ordering::SeqCst) != false { | |
| return Err(()); | |
| } | |
| CLIENT = mem::transmute(metric); | |
| Ok(()) | |
| } | |
| } | |
| #[doc(hidden)] | |
| pub fn __client() -> Option<&'static Metric> { | |
| if IS_INITIALIZED.load(Ordering::SeqCst) != true { | |
| return None; | |
| } | |
| Some(unsafe { &*CLIENT }) | |
| } | |
| /// ======================= | |
| /// NonblockUdpMetricSink | |
| /// ======================= | |
| pub struct NonblockUdpMetricSink { | |
| sink_addr: SocketAddr, | |
| socket: UdpSocket, | |
| } | |
| impl NonblockUdpMetricSink { | |
| pub fn from<A>(sink_addr: A, socket: UdpSocket) -> MetricResult<NonblockUdpMetricSink> | |
| where A: ToSocketAddrs | |
| { | |
| let mut addr_iter = try!(sink_addr.to_socket_addrs()); | |
| let addr = try!(addr_iter.next() | |
| .ok_or((ErrorKind::InvalidInput, "No socket addresses yielded"))); | |
| // Moves this UDP stream into nonblocking mode. | |
| let _ = socket.set_nonblocking(true).unwrap(); | |
| Ok(NonblockUdpMetricSink { | |
| sink_addr: addr, | |
| socket: socket, | |
| }) | |
| } | |
| } | |
| impl MetricSink for NonblockUdpMetricSink { | |
| fn emit(&self, metric: &str) -> io::Result<usize> { | |
| self.socket.send_to(metric.as_bytes(), &self.sink_addr) | |
| } | |
| } | |
| impl Metric for NonblockUdpMetricSink {} | |
| /// =============================================== | |
| /// BlockUdpMetricSink AKA cadence::UdpMetricSink | |
| /// =============================================== | |
| pub struct BlockUdpMetricSink { | |
| sink_addr: SocketAddr, | |
| socket: UdpSocket, | |
| } | |
| impl BlockUdpMetricSink { | |
| pub fn from<A>(sink_addr: A, socket: UdpSocket) -> MetricResult<BlockUdpMetricSink> | |
| where A: ToSocketAddrs | |
| { | |
| let mut addr_iter = try!(sink_addr.to_socket_addrs()); | |
| let addr = try!(addr_iter.next() | |
| .ok_or((ErrorKind::InvalidInput, "No socket addresses yielded"))); | |
| Ok(BlockUdpMetricSink { | |
| sink_addr: addr, | |
| socket: socket, | |
| }) | |
| } | |
| } | |
| impl MetricSink for BlockUdpMetricSink { | |
| fn emit(&self, metric: &str) -> io::Result<usize> { | |
| self.socket.send_to(metric.as_bytes(), &self.sink_addr) | |
| } | |
| } | |
| impl Metric for BlockUdpMetricSink {} | |
| /// ================================ | |
| /// Buffered NonblockUdpMetricSink | |
| /// ================================ | |
| pub struct BufferedUdpMetricSink { | |
| sink: NonblockUdpMetricSink, | |
| buffer: Mutex<Vec<u8>>, | |
| last_flush_time: Mutex<Instant>, | |
| flush_period: Duration, | |
| } | |
| impl BufferedUdpMetricSink { | |
| pub fn from<A>(sink_addr: A, | |
| socket: UdpSocket, | |
| flush_period: Duration) | |
| -> MetricResult<BufferedUdpMetricSink> | |
| where A: ToSocketAddrs | |
| { | |
| Ok(BufferedUdpMetricSink { | |
| sink: NonblockUdpMetricSink::from(sink_addr, socket).unwrap(), | |
| buffer: Mutex::new(Vec::with_capacity(1024)), | |
| flush_period: flush_period, | |
| last_flush_time: Mutex::new(Instant::now()), | |
| }) | |
| } | |
| fn append_to_buffer(&self, metric: &str) -> io::Result<usize> { | |
| let mut buffer = self.buffer.lock().unwrap(); | |
| let mut last_flush_time = self.last_flush_time.lock().unwrap(); | |
| // +1 for '\n' | |
| let elapse = last_flush_time.elapsed(); | |
| if (buffer.len() + metric.len() + 1) > buffer.capacity() || | |
| (elapse >= self.flush_period) { | |
| self.flush(&mut buffer); | |
| *last_flush_time = Instant::now(); | |
| } | |
| buffer.extend_from_slice(metric.as_bytes()); | |
| buffer.push('\n' as u8); | |
| Ok(metric.len()) | |
| } | |
| fn flush(&self, buffer: &mut Vec<u8>) { | |
| let _ = self.sink.emit(from_utf8(buffer.as_slice()).unwrap()); | |
| buffer.clear(); | |
| } | |
| } | |
| impl MetricSink for BufferedUdpMetricSink { | |
| fn emit(&self, metric: &str) -> io::Result<usize> { | |
| self.append_to_buffer(metric) | |
| } | |
| } | |
| #[cfg(test)] | |
| mod tests { | |
| use super::*; | |
| use test::Bencher; | |
| use std::time::Duration; | |
| use std::net::UdpSocket; | |
| use std::thread::{self, JoinHandle}; | |
| use std::sync::mpsc::{channel, Sender, Receiver}; | |
| use cadence::MetricSink; | |
| const MTABLE: &'static [ &'static str ] = &[ "foo:1|c", "bar:1|d", "foo:2|t" ]; | |
| fn new_nonblock_metric() -> NonblockUdpMetricSink { | |
| let host = "127.0.0.1:8125"; | |
| let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); | |
| NonblockUdpMetricSink::from(host, socket).unwrap() | |
| } | |
| fn new_block_metric() -> BlockUdpMetricSink { | |
| let host = "127.0.0.1:8125"; | |
| let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); | |
| BlockUdpMetricSink::from(host, socket).unwrap() | |
| } | |
| fn new_buffer_metric() -> BufferedUdpMetricSink { | |
| let host = "127.0.0.1:8125"; | |
| let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); | |
| BufferedUdpMetricSink::from(host, socket, Duration::from_millis(10)).unwrap() | |
| } | |
| #[bench] | |
| fn bench_single_thread_block_udp(b: &mut Bencher) { | |
| let sink = new_block_metric(); | |
| b.iter(|| sink.emit("foo:1|c")); | |
| } | |
| #[bench] | |
| fn bench_single_thread_nonblock_udp(b: &mut Bencher) { | |
| let sink = new_nonblock_metric(); | |
| b.iter(|| sink.emit("foo:1|c")); | |
| } | |
| #[bench] | |
| fn bench_single_thread_buffer_udp(b: &mut Bencher) { | |
| let sink = new_buffer_metric(); | |
| b.iter(|| sink.emit("foo:1|c")); | |
| } | |
| fn thread_worker() -> (JoinHandle<()>, Sender<usize>, Receiver<i64>) { | |
| let (tx, rx) = channel(); | |
| let (tx1, done) = channel(); | |
| let t = thread::spawn(move || { | |
| let sink = __client().unwrap(); | |
| let length = MTABLE.len(); | |
| loop { | |
| let n = rx.recv().unwrap(); | |
| if n == 0 { return } | |
| for _ in 0 .. n { | |
| let s = MTABLE[n%length]; | |
| let _ = sink.emit(s).unwrap(); | |
| } | |
| tx1.send(1).unwrap() | |
| } | |
| }); | |
| (t, tx, done) | |
| } | |
| #[bench] | |
| fn bench_multi_thread_block_udp(b: &mut Bencher) { | |
| let sink = new_block_metric(); | |
| let _ = set_metric(Box::new(sink)); | |
| let (t0, tx0, done0) = thread_worker(); | |
| let (t1, tx1, done1) = thread_worker(); | |
| let (t2, tx2, done2) = thread_worker(); | |
| b.iter(|| { | |
| let _ = tx0.send(5000).unwrap(); | |
| let _ = tx1.send(5000).unwrap(); | |
| let _ = tx2.send(5000).unwrap(); | |
| let _ = done0.recv().unwrap(); | |
| let _ = done1.recv().unwrap(); | |
| let _ = done2.recv().unwrap(); | |
| }); | |
| let _ = tx0.send(0).unwrap(); | |
| let _ = tx1.send(0).unwrap(); | |
| let _ = tx2.send(0).unwrap(); | |
| let _ = t0.join().unwrap(); | |
| let _ = t1.join().unwrap(); | |
| let _ = t2.join().unwrap(); | |
| } | |
| #[bench] | |
| fn bench_multi_thread_nonblock_udp(b: &mut Bencher) { | |
| let sink = new_nonblock_metric(); | |
| let _ = set_metric(Box::new(sink)); | |
| let (t0, tx0, done0) = thread_worker(); | |
| let (t1, tx1, done1) = thread_worker(); | |
| let (t2, tx2, done2) = thread_worker(); | |
| b.iter(|| { | |
| let _ = tx0.send(5000).unwrap(); | |
| let _ = tx1.send(5000).unwrap(); | |
| let _ = tx2.send(5000).unwrap(); | |
| let _ = done0.recv().unwrap(); | |
| let _ = done1.recv().unwrap(); | |
| let _ = done2.recv().unwrap(); | |
| }); | |
| let _ = tx0.send(0).unwrap(); | |
| let _ = tx1.send(0).unwrap(); | |
| let _ = tx2.send(0).unwrap(); | |
| let _ = t0.join().unwrap(); | |
| let _ = t1.join().unwrap(); | |
| let _ = t2.join().unwrap(); | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment