/** * This is an example of a one-way or "messaging span", which is possible by use of the {@link * Span#flush()} operator. * *
Note that this uses a span as a kafka key, not because it is recommended, rather as it is * convenient for demonstration, since kafka doesn't have message properties. * *
See https://github.com/openzipkin/zipkin/issues/1243
*/
public class KafkaExampleIT {
@Rule public KafkaJunitRule kafka = new KafkaJunitRule(EphemeralKafkaBroker.create());
InMemoryStorage storage = new InMemoryStorage();
Tracing tracing = Tracing.newBuilder()
.localEndpoint(Endpoint.builder().serviceName("producer").build())
.reporter(s -> storage.spanConsumer().accept(Collections.singletonList(s)))
.build();
KafkaProducer producer;
KafkaConsumer consumer;
Endpoint kafkaEndpoint;
@Before public void setup() {
producer = kafka.helper()
.createProducer(new SpanSerializer(tracing), new StringSerializer(), null);
consumer = kafka.helper()
.createConsumer(new SpanDeserializer(storage), new StringDeserializer(), null);
kafkaEndpoint =
Endpoint.builder().serviceName("kafka").port(kafka.helper().kafkaPort()).build();
}
@Test
public void startWithOneTracerAndStopWithAnother() throws Exception {
String topic = "startWithOneTracerAndStopWithAnother";
consumer.subscribe(Collections.singletonList(topic));
Span span = tracing.tracer().newTrace().kind(Span.Kind.CLIENT).remoteEndpoint(kafkaEndpoint);
producer.send(new ProducerRecord<>(topic, span, "foo")).get();
span.flush();
producer.close();
consumer.poll(500L).forEach(record -> {
record.key()
.name(record.value())
.kind(Span.Kind.SERVER)
.remoteEndpoint(kafkaEndpoint)
.flush();
});
consumer.close();
}
}
/** This class simulates a consumer being on a different process, by not sharing a tracer */
final class SpanDeserializer implements Deserializer {
final Tracing tracing;
final TraceContext.Extractor