package com.linkedin.camus.etl.kafka.coders; import com.google.gson.JsonObject; import com.linkedin.camus.coders.CamusWrapper; import com.linkedin.camus.coders.Message; import com.linkedin.camus.coders.MessageDecoder; import org.apache.log4j.Logger; import java.io.UnsupportedEncodingException; import java.util.Properties; /** * 转换Kafka中的Payload为CSV String(即原来就是String) * * - MessageDecoder class that will convert the payload into a String object, * - System.currentTimeMillis() will be used to set CamusWrapper's * timestamp property * @todo System.currentTimeMillis的返回如何划分partitions * @todo 为何使用byte[],而不是Message message? */ public class CSVStringMessageDecoder extends MessageDecoder { private static final Logger log = Logger.getLogger(CSVStringMessageDecoder.class); @Override public void init(Properties props, String topicName) { this.props = props; this.topicName = topicName; } @Override public CamusWrapper decode(Message message) { long timestamp = 0; String payloadString; try { payloadString = new String(message.getPayload(), "UTF-8"); } catch (UnsupportedEncodingException e) { log.error("Unable to load UTF-8 encoding, falling back to system default", e); payloadString = new String(message.getPayload()); } timestamp = System.currentTimeMillis(); return new CamusWrapper(payloadString, timestamp); } }