Created
August 12, 2015 06:06
-
-
Save marsprobe/81f0045fb64e4c3590f0 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package com.linkedin.camus.etl.kafka.coders; | |
| import com.google.gson.JsonObject; | |
| import com.linkedin.camus.coders.CamusWrapper; | |
| import com.linkedin.camus.coders.Message; | |
| import com.linkedin.camus.coders.MessageDecoder; | |
| import org.apache.log4j.Logger; | |
| import java.io.UnsupportedEncodingException; | |
| import java.util.Properties; | |
| /** | |
| * 转换Kafka中的Payload为CSV String(即原来就是String) | |
| * | |
| * - MessageDecoder class that will convert the payload into a String object, | |
| * - System.currentTimeMillis() will be used to set CamusWrapper's | |
| * timestamp property | |
| * @todo System.currentTimeMillis的返回如何划分partitions | |
| * @todo 为何使用byte[],而不是Message message? | |
| */ | |
| public class CSVStringMessageDecoder extends MessageDecoder<Message, String> { | |
| private static final Logger log = Logger.getLogger(CSVStringMessageDecoder.class); | |
| @Override | |
| public void init(Properties props, String topicName) { | |
| this.props = props; | |
| this.topicName = topicName; | |
| } | |
| @Override | |
| public CamusWrapper<String> decode(Message message) { | |
| long timestamp = 0; | |
| String payloadString; | |
| try { | |
| payloadString = new String(message.getPayload(), "UTF-8"); | |
| } catch (UnsupportedEncodingException e) { | |
| log.error("Unable to load UTF-8 encoding, falling back to system default", e); | |
| payloadString = new String(message.getPayload()); | |
| } | |
| timestamp = System.currentTimeMillis(); | |
| return new CamusWrapper<String>(payloadString, timestamp); | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment