/** * This is an example of a one-way or "messaging span", which is possible by use of the {@link * Span#flush()} operator. * *

Note that this uses a span as a kafka key, not because it is recommended, rather as it is * convenient for demonstration, since kafka doesn't have message properties. * *

See https://github.com/openzipkin/zipkin/issues/1243 */ public class KafkaExampleIT { @Rule public KafkaJunitRule kafka = new KafkaJunitRule(EphemeralKafkaBroker.create()); InMemoryStorage storage = new InMemoryStorage(); Tracing tracing = Tracing.newBuilder() .localEndpoint(Endpoint.builder().serviceName("producer").build()) .reporter(s -> storage.spanConsumer().accept(Collections.singletonList(s))) .build(); KafkaProducer producer; KafkaConsumer consumer; Endpoint kafkaEndpoint; @Before public void setup() { producer = kafka.helper() .createProducer(new SpanSerializer(tracing), new StringSerializer(), null); consumer = kafka.helper() .createConsumer(new SpanDeserializer(storage), new StringDeserializer(), null); kafkaEndpoint = Endpoint.builder().serviceName("kafka").port(kafka.helper().kafkaPort()).build(); } @Test public void startWithOneTracerAndStopWithAnother() throws Exception { String topic = "startWithOneTracerAndStopWithAnother"; consumer.subscribe(Collections.singletonList(topic)); Span span = tracing.tracer().newTrace().kind(Span.Kind.CLIENT).remoteEndpoint(kafkaEndpoint); producer.send(new ProducerRecord<>(topic, span, "foo")).get(); span.flush(); producer.close(); consumer.poll(500L).forEach(record -> { record.key() .name(record.value()) .kind(Span.Kind.SERVER) .remoteEndpoint(kafkaEndpoint) .flush(); }); consumer.close(); } } /** This class simulates a consumer being on a different process, by not sharing a tracer */ final class SpanDeserializer implements Deserializer { final Tracing tracing; final TraceContext.Extractor extractor; SpanDeserializer(InMemoryStorage storage) { tracing = Tracing.newBuilder() .localEndpoint(Endpoint.builder().serviceName("consumer").build()) .reporter(s -> storage.spanConsumer().accept(Collections.singletonList(s))) .build(); extractor = tracing.propagation().extractor(Properties::getProperty); } @Override public void configure(Map map, boolean b) { } /** Extract the span from the key or start a new trace if any problem. */ @Override public Span deserialize(String s, byte[] bytes) { try { Properties properties = new Properties(); properties.load(new ByteArrayInputStream(bytes)); // in Brave 4.3 this will be simplified to tracing.tracer().nextSpan(extractor, properties) TraceContextOrSamplingFlags result = extractor.extract(properties); return result.context() != null ? tracer.joinSpan(result.context()) : tracer.newTrace(result.samplingFlags()); } catch (RuntimeException | IOException e) { return tracer.newTrace(); // return a new trace upon failure of any kind } } @Override public void close() { } } final class SpanSerializer implements Serializer { final TraceContext.Injector injector; SpanSerializer(Tracing tracing) { injector = tracing.propagation().injector(Properties::setProperty); } @Override public void configure(Map map, boolean b) { } @Override public byte[] serialize(String s, Span span) { Properties properties = new Properties(); injector.inject(span.context(), properties); ByteArrayOutputStream out = new ByteArrayOutputStream(); try { properties.store(out, "zipkin"); } catch (IOException e) { throw new AssertionError(e); } return out.toByteArray(); } @Override public void close() { } }