Skip to content

Instantly share code, notes, and snippets.

@jprante
Last active January 31, 2024 09:47
Show Gist options
  • Select an option

  • Save jprante/b53e71aa55935ca35ba2dff5667f0821 to your computer and use it in GitHub Desktop.

Select an option

Save jprante/b53e71aa55935ca35ba2dff5667f0821 to your computer and use it in GitHub Desktop.

Revisions

  1. jprante revised this gist Jan 31, 2024. 1 changed file with 10 additions and 5 deletions.
    15 changes: 10 additions & 5 deletions OpenAlexAwsClientTest.java
    Original file line number Diff line number Diff line change
    @@ -4,6 +4,8 @@
    import java.io.InputStreamReader;
    import java.nio.file.Files;
    import java.nio.file.Path;
    import java.util.List;
    import java.util.concurrent.CopyOnWriteArrayList;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.TimeoutException;
    @@ -29,9 +31,10 @@ public class OpenAlexAwsClientTest {
    private final AtomicLong counter = new AtomicLong();

    @Test
    public void downloadOpenAlexFiles() throws ExecutionException, InterruptedException, TimeoutException {
    public void openAlexFiles() throws ExecutionException, InterruptedException, TimeoutException {
    AnonymousCredentialsProvider anonymousCredentialsProvider = AnonymousCredentialsProvider.create();
    String bucket = "openalex";
    List<S3Object> s3Objects = new CopyOnWriteArrayList<>();
    try (S3AsyncClient s3Client = S3AsyncClient.builder()
    .region(Region.US_EAST_1)
    .credentialsProvider(anonymousCredentialsProvider)
    @@ -41,11 +44,12 @@ public void downloadOpenAlexFiles() throws ExecutionException, InterruptedExcept
    .build();
    ListObjectsV2Publisher listObjectsV2Publisher =
    s3Client.listObjectsV2Paginator(listObjectsV2Request);
    listObjectsV2Publisher.contents().subscribe(s3Object ->
    log(s3Client, s3Object, bucket)
    ).get(1L, TimeUnit.MINUTES);
    listObjectsV2Publisher.contents().subscribe(s3Objects::add).get(1L, TimeUnit.MINUTES);
    logger.log(Level.INFO, "found " + s3Objects.size() + " S3 objects in bucket " + bucket);
    for (S3Object s3Object : s3Objects) {
    log(s3Client, s3Object, bucket);
    }
    }
    logger.log(Level.INFO, "found " + counter.get() + " S3 objects in bucket " + bucket);
    }

    private void log(S3AsyncClient s3Client, S3Object s3Object, String bucket) {
    @@ -87,6 +91,7 @@ private void download(S3AsyncClient s3Client, S3Object s3Object, String bucket)
    }

    private void processPath(Path path) throws IOException {
    // unpack and log each line
    try (BufferedReader reader = new BufferedReader(new InputStreamReader(
    new GZIPInputStream(Files.newInputStream(path))))) {
    reader.lines().forEach(line -> logger.log(Level.INFO, "line = " + line));
  2. jprante revised this gist Jan 31, 2024. 1 changed file with 43 additions and 20 deletions.
    63 changes: 43 additions & 20 deletions OpenAlexAwsClientTest.java
    Original file line number Diff line number Diff line change
    @@ -4,79 +4,102 @@
    import java.io.InputStreamReader;
    import java.nio.file.Files;
    import java.nio.file.Path;
    import java.util.Comparator;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.TimeoutException;
    import java.util.concurrent.atomic.AtomicLong;
    import java.util.logging.Level;
    import java.util.logging.Logger;
    import java.util.regex.Matcher;
    import java.util.regex.Pattern;
    import java.util.zip.GZIPInputStream;
    import org.junit.jupiter.api.Test;
    import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider;
    import software.amazon.awssdk.regions.Region;
    import software.amazon.awssdk.services.s3.S3AsyncClient;
    import software.amazon.awssdk.services.s3.model.GetObjectRequest;
    import software.amazon.awssdk.services.s3.model.ListObjectsRequest;
    import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
    import software.amazon.awssdk.services.s3.model.S3Object;
    import software.amazon.awssdk.services.s3.paginators.ListObjectsV2Publisher;

    public class OpenAlexAwsClientTest {

    private static final Logger logger = Logger.getLogger(OpenAlexAwsClientTest.class.getName());

    private final AtomicLong counter = new AtomicLong();

    @Test
    public void downloadAllOpenAlexFiles() throws ExecutionException, InterruptedException, TimeoutException {
    public void downloadOpenAlexFiles() throws ExecutionException, InterruptedException, TimeoutException {
    AnonymousCredentialsProvider anonymousCredentialsProvider = AnonymousCredentialsProvider.create();
    String bucket = "openalex";
    try (S3AsyncClient s3Client = S3AsyncClient.builder()
    .region(Region.US_EAST_1)
    .credentialsProvider(anonymousCredentialsProvider)
    .build()) {
    ListObjectsRequest listObjectsRequest = ListObjectsRequest.builder()
    ListObjectsV2Request listObjectsV2Request = ListObjectsV2Request.builder()
    .bucket(bucket)
    .build();
    s3Client.listObjects(listObjectsRequest).get(1L, TimeUnit.MINUTES)
    .contents()
    .stream()
    .sorted(Comparator.comparing(S3Object::lastModified))
    .forEach(s3Object -> downloadAndProcess(s3Client, s3Object, bucket));
    ListObjectsV2Publisher listObjectsV2Publisher =
    s3Client.listObjectsV2Paginator(listObjectsV2Request);
    listObjectsV2Publisher.contents().subscribe(s3Object ->
    log(s3Client, s3Object, bucket)
    ).get(1L, TimeUnit.MINUTES);
    }
    logger.log(Level.INFO, "found " + counter.get() + " S3 objects in bucket " + bucket);
    }

    private void downloadAndProcess(S3AsyncClient s3Client, S3Object s3Object, String bucket) {
    try {
    private void log(S3AsyncClient s3Client, S3Object s3Object, String bucket) {
    String regex = "data/works/.*?.gz";
    Pattern pattern = Pattern.compile(regex);
    Matcher matcher = pattern.matcher(s3Object.key());
    if (matcher.matches()) {
    logger.log(Level.INFO, "key = " + s3Object.key() +
    " size = " + s3Object.size() +
    " owner = " + s3Object.owner() +
    " last modified = " + s3Object.lastModified());
    counter.incrementAndGet();
    }
    }

    private void download(S3AsyncClient s3Client, S3Object s3Object, String bucket) {
    Path path = null;
    try {
    path = Files.createTempFile(bucket + "_", "");
    // in this example, we download only files from data/works with gz suffix
    if (s3Object.key().startsWith("data/works/") && s3Object.key().endsWith(".gz")) {
    String regex = "data/works/.*?.gz";
    Pattern pattern = Pattern.compile(regex);
    Matcher matcher = pattern.matcher(s3Object.key());
    if (matcher.matches()) {
    GetObjectRequest getObjectRequest = GetObjectRequest
    .builder()
    .bucket(bucket)
    .key(s3Object.key())
    .build();
    logger.log(Level.INFO, "downloading " + s3Object.key());
    Path tempPath = Files.createTempFile("openalex_", ".json");
    s3Client.getObject(getObjectRequest, tempPath).get(1L, TimeUnit.HOURS);
    s3Client.getObject(getObjectRequest, path).get(1L, TimeUnit.HOURS);
    logger.log(Level.INFO, "completed " + s3Object.key());
    deletePath(processPath(tempPath));
    processPath(path);
    }
    } catch (IOException | InterruptedException | ExecutionException | TimeoutException e) {
    logger.log(Level.SEVERE, e.getMessage(), e);
    } finally {
    deletePath(path);
    }
    }

    private Path processPath(Path path) throws IOException {
    private void processPath(Path path) throws IOException {
    try (BufferedReader reader = new BufferedReader(new InputStreamReader(
    new GZIPInputStream(Files.newInputStream(path))))) {
    reader.lines().forEach(line -> logger.log(Level.INFO, "line = " + line));
    }
    return path;
    }

    private void deletePath(Path path) throws IOException {
    private void deletePath(Path path) {
    if (path != null) {
    Files.delete(path);
    try {
    Files.delete(path);
    } catch (IOException e) {
    logger.log(Level.WARNING, "unable to delete " + path);
    }
    }
    }
    }
  3. jprante created this gist Jan 30, 2024.
    82 changes: 82 additions & 0 deletions OpenAlexAwsClientTest.java
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,82 @@

    import java.io.BufferedReader;
    import java.io.IOException;
    import java.io.InputStreamReader;
    import java.nio.file.Files;
    import java.nio.file.Path;
    import java.util.Comparator;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.TimeoutException;
    import java.util.logging.Level;
    import java.util.logging.Logger;
    import java.util.zip.GZIPInputStream;
    import org.junit.jupiter.api.Test;
    import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider;
    import software.amazon.awssdk.regions.Region;
    import software.amazon.awssdk.services.s3.S3AsyncClient;
    import software.amazon.awssdk.services.s3.model.GetObjectRequest;
    import software.amazon.awssdk.services.s3.model.ListObjectsRequest;
    import software.amazon.awssdk.services.s3.model.S3Object;

    public class OpenAlexAwsClientTest {

    private static final Logger logger = Logger.getLogger(OpenAlexAwsClientTest.class.getName());

    @Test
    public void downloadAllOpenAlexFiles() throws ExecutionException, InterruptedException, TimeoutException {
    AnonymousCredentialsProvider anonymousCredentialsProvider = AnonymousCredentialsProvider.create();
    String bucket = "openalex";
    try (S3AsyncClient s3Client = S3AsyncClient.builder()
    .region(Region.US_EAST_1)
    .credentialsProvider(anonymousCredentialsProvider)
    .build()) {
    ListObjectsRequest listObjectsRequest = ListObjectsRequest.builder()
    .bucket(bucket)
    .build();
    s3Client.listObjects(listObjectsRequest).get(1L, TimeUnit.MINUTES)
    .contents()
    .stream()
    .sorted(Comparator.comparing(S3Object::lastModified))
    .forEach(s3Object -> downloadAndProcess(s3Client, s3Object, bucket));
    }
    }

    private void downloadAndProcess(S3AsyncClient s3Client, S3Object s3Object, String bucket) {
    try {
    logger.log(Level.INFO, "key = " + s3Object.key() +
    " size = " + s3Object.size() +
    " owner = " + s3Object.owner() +
    " last modified = " + s3Object.lastModified());
    // in this example, we download only files from data/works with gz suffix
    if (s3Object.key().startsWith("data/works/") && s3Object.key().endsWith(".gz")) {
    GetObjectRequest getObjectRequest = GetObjectRequest
    .builder()
    .bucket(bucket)
    .key(s3Object.key())
    .build();
    logger.log(Level.INFO, "downloading " + s3Object.key());
    Path tempPath = Files.createTempFile("openalex_", ".json");
    s3Client.getObject(getObjectRequest, tempPath).get(1L, TimeUnit.HOURS);
    logger.log(Level.INFO, "completed " + s3Object.key());
    deletePath(processPath(tempPath));
    }
    } catch (IOException | InterruptedException | ExecutionException | TimeoutException e) {
    logger.log(Level.SEVERE, e.getMessage(), e);
    }
    }

    private Path processPath(Path path) throws IOException {
    try (BufferedReader reader = new BufferedReader(new InputStreamReader(
    new GZIPInputStream(Files.newInputStream(path))))) {
    reader.lines().forEach(line -> logger.log(Level.INFO, "line = " + line));
    }
    return path;
    }

    private void deletePath(Path path) throws IOException {
    if (path != null) {
    Files.delete(path);
    }
    }
    }