Rxjava操作符:第1篇 Creating Observables

本文基于Rxjava 2.x版本,介绍用于创建 Observable 对象的操作符。

Operators that originate new Observables.

  • Create — create an Observable from scratch by calling observer methods programmatically
  • Defer — do not create the Observable until the observer subscribes, and create a fresh Observable for each observer
  • Empty/Never/Throw — create Observables that have very precise and limited behavior
  • From — convert some other object or data structure into an Observable
  • Interval — create an Observable that emits a sequence of integers spaced by a particular time interval
  • Just — convert an object or a set of objects into an Observable that emits that or those objects
  • Range — create an Observable that emits a range of sequential integers
  • Repeat — create an Observable that emits a particular item or sequence of items repeatedly
  • Start — create an Observable that emits the return value of a function
  • Timer — create an Observable that emits a single item after a given delay

create 操作符

通过 create 方法构造 Observable 对象,通过被观察者的 subcribe 方法建立起观察者与被观察者的联系。

create example

1
2
3
4
5
6
7
8
9
Observable<String> observable = Observable.create(emitter -> {
emitter.onNext("Hello");
emitter.onNext("World");
emitter.onComplete();
emitter.onError(new NullPointerException());
});
observable.subscribe(System.out::println,
Throwable::printStackTrace,
() -> System.out.println("onComplete"));
create 操作符

defer 操作符

一个允许延迟操作的操作符,直到 ObservableSource 订阅了观察者 Observer,发射源 emitting item 才会发射数据。defer会为每一个 Observer 观察者对象创建新的 Observable ,所以下面两次打印数据是不同的。而 create 操作符观察者对象无论被订阅多少次,数据都是相同的。

defer example

1
2
3
4
5
6
7
8
9
Observable<Long> observable = Observable.defer(() -> {
long time = System.currentTimeMillis();
return Observable.just(time);
});

observable.subscribe(time -> System.out.println(time));
Thread.sleep(1000);
observable.subscribe(time -> System.out.println(time));
//print 1562638410886 1562638411891

empty 操作符

这个操作符会生成一个没有发射数据的 Observable 对象,只会且直接调用 Observer#onComplete 方法。

empty example

1
2
3
4
5
6
7
Observable<String> empty = Observable.empty();

empty.subscribe(
v -> System.out.println("This should never be printed!"),
error -> System.out.println("Or this!"),
() -> System.out.println("Done will be printed."));
//print Done will be printed.

never 操作符

never 操作符不会调用观察者对象的 onNext、onComplete 或者 onError 方法,它主要用来测试或者禁用组合操作符中的某些 Observable 对象。

never example

1
2
3
4
5
6
Observable<String> never = Observable.never();

never.subscribe(
v -> System.out.println("This should never be printed!"),
error -> System.out.println("Or this!"),
() -> System.out.println("This neither!"));

error 操作符

error 操作符只会调用 Observer#error 方法,常见 error 使用场景:

error example

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
Observable<String> results = Observable.fromCallable((Callable<String>) () -> {
if (Math.random() < 0.5) {
throw new IOException();
}
throw new IllegalArgumentException();
}).onErrorResumeNext(error -> {
if (error instanceof IllegalArgumentException) {
return Observable.empty();
}
return Observable.error(error);
});

for (int i = 0; i < 10; i++) {
results.subscribe(
v -> System.out.println("This should never be printed!"),
error -> error.printStackTrace(),
() -> System.out.println("Done"));
}

from 操作符

fromIterable 操作符

根据 Iterable (类似 List,Set,Collection 等) 对象创建可观察者模型。

fromIterable example

1
2
3
4
5
6
List<Integer> list = new ArrayList<>(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8));

Observable<Integer> observable = Observable.fromIterable(list);

observable.subscribe(item -> System.out.println(item), error -> error.printStackTrace(),
() -> System.out.println("Done"));

fromArray 操作符

根据数组创建可观察者模型。

1
2
3
4
5
6
7
8
9
Integer[] array = new Integer[10];
for (int i = 0; i < array.length; i++) {
array[i] = i;
}

Observable<Integer> observable = Observable.fromArray(array);

observable.subscribe(item -> System.out.println(item), error -> error.printStackTrace(),
() -> System.out.println("Done"));

fromCallable 操作符

当订阅事件发生,Callable中的方法会被调用,返回值会转发至观察者。

1
2
3
4
5
6
7
8
9
10
Callable<String> callable = () -> {
System.out.println("Hello World!");
return "Hello World!";
};
Observable<String> observable = Observable.fromCallable(callable);

observable.subscribe(item -> System.out.println(item), error -> error.printStackTrace(),
() -> System.out.println("Done"));

