Skip to content

Instantly share code, notes, and snippets.

@hakanserce
Created December 7, 2016 08:48
Show Gist options
  • Save hakanserce/afb3ac125494bbaa5d0b26fdf77c6f8d to your computer and use it in GitHub Desktop.
Save hakanserce/afb3ac125494bbaa5d0b26fdf77c6f8d to your computer and use it in GitHub Desktop.
ow Elasticsearch handles upsert scripts (v 1.7)

#How Elasticsearch handles upsert scripts (v 1.7)

Copied code over from https://github.com/elastic/elasticsearch/blob/1.7/src/main/java/org/elasticsearch/action/update/UpdateHelper.java for reference...

public Result prepare(UpdateRequest request, IndexShard indexShard) {
      long getDateNS = System.nanoTime();
      final GetResult getResult = indexShard.getService().get(request.type(), request.id(),
              new String[]{RoutingFieldMapper.NAME, ParentFieldMapper.NAME, TTLFieldMapper.NAME, TimestampFieldMapper.NAME},
              true, request.version(), request.versionType(), FetchSourceContext.FETCH_SOURCE, false);

      if (!getResult.isExists()) {
          if (request.upsertRequest() == null && !request.docAsUpsert()) {
              throw new DocumentMissingException(new ShardId(indexShard.indexService().index().name(), request.shardId()), request.type(), request.id());
          }
          Long ttl = null;
          IndexRequest indexRequest = request.docAsUpsert() ? request.doc() : request.upsertRequest();
          if (request.scriptedUpsert() && (request.script() != null)) {
              // Run the script to perform the create logic
              IndexRequest upsert = request.upsertRequest();               
              Map<String, Object> upsertDoc = upsert.sourceAsMap();
              Map<String, Object> ctx = new HashMap<>(2);
              // Tell the script that this is a create and not an update
              ctx.put("op", "create");
              ctx.put("_source", upsertDoc);
              try {
                  ExecutableScript script = scriptService.executable(request.scriptLang, request.script, request.scriptType, ScriptContext.Standard.UPDATE, request.scriptParams);
                  script.setNextVar("ctx", ctx);
                  script.run();
                  // we need to unwrap the ctx...
                  ctx = (Map<String, Object>) script.unwrap(ctx);
              } catch (Exception e) {
                  throw new ElasticsearchIllegalArgumentException("failed to execute script", e);
              }                
              //Allow the script to set TTL using ctx._ttl
              ttl = getTTLFromScriptContext(ctx);
              //Allow the script to abort the create by setting "op" to "none"
              String scriptOpChoice = (String) ctx.get("op");
              
              // Only valid options for an upsert script are "create"
              // (the default) or "none", meaning abort upsert
              if (!"create".equals(scriptOpChoice)) {
                  if (!"none".equals(scriptOpChoice)) {
                      logger.warn("Used upsert operation [{}] for script [{}], doing nothing...", scriptOpChoice, request.script);
                  }
                  UpdateResponse update = new UpdateResponse(getResult.getIndex(), getResult.getType(), getResult.getId(),
                          getResult.getVersion(), false);
                  update.setGetResult(getResult);
                  return new Result(update, Operation.NONE, upsertDoc, XContentType.JSON);
              }
              indexRequest.source((Map) ctx.get("_source"));
          }

          indexRequest.index(request.index()).type(request.type()).id(request.id())
                  // it has to be a "create!"
                  .create(true)                    
                  .ttl(ttl)
                  .refresh(request.refresh())
                  .routing(request.routing())
                  .parent(request.parent())
                  .replicationType(request.replicationType()).consistencyLevel(request.consistencyLevel());
          indexRequest.operationThreaded(false);
          if (request.versionType() != VersionType.INTERNAL) {
              // in all but the internal versioning mode, we want to create the new document using the given version.
              indexRequest.version(request.version()).versionType(request.versionType());
          }
          return new Result(indexRequest, Operation.UPSERT, null, null);
      }

      long updateVersion = getResult.getVersion();

      if (request.versionType() != VersionType.INTERNAL) {
          assert request.versionType() == VersionType.FORCE;
          updateVersion = request.version(); // remember, match_any is excluded by the conflict test
      }

      if (getResult.internalSourceRef() == null) {
          // no source, we can't do nothing, through a failure...
          throw new DocumentSourceMissingException(new ShardId(indexShard.indexService().index().name(), request.shardId()), request.type(), request.id());
      }

      Tuple<XContentType, Map<String, Object>> sourceAndContent = XContentHelper.convertToMap(getResult.internalSourceRef(), true);
      String operation = null;
      String timestamp = null;
      Long ttl = null;
      final Map<String, Object> updatedSourceAsMap;
      final XContentType updateSourceContentType = sourceAndContent.v1();
      String routing = getResult.getFields().containsKey(RoutingFieldMapper.NAME) ? getResult.field(RoutingFieldMapper.NAME).getValue().toString() : null;
      String parent = getResult.getFields().containsKey(ParentFieldMapper.NAME) ? getResult.field(ParentFieldMapper.NAME).getValue().toString() : null;

      if (request.script() == null && request.doc() != null) {
          IndexRequest indexRequest = request.doc();
          updatedSourceAsMap = sourceAndContent.v2();
          if (indexRequest.ttl() > 0) {
              ttl = indexRequest.ttl();
          }
          timestamp = indexRequest.timestamp();
          if (indexRequest.routing() != null) {
              routing = indexRequest.routing();
          }
          if (indexRequest.parent() != null) {
              parent = indexRequest.parent();
          }
          boolean noop = !XContentHelper.update(updatedSourceAsMap, indexRequest.sourceAsMap(), request.detectNoop());
          // noop could still be true even if detectNoop isn't because update detects empty maps as noops.  BUT we can only
          // actually turn the update into a noop if detectNoop is true to preserve backwards compatibility and to handle
          // cases where users repopulating multi-fields or adding synonyms, etc.
          if (request.detectNoop() && noop) {
              operation = "none";
          }
      } else {
          Map<String, Object> ctx = new HashMap<>(16);
          Long originalTtl = getResult.getFields().containsKey(TTLFieldMapper.NAME) ? (Long) getResult.field(TTLFieldMapper.NAME).getValue() : null;
          Long originalTimestamp = getResult.getFields().containsKey(TimestampFieldMapper.NAME) ? (Long) getResult.field(TimestampFieldMapper.NAME).getValue() : null;
          ctx.put("_index", getResult.getIndex());
          ctx.put("_type", getResult.getType());
          ctx.put("_id", getResult.getId());
          ctx.put("_version", getResult.getVersion());
          ctx.put("_routing", routing);
          ctx.put("_parent", parent);
          ctx.put("_timestamp", originalTimestamp);
          ctx.put("_ttl", originalTtl);
          ctx.put("_source", sourceAndContent.v2());

          try {
              ExecutableScript script = scriptService.executable(request.scriptLang, request.script, request.scriptType, ScriptContext.Standard.UPDATE, request.scriptParams);
              script.setNextVar("ctx", ctx);
              script.run();
              // we need to unwrap the ctx...
              ctx = (Map<String, Object>) script.unwrap(ctx);
          } catch (Exception e) {
              throw new ElasticsearchIllegalArgumentException("failed to execute script", e);
          }

          operation = (String) ctx.get("op");

          Object fetchedTimestamp = ctx.get("_timestamp");
          if (fetchedTimestamp != null) {
              timestamp = fetchedTimestamp.toString();
          } else if (originalTimestamp != null) {
              // No timestamp has been given in the update script, so we keep the previous timestamp if there is one
              timestamp = originalTimestamp.toString();
          }

          ttl = getTTLFromScriptContext(ctx);
          
          updatedSourceAsMap = (Map<String, Object>) ctx.get("_source");
      }

      // apply script to update the source
      // No TTL has been given in the update script so we keep previous TTL value if there is one
      if (ttl == null) {
          ttl = getResult.getFields().containsKey(TTLFieldMapper.NAME) ? (Long) getResult.field(TTLFieldMapper.NAME).getValue() : null;
          if (ttl != null) {
              ttl = ttl - TimeValue.nsecToMSec(System.nanoTime() - getDateNS); // It is an approximation of exact TTL value, could be improved
          }
      }

      if (operation == null || "index".equals(operation)) {
          final IndexRequest indexRequest = Requests.indexRequest(request.index()).type(request.type()).id(request.id()).routing(routing).parent(parent)
                  .source(updatedSourceAsMap, updateSourceContentType)
                  .version(updateVersion).versionType(request.versionType())
                  .replicationType(request.replicationType()).consistencyLevel(request.consistencyLevel())
                  .timestamp(timestamp).ttl(ttl)
                  .refresh(request.refresh());
          indexRequest.operationThreaded(false);
          return new Result(indexRequest, Operation.INDEX, updatedSourceAsMap, updateSourceContentType);
      } else if ("delete".equals(operation)) {
          DeleteRequest deleteRequest = Requests.deleteRequest(request.index()).type(request.type()).id(request.id()).routing(routing).parent(parent)
                  .version(updateVersion).versionType(request.versionType())
                  .replicationType(request.replicationType()).consistencyLevel(request.consistencyLevel());
          deleteRequest.operationThreaded(false);
          return new Result(deleteRequest, Operation.DELETE, updatedSourceAsMap, updateSourceContentType);
      } else if ("none".equals(operation)) {
          UpdateResponse update = new UpdateResponse(getResult.getIndex(), getResult.getType(), getResult.getId(), getResult.getVersion(), false);
          update.setGetResult(extractGetResult(request, indexShard.indexService().index().name(), getResult.getVersion(), updatedSourceAsMap, updateSourceContentType, getResult.internalSourceRef()));
          return new Result(update, Operation.NONE, updatedSourceAsMap, updateSourceContentType);
      } else {
          logger.warn("Used update operation [{}] for script [{}], doing nothing...", operation, request.script);
          UpdateResponse update = new UpdateResponse(getResult.getIndex(), getResult.getType(), getResult.getId(), getResult.getVersion(), false);
          return new Result(update, Operation.NONE, updatedSourceAsMap, updateSourceContentType);
      }
  }

  private Long getTTLFromScriptContext(Map<String, Object> ctx) {
      Long ttl = null;
      Object fetchedTTL = ctx.get("_ttl");
      if (fetchedTTL != null) {
          if (fetchedTTL instanceof Number) {
              ttl = ((Number) fetchedTTL).longValue();
          } else {
              ttl = TimeValue.parseTimeValue((String) fetchedTTL, null).millis();
          }
      }
      return ttl;
  }
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment