package app import io.elasticsearch.DocumentIndex import io.mongodb.DocumentCollection import org.bson.Document import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest import org.elasticsearch.action.admin.indices.alias.get.GetAliasesResponse import org.elasticsearch.client.Client import org.elasticsearch.cluster.metadata.AliasMetaData import org.elasticsearch.common.collect.ImmutableOpenMap import org.elasticsearch.index.query.QueryBuilder import org.elasticsearch.index.query.QueryBuilders import org.elasticsearch.index.reindex.UpdateByQueryAction import org.elasticsearch.index.reindex.UpdateByQueryRequestBuilder import org.elasticsearch.script.Script import org.elasticsearch.script.ScriptService import util.PayoutFailsafe /** * Created by dangnh@eway.vn on 11/3/17. */ class UpdateClicks { static void main(String[] args) { def guaranteeOffers = getOfferGuarantee() def inhousePublishers = getPublisherInhouse() DocumentIndex clickIndex = new DocumentIndex('es://localhost:19300/clicks') def clickIndices = getIndicesFromAlias('clicks', clickIndex.delegate).sort() clickIndices.each { clickIndexName -> def query = """ SELECT * FROM $clickIndexName GROUP BY offer_id, publisher_id LIMIT 10000 """ def offerPublishers = clickIndex.prepareQueryDocument(query).get().rows offerPublishers.each { offerPublisher -> def offerId = offerPublisher.offer_id as String def publisherId = offerPublisher.publisher_id as String QueryBuilder queryBuilder = QueryBuilders.boolQuery() .must(QueryBuilders.matchQuery('offer_id', offerId)) .must(QueryBuilders.matchQuery('publisher_id', publisherId)) def script = """ ctx._source.offer_guarantee = offer_guarantee;ctx._source.inhouse = inhouse; """ def params = [ offer_guarantee: offerId in guaranteeOffers, inhouse : publisherId in inhousePublishers ] def result = PayoutFailsafe.instance.get { UpdateByQueryRequestBuilder updateRequestBuilder = UpdateByQueryAction.INSTANCE.newRequestBuilder(clickIndex.delegate) updateRequestBuilder.source(clickIndexName) .filter(queryBuilder) .script(new Script(script, ScriptService.ScriptType.INLINE, null, params)) .get() } if (result.indexingFailures){ result.indexingFailures.each {println it.toString()} return null } if (result.searchFailures){ result.searchFailures.each {println it.toString()} return null } println "Updated index $clickIndexName with pub_id = $publisherId and offer_id = $offerId" } sleep(60000) } } static List getIndicesFromAlias(String alias, Client client) { ImmutableOpenMap> aliases = ((GetAliasesResponse) client.admin().indices().getAliases(new GetAliasesRequest(alias)).actionGet()).getAliases() ArrayList allIndices = new ArrayList() aliases.keysIt().forEachRemaining { index -> allIndices.add(index) } return allIndices } static Set getOfferGuarantee() { DocumentCollection offerCollection = new DocumentCollection('mongodb://adflexmeta_ro:a45leXmRtarW@localhost:57017/adflex_meta.offers') def filter = [guarantee: true] as Document def projection = [id: 1] as Document offerCollection.find(filter).projection(projection).intoDocuments().thenApply { offers -> return (offers*.id as List).toSet() }.join() } static Set getPublisherInhouse() { DocumentCollection publisherCollection = new DocumentCollection('mongodb://adflexmeta_ro:a45leXmRtarW@localhost:57017/adflex_meta.publishers') def filter = [inhouse: true] as Document def projection = [id: 1] as Document publisherCollection.find(filter).projection(projection).intoDocuments().thenApply { publishers -> return (publishers*.id as List).toSet() }.join() } }