Last active
July 12, 2018 19:56
-
-
Save codefromthecrypt/76d94054b77e3be338bd75424ca8ba30 to your computer and use it in GitHub Desktop.
Revisions
-
adriancole revised this gist
May 2, 2017 . 1 changed file with 18 additions and 14 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -12,7 +12,7 @@ public class KafkaExampleIT { InMemoryStorage storage = new InMemoryStorage(); Tracing tracing = Tracing.newBuilder() .localEndpoint(Endpoint.builder().serviceName("producer").build()) .reporter(s -> storage.spanConsumer().accept(Collections.singletonList(s))) .build(); @@ -23,7 +23,7 @@ public class KafkaExampleIT { @Before public void setup() { producer = kafka.helper() .createProducer(new SpanSerializer(tracing), new StringSerializer(), null); consumer = kafka.helper() .createConsumer(new SpanDeserializer(storage), new StringDeserializer(), null); kafkaEndpoint = @@ -35,34 +35,33 @@ public void startWithOneTracerAndStopWithAnother() throws Exception { String topic = "startWithOneTracerAndStopWithAnother"; consumer.subscribe(Collections.singletonList(topic)); Span span = tracing.tracer().newTrace().kind(Span.Kind.CLIENT).remoteEndpoint(kafkaEndpoint); producer.send(new ProducerRecord<>(topic, span, "foo")).get(); span.flush(); producer.close(); consumer.poll(500L).forEach(record -> { record.key() .name(record.value()) .kind(Span.Kind.SERVER) .remoteEndpoint(kafkaEndpoint) .flush(); }); consumer.close(); } } /** This class simulates a consumer being on a different process, by not sharing a tracer */ final class SpanDeserializer implements Deserializer<Span> { final Tracing tracing; final TraceContext.Extractor<Properties> extractor; SpanDeserializer(InMemoryStorage storage) { tracing = Tracing.newBuilder() .localEndpoint(Endpoint.builder().serviceName("consumer").build()) .reporter(s -> storage.spanConsumer().accept(Collections.singletonList(s))) .build(); extractor = tracing.propagation().extractor(Properties::getProperty); } @Override public void configure(Map<String, ?> map, boolean b) { @@ -73,6 +72,7 @@ final class SpanDeserializer implements Deserializer<Span> { try { Properties properties = new Properties(); properties.load(new ByteArrayInputStream(bytes)); // in Brave 4.3 this will be simplified to tracing.tracer().nextSpan(extractor, properties) TraceContextOrSamplingFlags result = extractor.extract(properties); return result.context() != null ? tracer.joinSpan(result.context()) @@ -85,9 +85,13 @@ final class SpanDeserializer implements Deserializer<Span> { @Override public void close() { } } final class SpanSerializer implements Serializer<Span> { final TraceContext.Injector<Properties> injector; SpanSerializer(Tracing tracing) { injector = tracing.propagation().injector(Properties::setProperty); } @Override public void configure(Map<String, ?> map, boolean b) { } -
adriancole created this gist
Jan 5, 2017 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,109 @@ /** * This is an example of a one-way or "messaging span", which is possible by use of the {@link * Span#flush()} operator. * * <p>Note that this uses a span as a kafka key, not because it is recommended, rather as it is * convenient for demonstration, since kafka doesn't have message properties. * * <p>See https://github.com/openzipkin/zipkin/issues/1243 */ public class KafkaExampleIT { @Rule public KafkaJunitRule kafka = new KafkaJunitRule(EphemeralKafkaBroker.create()); InMemoryStorage storage = new InMemoryStorage(); Tracer tracer = Tracer.newBuilder() .localEndpoint(Endpoint.builder().serviceName("producer").build()) .reporter(s -> storage.spanConsumer().accept(Collections.singletonList(s))) .build(); KafkaProducer<Span, String> producer; KafkaConsumer<Span, String> consumer; Endpoint kafkaEndpoint; @Before public void setup() { producer = kafka.helper() .createProducer(new SpanSerializer(), new StringSerializer(), null); consumer = kafka.helper() .createConsumer(new SpanDeserializer(storage), new StringDeserializer(), null); kafkaEndpoint = Endpoint.builder().serviceName("kafka").port(kafka.helper().kafkaPort()).build(); } @Test public void startWithOneTracerAndStopWithAnother() throws Exception { String topic = "startWithOneTracerAndStopWithAnother"; consumer.subscribe(Collections.singletonList(topic)); Span span = tracer.newTrace().annotate(CLIENT_SEND).remoteEndpoint(kafkaEndpoint); producer.send(new ProducerRecord<>(topic, span, "foo")).get(); span.flush(); producer.close(); consumer.poll(500L).forEach(record -> { record.key() .name(record.value()) .annotate(SERVER_RECV) .remoteEndpoint(kafkaEndpoint) .flush(); }); consumer.close(); } } /** This class simulates a consumer being on a different process, by not sharing a tracer */ final class SpanDeserializer implements Deserializer<Span> { TraceContext.Extractor<Properties> extractor = Propagation.B3_STRING.extractor(Properties::getProperty); final Tracer tracer; SpanDeserializer(InMemoryStorage storage) { tracer = Tracer.newBuilder() .localEndpoint(Endpoint.builder().serviceName("consumer").build()) .reporter(s -> storage.spanConsumer().accept(Collections.singletonList(s))) .build(); } @Override public void configure(Map<String, ?> map, boolean b) { } /** Extract the span from the key or start a new trace if any problem. */ @Override public Span deserialize(String s, byte[] bytes) { try { Properties properties = new Properties(); properties.load(new ByteArrayInputStream(bytes)); TraceContextOrSamplingFlags result = extractor.extract(properties); return result.context() != null ? tracer.joinSpan(result.context()) : tracer.newTrace(result.samplingFlags()); } catch (RuntimeException | IOException e) { return tracer.newTrace(); // return a new trace upon failure of any kind } } @Override public void close() { } } final class SpanSerializer implements Serializer<Span> { TraceContext.Injector<Properties> injector = Propagation.B3_STRING.injector(Properties::setProperty); @Override public void configure(Map<String, ?> map, boolean b) { } @Override public byte[] serialize(String s, Span span) { Properties properties = new Properties(); injector.inject(span.context(), properties); ByteArrayOutputStream out = new ByteArrayOutputStream(); try { properties.store(out, "zipkin"); } catch (IOException e) { throw new AssertionError(e); } return out.toByteArray(); } @Override public void close() { } }