Skip to content

Instantly share code, notes, and snippets.

@overvenus
Last active July 7, 2016 03:34
Show Gist options
  • Save overvenus/31b3adba8d60fbc81325a958a0e9fc9d to your computer and use it in GitHub Desktop.
Save overvenus/31b3adba8d60fbc81325a958a0e9fc9d to your computer and use it in GitHub Desktop.

Revisions

  1. overvenus revised this gist Jul 7, 2016. 3 changed files with 487 additions and 0 deletions.
    3 changes: 3 additions & 0 deletions Cargo.toml
    Original file line number Diff line number Diff line change
    @@ -5,3 +5,6 @@ authors = ["overvenus <[email protected]>"]

    [dependencies]
    cadence = "0.5.0"
    log = "0.3"
    quick-error = "0.2"

    134 changes: 134 additions & 0 deletions lib.rs
    Original file line number Diff line number Diff line change
    @@ -3,9 +3,14 @@

    extern crate test;
    extern crate cadence;
    #[macro_use]
    extern crate log;
    #[macro_use]
    extern crate quick_error;

    use std::io;
    use std::mem;
    use std::fmt;
    use std::str::from_utf8;
    use std::sync::Mutex;
    use std::time::{Duration, Instant};
    @@ -14,6 +19,9 @@ use std::net::{ToSocketAddrs, SocketAddr, UdpSocket};

    use cadence::{MetricSink, MetricResult, ErrorKind};

    pub mod worker;
    use worker::{Worker, Runnable};

    static mut CLIENT: *const Metric = &NopMetric;
    // IS_INITIALIZED indicates the state of CLIENT,
    // `false` for uninitialized, `true` for initialized.
    @@ -173,6 +181,92 @@ impl MetricSink for BufferedUdpMetricSink {

    impl Metric for BufferedUdpMetricSink {}

    // ========================================
    /// AsyncUdpMetricSink
    // ========================================

    const BUFFER_SIZE: usize = 1024;

    struct BufferSender {
    sink_addr: SocketAddr,
    socket: UdpSocket,

    buffer: Vec<u8>,
    flush_period: Duration,
    last_flush_time: Instant,
    }

    impl Runnable<String> for BufferSender {
    fn run(&mut self, metric: String) {

    // +1 for '\n'
    if (self.buffer.len() + metric.len() + 1) > self.buffer.capacity() ||
    (self.last_flush_time.elapsed() >= self.flush_period) {
    self.last_flush_time = Instant::now();

    if let Err(e) = self.socket.send_to(self.buffer.as_slice(), &self.sink_addr) {
    warn!("send metric failed {:?}", e);
    }
    self.buffer.clear()
    }

    self.buffer.extend_from_slice(metric.as_bytes());
    self.buffer.push(b'\n');
    }
    }

    pub struct AsyncUdpMetricSink {
    worker: Mutex<Worker<String>>,
    }

    impl AsyncUdpMetricSink {
    pub fn from<A>(sink_addr: A,
    socket: UdpSocket,
    flush_period: Duration)
    -> MetricResult<AsyncUdpMetricSink>
    where A: ToSocketAddrs
    {
    let mut addr_iter = try!(sink_addr.to_socket_addrs());
    let addr = try!(addr_iter.next()
    .ok_or((ErrorKind::InvalidInput, "No socket addresses yielded")));

    let sender = BufferSender {
    sink_addr: addr,
    socket: socket,

    buffer:Vec::with_capacity(BUFFER_SIZE),
    flush_period: flush_period,
    last_flush_time: Instant::now(),
    };

    let mut worker = Worker::new("mteric-worker");
    worker.start(sender).unwrap();

    Ok(AsyncUdpMetricSink {
    worker: Mutex::new(worker),
    })
    }

    fn handle_to_sender(&self, metric: &str) -> io::Result<usize> {
    let worker = self.worker.lock().unwrap();

    if let Err(e) = worker.schedule(metric.to_owned()) {
    warn!("{}", e);
    }

    Ok(metric.len())
    }
    }

    impl MetricSink for AsyncUdpMetricSink {
    fn emit(&self, metric: &str) -> io::Result<usize> {
    self.handle_to_sender(metric)
    }
    }

    impl Metric for AsyncUdpMetricSink {}



    #[cfg(test)]
    mod tests {
    @@ -205,6 +299,12 @@ mod tests {
    BufferedUdpMetricSink::from(host, socket, Duration::from_millis(10)).unwrap()
    }

    fn new_async_metric() -> AsyncUdpMetricSink {
    let host = "127.0.0.1:8125";
    let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
    AsyncUdpMetricSink::from(host, socket, Duration::from_millis(10)).unwrap()
    }

    #[bench]
    fn bench_single_thread_block_udp(b: &mut Bencher) {
    let sink = new_block_metric();
    @@ -223,6 +323,12 @@ mod tests {
    b.iter(|| sink.emit("foo:1|c"));
    }

    #[bench]
    fn bench_single_thread_async_udp(b: &mut Bencher) {
    let sink = new_async_metric();
    b.iter(|| sink.emit("foo:1|c"));
    }

    fn thread_worker() -> (JoinHandle<()>, Sender<usize>, Receiver<i64>) {
    let (tx, rx) = channel();
    let (tx1, done) = channel();
    @@ -327,4 +433,32 @@ mod tests {
    let _ = t1.join().unwrap();
    let _ = t2.join().unwrap();
    }

    #[bench]
    fn bench_multi_thread_async_udp(b: &mut Bencher) {
    let sink = new_async_metric();
    let _ = set_metric(Box::new(sink));

    let (t0, tx0, done0) = thread_worker();
    let (t1, tx1, done1) = thread_worker();
    let (t2, tx2, done2) = thread_worker();

    b.iter(|| {
    let _ = tx0.send(5000).unwrap();
    let _ = tx1.send(5000).unwrap();
    let _ = tx2.send(5000).unwrap();

    let _ = done0.recv().unwrap();
    let _ = done1.recv().unwrap();
    let _ = done2.recv().unwrap();
    });

    let _ = tx0.send(0).unwrap();
    let _ = tx1.send(0).unwrap();
    let _ = tx2.send(0).unwrap();

    let _ = t0.join().unwrap();
    let _ = t1.join().unwrap();
    let _ = t2.join().unwrap();
    }
    }
    350 changes: 350 additions & 0 deletions mod.rs
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,350 @@
    /// Worker contains all workers that do the expensive job in background.

    use std::io;
    use std::sync::Arc;
    use std::fmt::Display;
    use std::time::{Instant, Duration};
    use std::thread::{self, JoinHandle, Builder};
    use std::sync::atomic::{AtomicUsize, Ordering};
    use std::sync::mpsc::{self, Sender, Receiver};
    use std::result;

    macro_rules! slow_log {
    ($t:expr, $($arg:tt)*) => {{
    if $t.is_slow() {
    warn!($($arg)*);
    } else {
    trace!($($arg)*);
    }
    }}
    }

    pub struct SlowTimer {
    slow_time: Duration,
    t: Instant,
    }

    impl SlowTimer {
    pub fn new() -> SlowTimer {
    SlowTimer::default()
    }

    pub fn from(slow_time: Duration) -> SlowTimer {
    SlowTimer {
    slow_time: slow_time,
    t: Instant::now(),
    }
    }

    pub fn from_secs(secs: u64) -> SlowTimer {
    SlowTimer::from(Duration::from_secs(secs))
    }

    pub fn from_millis(millis: u64) -> SlowTimer {
    SlowTimer::from(Duration::from_millis(millis))
    }

    pub fn elapsed(&self) -> Duration {
    self.t.elapsed()
    }

    pub fn is_slow(&self) -> bool {
    self.elapsed() >= self.slow_time
    }
    }

    const DEFAULT_SLOW_SECS: u64 = 1;

    impl Default for SlowTimer {
    fn default() -> SlowTimer {
    SlowTimer::from_secs(DEFAULT_SLOW_SECS)
    }
    }

    quick_error! {
    #[derive(Debug)]
    pub enum Error {
    Stopped
    IoError(e: io::Error) {
    from()
    display("{}", e)
    }
    }
    }

    impl<T> From<mpsc::SendError<T>> for Error {
    fn from(_: mpsc::SendError<T>) -> Error {
    // do we need to return the failed data
    Error::Stopped
    }
    }

    pub type Result<T> = result::Result<T, Error>;

    pub trait Runnable<T: Display> {
    fn run(&mut self, t: T);
    }

    pub trait BatchRunnable<T: Display> {
    /// run a batch of tasks.
    ///
    /// Please note that ts will be clear after invoking this method.
    fn run_batch(&mut self, ts: &mut Vec<T>);
    }

    impl<T: Display, R: Runnable<T>> BatchRunnable<T> for R {
    fn run_batch(&mut self, ts: &mut Vec<T>) {
    for t in ts.drain(..) {
    let task_str = format!("{}", t);
    let timer = SlowTimer::new();
    self.run(t);
    slow_log!(timer,
    "task {} takes {:?} to finish.",
    task_str,
    timer.elapsed());
    }
    }
    }

    /// Scheduler provides interface to schedule task to underlying workers.
    pub struct Scheduler<T> {
    counter: Arc<AtomicUsize>,
    sender: Sender<Option<T>>,
    }

    impl<T: Display> Scheduler<T> {
    fn new(counter: AtomicUsize, sender: Sender<Option<T>>) -> Scheduler<T> {
    Scheduler {
    counter: Arc::new(counter),
    sender: sender,
    }
    }

    /// Schedule a task to run.
    ///
    /// If the worker is stopped, an error will return.
    pub fn schedule(&self, task: T) -> Result<()> {
    debug!("scheduling task {}", task);
    try!(self.sender.send(Some(task)));
    self.counter.fetch_add(1, Ordering::SeqCst);
    Ok(())
    }

    /// Check if underlying worker can't handle task immediately.
    pub fn is_busy(&self) -> bool {
    self.counter.load(Ordering::SeqCst) > 0
    }
    }

    impl<T: Display> Clone for Scheduler<T> {
    fn clone(&self) -> Scheduler<T> {
    Scheduler {
    counter: self.counter.clone(),
    sender: self.sender.clone(),
    }
    }
    }

    /// Create a scheduler that can't be scheduled any task.
    ///
    /// Useful for test purpose.
    #[cfg(test)]
    pub fn dummy_scheduler<T: Display>() -> Scheduler<T> {
    let (tx, _) = mpsc::channel();
    Scheduler::new(AtomicUsize::new(0), tx)
    }

    /// A worker that can schedule time consuming tasks.
    pub struct Worker<T: Display> {
    name: String,
    scheduler: Scheduler<T>,
    receiver: Option<Receiver<Option<T>>>,
    handle: Option<JoinHandle<()>>,
    }

    fn poll<R, T>(mut runner: R, rx: Receiver<Option<T>>, counter: Arc<AtomicUsize>, batch_size: usize)
    where R: BatchRunnable<T> + Send + 'static,
    T: Display + Send + 'static
    {
    let mut keep_going = true;
    let mut buffer = Vec::with_capacity(batch_size);
    while keep_going {
    let t = rx.recv();
    match t {
    Ok(Some(t)) => buffer.push(t),
    _ => return,
    }
    while buffer.len() < batch_size {
    match rx.try_recv() {
    Ok(None) => {
    keep_going = false;
    break;
    }
    Ok(Some(t)) => buffer.push(t),
    _ => break,
    }
    }
    counter.fetch_sub(buffer.len(), Ordering::SeqCst);
    runner.run_batch(&mut buffer);
    buffer.clear();
    }
    }

    impl<T: Display + Send + 'static> Worker<T> {
    /// Create a worker.
    pub fn new<S: Into<String>>(name: S) -> Worker<T> {
    let (tx, rx) = mpsc::channel();
    Worker {
    name: name.into(),
    scheduler: Scheduler::new(AtomicUsize::new(0), tx),
    receiver: Some(rx),
    handle: None,
    }
    }

    /// Start the worker.
    pub fn start<R: Runnable<T> + Send + 'static>(&mut self, runner: R) -> Result<()> {
    self.start_batch(runner, 1)
    }

    pub fn start_batch<R>(&mut self, runner: R, batch_size: usize) -> Result<()>
    where R: BatchRunnable<T> + Send + 'static
    {
    info!("starting working thread: {}", self.name);
    if self.receiver.is_none() {
    warn!("worker {} has been started.", self.name);
    return Ok(());
    }

    let rx = self.receiver.take().unwrap();
    let counter = self.scheduler.counter.clone();
    let h = try!(Builder::new()
    .name(self.name.clone())
    .spawn(move || poll(runner, rx, counter, batch_size)));
    self.handle = Some(h);
    Ok(())
    }

    /// Get a scheduler to schedule task.
    pub fn scheduler(&self) -> Scheduler<T> {
    self.scheduler.clone()
    }

    /// Schedule a task to run.
    ///
    /// If the worker is stopped, an error will return.
    pub fn schedule(&self, task: T) -> Result<()> {
    self.scheduler.schedule(task)
    }

    /// Check if underlying worker can't handle task immediately.
    pub fn is_busy(&self) -> bool {
    self.handle.is_none() || self.scheduler.is_busy()
    }

    /// Stop the worker thread.
    pub fn stop(&mut self) -> thread::Result<()> {
    let handler = match self.handle.take() {
    Some(h) => h,
    None => return Ok(()),
    };

    // close sender explicitly so the background thread will exit.
    info!("stoping {}", self.name);
    if let Err(e) = self.scheduler.sender.send(None) {
    warn!("failed to stop worker thread: {:?}", e);
    }
    handler.join()
    }
    }

    #[cfg(test)]
    mod test {
    use std::thread;
    use std::sync::Arc;
    use std::sync::atomic::*;
    use std::cmp;
    use std::time::Duration;

    use super::*;

    struct CountRunner {
    count: Arc<AtomicUsize>,
    }

    impl Runnable<u64> for CountRunner {
    fn run(&mut self, step: u64) {
    self.count.fetch_add(step as usize, Ordering::SeqCst);
    thread::sleep(Duration::from_millis(10));
    }
    }

    struct BatchRunner {
    count: Arc<AtomicUsize>,
    }

    impl BatchRunnable<u64> for BatchRunner {
    fn run_batch(&mut self, ms: &mut Vec<u64>) {
    let total = ms.iter().fold(0, |l, &r| l + r);
    self.count.fetch_add(total as usize, Ordering::SeqCst);
    let max_sleep = ms.iter().fold(0, |l, &r| cmp::max(l, r));
    thread::sleep(Duration::from_millis(max_sleep));
    }
    }

    #[test]
    fn test_worker() {
    let mut worker = Worker::new("test-worker");
    let count = Arc::new(AtomicUsize::new(0));
    worker.start(CountRunner { count: count.clone() }).unwrap();
    assert!(!worker.is_busy());
    worker.schedule(50).unwrap();
    worker.schedule(50).unwrap();
    worker.schedule(50).unwrap();
    assert!(worker.is_busy());
    for _ in 0..100 {
    if !worker.is_busy() {
    break;
    }
    thread::sleep(Duration::from_millis(10));
    }
    assert!(!worker.is_busy());
    worker.stop().unwrap();
    assert_eq!(count.load(Ordering::SeqCst), 150);
    // now worker can't handle any task
    assert!(worker.is_busy());
    }

    #[test]
    fn test_threaded() {
    let mut worker = Worker::new("test-worker-threaded");
    let count = Arc::new(AtomicUsize::new(0));
    worker.start(CountRunner { count: count.clone() }).unwrap();
    let scheduler = worker.scheduler();
    thread::spawn(move || {
    scheduler.schedule(100).unwrap();
    scheduler.schedule(100).unwrap();
    });
    for _ in 1..1000 {
    if worker.is_busy() {
    break;
    }
    thread::sleep(Duration::from_millis(1));
    }
    worker.stop().unwrap();
    assert_eq!(count.load(Ordering::SeqCst), 200);
    }

    #[test]
    fn test_batch() {
    let mut worker = Worker::new("test-worker-batch");
    let count = Arc::new(AtomicUsize::new(0));
    worker.start_batch(BatchRunner { count: count.clone() }, 10).unwrap();
    for _ in 0..20 {
    worker.schedule(50).unwrap();
    }
    worker.stop().unwrap();
    assert_eq!(count.load(Ordering::SeqCst), 20 * 50);
    }
    }
  2. overvenus revised this gist Jul 2, 2016. 1 changed file with 29 additions and 0 deletions.
    29 changes: 29 additions & 0 deletions lib.rs
    Original file line number Diff line number Diff line change
    @@ -171,6 +171,7 @@ impl MetricSink for BufferedUdpMetricSink {
    }
    }

    impl Metric for BufferedUdpMetricSink {}


    #[cfg(test)]
    @@ -298,4 +299,32 @@ mod tests {
    let _ = t1.join().unwrap();
    let _ = t2.join().unwrap();
    }

    #[bench]
    fn bench_multi_thread_buffer_udp(b: &mut Bencher) {
    let sink = new_buffer_metric();
    let _ = set_metric(Box::new(sink));

    let (t0, tx0, done0) = thread_worker();
    let (t1, tx1, done1) = thread_worker();
    let (t2, tx2, done2) = thread_worker();

    b.iter(|| {
    let _ = tx0.send(5000).unwrap();
    let _ = tx1.send(5000).unwrap();
    let _ = tx2.send(5000).unwrap();

    let _ = done0.recv().unwrap();
    let _ = done1.recv().unwrap();
    let _ = done2.recv().unwrap();
    });

    let _ = tx0.send(0).unwrap();
    let _ = tx1.send(0).unwrap();
    let _ = tx2.send(0).unwrap();

    let _ = t0.join().unwrap();
    let _ = t1.join().unwrap();
    let _ = t2.join().unwrap();
    }
    }
  3. overvenus revised this gist Jul 1, 2016. 1 changed file with 75 additions and 0 deletions.
    75 changes: 75 additions & 0 deletions lib.rs
    Original file line number Diff line number Diff line change
    @@ -6,6 +6,9 @@ extern crate cadence;

    use std::io;
    use std::mem;
    use std::str::from_utf8;
    use std::sync::Mutex;
    use std::time::{Duration, Instant};
    use std::sync::atomic::{AtomicBool, Ordering};
    use std::net::{ToSocketAddrs, SocketAddr, UdpSocket};

    @@ -111,10 +114,70 @@ impl MetricSink for BlockUdpMetricSink {

    impl Metric for BlockUdpMetricSink {}


    /// ================================
    /// Buffered NonblockUdpMetricSink
    /// ================================
    pub struct BufferedUdpMetricSink {
    sink: NonblockUdpMetricSink,

    buffer: Mutex<Vec<u8>>,
    last_flush_time: Mutex<Instant>,
    flush_period: Duration,
    }

    impl BufferedUdpMetricSink {
    pub fn from<A>(sink_addr: A,
    socket: UdpSocket,
    flush_period: Duration)
    -> MetricResult<BufferedUdpMetricSink>
    where A: ToSocketAddrs
    {
    Ok(BufferedUdpMetricSink {
    sink: NonblockUdpMetricSink::from(sink_addr, socket).unwrap(),
    buffer: Mutex::new(Vec::with_capacity(1024)),
    flush_period: flush_period,
    last_flush_time: Mutex::new(Instant::now()),
    })
    }

    fn append_to_buffer(&self, metric: &str) -> io::Result<usize> {
    let mut buffer = self.buffer.lock().unwrap();
    let mut last_flush_time = self.last_flush_time.lock().unwrap();

    // +1 for '\n'
    let elapse = last_flush_time.elapsed();
    if (buffer.len() + metric.len() + 1) > buffer.capacity() ||
    (elapse >= self.flush_period) {
    self.flush(&mut buffer);
    *last_flush_time = Instant::now();
    }

    buffer.extend_from_slice(metric.as_bytes());
    buffer.push('\n' as u8);
    Ok(metric.len())
    }

    fn flush(&self, buffer: &mut Vec<u8>) {
    let _ = self.sink.emit(from_utf8(buffer.as_slice()).unwrap());
    buffer.clear();
    }
    }

    impl MetricSink for BufferedUdpMetricSink {
    fn emit(&self, metric: &str) -> io::Result<usize> {
    self.append_to_buffer(metric)
    }
    }



    #[cfg(test)]
    mod tests {
    use super::*;
    use test::Bencher;
    use std::time::Duration;
    use std::net::UdpSocket;
    use std::thread::{self, JoinHandle};
    use std::sync::mpsc::{channel, Sender, Receiver};
    @@ -135,6 +198,12 @@ mod tests {
    BlockUdpMetricSink::from(host, socket).unwrap()
    }

    fn new_buffer_metric() -> BufferedUdpMetricSink {
    let host = "127.0.0.1:8125";
    let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
    BufferedUdpMetricSink::from(host, socket, Duration::from_millis(10)).unwrap()
    }

    #[bench]
    fn bench_single_thread_block_udp(b: &mut Bencher) {
    let sink = new_block_metric();
    @@ -147,6 +216,12 @@ mod tests {
    b.iter(|| sink.emit("foo:1|c"));
    }

    #[bench]
    fn bench_single_thread_buffer_udp(b: &mut Bencher) {
    let sink = new_buffer_metric();
    b.iter(|| sink.emit("foo:1|c"));
    }

    fn thread_worker() -> (JoinHandle<()>, Sender<usize>, Receiver<i64>) {
    let (tx, rx) = channel();
    let (tx1, done) = channel();
  4. overvenus revised this gist May 31, 2016. 1 changed file with 18 additions and 16 deletions.
    34 changes: 18 additions & 16 deletions lib.rs
    Original file line number Diff line number Diff line change
    @@ -121,6 +121,8 @@ mod tests {

    use cadence::MetricSink;

    const MTABLE: &'static [ &'static str ] = &[ "foo:1|c", "bar:1|d", "foo:2|t" ];

    fn new_nonblock_metric() -> NonblockUdpMetricSink {
    let host = "127.0.0.1:8125";
    let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
    @@ -145,19 +147,19 @@ mod tests {
    b.iter(|| sink.emit("foo:1|c"));
    }

    fn thread_worker(s: &str) -> (JoinHandle<()>, Sender<i64>, Receiver<i64>) {
    let s = s.to_owned();

    fn thread_worker() -> (JoinHandle<()>, Sender<usize>, Receiver<i64>) {
    let (tx, rx) = channel();
    let (tx1, done) = channel();

    let t = thread::spawn(move || {
    let sink = __client().unwrap();
    let length = MTABLE.len();
    loop {
    let n = rx.recv().unwrap();
    if n == 0 { return }
    for _ in 0 .. n {
    let _ = sink.emit(&s).unwrap();
    let s = MTABLE[n%length];
    let _ = sink.emit(s).unwrap();
    }
    tx1.send(1).unwrap()
    }
    @@ -171,14 +173,14 @@ mod tests {
    let sink = new_block_metric();
    let _ = set_metric(Box::new(sink));

    let (t0, tx0, done0) = thread_worker("foo:1|c");
    let (t1, tx1, done1) = thread_worker("foo:1|c");
    let (t2, tx2, done2) = thread_worker("foo:1|c");
    let (t0, tx0, done0) = thread_worker();
    let (t1, tx1, done1) = thread_worker();
    let (t2, tx2, done2) = thread_worker();

    b.iter(|| {
    let _ = tx0.send(500).unwrap();
    let _ = tx1.send(500).unwrap();
    let _ = tx2.send(500).unwrap();
    let _ = tx0.send(5000).unwrap();
    let _ = tx1.send(5000).unwrap();
    let _ = tx2.send(5000).unwrap();

    let _ = done0.recv().unwrap();
    let _ = done1.recv().unwrap();
    @@ -199,14 +201,14 @@ mod tests {
    let sink = new_nonblock_metric();
    let _ = set_metric(Box::new(sink));

    let (t0, tx0, done0) = thread_worker("foo:1|c");
    let (t1, tx1, done1) = thread_worker("foo:1|c");
    let (t2, tx2, done2) = thread_worker("foo:1|c");
    let (t0, tx0, done0) = thread_worker();
    let (t1, tx1, done1) = thread_worker();
    let (t2, tx2, done2) = thread_worker();

    b.iter(|| {
    let _ = tx0.send(500).unwrap();
    let _ = tx1.send(500).unwrap();
    let _ = tx2.send(500).unwrap();
    let _ = tx0.send(5000).unwrap();
    let _ = tx1.send(5000).unwrap();
    let _ = tx2.send(5000).unwrap();

    let _ = done0.recv().unwrap();
    let _ = done1.recv().unwrap();
  5. overvenus revised this gist May 31, 2016. 1 changed file with 1 addition and 1 deletion.
    2 changes: 1 addition & 1 deletion lib.rs
    Original file line number Diff line number Diff line change
    @@ -63,7 +63,7 @@ impl NonblockUdpMetricSink {
    .ok_or((ErrorKind::InvalidInput, "No socket addresses yielded")));

    // Moves this UDP stream into nonblocking mode.
    let _ = socket.set_nonblocking(false).unwrap();
    let _ = socket.set_nonblocking(true).unwrap();

    Ok(NonblockUdpMetricSink {
    sink_addr: addr,
  6. overvenus created this gist May 31, 2016.
    7 changes: 7 additions & 0 deletions Cargo.toml
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,7 @@
    [package]
    name = "bench_metric"
    version = "0.1.0"
    authors = ["overvenus <[email protected]>"]

    [dependencies]
    cadence = "0.5.0"
    224 changes: 224 additions & 0 deletions lib.rs
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,224 @@
    #![feature(test)]
    #![feature(const_fn)]

    extern crate test;
    extern crate cadence;

    use std::io;
    use std::mem;
    use std::sync::atomic::{AtomicBool, Ordering};
    use std::net::{ToSocketAddrs, SocketAddr, UdpSocket};

    use cadence::{MetricSink, MetricResult, ErrorKind};

    static mut CLIENT: *const Metric = &NopMetric;
    // IS_INITIALIZED indicates the state of CLIENT,
    // `false` for uninitialized, `true` for initialized.
    static IS_INITIALIZED: AtomicBool = AtomicBool::new(false);

    pub trait Metric: MetricSink + Sync + Send{}

    pub struct NopMetric;

    impl MetricSink for NopMetric {
    fn emit(&self, _: &str) -> io::Result<usize> {Ok(0)}
    }

    impl Metric for NopMetric {}

    pub fn set_metric(metric: Box<Metric + Sync + Send>) -> Result<(), ()> {
    unsafe {
    if IS_INITIALIZED.compare_and_swap(false, true, Ordering::SeqCst) != false {
    return Err(());
    }

    CLIENT = mem::transmute(metric);
    Ok(())
    }
    }

    #[doc(hidden)]
    pub fn __client() -> Option<&'static Metric> {
    if IS_INITIALIZED.load(Ordering::SeqCst) != true {
    return None;
    }

    Some(unsafe { &*CLIENT })
    }

    /// =======================
    /// NonblockUdpMetricSink
    /// =======================
    pub struct NonblockUdpMetricSink {
    sink_addr: SocketAddr,
    socket: UdpSocket,
    }

    impl NonblockUdpMetricSink {
    pub fn from<A>(sink_addr: A, socket: UdpSocket) -> MetricResult<NonblockUdpMetricSink>
    where A: ToSocketAddrs
    {
    let mut addr_iter = try!(sink_addr.to_socket_addrs());
    let addr = try!(addr_iter.next()
    .ok_or((ErrorKind::InvalidInput, "No socket addresses yielded")));

    // Moves this UDP stream into nonblocking mode.
    let _ = socket.set_nonblocking(false).unwrap();

    Ok(NonblockUdpMetricSink {
    sink_addr: addr,
    socket: socket,
    })
    }
    }

    impl MetricSink for NonblockUdpMetricSink {
    fn emit(&self, metric: &str) -> io::Result<usize> {
    self.socket.send_to(metric.as_bytes(), &self.sink_addr)
    }
    }

    impl Metric for NonblockUdpMetricSink {}

    /// ===============================================
    /// BlockUdpMetricSink AKA cadence::UdpMetricSink
    /// ===============================================
    pub struct BlockUdpMetricSink {
    sink_addr: SocketAddr,
    socket: UdpSocket,
    }

    impl BlockUdpMetricSink {
    pub fn from<A>(sink_addr: A, socket: UdpSocket) -> MetricResult<BlockUdpMetricSink>
    where A: ToSocketAddrs
    {
    let mut addr_iter = try!(sink_addr.to_socket_addrs());
    let addr = try!(addr_iter.next()
    .ok_or((ErrorKind::InvalidInput, "No socket addresses yielded")));

    Ok(BlockUdpMetricSink {
    sink_addr: addr,
    socket: socket,
    })
    }
    }

    impl MetricSink for BlockUdpMetricSink {
    fn emit(&self, metric: &str) -> io::Result<usize> {
    self.socket.send_to(metric.as_bytes(), &self.sink_addr)
    }
    }

    impl Metric for BlockUdpMetricSink {}

    #[cfg(test)]
    mod tests {
    use super::*;
    use test::Bencher;
    use std::net::UdpSocket;
    use std::thread::{self, JoinHandle};
    use std::sync::mpsc::{channel, Sender, Receiver};

    use cadence::MetricSink;

    fn new_nonblock_metric() -> NonblockUdpMetricSink {
    let host = "127.0.0.1:8125";
    let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
    NonblockUdpMetricSink::from(host, socket).unwrap()
    }

    fn new_block_metric() -> BlockUdpMetricSink {
    let host = "127.0.0.1:8125";
    let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
    BlockUdpMetricSink::from(host, socket).unwrap()
    }

    #[bench]
    fn bench_single_thread_block_udp(b: &mut Bencher) {
    let sink = new_block_metric();
    b.iter(|| sink.emit("foo:1|c"));
    }

    #[bench]
    fn bench_single_thread_nonblock_udp(b: &mut Bencher) {
    let sink = new_nonblock_metric();
    b.iter(|| sink.emit("foo:1|c"));
    }

    fn thread_worker(s: &str) -> (JoinHandle<()>, Sender<i64>, Receiver<i64>) {
    let s = s.to_owned();

    let (tx, rx) = channel();
    let (tx1, done) = channel();

    let t = thread::spawn(move || {
    let sink = __client().unwrap();
    loop {
    let n = rx.recv().unwrap();
    if n == 0 { return }
    for _ in 0 .. n {
    let _ = sink.emit(&s).unwrap();
    }
    tx1.send(1).unwrap()
    }
    });

    (t, tx, done)
    }

    #[bench]
    fn bench_multi_thread_block_udp(b: &mut Bencher) {
    let sink = new_block_metric();
    let _ = set_metric(Box::new(sink));

    let (t0, tx0, done0) = thread_worker("foo:1|c");
    let (t1, tx1, done1) = thread_worker("foo:1|c");
    let (t2, tx2, done2) = thread_worker("foo:1|c");

    b.iter(|| {
    let _ = tx0.send(500).unwrap();
    let _ = tx1.send(500).unwrap();
    let _ = tx2.send(500).unwrap();

    let _ = done0.recv().unwrap();
    let _ = done1.recv().unwrap();
    let _ = done2.recv().unwrap();
    });

    let _ = tx0.send(0).unwrap();
    let _ = tx1.send(0).unwrap();
    let _ = tx2.send(0).unwrap();

    let _ = t0.join().unwrap();
    let _ = t1.join().unwrap();
    let _ = t2.join().unwrap();
    }

    #[bench]
    fn bench_multi_thread_nonblock_udp(b: &mut Bencher) {
    let sink = new_nonblock_metric();
    let _ = set_metric(Box::new(sink));

    let (t0, tx0, done0) = thread_worker("foo:1|c");
    let (t1, tx1, done1) = thread_worker("foo:1|c");
    let (t2, tx2, done2) = thread_worker("foo:1|c");

    b.iter(|| {
    let _ = tx0.send(500).unwrap();
    let _ = tx1.send(500).unwrap();
    let _ = tx2.send(500).unwrap();

    let _ = done0.recv().unwrap();
    let _ = done1.recv().unwrap();
    let _ = done2.recv().unwrap();
    });

    let _ = tx0.send(0).unwrap();
    let _ = tx1.send(0).unwrap();
    let _ = tx2.send(0).unwrap();

    let _ = t0.join().unwrap();
    let _ = t1.join().unwrap();
    let _ = t2.join().unwrap();
    }
    }