Skip to content

Instantly share code, notes, and snippets.

@codefromthecrypt
Last active July 12, 2018 19:56
Show Gist options
  • Save codefromthecrypt/76d94054b77e3be338bd75424ca8ba30 to your computer and use it in GitHub Desktop.
Save codefromthecrypt/76d94054b77e3be338bd75424ca8ba30 to your computer and use it in GitHub Desktop.

Revisions

  1. @adriancole adriancole revised this gist May 2, 2017. 1 changed file with 18 additions and 14 deletions.
    32 changes: 18 additions & 14 deletions kakfa-oneway.java
    Original file line number Diff line number Diff line change
    @@ -12,7 +12,7 @@ public class KafkaExampleIT {

    InMemoryStorage storage = new InMemoryStorage();

    Tracer tracer = Tracer.newBuilder()
    Tracing tracing = Tracing.newBuilder()
    .localEndpoint(Endpoint.builder().serviceName("producer").build())
    .reporter(s -> storage.spanConsumer().accept(Collections.singletonList(s)))
    .build();
    @@ -23,7 +23,7 @@ public class KafkaExampleIT {

    @Before public void setup() {
    producer = kafka.helper()
    .createProducer(new SpanSerializer(), new StringSerializer(), null);
    .createProducer(new SpanSerializer(tracing), new StringSerializer(), null);
    consumer = kafka.helper()
    .createConsumer(new SpanDeserializer(storage), new StringDeserializer(), null);
    kafkaEndpoint =
    @@ -35,34 +35,33 @@ public void startWithOneTracerAndStopWithAnother() throws Exception {
    String topic = "startWithOneTracerAndStopWithAnother";
    consumer.subscribe(Collections.singletonList(topic));

    Span span = tracer.newTrace().annotate(CLIENT_SEND).remoteEndpoint(kafkaEndpoint);
    Span span = tracing.tracer().newTrace().kind(Span.Kind.CLIENT).remoteEndpoint(kafkaEndpoint);
    producer.send(new ProducerRecord<>(topic, span, "foo")).get();
    span.flush();
    producer.close();

    consumer.poll(500L).forEach(record -> {
    record.key()
    .name(record.value())
    .annotate(SERVER_RECV)
    .remoteEndpoint(kafkaEndpoint)
    .flush();
    .name(record.value())
    .kind(Span.Kind.SERVER)
    .remoteEndpoint(kafkaEndpoint)
    .flush();
    });
    consumer.close();
    }
    }

    /** This class simulates a consumer being on a different process, by not sharing a tracer */
    final class SpanDeserializer implements Deserializer<Span> {
    TraceContext.Extractor<Properties> extractor =
    Propagation.B3_STRING.extractor(Properties::getProperty);

    final Tracer tracer;
    final Tracing tracing;
    final TraceContext.Extractor<Properties> extractor;

    SpanDeserializer(InMemoryStorage storage) {
    tracer = Tracer.newBuilder()
    tracing = Tracing.newBuilder()
    .localEndpoint(Endpoint.builder().serviceName("consumer").build())
    .reporter(s -> storage.spanConsumer().accept(Collections.singletonList(s)))
    .build();
    extractor = tracing.propagation().extractor(Properties::getProperty);
    }

    @Override public void configure(Map<String, ?> map, boolean b) {
    @@ -73,6 +72,7 @@ final class SpanDeserializer implements Deserializer<Span> {
    try {
    Properties properties = new Properties();
    properties.load(new ByteArrayInputStream(bytes));
    // in Brave 4.3 this will be simplified to tracing.tracer().nextSpan(extractor, properties)
    TraceContextOrSamplingFlags result = extractor.extract(properties);
    return result.context() != null
    ? tracer.joinSpan(result.context())
    @@ -85,9 +85,13 @@ final class SpanDeserializer implements Deserializer<Span> {
    @Override public void close() {
    }
    }

    final class SpanSerializer implements Serializer<Span> {
    TraceContext.Injector<Properties> injector =
    Propagation.B3_STRING.injector(Properties::setProperty);
    final TraceContext.Injector<Properties> injector;

    SpanSerializer(Tracing tracing) {
    injector = tracing.propagation().injector(Properties::setProperty);
    }

    @Override public void configure(Map<String, ?> map, boolean b) {
    }
  2. @adriancole adriancole created this gist Jan 5, 2017.
    109 changes: 109 additions & 0 deletions kakfa-oneway.java
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,109 @@
    /**
    * This is an example of a one-way or "messaging span", which is possible by use of the {@link
    * Span#flush()} operator.
    *
    * <p>Note that this uses a span as a kafka key, not because it is recommended, rather as it is
    * convenient for demonstration, since kafka doesn't have message properties.
    *
    * <p>See https://github.com/openzipkin/zipkin/issues/1243
    */
    public class KafkaExampleIT {
    @Rule public KafkaJunitRule kafka = new KafkaJunitRule(EphemeralKafkaBroker.create());

    InMemoryStorage storage = new InMemoryStorage();

    Tracer tracer = Tracer.newBuilder()
    .localEndpoint(Endpoint.builder().serviceName("producer").build())
    .reporter(s -> storage.spanConsumer().accept(Collections.singletonList(s)))
    .build();

    KafkaProducer<Span, String> producer;
    KafkaConsumer<Span, String> consumer;
    Endpoint kafkaEndpoint;

    @Before public void setup() {
    producer = kafka.helper()
    .createProducer(new SpanSerializer(), new StringSerializer(), null);
    consumer = kafka.helper()
    .createConsumer(new SpanDeserializer(storage), new StringDeserializer(), null);
    kafkaEndpoint =
    Endpoint.builder().serviceName("kafka").port(kafka.helper().kafkaPort()).build();
    }

    @Test
    public void startWithOneTracerAndStopWithAnother() throws Exception {
    String topic = "startWithOneTracerAndStopWithAnother";
    consumer.subscribe(Collections.singletonList(topic));

    Span span = tracer.newTrace().annotate(CLIENT_SEND).remoteEndpoint(kafkaEndpoint);
    producer.send(new ProducerRecord<>(topic, span, "foo")).get();
    span.flush();
    producer.close();

    consumer.poll(500L).forEach(record -> {
    record.key()
    .name(record.value())
    .annotate(SERVER_RECV)
    .remoteEndpoint(kafkaEndpoint)
    .flush();
    });
    consumer.close();
    }
    }

    /** This class simulates a consumer being on a different process, by not sharing a tracer */
    final class SpanDeserializer implements Deserializer<Span> {
    TraceContext.Extractor<Properties> extractor =
    Propagation.B3_STRING.extractor(Properties::getProperty);

    final Tracer tracer;

    SpanDeserializer(InMemoryStorage storage) {
    tracer = Tracer.newBuilder()
    .localEndpoint(Endpoint.builder().serviceName("consumer").build())
    .reporter(s -> storage.spanConsumer().accept(Collections.singletonList(s)))
    .build();
    }

    @Override public void configure(Map<String, ?> map, boolean b) {
    }

    /** Extract the span from the key or start a new trace if any problem. */
    @Override public Span deserialize(String s, byte[] bytes) {
    try {
    Properties properties = new Properties();
    properties.load(new ByteArrayInputStream(bytes));
    TraceContextOrSamplingFlags result = extractor.extract(properties);
    return result.context() != null
    ? tracer.joinSpan(result.context())
    : tracer.newTrace(result.samplingFlags());
    } catch (RuntimeException | IOException e) {
    return tracer.newTrace(); // return a new trace upon failure of any kind
    }
    }

    @Override public void close() {
    }
    }
    final class SpanSerializer implements Serializer<Span> {
    TraceContext.Injector<Properties> injector =
    Propagation.B3_STRING.injector(Properties::setProperty);

    @Override public void configure(Map<String, ?> map, boolean b) {
    }

    @Override public byte[] serialize(String s, Span span) {
    Properties properties = new Properties();
    injector.inject(span.context(), properties);
    ByteArrayOutputStream out = new ByteArrayOutputStream();
    try {
    properties.store(out, "zipkin");
    } catch (IOException e) {
    throw new AssertionError(e);
    }
    return out.toByteArray();
    }

    @Override public void close() {
    }
    }