Skip to content

Instantly share code, notes, and snippets.

@viggin543
Created February 22, 2025 13:34
Show Gist options
  • Select an option

  • Save viggin543/16bc1fa683fb0f022e9ab5ee179d7c08 to your computer and use it in GitHub Desktop.

Select an option

Save viggin543/16bc1fa683fb0f022e9ab5ee179d7c08 to your computer and use it in GitHub Desktop.

Revisions

  1. viggin543 renamed this gist Feb 22, 2025. 1 changed file with 0 additions and 0 deletions.
    File renamed without changes.
  2. viggin543 created this gist Feb 22, 2025.
    109 changes: 109 additions & 0 deletions gistfile1.txt
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,109 @@
    /*
    * Copyright (c) 2021 Snowflake Computing Inc. All rights reserved.
    */

    import net.snowflake.ingest.streaming.*;

    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.Properties;

    /**
    * Example on how to use the Streaming Ingest client APIs.
    *
    * <p>Please read the README.md file for detailed steps
    * https://github.com/snowflakedb/snowflake-ingest-java/blob/master/README.md
    */
    public class SnowflakeStreamingIngestExample {
    // Please follow the example in profile_streaming.json.example to see the required properties, or
    // if you have already set up profile.json with Snowpipe before, all you need is to add the "role"
    // property. If the "role" is not specified, the default user role will be applied.

    public static void main(String[] args) throws Exception {
    Properties props = getConnectionProps();

    try (SnowflakeStreamingIngestClient client =
    SnowflakeStreamingIngestClientFactory.
    builder("VISUALLY_STREAM").
    setProperties(props).
    build()) {

    // Create an open channel request on table MY_TABLE, note that the corresponding
    // db/schema/table needs to be present
    // Example: create or replace table MY_TABLE(c1 number);

    SnowflakeStreamingIngestChannel channel1 = client.openChannel(
    OpenChannelRequest.builder("MY_CHANNEL4")
    .setDBName("JITSU")
    .setSchemaName("PUBLIC")
    .setTableName("FOO")
    .setOnErrorOption(
    OpenChannelRequest.OnErrorOption.CONTINUE)
    .build());

    final int totalRowsInTable = 2;
    for (int val = 0; val < totalRowsInTable; val++) {
    Map<String, Object> row = new HashMap<>();

    row.put("USER_ANONYMOUS_ID", "testingeststream" + val);
    row.put("ALIAS", "SNOWSTREAM_TEST");
    row.put("DOC_PATH", "/test");

    // Insert the row with the current offset_token
    InsertValidationResponse response = channel1.insertRow(row, String.valueOf(val));
    if (response.hasErrors()) {
    // Simply throw if there is an exception, or you can do whatever you want with the
    // erroneous row
    throw response.getInsertErrors().get(0).getException();
    }
    }

    // If needed, you can check the offset_token registered in Snowflake to make sure everything
    // is committed
    final int expectedOffsetTokenInSnowflake = totalRowsInTable - 1; // 0 based offset_token
    final int maxRetries = 10;
    int retryCount = 0;

    do {
    String offsetTokenFromSnowflake = channel1.getLatestCommittedOffsetToken();
    if (offsetTokenFromSnowflake != null
    && offsetTokenFromSnowflake.equals(String.valueOf(expectedOffsetTokenInSnowflake))) {
    System.out.println("SUCCESSFULLY inserted " + totalRowsInTable + " rows");
    break;
    }
    retryCount++;
    } while (retryCount < maxRetries);

    // Close the channel, the function internally will make sure everything is committed (or throw
    // an exception if there is any issue)
    channel1.close().get();
    }
    }

    private static Properties getConnectionProps() throws IOException {
    Properties props = new Properties();
    props.put("user", "xxx");
    props.put("port", "443");
    props.put("name", "stream");
    props.put("scheme", "https");
    props.put("ssl", "on");
    props.put("warehouse", "COMPUTE_WH");
    props.put("role", "SYSADMIN");
    props.put("database", "JISTU");
    props.put("connect_string", "jdbc:snowflake://xxx.us-central1.gcp.snowflakecomputing.com:443?warehouse=COMPUTE_WH");
    props.put("host", "xxx.us-central1.gcp.snowflakecomputing.com");
    props.put("schema", "PUBLIC");
    props.put("url", "https://xxx.us-central1.gcp.snowflakecomputing.com:443");
    props.put("account", "xxx");
    props.put("private_key", xxx
    -----BEGIN PRIVATE KEY-----
    ...
    -----END PRIVATE KEY-----
    """);
    return props;
    }



    }