import android.database.Cursor; import android.support.annotation.NonNull; import com.raizlabs.android.dbflow.runtime.DBTransactionInfo; import com.raizlabs.android.dbflow.runtime.TransactionManager; import com.raizlabs.android.dbflow.runtime.transaction.BaseTransaction; import com.raizlabs.android.dbflow.runtime.transaction.QueryTransaction; import com.raizlabs.android.dbflow.runtime.transaction.TransactionListener; import com.raizlabs.android.dbflow.sql.builder.Condition; import com.raizlabs.android.dbflow.sql.builder.ConditionQueryBuilder; import com.raizlabs.android.dbflow.sql.language.Update; import com.raizlabs.android.dbflow.sql.language.Where; import com.raizlabs.android.dbflow.structure.Model; import rx.Observable; import rx.Subscriber; import rx.schedulers.Schedulers; public class UpdateModelInBatchOnSubscribe implements Observable.OnSubscribe, TransactionListener { private final Where where; private final Class modelType; private Subscriber subscriber; /** * From DBFlow Update in batch to Observable * * @param whereConditionBuilder example: new ConditionQueryBuilder<>(Conversation.class, Condition.column(Conversation$Table.CONVERSATIONID).in(headId, tailIdArray)) * @param setConditions example: Condition.column(Conversation$Table.READ).eq(Boolean.TRUE) */ public UpdateModelInBatchOnSubscribe(@NonNull ConditionQueryBuilder whereConditionBuilder, Class modelType, Condition... setConditions) { this.modelType = modelType; this.where = (new Update<>(whereConditionBuilder.getTableClass())).set(setConditions).where(whereConditionBuilder); } private static Observable createObservable(UpdateModelInBatchOnSubscribe instance) { return Observable.create(instance).subscribeOn(Schedulers.io()); } public Observable toObservable() { return createObservable(this); } @Override public void call(Subscriber subscriber) { this.subscriber = subscriber; if (!this.subscriber.isUnsubscribed()) { // Logger.d("Starting BatchUpdate TRANSACTION for: %s\nQuery: %s", where.getTable(), where.toString()); TransactionManager.getInstance().addTransaction(new QueryTransaction(DBTransactionInfo.create(BaseTransaction.PRIORITY_NORMAL), this.where, this)); } } @Override public void onResultReceived(Cursor cursor) { // Logger.d("BatchUpdate TRANSACTION for %s completed", where.getTable()); if (!this.subscriber.isUnsubscribed()) { this.subscriber.onNext(null); this.subscriber.onCompleted(); } } @Override public boolean onReady(BaseTransaction baseTransaction) { return true; } @Override public boolean hasResult(BaseTransaction baseTransaction, Cursor cursor) { return true; } }