import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.io.OutputStream; import java.io.Serializable; import java.util.Comparator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.io.serializer.Deserializer; import org.apache.hadoop.io.serializer.Serialization; import org.apache.hadoop.io.serializer.Serializer; import cascading.CascadingException; import cascading.tuple.Comparison; import cascading.tuple.Fields; import cascading.tuple.StreamComparator; import cascading.tuple.Tuple; import cascading.tuple.TupleEntry; import cascading.tuple.hadoop.SerializationToken; import cascading.tuple.hadoop.TupleSerialization; import cascading.tuple.hadoop.io.BufferedInputStream; import cascading.tuple.hadoop.io.TupleDeserializer; import cascading.tuple.hadoop.io.TupleSerializer; import com.google.common.base.Throwables; /** * Serialization implementation that is required for Hadoop {@link SequenceFile} serialization. */ @SerializationToken(tokens = { 223 }, classNames = { "cascading.tuple.TupleEntry" }) public class TupleEntrySerialization extends Configured implements Comparison, Serialization { /** * Delegate for recursively contained {@link Tuple} values. */ private TupleSerialization tupleSerialization; @Override public void setConf(Configuration conf) { super.setConf(conf); this.tupleSerialization = new TupleSerialization(conf); } @Override public boolean accept(Class c) { return TupleEntry.class.isAssignableFrom(c); } @Override public Serializer getSerializer(Class c) { final TupleSerializer tupleSerializer = (TupleSerializer) tupleSerialization.getSerializer(Tuple.class); return new TupleEntrySerializer(tupleSerializer); } @Override public Deserializer getDeserializer(Class c) { final TupleDeserializer tupleDeserializer = (TupleDeserializer) tupleSerialization.getDeserializer(Tuple.class); return new TupleEntryDeserializer(tupleDeserializer); } @Override public Comparator getComparator(Class arg0) { return new TupleEntryComparator(); } /** * {@code TupleEntry} serializer implementation. */ public static class TupleEntrySerializer implements Serializer { private DataOutputStream out; private final TupleSerializer delegate; public TupleEntrySerializer(TupleSerializer tupleSerializer) { this.delegate = tupleSerializer; } @Override public void open(OutputStream out) throws IOException { delegate.open(out); if(out instanceof DataOutputStream) { this.out = (DataOutputStream) out; } else { this.out = new DataOutputStream(out); } } @Override public void serialize(TupleEntry t) throws IOException { ObjectOutputStream output = new ObjectOutputStream(out); // Creating a new stream each call seems to be required output.writeObject(t.getFields()); delegate.serialize(t.getTuple()); } @Override public void close() throws IOException { try { delegate.close(); } finally { out.close(); } } } /** * {@code TupleEntry} deserializer implementation. */ public static class TupleEntryDeserializer implements Deserializer { private DataInputStream in; private final TupleDeserializer delegate; public TupleEntryDeserializer(TupleDeserializer tupleDeserializer) { this.delegate = tupleDeserializer; } @Override public void open(InputStream in) throws IOException { delegate.open(in); if(in instanceof DataInputStream) { this.in = (DataInputStream) in; } else { this.in = new DataInputStream(in); } } @Override public TupleEntry deserialize(TupleEntry t) throws IOException { ObjectInputStream input = new ObjectInputStream(in); // Creating a new stream each call seems to be required TupleEntry tupleEntry = null; try { Fields fields = (Fields) input.readObject(); Tuple tuple = delegate.deserialize(null); tupleEntry = new TupleEntry(fields, tuple); } catch(ClassNotFoundException e) { Throwables.propagate(e); } return tupleEntry; } @Override public void close() throws IOException { try { delegate.close(); } finally { in.close(); } } } public static class TupleEntryComparator implements StreamComparator, Comparator, Serializable { @Override public int compare(TupleEntry lhs, TupleEntry rhs) { if(lhs == null) { return -1; } if(rhs == null) { return 1; } return 0; } @Override public int compare(BufferedInputStream lhsStream, BufferedInputStream rhsStream) { try { if(lhsStream == null && rhsStream == null) { return 0; } if(lhsStream == null) { return -1; } if(rhsStream == null) { return 1; } String lhsString = WritableUtils.readString(new DataInputStream(lhsStream)); String rhsString = WritableUtils.readString(new DataInputStream(rhsStream)); return lhsString.compareTo(rhsString); } catch(IOException exception) { throw new CascadingException(exception); } } } }