//print Hello World! Hello World! Done

fromAction 操作符

//待验证…

1
2
3
4
5
Action action = () -> System.out.println("Hello World!");

Completable completable = Completable.fromAction(action);

completable.subscribe(() -> System.out.println("Done"), error -> error.printStackTrace());

fromRunnable 操作符

//待验证…

1
2
3
4
5
Runnable runnable = () -> System.out.println("Hello World!");

Completable completable = Completable.fromRunnable(runnable);

completable.subscribe(() -> System.out.println("Done"), error -> error.printStackTrace());

fromFuture 操作符

给定预先存在的,已经运行或已经完成的java.util.concurrent.Future,等待Future正常完成或以阻塞方式使用异常并将生成的值或异常转发给使用者。

1
2
3
4
5
6
7
8
9
10
11
12
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
//延迟1秒
Future<String> future = executor.schedule(() -> "Hello world!", 1, TimeUnit.SECONDS);

Observable<String> observable = Observable.fromFuture(future);

observable.subscribe(
item -> System.out.println(item),
error -> error.printStackTrace(),
() -> System.out.println("Done"));

executor.shutdown();

from{reactive type} 操作符

//待验证…

1
2
3
4
5
6
7
8
Flux<Integer> reactorFlux = Flux.fromCompletionStage(CompletableFuture.<Integer>completedFuture(1));

Observable<Integer> observable = Observable.fromPublisher(reactorFlux);

observable.subscribe(
item -> System.out.println(item),
error -> error.printStackTrace(),
() -> System.out.println("Done"));

generate 操作符

//待验证…

1
2
3
4
5
6
7
8
int startValue = 1;
int incrementValue = 1;
Flowable<Integer> flowable = Flowable.generate(() -> startValue, (s, emitter) -> {
int nextValue = s + incrementValue;
emitter.onNext(nextValue);
return nextValue;
});
flowable.subscribe(value -> System.out.println(value));

interval 操作符

interval操作符指定定时发送发出数据的时间间隔,interval 有多个重载方法,最终都会调用下面的方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* @param initialDelay 发射第一条数据的延迟时间
* @param period 其它数据的延迟发射周期
* @param unit 时间单位
* @param scheduler 任务调度器
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.CUSTOM)
public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit, Scheduler scheduler) {
ObjectHelper.requireNonNull(unit, "unit is null");
ObjectHelper.requireNonNull(scheduler, "scheduler is null");

return RxJavaPlugins.onAssembly(new ObservableInterval(Math.max(0L, initialDelay), Math.max(0L, period), unit, scheduler));
}

interval()的其它重载方法默认不设置 initialDleay 值时,initialDelay 的值将和 period 值保持一致,默认的调度器 Schedulers.computation()。在 java 中使用默认调度器的时候,interval 方法不会正常执行,程序会自动退出。interval 方法指定线程是未阻塞的,并不会阻止 JVM 退出程序。指定调度器 trampoline() 延长程序存活时间,interval 方法会正常执行。在Android中程序是在活跃的,不需要指定调度器。问题详情见 Rxjava issue

interval example

1
2
3
4
5
6
7
8
Observable<Long> clock = Observable.interval(1, TimeUnit.SECONDS, 						  Schedulers.trampoline());
clock.subscribe(time -> {
if (time % 2 == 0) {
System.out.println("Tick");
} else {
System.out.println("Tock");
}
});

just 操作符

根据 just 方法中顺序依次发射数据到下游观察者对象,just 方法可定义 1~9 个参数,但参数类型应保持一致。

just example

1
2
3
4
5
Observable<Object> observable = Observable.just("1", "A", "3.2", "def");

observable.subscribe(item -> System.out.print(item), error -> error.printStackTrace(),
() -> System.out.println());
//print 1A3.2def

range 操作符

发出指定范围的整数队列值,调用的是 onNext 方法。

range example

1
2
3
4
5
6
7
8
9
String greeting = "Hello World!";

Observable<Integer> indexes = Observable.range(0, greeting.length());

Observable<Character> characters = indexes
.map(index -> greeting.charAt(index));

characters.subscribe(character -> System.out.print(character), error -> error.printStackTrace(),
() -> System.out.println());

timer 操作符

timer(long, TimeUnit) 方法指定延迟时间发射出指定数据,不重复发出。

timer example

1
2
3
Observable<Long> eggTimer = Observable.timer(5, TimeUnit.MINUTES);

eggTimer.blockingSubscribe(v -> System.out.println("Egg is ready!"));

参考文章:

https://github.com/ReactiveX/RxJava/wiki/Creating-Observables

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×