Skip to content

Instantly share code, notes, and snippets.

@overvenus
Last active July 7, 2016 03:34
Show Gist options
  • Select an option

  • Save overvenus/31b3adba8d60fbc81325a958a0e9fc9d to your computer and use it in GitHub Desktop.

Select an option

Save overvenus/31b3adba8d60fbc81325a958a0e9fc9d to your computer and use it in GitHub Desktop.
BlockUdpMetric VS NonBlockUdpMetric
[package]
name = "bench_metric"
version = "0.1.0"
authors = ["overvenus <[email protected]>"]
[dependencies]
cadence = "0.5.0"
#![feature(test)]
#![feature(const_fn)]
extern crate test;
extern crate cadence;
use std::io;
use std::mem;
use std::str::from_utf8;
use std::sync::Mutex;
use std::time::{Duration, Instant};
use std::sync::atomic::{AtomicBool, Ordering};
use std::net::{ToSocketAddrs, SocketAddr, UdpSocket};
use cadence::{MetricSink, MetricResult, ErrorKind};
static mut CLIENT: *const Metric = &NopMetric;
// IS_INITIALIZED indicates the state of CLIENT,
// `false` for uninitialized, `true` for initialized.
static IS_INITIALIZED: AtomicBool = AtomicBool::new(false);
pub trait Metric: MetricSink + Sync + Send{}
pub struct NopMetric;
impl MetricSink for NopMetric {
fn emit(&self, _: &str) -> io::Result<usize> {Ok(0)}
}
impl Metric for NopMetric {}
pub fn set_metric(metric: Box<Metric + Sync + Send>) -> Result<(), ()> {
unsafe {
if IS_INITIALIZED.compare_and_swap(false, true, Ordering::SeqCst) != false {
return Err(());
}
CLIENT = mem::transmute(metric);
Ok(())
}
}
#[doc(hidden)]
pub fn __client() -> Option<&'static Metric> {
if IS_INITIALIZED.load(Ordering::SeqCst) != true {
return None;
}
Some(unsafe { &*CLIENT })
}
/// =======================
/// NonblockUdpMetricSink
/// =======================
pub struct NonblockUdpMetricSink {
sink_addr: SocketAddr,
socket: UdpSocket,
}
impl NonblockUdpMetricSink {
pub fn from<A>(sink_addr: A, socket: UdpSocket) -> MetricResult<NonblockUdpMetricSink>
where A: ToSocketAddrs
{
let mut addr_iter = try!(sink_addr.to_socket_addrs());
let addr = try!(addr_iter.next()
.ok_or((ErrorKind::InvalidInput, "No socket addresses yielded")));
// Moves this UDP stream into nonblocking mode.
let _ = socket.set_nonblocking(true).unwrap();
Ok(NonblockUdpMetricSink {
sink_addr: addr,
socket: socket,
})
}
}
impl MetricSink for NonblockUdpMetricSink {
fn emit(&self, metric: &str) -> io::Result<usize> {
self.socket.send_to(metric.as_bytes(), &self.sink_addr)
}
}
impl Metric for NonblockUdpMetricSink {}
/// ===============================================
/// BlockUdpMetricSink AKA cadence::UdpMetricSink
/// ===============================================
pub struct BlockUdpMetricSink {
sink_addr: SocketAddr,
socket: UdpSocket,
}
impl BlockUdpMetricSink {
pub fn from<A>(sink_addr: A, socket: UdpSocket) -> MetricResult<BlockUdpMetricSink>
where A: ToSocketAddrs
{
let mut addr_iter = try!(sink_addr.to_socket_addrs());
let addr = try!(addr_iter.next()
.ok_or((ErrorKind::InvalidInput, "No socket addresses yielded")));
Ok(BlockUdpMetricSink {
sink_addr: addr,
socket: socket,
})
}
}
impl MetricSink for BlockUdpMetricSink {
fn emit(&self, metric: &str) -> io::Result<usize> {
self.socket.send_to(metric.as_bytes(), &self.sink_addr)
}
}
impl Metric for BlockUdpMetricSink {}
/// ================================
/// Buffered NonblockUdpMetricSink
/// ================================
pub struct BufferedUdpMetricSink {
sink: NonblockUdpMetricSink,
buffer: Mutex<Vec<u8>>,
last_flush_time: Mutex<Instant>,
flush_period: Duration,
}
impl BufferedUdpMetricSink {
pub fn from<A>(sink_addr: A,
socket: UdpSocket,
flush_period: Duration)
-> MetricResult<BufferedUdpMetricSink>
where A: ToSocketAddrs
{
Ok(BufferedUdpMetricSink {
sink: NonblockUdpMetricSink::from(sink_addr, socket).unwrap(),
buffer: Mutex::new(Vec::with_capacity(1024)),
flush_period: flush_period,
last_flush_time: Mutex::new(Instant::now()),
})
}
fn append_to_buffer(&self, metric: &str) -> io::Result<usize> {
let mut buffer = self.buffer.lock().unwrap();
let mut last_flush_time = self.last_flush_time.lock().unwrap();
// +1 for '\n'
let elapse = last_flush_time.elapsed();
if (buffer.len() + metric.len() + 1) > buffer.capacity() ||
(elapse >= self.flush_period) {
self.flush(&mut buffer);
*last_flush_time = Instant::now();
}
buffer.extend_from_slice(metric.as_bytes());
buffer.push('\n' as u8);
Ok(metric.len())
}
fn flush(&self, buffer: &mut Vec<u8>) {
let _ = self.sink.emit(from_utf8(buffer.as_slice()).unwrap());
buffer.clear();
}
}
impl MetricSink for BufferedUdpMetricSink {
fn emit(&self, metric: &str) -> io::Result<usize> {
self.append_to_buffer(metric)
}
}
#[cfg(test)]
mod tests {
use super::*;
use test::Bencher;
use std::time::Duration;
use std::net::UdpSocket;
use std::thread::{self, JoinHandle};
use std::sync::mpsc::{channel, Sender, Receiver};
use cadence::MetricSink;
const MTABLE: &'static [ &'static str ] = &[ "foo:1|c", "bar:1|d", "foo:2|t" ];
fn new_nonblock_metric() -> NonblockUdpMetricSink {
let host = "127.0.0.1:8125";
let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
NonblockUdpMetricSink::from(host, socket).unwrap()
}
fn new_block_metric() -> BlockUdpMetricSink {
let host = "127.0.0.1:8125";
let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
BlockUdpMetricSink::from(host, socket).unwrap()
}
fn new_buffer_metric() -> BufferedUdpMetricSink {
let host = "127.0.0.1:8125";
let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
BufferedUdpMetricSink::from(host, socket, Duration::from_millis(10)).unwrap()
}
#[bench]
fn bench_single_thread_block_udp(b: &mut Bencher) {
let sink = new_block_metric();
b.iter(|| sink.emit("foo:1|c"));
}
#[bench]
fn bench_single_thread_nonblock_udp(b: &mut Bencher) {
let sink = new_nonblock_metric();
b.iter(|| sink.emit("foo:1|c"));
}
#[bench]
fn bench_single_thread_buffer_udp(b: &mut Bencher) {
let sink = new_buffer_metric();
b.iter(|| sink.emit("foo:1|c"));
}
fn thread_worker() -> (JoinHandle<()>, Sender<usize>, Receiver<i64>) {
let (tx, rx) = channel();
let (tx1, done) = channel();
let t = thread::spawn(move || {
let sink = __client().unwrap();
let length = MTABLE.len();
loop {
let n = rx.recv().unwrap();
if n == 0 { return }
for _ in 0 .. n {
let s = MTABLE[n%length];
let _ = sink.emit(s).unwrap();
}
tx1.send(1).unwrap()
}
});
(t, tx, done)
}
#[bench]
fn bench_multi_thread_block_udp(b: &mut Bencher) {
let sink = new_block_metric();
let _ = set_metric(Box::new(sink));
let (t0, tx0, done0) = thread_worker();
let (t1, tx1, done1) = thread_worker();
let (t2, tx2, done2) = thread_worker();
b.iter(|| {
let _ = tx0.send(5000).unwrap();
let _ = tx1.send(5000).unwrap();
let _ = tx2.send(5000).unwrap();
let _ = done0.recv().unwrap();
let _ = done1.recv().unwrap();
let _ = done2.recv().unwrap();
});
let _ = tx0.send(0).unwrap();
let _ = tx1.send(0).unwrap();
let _ = tx2.send(0).unwrap();
let _ = t0.join().unwrap();
let _ = t1.join().unwrap();
let _ = t2.join().unwrap();
}
#[bench]
fn bench_multi_thread_nonblock_udp(b: &mut Bencher) {
let sink = new_nonblock_metric();
let _ = set_metric(Box::new(sink));
let (t0, tx0, done0) = thread_worker();
let (t1, tx1, done1) = thread_worker();
let (t2, tx2, done2) = thread_worker();
b.iter(|| {
let _ = tx0.send(5000).unwrap();
let _ = tx1.send(5000).unwrap();
let _ = tx2.send(5000).unwrap();
let _ = done0.recv().unwrap();
let _ = done1.recv().unwrap();
let _ = done2.recv().unwrap();
});
let _ = tx0.send(0).unwrap();
let _ = tx1.send(0).unwrap();
let _ = tx2.send(0).unwrap();
let _ = t0.join().unwrap();
let _ = t1.join().unwrap();
let _ = t2.join().unwrap();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment