Last active
April 9, 2023 08:37
-
-
Save keke1123/7f130b62b1ea2c321aa7 to your computer and use it in GitHub Desktop.
Revisions
-
keke1123 revised this gist
Jun 23, 2015 . 1 changed file with 1 addition and 2 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1,8 +1,7 @@ /** * Elasticsearch Java API Example */ import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse; -
keke1123 revised this gist
Jun 23, 2015 . 1 changed file with 3 additions and 0 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1,3 +1,6 @@ /** * Elasticsearch Java API Example */ import com.epozen.tortiletorpex.util.TtDummyRvMsgGenerate; import com.epozen.tortiletorpex.util.TtRvMsgParser; import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder; -
keke1123 created this gist
Jun 23, 2015 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,458 @@ import com.epozen.tortiletorpex.util.TtDummyRvMsgGenerate; import com.epozen.tortiletorpex.util.TtRvMsgParser; import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse; import org.elasticsearch.action.admin.indices.mapping.delete.DeleteMappingResponse; import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse; import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsResponse; import org.elasticsearch.action.bulk.BulkProcessor; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.count.CountResponse; import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchType; import org.elasticsearch.client.Client; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHitField; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.*; import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.TimeUnit; import static org.elasticsearch.index.query.QueryBuilders.*; /** * Created by SteveShin on 2015-05-08. */ public class EsSample { Logger logger = LoggerFactory.getLogger(EsSample.class); static final ForkJoinPool forkJoinPool = new ForkJoinPool(); static final int bytes = 1000; static String indexName = "size_sample"; static String indexType = "sample"; static int amount = 1000000; static final Map<String, Object> beforeSet = new HashMap<String, Object>(){ { put("index.refresh_interval",-1); put("index.number_of_replicas",0); } }; static final Map<String, Object> afterSet = new HashMap<String, Object>(){ { put("index.refresh_interval","10s"); put("index.number_of_replicas",0); } }; //create Index(schema) public static void createIndex(Client client, String indexName) { System.out.println(indexName); IndicesExistsResponse res = client.admin().indices().prepareExists(indexName).execute().actionGet(); if (!res.isExists()) { CreateIndexRequestBuilder createIndexRequestBuilder = client.admin().indices().prepareCreate(indexName); createIndexRequestBuilder.setSettings(beforeSet); CreateIndexResponse response = createIndexRequestBuilder.execute().actionGet(); if(response.isAcknowledged()) { String mappingJson = "{" + "\"_all\":{\"enabled\":false}" + ",\"_source\":{\"enabled\":false}" + ",\"properties\":{" + "\"body\":{\"type\":\"string\",\"store\":true}" + ",\"head\":{\"type\":\"object\",\"store\":false}" + "}}"; PutMappingResponse mappingResponse = client.admin().indices() .preparePutMapping(indexName) .setType(indexType) .setSource(mappingJson) .execute().actionGet(); System.out.println("mapping... " + mappingResponse.isAcknowledged()); } System.out.println("CREATE index: " + response.isAcknowledged()); } } //delete document public static void deleteDocument(Client client, String indexName, String typeName, String documentId) { DeleteResponse response = client.prepareDelete(indexName, typeName, documentId) .execute()//.setOperationThreades(false)//run on this thread .actionGet(); } //delete type public static void deleteType(Client client, String indexName, String typeName) { DeleteMappingResponse response = client.admin().indices().prepareDeleteMapping(indexName).setType(typeName).execute().actionGet(); } public static List<String> pumpup(List<String> logs, int loop) { List<String> result = new ArrayList<String>(); System.out.println("Loop:" + loop + ", total size: " + logs.size() * loop); for (int i = 0; i < loop; i++) { result.addAll(logs); } return result; } public static void putTest(Client client) throws IOException, InterruptedException, ExecutionException { //prepare Bulk Processor BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() { @Override public void beforeBulk(long executionId, BulkRequest request) { //System.out.println("Bulk process ready... : { id:" + executionId + ", size:" + request.requests().size() + "}\t"); } @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { System.out.println(" done... { id:" + executionId + ", time:" + response.getTookInMillis() + ", size:" + response.getItems().length + "}"); } @Override public void afterBulk(long executionId, BulkRequest request, Throwable fail) { fail.printStackTrace(); System.out.println(" fail... { id:" + executionId + ", error:" + fail.getMessage() + "}"); } }) .setBulkActions(10000) .setBulkSize(new ByteSizeValue(200, ByteSizeUnit.MB)) .setFlushInterval(TimeValue.timeValueSeconds(10)) .setConcurrentRequests(10) .build(); } public static void indexSetting(Client client, Map<String, Object> settings){ UpdateSettingsResponse response = client.admin().indices().prepareUpdateSettings(indexName).setSettings(settings).execute().actionGet(); System.out.println("update set:" + response.isAcknowledged()); } public static void put(Client client) throws IOException, InterruptedException, ExecutionException { /* //get indice(schema) GetMappingsResponse res = client.admin().indices().getMappings(new GetMappingsRequest().indices(indexName)).actionGet(); ImmutableOpenMap<String, MappingMetaData> mapping = res.mappings().get(indexName); for (ObjectObjectCursor<String, MappingMetaData> c : mapping) { System.out.println("index find: " + c.key + " = " + c.value.source()); }*/ // indexSetting(client, beforeSet); //indexSetting(client, beforeSet); //CreateIndexResponse createIndexResponse = client.admin().indices().create(new CreateIndexRequestBuilder. .createIndexRequest()).actionGet(); //prepare Bulk Processor BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() { @Override public void beforeBulk(long executionId, BulkRequest request) { //System.out.println("Bulk process ready... : { id:" + executionId + ", size:" + request.requests().size() + "}\t"); } @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { System.out.println(" done... { id:" + executionId + ", time:" + response.getTookInMillis() + ", size:" + response.getItems().length + "}"); } @Override public void afterBulk(long executionId, BulkRequest request, Throwable fail) { fail.printStackTrace(); System.out.println(" fail... { id:" + executionId + ", error:" + fail.getMessage() + "}"); } }) .setBulkActions(10000) .setBulkSize(new ByteSizeValue(150, ByteSizeUnit.MB)) //.setFlushInterval(TimeValue.timeValueSeconds(120)) .setConcurrentRequests(3) .build(); //get sample logs List<String> message = pumpup(TtDummyRvMsgGenerate.read(), 1); TtRvMsgParser parser = new TtRvMsgParser(); List<Map<String, Object>> listMapList = new ArrayList<>(); for (String msg : message) { parser.init(msg);//.println(); Map<String, Object> listMap = parser.result(); listMapList.add(listMap); } int loop = 1; long start = new Date().getTime(); while (loop <= amount) { //use not parsed // for(final String msg : message){ // @SuppressWarnings("unchecked") // Map<String, String> fullMsg = new HashMap<String, String>(){{ // put("body", TtDummyRvMsgGenerate.increaseByte(msg, 1000)); // }}; // bulkProcessor.add(new IndexRequest(indexName, indexType).source(fullMsg)); // loop++; // } //use parsed for (Map<String, Object> listMap : listMapList) { //String a = (String)listMap.get("body"); //System.out.println(a); //listMap.put("body", ((String) listMap.get("body")).concat(TtDummyRvMsgGenerate.getSampleWords(15))); //listMap.put("body", ((String) listMap.get("body")).concat(TtDummyRvMsgGenerate.randomKey(15))); //listMap.put("body", TtDummyRvMsgGenerate.increaseByte((String) listMap.get("body"), 500)); //listMap.put("loop", loop); bulkProcessor.add(new IndexRequest(indexName, indexType, loop+"").source(listMap)); loop++; } //System.exit(0); } long end = new Date().getTime(); System.out.println("-------------------- ELAPSED:" + (end - start) + " millis"); indexSetting(client, afterSet); //Bulk Process close bulkProcessor.awaitClose(10, TimeUnit.MINUTES); } public static void getDocument(Client client, String indexName, String typeName, String documentId) { GetResponse getResponse = client.prepareGet(indexName, typeName, documentId).execute().actionGet(); Map<String, Object> source = getResponse.getSource(); System.out.println("------------------------------"); System.out.println("Index: " + getResponse.getIndex()); System.out.println("Type: " + getResponse.getType()); System.out.println("Id: " + getResponse.getId()); System.out.println("Version: " + getResponse.getVersion()); System.out.println(source); System.out.println("------------------------------"); } public static void fullTextSearchCount(Client client, String IndexName, String typeName, String value, String... field) { QueryBuilder queryBuilder = //moreLikeThisQuery(field).likeText(value).minTermFreq(1).maxQueryTerms(1000); //wildcardQuery(field[0], value); queryStringQuery(field[0] + ":" + value); CountResponse response = client.prepareCount(IndexName).setTypes(typeName) .setQuery(queryBuilder) .execute().actionGet(); System.out.println("\tCurrent More Like This count: " + response.getCount() + ""); } public static void fullTextSearchDocument(Client client, String indexName, String typeName, String value, String... field) { QueryBuilder fieldQueryBuilder = //termQuery(field[0], value); matchQuery(field[0], value); //new WildcardQueryBuilder(field[0], value); //moreLikeThisQuery(field).likeText(value).minTermFreq(1).maxQueryTerms(25); //wildcardQuery(field[0], value); //queryStringQuery(field[0] + ":" + value); //multiMatchQuery(value, field); //SortBuilder sortBuilder = SortBuilders.fieldSort(field[0]); SearchResponse searchResponse = client.prepareSearch(indexName).setTypes(typeName).addFields(field[0]) .setSearchType(SearchType.QUERY_THEN_FETCH)//.addSort(sortBuilder) .setQuery(fieldQueryBuilder)//.addSort(SortBuilders.fieldSort("loop").order(SortOrder.DESC)) .setFrom(0).setSize(1000).setExplain(false).execute().actionGet(); SearchHit[] result = searchResponse.getHits().getHits(); System.out.println(searchResponse.getHits().getTotalHits() + " total count"); //get result & print System.out.println("\tCurrent More Like This results: " + result.length + ""); for (SearchHit hit : result) { Set<String> keys = hit.fields().keySet(); Iterator<String> keyItr = keys.iterator(); while(keyItr.hasNext()){ SearchHitField hitField = hit.field(keyItr.next()); System.out.println(hitField.getValue().toString()); } // Map<String, Object> r = hit.getSource(); // System.out.println(r); } } public static boolean isOpen(Client client, String indexName){ ClusterState clusterState = client.admin().cluster().prepareState().setIndices(indexName).get().getState(); IndexMetaData metaData = clusterState.getMetaData().index(indexName); final String open = "OPEN"; return metaData.getState() != null && metaData.getState().toString().equals(open); } public static void searchDocument(Client client, String indexName, String typeName, String value, String... field) { //query using query builder QueryBuilder matchQuery = matchQuery(field[0], value); //System.out.println(matchQuery.toString()); //query json println QueryBuilder multiMatchQuery = multiMatchQuery(value, field); QueryBuilder stringQuery = queryStringQuery(field[0] + ":" + value); //search query execution SearchResponse searchResponse = client.prepareSearch(indexName) .setTypes(typeName) .setSearchType(SearchType.QUERY_THEN_FETCH) //search type. hold .setQuery(stringQuery) .setFrom(0).setSize(1000) //limit .setExplain(false) //query explain .execute().actionGet(); //execute SearchHit[] result = searchResponse.getHits().getHits(); //get result & print System.out.println("Current Match results: " + result.length + "--------------------"); for (SearchHit hit : result) { Map<String, Object> r = hit.getSource(); //System.out.println(r); } } public static void count(Client client, String index, String type) { CountResponse response = client.prepareCount(index) .setQuery(termQuery("_type", type)) .execute().actionGet(); System.out.println("type:" + type + "`s total count: " + response.getCount()); } public static void main(String[] args) throws InterruptedException, ExecutionException, IOException { //ES connection set Settings settings = ImmutableSettings.settingsBuilder() .put("action.auto_create_index", true) .put("client.transport.sniff", true) .put("client.transport.ping_timeout", "60s") .put("network.tcp.block", true) //.put("node.name", "ES3") .put("cluster.name", "elasticsearch").build(); //ES client //default native transport : 9300 //default web/rest port : 9200 Client client = new TransportClient(settings) .addTransportAddresses( // new InetSocketTransportAddress("192.168.10.229", 9300) new InetSocketTransportAddress("192.168.10.210", 9300) , new InetSocketTransportAddress("192.168.10.251", 9300) , new InetSocketTransportAddress("192.168.10.252", 9300) ); // try { // ClusterStatsResponse res = client.admin().cluster().prepareClusterStats().execute().actionGet(); // // PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("hdfs_repo1") // .setType("hdfs") // .setSettings( // ImmutableSettings.builder() // .put("uri", "hdfs://192.168.10.251:2181/") // .put("path", "es/snapshot") // .put("concurrent_streams", 4) // .put("compress", true) // .put("chunk_size", "40mb")).get(); // System.out.println("repository put response:" + putRepositoryResponse.isAcknowledged()); // // CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot("hdfs_repo1", "map1_2million") // .setIncludeGlobalState(false) // .setIndices("map1") // .setRepository("hdfs_repo1") // .setSettings(ImmutableSettings.builder().put("compress", true)) // .setWaitForCompletion(true) // .get(); // System.out.println("success:" + createSnapshotResponse.getSnapshotInfo().successfulShards()); // // GetSnapshotsResponse getSnapshotsResponse = client.admin().cluster().prepareGetSnapshots("hdfs_repo1").get(); // ImmutableList<SnapshotInfo> snapshotInfos = getSnapshotsResponse.getSnapshots(); // for(SnapshotInfo info : snapshotInfos){ // System.out.println(info.name()); // } // // RestoreSnapshotResponse restoreSnapshotResponse = client.admin().cluster().prepareRestoreSnapshot("hdfs_repo1", "map1_2million") // .setIndexSettings( // ImmutableSettings.builder().put("index.number_of_replicas", 0)) // .setIndices("map1") // .setWaitForCompletion(true) // .setRenamePattern("map[.*]") // .setRenameReplacement("restore_map$1") // .setIgnoreIndexSettings("index.refresh_interval", "") // .get(); // // // System.out.println(res.getClusterNameAsString()); indexName = "disable_source"; indexType = "test"; amount = 10000; fullTextSearchDocument(client, indexName, indexType, "tmtm", "body"); System.exit(0); //indexType = "test"; createIndex(client, indexName); //Thread.sleep(1000); put(client); //System.out.println("loop : " + i); String repoName = "hdfs-repo"; System.out.println("PUT END"); // long start_ = new Date().getTime(); // count(client, indexName, indexType); // long end_ = new Date().getTime(); // System.out.println("--------------------------\nEnd count : " + (end_ - start_) + " time elapsed"); // //getDocument(client, indexName, indexType, "1000"); // //searchDocument(client, indexName, indexType, "155431.679016", "TXN_TIME", "TXN_KEY"); List<String> query = Arrays.asList( "*ASDF1186*" , "*QLEGASD1800*" , "*WEFLKJASLD118*" , "*JGKLJDLSF1006*" , "*SLD2186*" , "*DLFKJQWEF1686*" , "*LKJFASD2718*" , "*LKJGQLEGASD235*" , "*KJQWEF1361*" , "*QWEFLKJASLD2*" , "*KLJGKLJDLSF ASDLKJFASD1*" ); String index = "test3"; String type = "rv_10000"; //deleteType(client, "test", "ex_1000"); // deleteType(client, "test2", "ex_10000_3"); // System.out.println("delete complete"); // System.exit(0); System.out.println(type + " full text search start"); System.out.println("--------------------------------------------------------------------------------------------------"); for (int i = 0; i <= 9; i++) { System.out.println("LOOP" + i + "test start"); for (String q : query) { long start = new Date().getTime(); String mergeQuery = q; //fullTextSearchCount(client, index, type, mergeQuery, "body"); fullTextSearchDocument(client, index, type, mergeQuery, "body"); long end = new Date().getTime(); System.out.println("\t--------------End search : " + "query:" + mergeQuery + ", " + (end - start) + " time elapsed"); } System.out.println("end of test---------------------------------------------------------------------------------------"); // Thread.sleep(5000); } // long start1 = new Date().getTime(); // fullTextSearchCount(client, indexName, indexType, " tmtm", "body"); // long end2 = new Date().getTime(); // System.out.println("--------------------------\nEnd count : " + (end2 - start1) + " time elapsed"); // } catch (IOException | InterruptedException | ExecutionException e) { // e.printStackTrace(); // } //Connection close client.close(); } }