非同期でファイルからデータを読み込んでRxPYで利用するためのソースを生成する処理を作成しました。 asyncore from __future__ import print_function import asyncore import os import rx class AsyncFileReader(asyncore.file_dispatcher, object): def __init__(self, file_name, observer, size=None): fd = os.open(file_name, os.O_RDONLY) super(AsyncFileReader, self).__init__(fd) self.observer = observer self.size = size self.data = "" def handle_read(self): self.data += self.recv(self.size) def handle_close(self): self.observer.on_next(self.data) self.close() def writable(self): return False def form_async_file_read(file_names, size=1024, timeout=30, use_poll=False): def subscribe(observer): map(lambda file_name: AsyncFileReader(file_name, observer, size), file_names) asyncore.loop(timeout, use_poll) observer.on_completed() return subscribe if __name__ == "__main__": rx.Observable.create(form_async_file_read(["a.txt", "b.txt"], use_poll=True)).subscribe(print)