Skip to content

Commit

Permalink
NIFI-12224: add support for update many in PutMongo processor
Browse files Browse the repository at this point in the history
These changes will allow the processor to use Mongo updateMany operation in
update mode.

A different name for update mode property is used in PutMongo than
PutMongoRecord since the property name "Update Mode" is already used in PutMongo

Test cases for `PutMongo` updated to use Map and Document classes instead of
string based json input.

Signed-off-by: Umar Hussain <umarhussain.work@gmail.com>
  • Loading branch information
umarhussain15 committed May 12, 2024
1 parent 15696ad commit 98179f7
Show file tree
Hide file tree
Showing 5 changed files with 392 additions and 152 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.DescribedValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
Expand All @@ -46,13 +48,13 @@
import java.io.UnsupportedEncodingException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Map;

public abstract class AbstractMongoProcessor extends AbstractProcessor {
public static final String ATTRIBUTE_MONGODB_UPDATE_MODE = "mongodb.update.mode";

protected static final String JSON_TYPE_EXTENDED = "Extended";
protected static final String JSON_TYPE_STANDARD = "Standard";
protected static final AllowableValue JSON_EXTENDED = new AllowableValue(JSON_TYPE_EXTENDED, "Extended JSON",
Expand Down Expand Up @@ -159,11 +161,42 @@ public abstract class AbstractMongoProcessor extends AbstractProcessor {
static final List<PropertyDescriptor> descriptors;

static {
List<PropertyDescriptor> _temp = new ArrayList<>();
_temp.add(CLIENT_SERVICE);
_temp.add(DATABASE_NAME);
_temp.add(COLLECTION_NAME);
descriptors = Collections.unmodifiableList(_temp);
descriptors = List.of(CLIENT_SERVICE, DATABASE_NAME, COLLECTION_NAME);
}

public enum MongoUpdateOption implements DescribedValue {
UPDATE_ONE("one", "Update One", "Updates only the first document that matches the query."),
UPDATE_MANY("many", "Update Many", "Updates every document that matches the query."),
UPDATE_FF_ATTRIBUTE("flowfile-attribute", "Use '" + ATTRIBUTE_MONGODB_UPDATE_MODE + "' flowfile attribute.",
"Use the value of the '" + ATTRIBUTE_MONGODB_UPDATE_MODE + "' attribute of the incoming flowfile. Acceptable values are 'one' and 'many'.");
private final String value;
private final String displayName;
private final String description;

MongoUpdateOption(final String value, final String displayName, final String description) {
this.value = value;
this.displayName = displayName;
this.description = description;
}

@Override
public String getValue() {
return value;
}

@Override
public String getDisplayName() {
return displayName;
}

@Override
public String getDescription() {
return description;
}

public boolean matches(String value) {
return this.value.equals(value);
}
}

protected ObjectMapper objectMapper;
Expand Down Expand Up @@ -235,4 +268,20 @@ protected synchronized void configureMapper(String setting, String dateFormat) {
objectMapper.setDateFormat(df);
}
}

/**
* Checks if given update mode option matches for the incoming flow file
* @param updateModeToMatch the value against which processor's mode is compared
* @param processorMode the value coming from running processor
* @param flowFile incoming flow file to extract processor mode
* @return true if the incoming files update mode matches with updateModeToMatch
*/
public boolean updateModeMatches(
MongoUpdateOption updateModeToMatch, PropertyValue processorMode, FlowFile flowFile) {
String updateMode = processorMode.getValue();

return updateModeToMatch.matches(updateMode) || (MongoUpdateOption.UPDATE_FF_ATTRIBUTE.matches(updateMode) && updateModeToMatch.getValue()
.equalsIgnoreCase(flowFile.getAttribute(ATTRIBUTE_MONGODB_UPDATE_MODE)));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,18 @@
import com.mongodb.client.MongoCollection;
import com.mongodb.client.model.ReplaceOptions;
import com.mongodb.client.model.UpdateOptions;
import com.mongodb.client.result.UpdateResult;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SystemResource;
import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.expression.ExpressionLanguageScope;
Expand All @@ -42,14 +46,14 @@
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.StringUtils;
import org.bson.BsonValue;
import org.bson.Document;
import org.bson.types.ObjectId;

import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -58,12 +62,21 @@
@InputRequirement(Requirement.INPUT_REQUIRED)
@CapabilityDescription("Writes the contents of a FlowFile to MongoDB")
@SystemResourceConsideration(resource = SystemResource.MEMORY)
@WritesAttributes({
@WritesAttribute(attribute = PutMongo.ATTRIBUTE_UPDATE_MATCH_COUNT, description = "The match count from result if update/upsert is performed, otherwise not set."),
@WritesAttribute(attribute = PutMongo.ATTRIBUTE_UPDATE_MODIFY_COUNT, description = "The modify count from result if update/upsert is performed, otherwise not set."),
@WritesAttribute(attribute = PutMongo.ATTRIBUTE_UPSERT_ID, description = "The '_id' hex value if upsert is performed, otherwise not set.")
})
public class PutMongo extends AbstractMongoProcessor {
static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
.description("All FlowFiles that are written to MongoDB are routed to this relationship").build();
static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
.description("All FlowFiles that cannot be written to MongoDB are routed to this relationship").build();

static final String ATTRIBUTE_UPDATE_MATCH_COUNT = "mongo.put.update.match.count";
static final String ATTRIBUTE_UPDATE_MODIFY_COUNT = "mongo.put.update.modify.count";
static final String ATTRIBUTE_UPSERT_ID = "mongo.put.upsert.id";

static final String MODE_INSERT = "insert";
static final String MODE_UPDATE = "update";

Expand All @@ -82,39 +95,51 @@ public class PutMongo extends AbstractMongoProcessor {
.description("When true, inserts a document if no document matches the update query criteria; this property is valid only when using update mode, "
+ "otherwise it is ignored")
.required(true)
.dependsOn(MODE,MODE_UPDATE)
.allowableValues("true", "false")
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.defaultValue("false")
.build();
static final PropertyDescriptor UPDATE_QUERY_KEY = new PropertyDescriptor.Builder()
.name("Update Query Key")
.description("Key name used to build the update query criteria; this property is valid only when using update mode, "
+ "otherwise it is ignored. Example: _id")
.description("Comma separated key names used to build the update query criteria. Their values are taken from incoming flowfile. Example: _id")
.required(false)
.dependsOn(MODE,MODE_UPDATE)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
static final PropertyDescriptor UPDATE_QUERY = new PropertyDescriptor.Builder()
.name("putmongo-update-query")
.displayName("Update Query")
.description("Specify a full MongoDB query to be used for the lookup query to do an update/upsert.")
.description("Specify a full MongoDB query to be used for the lookup query to do an update/upsert. NOTE: this field is ignored if the '%s' value is not empty."
.formatted(UPDATE_QUERY_KEY.getDisplayName()))
.required(false)
.dependsOn(MODE,MODE_UPDATE)
.addValidator(JsonValidator.INSTANCE)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();

static final PropertyDescriptor UPDATE_MODE = new PropertyDescriptor.Builder()
static final PropertyDescriptor UPDATE_OPERATION_MODE = new PropertyDescriptor.Builder()
.displayName("Update Mode")
.name("put-mongo-update-mode")
.required(true)
.dependsOn(MODE,MODE_UPDATE)
.allowableValues(UPDATE_WITH_DOC, UPDATE_WITH_OPERATORS)
.defaultValue(UPDATE_WITH_DOC.getValue())
.defaultValue(UPDATE_WITH_DOC)
.description("Choose an update mode. You can either supply a JSON document to use as a direct replacement " +
"or specify a document that contains update operators like $set, $unset, and $inc. " +
"When Operators mode is enabled, the flowfile content is expected to be the operator part " +
"for example: {$set:{\"key\": \"value\"},$inc:{\"count\":1234}} and the update query will come " +
"from the configured Update Query property.")
.build();
static final PropertyDescriptor MONGO_UPDATE_MODE = new PropertyDescriptor.Builder()
.name("Mongo Update Query Mode")
.displayName("Mongo Update Query Mode")
.dependsOn(UPDATE_OPERATION_MODE,UPDATE_WITH_OPERATORS)
.description("Choose between 'updateOne' or 'updateMany' Mongo documents per incoming flow file.")
.allowableValues(MongoUpdateOption.class)
.defaultValue(MongoUpdateOption.UPDATE_ONE)
.build();
static final PropertyDescriptor CHARACTER_SET = new PropertyDescriptor.Builder()
.name("Character Set")
.description("The Character Set in which the data is encoded")
Expand All @@ -127,20 +152,17 @@ public class PutMongo extends AbstractMongoProcessor {
private final static List<PropertyDescriptor> propertyDescriptors;

static {
List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
_propertyDescriptors.addAll(descriptors);
List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>(descriptors);
_propertyDescriptors.add(MODE);
_propertyDescriptors.add(UPSERT);
_propertyDescriptors.add(UPDATE_QUERY_KEY);
_propertyDescriptors.add(UPDATE_QUERY);
_propertyDescriptors.add(UPDATE_MODE);
_propertyDescriptors.add(UPDATE_OPERATION_MODE);
_propertyDescriptors.add(MONGO_UPDATE_MODE);
_propertyDescriptors.add(CHARACTER_SET);
propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);

final Set<Relationship> _relationships = new HashSet<>();
_relationships.add(REL_SUCCESS);
_relationships.add(REL_FAILURE);
relationships = Collections.unmodifiableSet(_relationships);
relationships = Set.of(REL_SUCCESS, REL_FAILURE);
}

@Override
Expand Down Expand Up @@ -183,16 +205,16 @@ protected Collection<ValidationResult> customValidate(final ValidationContext va

@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final FlowFile flowFile = session.get();
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}

final ComponentLog logger = getLogger();

final Charset charset = Charset.forName(context.getProperty(CHARACTER_SET).getValue());
final String mode = context.getProperty(MODE).getValue();
final String updateMode = context.getProperty(UPDATE_MODE).getValue();
final String processorMode = context.getProperty(MODE).getValue();
final String flowfileType = context.getProperty(UPDATE_OPERATION_MODE).getValue();
final WriteConcern writeConcern = clientService.getWriteConcern();

try {
Expand All @@ -202,32 +224,50 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
session.read(flowFile, in -> StreamUtils.fillBuffer(in, content, true));

// parse
final Object doc = (mode.equals(MODE_INSERT) || (mode.equals(MODE_UPDATE) && updateMode.equals(UPDATE_WITH_DOC.getValue())))
final Object doc = (processorMode.equals(MODE_INSERT) || (processorMode.equals(MODE_UPDATE) && flowfileType.equals(UPDATE_WITH_DOC.getValue())))
? Document.parse(new String(content, charset)) : BasicDBObject.parse(new String(content, charset));

if (MODE_INSERT.equalsIgnoreCase(mode)) {
if (MODE_INSERT.equals(processorMode)) {
collection.insertOne((Document)doc);
logger.info("inserted {} into MongoDB", new Object[] { flowFile });
} else {
// update
final boolean upsert = context.getProperty(UPSERT).asBoolean();
final String updateKey = context.getProperty(UPDATE_QUERY_KEY).evaluateAttributeExpressions(flowFile).getValue();
final String filterQuery = context.getProperty(UPDATE_QUERY).evaluateAttributeExpressions(flowFile).getValue();
final Document query;
final Document updateQuery;

if (!StringUtils.isBlank(updateKey)) {
query = parseUpdateKey(updateKey, (Map)doc);
if (StringUtils.isNotBlank(updateKey)) {
updateQuery = parseUpdateKey(updateKey, (Map)doc);
removeUpdateKeys(updateKey, (Map)doc);
} else {
query = Document.parse(filterQuery);
updateQuery = Document.parse(filterQuery);
}

if (updateMode.equals(UPDATE_WITH_DOC.getValue())) {
collection.replaceOne(query, (Document)doc, new ReplaceOptions().upsert(upsert));
UpdateResult updateResult;
if (flowfileType.equals(UPDATE_WITH_DOC.getValue())) {
updateResult = collection.replaceOne(updateQuery, (Document)doc, new ReplaceOptions().upsert(upsert));
} else {
BasicDBObject update = (BasicDBObject)doc;
update.remove(updateKey);
collection.updateOne(query, update, new UpdateOptions().upsert(upsert));
UpdateOptions updateOptions = new UpdateOptions().upsert(upsert);
PropertyValue updateQueryMode = context.getProperty(MONGO_UPDATE_MODE);

if (this.updateModeMatches(MongoUpdateOption.UPDATE_ONE, updateQueryMode, flowFile)) {
updateResult = collection.updateOne(updateQuery, update, updateOptions);
} else if (this.updateModeMatches(MongoUpdateOption.UPDATE_MANY, updateQueryMode, flowFile)) {
updateResult = collection.updateMany(updateQuery, update, updateOptions);
} else {
String flowfileUpdateMode = flowFile.getAttribute(ATTRIBUTE_MONGODB_UPDATE_MODE);
throw new ProcessException("Unrecognized '" + ATTRIBUTE_MONGODB_UPDATE_MODE + "' value '" + flowfileUpdateMode + "'");
}
}

flowFile = session.putAttribute(flowFile, ATTRIBUTE_UPDATE_MATCH_COUNT, String.valueOf(updateResult.getMatchedCount()));
flowFile = session.putAttribute(flowFile, ATTRIBUTE_UPDATE_MODIFY_COUNT, String.valueOf(updateResult.getModifiedCount()));
BsonValue upsertedId = updateResult.getUpsertedId();
if (upsertedId != null) {
String id = upsertedId.isString() ? upsertedId.asString().getValue() : upsertedId.asObjectId().getValue().toString();
flowFile = session.putAttribute(flowFile, ATTRIBUTE_UPSERT_ID, id);
}
logger.info("updated {} into MongoDB", new Object[] { flowFile });
}
Expand Down

0 comments on commit 98179f7

Please sign in to comment.