Skip to content

Instantly share code, notes, and snippets.

@daschl
Last active November 23, 2019 16:19
Show Gist options
  • Select an option

  • Save daschl/e32e05e6abc31e450f67c23fe30c3826 to your computer and use it in GitHub Desktop.

Select an option

Save daschl/e32e05e6abc31e450f67c23fe30c3826 to your computer and use it in GitHub Desktop.

Revisions

  1. daschl revised this gist Nov 23, 2019. 3 changed files with 163 additions and 15 deletions.
    35 changes: 35 additions & 0 deletions BaseCouchbaseContainerTest.java
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,35 @@
    package org.testcontainers.couchbase;

    import com.couchbase.client.java.Bucket;
    import com.couchbase.client.java.Cluster;
    import com.couchbase.client.java.CouchbaseCluster;
    import com.couchbase.client.java.document.JsonDocument;
    import com.couchbase.client.java.document.json.JsonObject;
    import com.couchbase.client.java.env.CouchbaseEnvironment;
    import com.couchbase.client.java.env.DefaultCouchbaseEnvironment;
    import org.junit.ClassRule;
    import org.junit.Test;

    public class BaseCouchbaseContainerTest {

    @ClassRule
    public static CouchbaseContainer container = new CouchbaseContainer();

    @Test
    public void shouldInsertAndGet() {
    CouchbaseEnvironment env = DefaultCouchbaseEnvironment.builder()
    .bootstrapCarrierDirectPort(container.getKvPort())
    .bootstrapHttpDirectPort(container.getHttpPort())
    .build();
    Cluster cluster = CouchbaseCluster.create(env);
    cluster.authenticate(container.getUsername(), container.getPassword());
    Bucket bucket = cluster.openBucket("foobar");

    bucket.insert(JsonDocument.create("foo", JsonObject.empty()));
    System.err.println(bucket.get("foo"));

    cluster.disconnect();
    env.shutdown();
    }

    }
    65 changes: 65 additions & 0 deletions ClusterSpec.java
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,65 @@
    package org.testcontainers.couchbase;

    import java.util.EnumSet;
    import java.util.Set;

    public class ClusterSpec {

    private final Set<Service> enabledServices;
    private final int memoryQuota;

    public Set<Service> getEnabledServices() {
    return enabledServices;
    }

    public int getMemoryQuota() {
    return memoryQuota;
    }

    private ClusterSpec(Builder builder) {
    this.enabledServices = builder.enabledServices;
    this.memoryQuota = builder.memoryQuota;
    }

    public static ClusterSpec.Builder builder() {
    return new Builder();
    }

    public static ClusterSpec fromDefaults() {
    return builder().build();
    }

    public static class Builder {

    private Set<Service> enabledServices = EnumSet.of(Service.KV);
    private int memoryQuota = 300;

    public Builder enabledServices(Set<Service> enabled) {
    this.enabledServices = enabled;
    return this;
    }

    public Builder memoryQuota(int memoryQuota) {
    this.memoryQuota = memoryQuota;
    return this;
    }

    public ClusterSpec build() {
    return new ClusterSpec(this);
    }
    }

    enum Service {
    KV("kv");

    private String identifier;

    Service(String identifier) {
    this.identifier = identifier;
    }

    public String identifier() {
    return identifier;
    }
    }
    }
    78 changes: 63 additions & 15 deletions CouchbaseContainer.java
    Original file line number Diff line number Diff line change
    @@ -21,7 +21,8 @@
    import org.testcontainers.containers.Network;
    import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
    import org.testcontainers.images.builder.Transferable;
    import org.testcontainers.shaded.com.trilead.ssh2.crypto.Base64;
    import org.testcontainers.shaded.com.fasterxml.jackson.databind.JsonNode;
    import org.testcontainers.shaded.com.fasterxml.jackson.databind.ObjectMapper;
    import org.testcontainers.shaded.org.apache.commons.io.IOUtils;
    import org.testcontainers.utility.ThrowingFunction;

    @@ -32,13 +33,19 @@
    import java.net.URLEncoder;
    import java.nio.charset.StandardCharsets;
    import java.util.Arrays;
    import java.util.Base64;
    import java.util.Optional;
    import java.util.concurrent.atomic.AtomicInteger;
    import java.util.stream.Collectors;
    import java.util.stream.Stream;

    import static java.net.HttpURLConnection.HTTP_OK;


    public class CouchbaseContainer<SELF extends CouchbaseContainer<SELF>> extends GenericContainer<SELF> {

    public static final ObjectMapper MAPPER = new ObjectMapper();

    private static final AtomicInteger INSTANCE_COUNT = new AtomicInteger(0);

    private static final String STATIC_CONFIG = "/opt/couchbase/etc/couchbase/static_config";
    @@ -69,11 +76,20 @@ public CouchbaseContainer(final String imageName) {
    );
    }

    public int getKvPort() {
    return getMappedPort(Ports.MEMCACHED.getOriginalPort(containerPortOffset));
    }

    public int getHttpPort() {
    return getMappedPort(Ports.REST.getOriginalPort(containerPortOffset));
    }

    @Override
    protected void doStart() {
    super.doStart();
    try {
    initializeCluster();
    createBuckets();
    } catch (Exception ex) {
    throw new ContainerLaunchException("Could not launch couchbase container", ex);
    }
    @@ -83,8 +99,16 @@ protected void doStart() {
    String clusterUsername = "Administrator";
    String clusterPassword = "password";

    public String getUsername() {
    return clusterUsername;
    }

    public String getPassword() {
    return clusterPassword;
    }

    private void initializeCluster() throws Exception {
    urlBase = String.format("http://%s:%s", getContainerIpAddress(), getMappedPort(Ports.REST));
    urlBase = String.format("http://%s:%s", getContainerIpAddress(), getMappedPort(Ports.REST.getOriginalPort(containerPortOffset)));

    String poolURL = "/pools/default";
    String poolPayload = "memoryQuota="
    @@ -104,27 +128,47 @@ private void initializeCluster() throws Exception {
    callCouchbaseRestAPI(setupServicesURL, setupServiceContent);
    callCouchbaseRestAPI(webSettingsURL, webSettingsContent);

    // todo...
    //createNodeWaitStrategy().waitUntilReady(this);
    createNodeWaitStrategy().waitUntilReady(this);
    }

    private void createBuckets() {

    }

    private HttpWaitStrategy createNodeWaitStrategy() {
    return new HttpWaitStrategy()
    .forPath("/pools/default/")
    .withBasicCredentials(clusterUsername, clusterPassword)
    .forPort(Ports.REST.getOriginalPort(containerPortOffset))
    .forStatusCode(HTTP_OK)
    .forResponsePredicate(response -> {
    try {
    return Optional.of(MAPPER.readTree(response))
    .map(n -> n.at("/nodes/0/status"))
    .map(JsonNode::asText)
    .map("healthy"::equals)
    .orElse(false);
    } catch (IOException e) {
    //logger().error("Unable to parse response {}", response);
    return false;
    }
    });
    }


    public void callCouchbaseRestAPI(String url, String payload) throws IOException {
    /* String fullUrl = urlBase + url;
    String fullUrl = urlBase + url;
    HttpURLConnection httpConnection = (HttpURLConnection) ((new URL(fullUrl).openConnection()));
    httpConnection.setDoOutput(true);
    httpConnection.setRequestMethod("POST");
    httpConnection.setRequestProperty("Content-Type",
    "application/x-www-form-urlencoded");
    String encoded = Base64.encode((clusterUsername + ":" + clusterPassword).getBytes("UTF-8"));
    String encoded = Base64.getEncoder().encodeToString((clusterUsername + ":" + clusterPassword).getBytes(StandardCharsets.UTF_8));
    httpConnection.setRequestProperty("Authorization", "Basic " + encoded);
    DataOutputStream out = new DataOutputStream(httpConnection.getOutputStream());
    out.writeBytes(payload);
    out.flush();
    httpConnection.getResponseCode();*/
    }

    private int getMappedPort(final Ports ports) {
    return ports.getOriginalPort(containerPortOffset);
    httpConnection.getResponseCode();
    }

    @Override
    @@ -143,14 +187,14 @@ private void patchConfig(String configLocation, ThrowingFunction<String, String>

    private String addMappedPorts(String originalConfig) {
    String portConfig = Stream.of(Ports.values())
    .map(port -> String.format("{%s, %d}.", port.name, getMappedPort(port)))
    .map(port -> String.format("{%s, %d}.", port.name, port.getOriginalPort(containerPortOffset)))
    .collect(Collectors.joining("\n"));
    return String.format("%s\n%s", originalConfig, portConfig);
    }

    private String replaceCapiPort(String originalConfig) {
    return Arrays.stream(originalConfig.split("\n"))
    .map(s -> (s.matches("port\\s*=\\s*" + Ports.CAPI.getOriginalPort(containerPortOffset))) ? "port = " + getMappedPort(Ports.CAPI) : s)
    .map(s -> (s.matches("port\\s*=\\s*" + Ports.CAPI.getOriginalPort())) ? "port = " + Ports.CAPI.getOriginalPort(containerPortOffset) : s)
    .collect(Collectors.joining("\n"));
    }

    @@ -170,9 +214,9 @@ private enum Ports {
    CBAS_SSL("cbas_ssl_port", 18095),
    EVENTING_SSL("eventing_ssl_port", 18096);

    final String name;
    private final String name;

    final int originalPort;
    private final int originalPort;

    Ports(String name, int originalPort) {
    this.name = name;
    @@ -183,6 +227,10 @@ public int getOriginalPort(int offset) {
    return originalPort + offset;
    }

    public int getOriginalPort() {
    return originalPort;
    }

    }

    }
  2. daschl created this gist Nov 23, 2019.
    188 changes: 188 additions & 0 deletions CouchbaseContainer.java
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,188 @@
    /*
    * Copyright (c) 2019 Couchbase, Inc.
    *
    * Licensed under the Apache License, Version 2.0 (the "License");
    * you may not use this file except in compliance with the License.
    * You may obtain a copy of the License at
    *
    * http://www.apache.org/licenses/LICENSE-2.0
    *
    * Unless required by applicable law or agreed to in writing, software
    * distributed under the License is distributed on an "AS IS" BASIS,
    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    * See the License for the specific language governing permissions and
    * limitations under the License.
    */

    package org.testcontainers.couchbase;

    import org.testcontainers.containers.ContainerLaunchException;
    import org.testcontainers.containers.GenericContainer;
    import org.testcontainers.containers.Network;
    import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
    import org.testcontainers.images.builder.Transferable;
    import org.testcontainers.shaded.com.trilead.ssh2.crypto.Base64;
    import org.testcontainers.shaded.org.apache.commons.io.IOUtils;
    import org.testcontainers.utility.ThrowingFunction;

    import java.io.DataOutputStream;
    import java.io.IOException;
    import java.net.HttpURLConnection;
    import java.net.URL;
    import java.net.URLEncoder;
    import java.nio.charset.StandardCharsets;
    import java.util.Arrays;
    import java.util.concurrent.atomic.AtomicInteger;
    import java.util.stream.Collectors;
    import java.util.stream.Stream;


    public class CouchbaseContainer<SELF extends CouchbaseContainer<SELF>> extends GenericContainer<SELF> {

    private static final AtomicInteger INSTANCE_COUNT = new AtomicInteger(0);

    private static final String STATIC_CONFIG = "/opt/couchbase/etc/couchbase/static_config";
    private static final String CAPI_CONFIG = "/opt/couchbase/etc/couchdb/default.d/capi.ini";
    private static final String CACHED_CONFIG = "/opt/couchbase/var/lib/couchbase/config/config.dat";

    private static final String IMAGE = "couchbase/server";
    private static final String VERSION = "enterprise-6.0.3";

    private int containerPortOffset;

    private ClusterSpec clusterSpec = ClusterSpec.fromDefaults();

    public CouchbaseContainer() {
    this(IMAGE + ":" + VERSION);
    }

    public CouchbaseContainer(final String imageName) {
    super(imageName);
    containerPortOffset = INSTANCE_COUNT.getAndIncrement();

    withNetwork(Network.SHARED);
    Arrays.stream(Ports.values()).map(p -> p.getOriginalPort(containerPortOffset)).forEach(this::addExposedPort);
    setWaitStrategy(new HttpWaitStrategy()
    .forPort(Ports.REST.getOriginalPort(containerPortOffset))
    .forPath("/pools")
    .forStatusCode(200)
    );
    }

    @Override
    protected void doStart() {
    super.doStart();
    try {
    initializeCluster();
    } catch (Exception ex) {
    throw new ContainerLaunchException("Could not launch couchbase container", ex);
    }
    }

    String urlBase;
    String clusterUsername = "Administrator";
    String clusterPassword = "password";

    private void initializeCluster() throws Exception {
    urlBase = String.format("http://%s:%s", getContainerIpAddress(), getMappedPort(Ports.REST));

    String poolURL = "/pools/default";
    String poolPayload = "memoryQuota="
    + URLEncoder.encode(Integer.toString(clusterSpec.getMemoryQuota()), "UTF-8");

    String setupServicesURL = "/node/controller/setupServices";
    StringBuilder servicePayloadBuilder = new StringBuilder();
    for (ClusterSpec.Service service : clusterSpec.getEnabledServices()) {
    servicePayloadBuilder.append(service.identifier()).append(",");
    }
    String setupServiceContent = "services=" + URLEncoder.encode(servicePayloadBuilder.toString(), "UTF-8");

    String webSettingsURL = "/settings/web";
    String webSettingsContent = "username=" + URLEncoder.encode(clusterUsername, "UTF-8") + "&password=" + URLEncoder.encode(clusterPassword, "UTF-8") + "&port=8091";

    callCouchbaseRestAPI(poolURL, poolPayload);
    callCouchbaseRestAPI(setupServicesURL, setupServiceContent);
    callCouchbaseRestAPI(webSettingsURL, webSettingsContent);

    // todo...
    //createNodeWaitStrategy().waitUntilReady(this);
    }

    public void callCouchbaseRestAPI(String url, String payload) throws IOException {
    /* String fullUrl = urlBase + url;
    HttpURLConnection httpConnection = (HttpURLConnection) ((new URL(fullUrl).openConnection()));
    httpConnection.setDoOutput(true);
    httpConnection.setRequestMethod("POST");
    httpConnection.setRequestProperty("Content-Type",
    "application/x-www-form-urlencoded");
    String encoded = Base64.encode((clusterUsername + ":" + clusterPassword).getBytes("UTF-8"));
    httpConnection.setRequestProperty("Authorization", "Basic " + encoded);
    DataOutputStream out = new DataOutputStream(httpConnection.getOutputStream());
    out.writeBytes(payload);
    out.flush();
    httpConnection.getResponseCode();*/
    }

    private int getMappedPort(final Ports ports) {
    return ports.getOriginalPort(containerPortOffset);
    }

    @Override
    protected void containerIsCreated(String containerId) {
    patchConfig(STATIC_CONFIG, this::addMappedPorts);
    // capi needs a special configuration, see https://developer.couchbase.com/documentation/server/current/install/install-ports.html
    patchConfig(CAPI_CONFIG, this::replaceCapiPort);
    copyFileToContainer(Transferable.of(new byte[]{}), CACHED_CONFIG);
    }

    private void patchConfig(String configLocation, ThrowingFunction<String, String> patchFunction) {
    String patchedConfig = copyFileFromContainer(configLocation,
    inputStream -> patchFunction.apply(IOUtils.toString(inputStream, StandardCharsets.UTF_8)));
    copyFileToContainer(Transferable.of(patchedConfig.getBytes(StandardCharsets.UTF_8)), configLocation);
    }

    private String addMappedPorts(String originalConfig) {
    String portConfig = Stream.of(Ports.values())
    .map(port -> String.format("{%s, %d}.", port.name, getMappedPort(port)))
    .collect(Collectors.joining("\n"));
    return String.format("%s\n%s", originalConfig, portConfig);
    }

    private String replaceCapiPort(String originalConfig) {
    return Arrays.stream(originalConfig.split("\n"))
    .map(s -> (s.matches("port\\s*=\\s*" + Ports.CAPI.getOriginalPort(containerPortOffset))) ? "port = " + getMappedPort(Ports.CAPI) : s)
    .collect(Collectors.joining("\n"));
    }

    private enum Ports {
    REST("rest_port", 8091),
    CAPI("capi_port", 8092),
    QUERY("query_port", 8093),
    FTS("fts_http_port", 8094),
    CBAS("cbas_http_port", 8095),
    EVENTING("eventing_http_port", 8096),
    MEMCACHED_SSL("memcached_ssl_port", 11207),
    MEMCACHED("memcached_port", 11210),
    REST_SSL("ssl_rest_port", 18091),
    CAPI_SSL("ssl_capi_port", 18092),
    QUERY_SSL("ssl_query_port", 18093),
    FTS_SSL("fts_ssl_port", 18094),
    CBAS_SSL("cbas_ssl_port", 18095),
    EVENTING_SSL("eventing_ssl_port", 18096);

    final String name;

    final int originalPort;

    Ports(String name, int originalPort) {
    this.name = name;
    this.originalPort = originalPort;
    }

    public int getOriginalPort(int offset) {
    return originalPort + offset;
    }

    }

    }