Skip to content

Instantly share code, notes, and snippets.

@keke1123
Last active April 9, 2023 08:37
Show Gist options
  • Save keke1123/7f130b62b1ea2c321aa7 to your computer and use it in GitHub Desktop.
Save keke1123/7f130b62b1ea2c321aa7 to your computer and use it in GitHub Desktop.

Revisions

  1. keke1123 revised this gist Jun 23, 2015. 1 changed file with 1 addition and 2 deletions.
    3 changes: 1 addition & 2 deletions ElasticsearchJavaExample
    Original file line number Diff line number Diff line change
    @@ -1,8 +1,7 @@
    /**
    * Elasticsearch Java API Example
    */
    import com.epozen.tortiletorpex.util.TtDummyRvMsgGenerate;
    import com.epozen.tortiletorpex.util.TtRvMsgParser;

    import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
    import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
    import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
  2. keke1123 revised this gist Jun 23, 2015. 1 changed file with 3 additions and 0 deletions.
    3 changes: 3 additions & 0 deletions ElasticsearchJavaExample
    Original file line number Diff line number Diff line change
    @@ -1,3 +1,6 @@
    /**
    * Elasticsearch Java API Example
    */
    import com.epozen.tortiletorpex.util.TtDummyRvMsgGenerate;
    import com.epozen.tortiletorpex.util.TtRvMsgParser;
    import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
  3. keke1123 created this gist Jun 23, 2015.
    458 changes: 458 additions & 0 deletions ElasticsearchJavaExample
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,458 @@
    import com.epozen.tortiletorpex.util.TtDummyRvMsgGenerate;
    import com.epozen.tortiletorpex.util.TtRvMsgParser;
    import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
    import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
    import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
    import org.elasticsearch.action.admin.indices.mapping.delete.DeleteMappingResponse;
    import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;
    import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsResponse;
    import org.elasticsearch.action.bulk.BulkProcessor;
    import org.elasticsearch.action.bulk.BulkRequest;
    import org.elasticsearch.action.bulk.BulkResponse;
    import org.elasticsearch.action.count.CountResponse;
    import org.elasticsearch.action.delete.DeleteResponse;
    import org.elasticsearch.action.get.GetResponse;
    import org.elasticsearch.action.index.IndexRequest;
    import org.elasticsearch.action.search.SearchResponse;
    import org.elasticsearch.action.search.SearchType;
    import org.elasticsearch.client.Client;
    import org.elasticsearch.client.transport.TransportClient;
    import org.elasticsearch.cluster.ClusterState;
    import org.elasticsearch.cluster.metadata.IndexMetaData;
    import org.elasticsearch.common.settings.ImmutableSettings;
    import org.elasticsearch.common.settings.Settings;
    import org.elasticsearch.common.transport.InetSocketTransportAddress;
    import org.elasticsearch.common.unit.ByteSizeUnit;
    import org.elasticsearch.common.unit.ByteSizeValue;
    import org.elasticsearch.common.unit.TimeValue;
    import org.elasticsearch.index.query.QueryBuilder;
    import org.elasticsearch.search.SearchHit;
    import org.elasticsearch.search.SearchHitField;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;

    import java.io.IOException;
    import java.util.*;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ForkJoinPool;
    import java.util.concurrent.TimeUnit;

    import static org.elasticsearch.index.query.QueryBuilders.*;

    /**
    * Created by SteveShin on 2015-05-08.
    */
    public class EsSample {

    Logger logger = LoggerFactory.getLogger(EsSample.class);

    static final ForkJoinPool forkJoinPool = new ForkJoinPool();
    static final int bytes = 1000;
    static String indexName = "size_sample";
    static String indexType = "sample";
    static int amount = 1000000;

    static final Map<String, Object> beforeSet = new HashMap<String, Object>(){
    {
    put("index.refresh_interval",-1);
    put("index.number_of_replicas",0);
    }
    };
    static final Map<String, Object> afterSet = new HashMap<String, Object>(){
    {
    put("index.refresh_interval","10s");
    put("index.number_of_replicas",0);
    }
    };
    //create Index(schema)
    public static void createIndex(Client client, String indexName) {
    System.out.println(indexName);
    IndicesExistsResponse res = client.admin().indices().prepareExists(indexName).execute().actionGet();
    if (!res.isExists()) {
    CreateIndexRequestBuilder createIndexRequestBuilder = client.admin().indices().prepareCreate(indexName);
    createIndexRequestBuilder.setSettings(beforeSet);
    CreateIndexResponse response = createIndexRequestBuilder.execute().actionGet();
    if(response.isAcknowledged()) {
    String mappingJson = "{" +
    "\"_all\":{\"enabled\":false}" +
    ",\"_source\":{\"enabled\":false}" +
    ",\"properties\":{" +
    "\"body\":{\"type\":\"string\",\"store\":true}" +
    ",\"head\":{\"type\":\"object\",\"store\":false}" +
    "}}";
    PutMappingResponse mappingResponse = client.admin().indices()
    .preparePutMapping(indexName)
    .setType(indexType)
    .setSource(mappingJson)
    .execute().actionGet();
    System.out.println("mapping... " + mappingResponse.isAcknowledged());
    }
    System.out.println("CREATE index: " + response.isAcknowledged());
    }
    }

    //delete document
    public static void deleteDocument(Client client, String indexName, String typeName, String documentId) {
    DeleteResponse response = client.prepareDelete(indexName, typeName, documentId)
    .execute()//.setOperationThreades(false)//run on this thread
    .actionGet();
    }

    //delete type
    public static void deleteType(Client client, String indexName, String typeName) {
    DeleteMappingResponse response = client.admin().indices().prepareDeleteMapping(indexName).setType(typeName).execute().actionGet();

    }

    public static List<String> pumpup(List<String> logs, int loop) {
    List<String> result = new ArrayList<String>();
    System.out.println("Loop:" + loop + ", total size: " + logs.size() * loop);
    for (int i = 0; i < loop; i++) {
    result.addAll(logs);
    }
    return result;
    }

    public static void putTest(Client client) throws IOException, InterruptedException, ExecutionException {
    //prepare Bulk Processor
    BulkProcessor bulkProcessor = BulkProcessor.builder(client,
    new BulkProcessor.Listener() {
    @Override
    public void beforeBulk(long executionId, BulkRequest request) {
    //System.out.println("Bulk process ready... : { id:" + executionId + ", size:" + request.requests().size() + "}\t");
    }

    @Override
    public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
    System.out.println(" done... { id:" + executionId + ", time:" + response.getTookInMillis()
    + ", size:" + response.getItems().length + "}");
    }

    @Override
    public void afterBulk(long executionId, BulkRequest request, Throwable fail) {
    fail.printStackTrace();
    System.out.println(" fail... { id:" + executionId + ", error:" + fail.getMessage() + "}");
    }
    })
    .setBulkActions(10000)
    .setBulkSize(new ByteSizeValue(200, ByteSizeUnit.MB))
    .setFlushInterval(TimeValue.timeValueSeconds(10))
    .setConcurrentRequests(10)
    .build();
    }

    public static void indexSetting(Client client, Map<String, Object> settings){
    UpdateSettingsResponse response = client.admin().indices().prepareUpdateSettings(indexName).setSettings(settings).execute().actionGet();
    System.out.println("update set:" + response.isAcknowledged());
    }

    public static void put(Client client) throws IOException, InterruptedException, ExecutionException {
    /* //get indice(schema)
    GetMappingsResponse res = client.admin().indices().getMappings(new GetMappingsRequest().indices(indexName)).actionGet();
    ImmutableOpenMap<String, MappingMetaData> mapping = res.mappings().get(indexName);
    for (ObjectObjectCursor<String, MappingMetaData> c : mapping) {
    System.out.println("index find: " + c.key + " = " + c.value.source());
    }*/

    // indexSetting(client, beforeSet);
    //indexSetting(client, beforeSet);
    //CreateIndexResponse createIndexResponse = client.admin().indices().create(new CreateIndexRequestBuilder. .createIndexRequest()).actionGet();

    //prepare Bulk Processor
    BulkProcessor bulkProcessor = BulkProcessor.builder(client,
    new BulkProcessor.Listener() {
    @Override
    public void beforeBulk(long executionId, BulkRequest request) {
    //System.out.println("Bulk process ready... : { id:" + executionId + ", size:" + request.requests().size() + "}\t");
    }

    @Override
    public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
    System.out.println(" done... { id:" + executionId + ", time:" + response.getTookInMillis()
    + ", size:" + response.getItems().length + "}");
    }

    @Override
    public void afterBulk(long executionId, BulkRequest request, Throwable fail) {
    fail.printStackTrace();
    System.out.println(" fail... { id:" + executionId + ", error:" + fail.getMessage() + "}");
    }
    })
    .setBulkActions(10000)
    .setBulkSize(new ByteSizeValue(150, ByteSizeUnit.MB))
    //.setFlushInterval(TimeValue.timeValueSeconds(120))
    .setConcurrentRequests(3)
    .build();

    //get sample logs
    List<String> message = pumpup(TtDummyRvMsgGenerate.read(), 1);
    TtRvMsgParser parser = new TtRvMsgParser();

    List<Map<String, Object>> listMapList = new ArrayList<>();
    for (String msg : message) {
    parser.init(msg);//.println();
    Map<String, Object> listMap = parser.result();
    listMapList.add(listMap);
    }

    int loop = 1;
    long start = new Date().getTime();
    while (loop <= amount) {
    //use not parsed
    // for(final String msg : message){
    // @SuppressWarnings("unchecked")
    // Map<String, String> fullMsg = new HashMap<String, String>(){{
    // put("body", TtDummyRvMsgGenerate.increaseByte(msg, 1000));
    // }};
    // bulkProcessor.add(new IndexRequest(indexName, indexType).source(fullMsg));
    // loop++;
    // }
    //use parsed
    for (Map<String, Object> listMap : listMapList) {
    //String a = (String)listMap.get("body");
    //System.out.println(a);
    //listMap.put("body", ((String) listMap.get("body")).concat(TtDummyRvMsgGenerate.getSampleWords(15)));
    //listMap.put("body", ((String) listMap.get("body")).concat(TtDummyRvMsgGenerate.randomKey(15)));
    //listMap.put("body", TtDummyRvMsgGenerate.increaseByte((String) listMap.get("body"), 500));

    //listMap.put("loop", loop);
    bulkProcessor.add(new IndexRequest(indexName, indexType, loop+"").source(listMap));
    loop++;
    }
    //System.exit(0);
    }
    long end = new Date().getTime();
    System.out.println("-------------------- ELAPSED:" + (end - start) + " millis");
    indexSetting(client, afterSet);

    //Bulk Process close
    bulkProcessor.awaitClose(10, TimeUnit.MINUTES);
    }

    public static void getDocument(Client client, String indexName, String typeName, String documentId) {
    GetResponse getResponse = client.prepareGet(indexName, typeName, documentId).execute().actionGet();
    Map<String, Object> source = getResponse.getSource();
    System.out.println("------------------------------");
    System.out.println("Index: " + getResponse.getIndex());
    System.out.println("Type: " + getResponse.getType());
    System.out.println("Id: " + getResponse.getId());
    System.out.println("Version: " + getResponse.getVersion());
    System.out.println(source);
    System.out.println("------------------------------");
    }

    public static void fullTextSearchCount(Client client, String IndexName, String typeName, String value, String... field) {
    QueryBuilder queryBuilder =
    //moreLikeThisQuery(field).likeText(value).minTermFreq(1).maxQueryTerms(1000);
    //wildcardQuery(field[0], value);
    queryStringQuery(field[0] + ":" + value);
    CountResponse response = client.prepareCount(IndexName).setTypes(typeName)
    .setQuery(queryBuilder)
    .execute().actionGet();
    System.out.println("\tCurrent More Like This count: " + response.getCount() + "");
    }

    public static void fullTextSearchDocument(Client client, String indexName, String typeName, String value, String... field) {
    QueryBuilder fieldQueryBuilder =
    //termQuery(field[0], value);
    matchQuery(field[0], value);
    //new WildcardQueryBuilder(field[0], value);
    //moreLikeThisQuery(field).likeText(value).minTermFreq(1).maxQueryTerms(25);
    //wildcardQuery(field[0], value);
    //queryStringQuery(field[0] + ":" + value);
    //multiMatchQuery(value, field);
    //SortBuilder sortBuilder = SortBuilders.fieldSort(field[0]);
    SearchResponse searchResponse = client.prepareSearch(indexName).setTypes(typeName).addFields(field[0])
    .setSearchType(SearchType.QUERY_THEN_FETCH)//.addSort(sortBuilder)
    .setQuery(fieldQueryBuilder)//.addSort(SortBuilders.fieldSort("loop").order(SortOrder.DESC))
    .setFrom(0).setSize(1000).setExplain(false).execute().actionGet();

    SearchHit[] result = searchResponse.getHits().getHits();

    System.out.println(searchResponse.getHits().getTotalHits() + " total count");
    //get result & print
    System.out.println("\tCurrent More Like This results: " + result.length + "");
    for (SearchHit hit : result) {
    Set<String> keys = hit.fields().keySet();
    Iterator<String> keyItr = keys.iterator();
    while(keyItr.hasNext()){
    SearchHitField hitField = hit.field(keyItr.next());
    System.out.println(hitField.getValue().toString());
    }
    // Map<String, Object> r = hit.getSource();
    // System.out.println(r);
    }
    }

    public static boolean isOpen(Client client, String indexName){
    ClusterState clusterState = client.admin().cluster().prepareState().setIndices(indexName).get().getState();
    IndexMetaData metaData = clusterState.getMetaData().index(indexName);
    final String open = "OPEN";
    return metaData.getState() != null && metaData.getState().toString().equals(open);
    }

    public static void searchDocument(Client client, String indexName, String typeName, String value, String... field) {

    //query using query builder
    QueryBuilder matchQuery = matchQuery(field[0], value);
    //System.out.println(matchQuery.toString()); //query json println
    QueryBuilder multiMatchQuery = multiMatchQuery(value, field);
    QueryBuilder stringQuery = queryStringQuery(field[0] + ":" + value);
    //search query execution
    SearchResponse searchResponse = client.prepareSearch(indexName)
    .setTypes(typeName)
    .setSearchType(SearchType.QUERY_THEN_FETCH) //search type. hold
    .setQuery(stringQuery)
    .setFrom(0).setSize(1000) //limit
    .setExplain(false) //query explain
    .execute().actionGet(); //execute
    SearchHit[] result = searchResponse.getHits().getHits();

    //get result & print
    System.out.println("Current Match results: " + result.length + "--------------------");
    for (SearchHit hit : result) {
    Map<String, Object> r = hit.getSource();
    //System.out.println(r);
    }
    }

    public static void count(Client client, String index, String type) {
    CountResponse response = client.prepareCount(index)
    .setQuery(termQuery("_type", type))
    .execute().actionGet();
    System.out.println("type:" + type + "`s total count: " + response.getCount());
    }


    public static void main(String[] args) throws InterruptedException, ExecutionException, IOException {
    //ES connection set
    Settings settings = ImmutableSettings.settingsBuilder()
    .put("action.auto_create_index", true)
    .put("client.transport.sniff", true)
    .put("client.transport.ping_timeout", "60s")
    .put("network.tcp.block", true)
    //.put("node.name", "ES3")
    .put("cluster.name", "elasticsearch").build();

    //ES client
    //default native transport : 9300
    //default web/rest port : 9200
    Client client = new TransportClient(settings)
    .addTransportAddresses(
    // new InetSocketTransportAddress("192.168.10.229", 9300)
    new InetSocketTransportAddress("192.168.10.210", 9300)
    , new InetSocketTransportAddress("192.168.10.251", 9300)
    , new InetSocketTransportAddress("192.168.10.252", 9300)
    );
    // try {

    // ClusterStatsResponse res = client.admin().cluster().prepareClusterStats().execute().actionGet();
    //
    // PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("hdfs_repo1")
    // .setType("hdfs")
    // .setSettings(
    // ImmutableSettings.builder()
    // .put("uri", "hdfs://192.168.10.251:2181/")
    // .put("path", "es/snapshot")
    // .put("concurrent_streams", 4)
    // .put("compress", true)
    // .put("chunk_size", "40mb")).get();
    // System.out.println("repository put response:" + putRepositoryResponse.isAcknowledged());
    //
    // CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot("hdfs_repo1", "map1_2million")
    // .setIncludeGlobalState(false)
    // .setIndices("map1")
    // .setRepository("hdfs_repo1")
    // .setSettings(ImmutableSettings.builder().put("compress", true))
    // .setWaitForCompletion(true)
    // .get();
    // System.out.println("success:" + createSnapshotResponse.getSnapshotInfo().successfulShards());
    //
    // GetSnapshotsResponse getSnapshotsResponse = client.admin().cluster().prepareGetSnapshots("hdfs_repo1").get();
    // ImmutableList<SnapshotInfo> snapshotInfos = getSnapshotsResponse.getSnapshots();
    // for(SnapshotInfo info : snapshotInfos){
    // System.out.println(info.name());
    // }
    //
    // RestoreSnapshotResponse restoreSnapshotResponse = client.admin().cluster().prepareRestoreSnapshot("hdfs_repo1", "map1_2million")
    // .setIndexSettings(
    // ImmutableSettings.builder().put("index.number_of_replicas", 0))
    // .setIndices("map1")
    // .setWaitForCompletion(true)
    // .setRenamePattern("map[.*]")
    // .setRenameReplacement("restore_map$1")
    // .setIgnoreIndexSettings("index.refresh_interval", "")
    // .get();
    //
    //
    // System.out.println(res.getClusterNameAsString());

    indexName = "disable_source";
    indexType = "test";
    amount = 10000;

    fullTextSearchDocument(client, indexName, indexType, "tmtm", "body");
    System.exit(0);
    //indexType = "test";
    createIndex(client, indexName);
    //Thread.sleep(1000);
    put(client);
    //System.out.println("loop : " + i);
    String repoName = "hdfs-repo";

    System.out.println("PUT END");



    // long start_ = new Date().getTime();
    // count(client, indexName, indexType);
    // long end_ = new Date().getTime();
    // System.out.println("--------------------------\nEnd count : " + (end_ - start_) + " time elapsed");
    // //getDocument(client, indexName, indexType, "1000");
    // //searchDocument(client, indexName, indexType, "155431.679016", "TXN_TIME", "TXN_KEY");
    List<String> query = Arrays.asList(
    "*ASDF1186*"
    , "*QLEGASD1800*"
    , "*WEFLKJASLD118*"
    , "*JGKLJDLSF1006*"
    , "*SLD2186*"
    , "*DLFKJQWEF1686*"
    , "*LKJFASD2718*"
    , "*LKJGQLEGASD235*"
    , "*KJQWEF1361*"
    , "*QWEFLKJASLD2*"
    , "*KLJGKLJDLSF ASDLKJFASD1*"
    );
    String index = "test3";
    String type = "rv_10000";
    //deleteType(client, "test", "ex_1000");
    // deleteType(client, "test2", "ex_10000_3");
    // System.out.println("delete complete");
    // System.exit(0);
    System.out.println(type + " full text search start");
    System.out.println("--------------------------------------------------------------------------------------------------");
    for (int i = 0; i <= 9; i++) {
    System.out.println("LOOP" + i + "test start");
    for (String q : query) {
    long start = new Date().getTime();
    String mergeQuery = q;
    //fullTextSearchCount(client, index, type, mergeQuery, "body");
    fullTextSearchDocument(client, index, type, mergeQuery, "body");
    long end = new Date().getTime();
    System.out.println("\t--------------End search : " + "query:" + mergeQuery + ", " + (end - start) + " time elapsed");
    }
    System.out.println("end of test---------------------------------------------------------------------------------------");
    // Thread.sleep(5000);
    }

    // long start1 = new Date().getTime();
    // fullTextSearchCount(client, indexName, indexType, " tmtm", "body");
    // long end2 = new Date().getTime();
    // System.out.println("--------------------------\nEnd count : " + (end2 - start1) + " time elapsed");
    // } catch (IOException | InterruptedException | ExecutionException e) {
    // e.printStackTrace();
    // }
    //Connection close
    client.close();
    }
    }