import static org.fest.assertions.api.Assertions.assertThat; import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import org.apache.hadoop.mapred.JobConf; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; import cascading.CascadingTestCase; import cascading.tuple.Fields; import cascading.tuple.Tuple; import cascading.tuple.TupleEntry; import cascading.tuple.hadoop.TupleSerialization; import cascading.tuple.hadoop.io.HadoopTupleInputStream; import cascading.tuple.hadoop.io.HadoopTupleOutputStream; import cascading.tuple.io.TupleInputStream; import cascading.tuple.io.TupleOutputStream; /** * Tests {@code TupleEntrySerialization}. */ public class TupleEntrySerializationTest extends CascadingTestCase { @Rule TemporaryFolder tmp = new TemporaryFolder(); @Test public void test_tuple_entry_serialization() throws IOException { // Setup TupleSerialization tupleSerialization = new TupleSerialization(createJobConf()); File file = tmp.newFile(); // @formatter:off TupleEntry testEntry = new TupleEntry( new Fields("x", "y", "z"), new Tuple( new TupleEntry( new Fields("n1", "n2"), new Tuple(4, 5) ), // x 2, // y 3 // z ) ); // @formatter:on // Output tuple state serialization Tuple outputTuple = writeTuple(tupleSerialization, file, testEntry); // Input for tuple state serialization Tuple inputTuple = readTuple(tupleSerialization, file); // Check the first item in the tuple is a TupleEntry object assertThat(inputTuple.getObject(0)).isInstanceOf(TupleEntry.class); assertThat(inputTuple.toString()).isEqualTo(outputTuple.toString()); } private Tuple writeTuple(TupleSerialization tupleSerialization, File file, TupleEntry tupleEntry) throws FileNotFoundException, IOException { TupleOutputStream output = new HadoopTupleOutputStream(new FileOutputStream(file, false), tupleSerialization.getElementWriter()); Tuple outputTuple = new Tuple(tupleEntry); output.writeTuple(outputTuple); output.close(); return outputTuple; } private Tuple readTuple(TupleSerialization tupleSerialization, File file) throws FileNotFoundException, IOException { TupleInputStream input = new HadoopTupleInputStream(new FileInputStream(file), tupleSerialization.getElementReader()); Tuple inputTuple = input.readTuple(); input.close(); return inputTuple; } private JobConf createJobConf() { JobConf jobConf = new JobConf(); jobConf.set("io.serializations", TupleEntrySerialization.class.getName()); return jobConf; } }