public class RxBluetooth { private final Context context; private final BluetoothAdapter adapter; private static final Observable.Transformer BOND_STATUS_TRANSFORMER = statusObservable -> statusObservable.map(status -> { switch (status) { case BluetoothDevice.BOND_NONE: default: return BondStatus.NONE; case BluetoothDevice.BOND_BONDING: return BondStatus.BONDING; case BluetoothDevice.BOND_BONDED: return BondStatus.BONDED; } }); public enum BondStatus { NONE, BONDING, BONDED } public RxBluetooth(Context context, BluetoothAdapter bluetoothAdapter) { this.context = context.getApplicationContext(); this.adapter = bluetoothAdapter; } public Observable bondStatus(@NonNull final BluetoothDevice device) { return Observable.defer(() -> Observable.just(device.getBondState()).compose(BOND_STATUS_TRANSFORMER)); } public Observable bond(@NonNull final BluetoothDevice device) { return Observable.create(subscriber -> { bondStatus(device).subscribe(bondStatus -> { switch (bondStatus) { case NONE: observeDeviceBonding(context, device).compose(BOND_STATUS_TRANSFORMER).subscribe(subscriber); try { final boolean bonding = BluetoothCompat.createBondCompat(device); if (!bonding) { subscriber.onError(new BluetoothBondingException("Can't initiate a bonding operation!")); } } catch (Exception e) { subscriber.onError(new BluetoothIncompatibleBondingException(e)); } break; case BONDING: subscriber.onError(new BluetoothBondingException("device is already in the process of bonding")); break; case BONDED: subscriber.onNext(BondStatus.BONDED); subscriber.onCompleted(); break; } }); }); } public Observable removeBond(@NonNull final BluetoothDevice device) { return Observable.defer(() -> { for (BluetoothDevice bondedDevice : adapter.getBondedDevices()) { if (bondedDevice.getAddress().equals(device.getAddress())) { try { final boolean removeBond = BluetoothCompat.removeBondCompat(device); if (!removeBond) { return Observable.error(new BluetoothBondingException("Can't delete the bonding for this device!")); } } catch (Exception e) { return Observable.error(new BluetoothIncompatibleBondingException(e)); } } } return Observable.just(BondStatus.NONE); }); } private static Observable observeDeviceBonding(@NonNull final Context context, @NonNull final BluetoothDevice device) { return observeBroadcast(context, new IntentFilter(BluetoothDevice.ACTION_BOND_STATE_CHANGED)).filter(pair -> { BluetoothDevice bondingDevice = pair.getValue1().getParcelableExtra(BluetoothDevice.EXTRA_DEVICE); return bondingDevice.equals(device); }) .map(pair1 -> pair1.getValue1().getIntExtra(BluetoothDevice.EXTRA_BOND_STATE, -1)) .skipWhile(state -> state != BluetoothDevice.BOND_BONDING) .takeUntil(state -> state == BluetoothDevice.BOND_BONDED || state == BluetoothDevice.BOND_NONE); } private static Observable> observeBroadcast(final Context context, final IntentFilter filter) { return Observable.create(new Observable.OnSubscribe>() { @Override public void call(Subscriber> subscriber) { Enforcer.onMainThread(); final BroadcastReceiver receiver = new BroadcastReceiver() { @Override public void onReceive(Context context, Intent intent) { subscriber.onNext(Pair.with(context, intent)); } }; context.registerReceiver(receiver, filter); subscriber.add(Subscriptions.create(() -> context.unregisterReceiver(receiver))); } }); } }