import tarfile import apache_beam as beam from apache_beam.io import fileio from apache_beam.io.filesystems import FileSystems INPUT = '/home/pabloem/codes/test-tar/data/*' OUTPUT_DIR = '/home/pabloem/codes/test-tar/data/scratchspace/' # 1MB buffers BUFFER_SIZE = 1024 * 1024 def _print_and_return(x): print(x) return x def _extract_tar_archive(archive): tf = tarfile.open(mode='r', fileobj=archive.open()) for tarinfo in tf: if not tarinfo.isreg(): continue remote_file_name = FileSystems.join(OUTPUT_DIR, tarinfo.name) remote_file = FileSystems.create(remote_file_name) extracting_file = tf.extractfile(tarinfo) while True: data = extracting_file.read(BUFFER_SIZE) if not data: break remote_file.write(data) extracting_file.close() remote_file.flush() remote_file.close() yield remote_file_name tf.close() def run(): with beam.Pipeline() as p: tarfiles = (p | beam.Create([INPUT]) | fileio.MatchAll() | fileio.ReadMatches() | beam.FlatMap(_extract_tar_archive) ) if __name__ == '__main__': run()