import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.nio.file.Files; import java.nio.file.Path; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Level; import java.util.logging.Logger; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.zip.GZIPInputStream; import org.junit.jupiter.api.Test; import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.model.GetObjectRequest; import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; import software.amazon.awssdk.services.s3.model.S3Object; import software.amazon.awssdk.services.s3.paginators.ListObjectsV2Publisher; public class OpenAlexAwsClientTest { private static final Logger logger = Logger.getLogger(OpenAlexAwsClientTest.class.getName()); private final AtomicLong counter = new AtomicLong(); @Test public void openAlexFiles() throws ExecutionException, InterruptedException, TimeoutException { AnonymousCredentialsProvider anonymousCredentialsProvider = AnonymousCredentialsProvider.create(); String bucket = "openalex"; List s3Objects = new CopyOnWriteArrayList<>(); try (S3AsyncClient s3Client = S3AsyncClient.builder() .region(Region.US_EAST_1) .credentialsProvider(anonymousCredentialsProvider) .build()) { ListObjectsV2Request listObjectsV2Request = ListObjectsV2Request.builder() .bucket(bucket) .build(); ListObjectsV2Publisher listObjectsV2Publisher = s3Client.listObjectsV2Paginator(listObjectsV2Request); listObjectsV2Publisher.contents().subscribe(s3Objects::add).get(1L, TimeUnit.MINUTES); logger.log(Level.INFO, "found " + s3Objects.size() + " S3 objects in bucket " + bucket); for (S3Object s3Object : s3Objects) { log(s3Client, s3Object, bucket); } } } private void log(S3AsyncClient s3Client, S3Object s3Object, String bucket) { String regex = "data/works/.*?.gz"; Pattern pattern = Pattern.compile(regex); Matcher matcher = pattern.matcher(s3Object.key()); if (matcher.matches()) { logger.log(Level.INFO, "key = " + s3Object.key() + " size = " + s3Object.size() + " last modified = " + s3Object.lastModified()); counter.incrementAndGet(); } } private void download(S3AsyncClient s3Client, S3Object s3Object, String bucket) { Path path = null; try { path = Files.createTempFile(bucket + "_", ""); // in this example, we download only files from data/works with gz suffix String regex = "data/works/.*?.gz"; Pattern pattern = Pattern.compile(regex); Matcher matcher = pattern.matcher(s3Object.key()); if (matcher.matches()) { GetObjectRequest getObjectRequest = GetObjectRequest .builder() .bucket(bucket) .key(s3Object.key()) .build(); logger.log(Level.INFO, "downloading " + s3Object.key()); s3Client.getObject(getObjectRequest, path).get(1L, TimeUnit.HOURS); logger.log(Level.INFO, "completed " + s3Object.key()); processPath(path); } } catch (IOException | InterruptedException | ExecutionException | TimeoutException e) { logger.log(Level.SEVERE, e.getMessage(), e); } finally { deletePath(path); } } private void processPath(Path path) throws IOException { // unpack and log each line try (BufferedReader reader = new BufferedReader(new InputStreamReader( new GZIPInputStream(Files.newInputStream(path))))) { reader.lines().forEach(line -> logger.log(Level.INFO, "line = " + line)); } } private void deletePath(Path path) { if (path != null) { try { Files.delete(path); } catch (IOException e) { logger.log(Level.WARNING, "unable to delete " + path); } } } }