コンテンツへスキップ
ものがたり
戻る

Rx覚え書き (Observable以外編)

mono-reactiveを実装していた時の経験をもとに、Rxの諸クラスについて書けることをつらつら書いてみようと思う。とりあえずObservableクラスは膨大なので、それ以外について書く。例によってRxの入門的な記事はぐぐればいくつか出てくると思うので、それらを読んでもらえればと思う。

System.Reactive.Concurrency

ISchedulerの実装がこのネームスペースで公開されている。

ISchedulerのObservableクラスにおける利用には、2つのパターンがある:

ImmediateSchedulerとCurrentThreadSchedulerは、現在のスレッド上で同期的に指定されたタスクを実行する(つまり後者)。この2つの違いは、再帰的なタスクの扱いにあるのだけど、FAQなので詳しい説明は余所に丸投げする。

EventLoopScheduler, NewThreadScheduler, TaskPoolScheduler, ThreadPoolSchedulerは、それぞれの方法で生成されたスレッド上でタスクを実行するために存在するスケジューラーだ(つまり後者)。

SynchronizationContextSchedulerは、指定されたタスクを、コンストラクタ引数のSynchronizationContext上で実行する(つまり後者)。その利点は例示で説明してきた通りだ。

HistoricalScheduler(Base)とVirtualTimeScheduler(Base)、そしてTestScheduler (Microsoft.Reactive.TestingでもMono.Reactive.Testingでも)は、時間を任意に進めることが出来るスケジューラーだ(つまり前者)。もしIObservableの実装クラスが、そのSubscribe()の内部において、適切にIScheduler.Schedule()を使用して時間依存の処理を実装していれば、これらのスケジューラーを使うことで、「時間」を好きなように制御しながらタスクを実行することができる。「1分待ってタイムアウトする」コードをテストするのに、実際に1分待つ必要はなくなる。

ISchedulerで時間を制御するために必要なこと

独自のObservable拡張メソッドを定義する場合など、IObservableを返すコードを実装する場合、時間を制御するISchedulerを有効に活用するためには、以下の点に注意する必要がある。

(mono-reactiveのObservable実装も、これらを考慮している。)

System.Reactive.Disposables

さまざまなIDisposableの実装がこのネームスペースで公開されている。

IDisposableはRxでは重要なインターフェースであり、様々な場面で利用されている。

System.Reactive.Disposablesの各クラスは、Rxの機能を使うだけの人にとっては、あまり使う機会が無い、かもしれない。IObservableを実装したり、Rxの諸機能を拡張したりする人が活用するためにあると言える。

DisposableプロパティとIsDisposedのメリット

SingleAssignmentDisposable, MultipleAssignmentDisposable, SerialDisposable, CompositeDisposableのもう一つの大きな利点は、Dispose()を遅延評価出来る点である。

たとえば、EventLoopSchedulerにdueTime付きでFuncを登録した場合、一定時間が経過したらそのfuncを呼び出してIDisposableをdisposeする処理対象にしなければならない。一方でdueTimeに至る前にSchedule()の戻り値がキャンセルされた場合は、そのfunc自体が呼び出されないことになるので、結果的にIDisposableも返されず、処理する必要がない。

Rxではこういう場面が多々存在するが、その度に if (childDisposable != null) childDisposable.Dispose() のようなコードを書くのは煩雑だ。また、Schedule()の戻り値が先にDispose()されてから、タスクがIDisposableを生成する(返す)ような状況が発生することもしばしばある。このような場面では、「Disposableプロパティに設定されたオブジェクトがあれば、それをDispose()する。既に自身がDispose()されていたら、その後Disposableプロパティに設定されたIDisposableも直ちにDispose()する」という動作が、処理を簡潔にしてくれるというわけだ。

System.Reactive.Subjects

ここにはobservableでありかつobserverにもなるISubjectの型がある。

ISubjectの実装であるAsyncSubject, BehaviorSubject, ReplaySubjectの挙動の違いとそれがどのように活かされているかを知るには、IConnectableObservableを返すObservableのメソッドを見ると良い。以下のクラスが、それぞれ対称的に使用されている。

ISubjectは、他のIObservableで受け取ったnext/error/completedの各イベントを、そのまま他のIObserverにSubscribe()で転送できるので、mono-reactiveではObservableの実装で多大に活用している。(実際には受け取り済みのイベントをバッファリング出来るようにReplaySubjectが多く使用されているが、将来のバージョンで不必要なバッファリングを行わないように変更したいと考えている。)

その他のネームスペース

System.Reactive.Joinsネームスペースには、Observable.And()で使用されるPatternと、さらにThen()で返されるPlanがある。PlanはObservable.When()の引数になる(それ以外に用途が無い)。これらの型はpublic APIとしてはほとんど意味が無く、実装側としても特別に重要な存在ではない。ActionやFuncの引数の数に引きずられた結果として型が多すぎるので、ネームスペースを隔離したものと考えられる。

System.Reactive.Linqネームスペースには、RxのコアとなるObservableと、IGroupedObservableがある。ObservableはRxの中心的な存在であり、そのメソッドを個別に説明されるべきものだ。IGroupedObservableはGroupBy()で使われるkeyed observableで、それほど重要な存在ではない。

System.Reactive.Threading.Tasksネームスペースでは、Task Parallel Library (System.Threading.Tasks) に基づく機能を実装している。Silverlight4 / WP7.5以前の環境でRxを使えるようにするために切り離されたネームスペースということになろう。

System.Reactiveネームスペースには、「その他」とでも言うべきいくつかのクラスが存在している。

Systemネームスペースには、ObservableExtensionsのみ存在しており、大半がObserverのメソッドを呼び出すだけで実装出来ている。いずれもObservableクラスで拡張することは出来たはずだが、Systemネームスペースにある型のみで実現出来るメソッドは、このネームスペースで実装しておけば、System.Reactive.*をusingでインポートする必要もなくなる、という意図であろう。

続く…?

Observable以外編と書いたのだけど、Observableについて書くかどうかは今後の気分次第。


この記事を共有:

前の記事
updates
次の記事
mono-reactive v0.1 released