Last active
January 31, 2024 09:47
-
-
Save jprante/b53e71aa55935ca35ba2dff5667f0821 to your computer and use it in GitHub Desktop.
Revisions
-
jprante revised this gist
Jan 31, 2024 . 1 changed file with 10 additions and 5 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -4,6 +4,8 @@ import java.io.InputStreamReader; import java.nio.file.Files; import java.nio.file.Path; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -29,9 +31,10 @@ public class OpenAlexAwsClientTest { private final AtomicLong counter = new AtomicLong(); @Test public void openAlexFiles() throws ExecutionException, InterruptedException, TimeoutException { AnonymousCredentialsProvider anonymousCredentialsProvider = AnonymousCredentialsProvider.create(); String bucket = "openalex"; List<S3Object> s3Objects = new CopyOnWriteArrayList<>(); try (S3AsyncClient s3Client = S3AsyncClient.builder() .region(Region.US_EAST_1) .credentialsProvider(anonymousCredentialsProvider) @@ -41,11 +44,12 @@ public void downloadOpenAlexFiles() throws ExecutionException, InterruptedExcept .build(); ListObjectsV2Publisher listObjectsV2Publisher = s3Client.listObjectsV2Paginator(listObjectsV2Request); listObjectsV2Publisher.contents().subscribe(s3Objects::add).get(1L, TimeUnit.MINUTES); logger.log(Level.INFO, "found " + s3Objects.size() + " S3 objects in bucket " + bucket); for (S3Object s3Object : s3Objects) { log(s3Client, s3Object, bucket); } } } private void log(S3AsyncClient s3Client, S3Object s3Object, String bucket) { @@ -87,6 +91,7 @@ private void download(S3AsyncClient s3Client, S3Object s3Object, String bucket) } private void processPath(Path path) throws IOException { // unpack and log each line try (BufferedReader reader = new BufferedReader(new InputStreamReader( new GZIPInputStream(Files.newInputStream(path))))) { reader.lines().forEach(line -> logger.log(Level.INFO, "line = " + line)); -
jprante revised this gist
Jan 31, 2024 . 1 changed file with 43 additions and 20 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -4,79 +4,102 @@ import java.io.InputStreamReader; import java.nio.file.Files; import java.nio.file.Path; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Level; import java.util.logging.Logger; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.zip.GZIPInputStream; import org.junit.jupiter.api.Test; import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.model.GetObjectRequest; import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; import software.amazon.awssdk.services.s3.model.S3Object; import software.amazon.awssdk.services.s3.paginators.ListObjectsV2Publisher; public class OpenAlexAwsClientTest { private static final Logger logger = Logger.getLogger(OpenAlexAwsClientTest.class.getName()); private final AtomicLong counter = new AtomicLong(); @Test public void downloadOpenAlexFiles() throws ExecutionException, InterruptedException, TimeoutException { AnonymousCredentialsProvider anonymousCredentialsProvider = AnonymousCredentialsProvider.create(); String bucket = "openalex"; try (S3AsyncClient s3Client = S3AsyncClient.builder() .region(Region.US_EAST_1) .credentialsProvider(anonymousCredentialsProvider) .build()) { ListObjectsV2Request listObjectsV2Request = ListObjectsV2Request.builder() .bucket(bucket) .build(); ListObjectsV2Publisher listObjectsV2Publisher = s3Client.listObjectsV2Paginator(listObjectsV2Request); listObjectsV2Publisher.contents().subscribe(s3Object -> log(s3Client, s3Object, bucket) ).get(1L, TimeUnit.MINUTES); } logger.log(Level.INFO, "found " + counter.get() + " S3 objects in bucket " + bucket); } private void log(S3AsyncClient s3Client, S3Object s3Object, String bucket) { String regex = "data/works/.*?.gz"; Pattern pattern = Pattern.compile(regex); Matcher matcher = pattern.matcher(s3Object.key()); if (matcher.matches()) { logger.log(Level.INFO, "key = " + s3Object.key() + " size = " + s3Object.size() + " last modified = " + s3Object.lastModified()); counter.incrementAndGet(); } } private void download(S3AsyncClient s3Client, S3Object s3Object, String bucket) { Path path = null; try { path = Files.createTempFile(bucket + "_", ""); // in this example, we download only files from data/works with gz suffix String regex = "data/works/.*?.gz"; Pattern pattern = Pattern.compile(regex); Matcher matcher = pattern.matcher(s3Object.key()); if (matcher.matches()) { GetObjectRequest getObjectRequest = GetObjectRequest .builder() .bucket(bucket) .key(s3Object.key()) .build(); logger.log(Level.INFO, "downloading " + s3Object.key()); s3Client.getObject(getObjectRequest, path).get(1L, TimeUnit.HOURS); logger.log(Level.INFO, "completed " + s3Object.key()); processPath(path); } } catch (IOException | InterruptedException | ExecutionException | TimeoutException e) { logger.log(Level.SEVERE, e.getMessage(), e); } finally { deletePath(path); } } private void processPath(Path path) throws IOException { try (BufferedReader reader = new BufferedReader(new InputStreamReader( new GZIPInputStream(Files.newInputStream(path))))) { reader.lines().forEach(line -> logger.log(Level.INFO, "line = " + line)); } } private void deletePath(Path path) { if (path != null) { try { Files.delete(path); } catch (IOException e) { logger.log(Level.WARNING, "unable to delete " + path); } } } } -
jprante created this gist
Jan 30, 2024 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,82 @@ import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.nio.file.Files; import java.nio.file.Path; import java.util.Comparator; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.logging.Level; import java.util.logging.Logger; import java.util.zip.GZIPInputStream; import org.junit.jupiter.api.Test; import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.model.GetObjectRequest; import software.amazon.awssdk.services.s3.model.ListObjectsRequest; import software.amazon.awssdk.services.s3.model.S3Object; public class OpenAlexAwsClientTest { private static final Logger logger = Logger.getLogger(OpenAlexAwsClientTest.class.getName()); @Test public void downloadAllOpenAlexFiles() throws ExecutionException, InterruptedException, TimeoutException { AnonymousCredentialsProvider anonymousCredentialsProvider = AnonymousCredentialsProvider.create(); String bucket = "openalex"; try (S3AsyncClient s3Client = S3AsyncClient.builder() .region(Region.US_EAST_1) .credentialsProvider(anonymousCredentialsProvider) .build()) { ListObjectsRequest listObjectsRequest = ListObjectsRequest.builder() .bucket(bucket) .build(); s3Client.listObjects(listObjectsRequest).get(1L, TimeUnit.MINUTES) .contents() .stream() .sorted(Comparator.comparing(S3Object::lastModified)) .forEach(s3Object -> downloadAndProcess(s3Client, s3Object, bucket)); } } private void downloadAndProcess(S3AsyncClient s3Client, S3Object s3Object, String bucket) { try { logger.log(Level.INFO, "key = " + s3Object.key() + " size = " + s3Object.size() + " owner = " + s3Object.owner() + " last modified = " + s3Object.lastModified()); // in this example, we download only files from data/works with gz suffix if (s3Object.key().startsWith("data/works/") && s3Object.key().endsWith(".gz")) { GetObjectRequest getObjectRequest = GetObjectRequest .builder() .bucket(bucket) .key(s3Object.key()) .build(); logger.log(Level.INFO, "downloading " + s3Object.key()); Path tempPath = Files.createTempFile("openalex_", ".json"); s3Client.getObject(getObjectRequest, tempPath).get(1L, TimeUnit.HOURS); logger.log(Level.INFO, "completed " + s3Object.key()); deletePath(processPath(tempPath)); } } catch (IOException | InterruptedException | ExecutionException | TimeoutException e) { logger.log(Level.SEVERE, e.getMessage(), e); } } private Path processPath(Path path) throws IOException { try (BufferedReader reader = new BufferedReader(new InputStreamReader( new GZIPInputStream(Files.newInputStream(path))))) { reader.lines().forEach(line -> logger.log(Level.INFO, "line = " + line)); } return path; } private void deletePath(Path path) throws IOException { if (path != null) { Files.delete(path); } } }