# チュートリアル: Thread
Threadとは
Threadとはひとつのプログラムの中で複数の制御の流れを扱うことが出来る機能です.OSで提供されるプロセスとは違ってthreadではメモリ空間が共有されます.
Rubyで使われているthreadはユーザレベルthreadと呼ばれるもので,rubyインタプリタ自身が自分でthreadの切替えを行っています.この方法は,OSで実装されているものよりも効率が低い,マルチCPUを活かすことが出来ない,というデメリットがありますが,その代わり移植性が高いというメリットがあります.
Threadの生成
新しいThreadを作るためにはThread.startというメソッドを使います.使い方は以下の通りです.
Thread.start { .... }
Thread.startは新しいthreadを作り,そのthreadでイテレータブロックを評価します.簡単なプログラムでthreadが動く様子を見てみましょう.
1 Thread.start {
2 while true
3 print "thread 1\n"
4 end
5 }
6
7 while true
8 print "thread 2\n"
9 end
このプログラムを動かすと「thread 1」と「thread 2」が混じって表示されるので,二つの無限ループが同時に動作しているのが分かると思います.このプログラムを終了させるためにはCtrl-Cを押してください.
Threadの操作
Threadクラスのメソッドは以下の通りです.
Thread.start {...}Thread.new {...}- 新しいthreadを生成し,その中でイテレータブロックを評価する.新しく生成されたthreadオブジェクトを返す.newはstart の別名.
Thread.current- 現在実行しているthreadオブジェクトを返す.
Thread.exit- 現在実行しているthreadを終了させる.
Thread.join thread- 指定したthreadの実行が終了するまで,現在のthreadを停止させる.
Thread.kill thread- 指定したthreadの実行を終了させる.
Thread.pass- 実行可能な他のthreadに明示的に制御を渡す.
Thread.stop- 現在のtheadの実行を停止する.他のthreadがthread#runを実行するまで停止し続ける
Thread#exit- レシーバのthreadの実行を終了させる.
Thread#run- レシーバの実行を再開させる.
Thread#stop- レシーバの実行を停止させる.
Thread#status- レシーバがまだ生きていれば真を返す.例外によってthreadが終了していればその例外を発生させる.
Thread#value- レシーバのイテレータブロックを評価した結果を返す.まだイテレータブロックの評価が終了していない時にはそのthreadが終了するまで待つ.
Thread間の同期
Threadはメモリ空間を共有しているのでThread間のデータのやりとりは普通の変数を使って行うことができますが,動作するタイミングを合わせるために同期を行う必要があります.この同期に失敗すると,来るはずの無いデータを待って永遠に待ち続けるデッドロックと呼ばれる状態になったり,期待するのと違うデータを受け取って見付けにくいバグの元になったりします.
Rubyのthreadライブラリでは二つの同期方法を提供しています.ひとつは同期だけを行うMutexとデータの受渡しも行うQueueです.これらのライブラリを使うためにはプログラムの先頭で
require "thread"
を呼び出しておく必要があります.
Mutex
Mutexとはmutual-exclusion lock(相互排他ロック)の略です.Mutexをロックしようとした時にすでにロックされていれば, threadはロックが解除されるまで停止します.
並行アクセスから共有データを保護するためには以下のようなコードを用いて行います(ここでmをMutexのインスタンスとします).
begin m.lock # mで保護される共有データへのアクセス ensure m.unlock end
同じことをより簡単に行うためMutexにはsynchronizeというメソッドがあります.
m.synchronize {
# mで保護される共有データへのアクセス
}
例として簡単なプログラムを用意してみましょう.
1 require "thread"
2
3 m = Mutex.new
4 v = 0; # mで保護されるデータ
5
6 Thread.start {
7 while true
8 m.synchronize {
9 v = v + 100
10 }
11 end
12 }
13
14 while true
15 m.synchronize {
16 v = v - 33
17 }
18 end
このプログラムをMutexで保護しないと,タイミングによっては vの値を取り出してから代入までの間に他のthreadによって値が変更されてしまう可能性があります.
Mutexのメソッドは以下の通りです.
Mutex.new- 新しいロックを生成する
Mutex#lock- ロックする.すでにロックされている場合にはロックが解除されるまで待つ.
Mutex#unlock- ロックを解除する.ロックを待っている他のthreadがあればそちらを走らせる.
Mutex#synchronize- ロックの獲得から解除までを行うイテレータ.
Mutex#try_lock- ロックを獲得する.すでにロックされている場合には停止せず FALSEを返す.
Queue
Queueはデータを読み書きするパイプのようなものです.データを提供するthreadは一方からデータを書き込み,読み出すthreadはもう一方からデータを取り出します.Queueに読み出すデータが残っていない時には読み出そうとしたthreadはデータが来るまで停止します.
Queueを使った簡単なプログラムは以下のようになります.
1 require "thread"
2
3 q = Queue.new
4
5 th = Thread.start {
6 while line = q.pop
7 print line
8 end
9 }
10
11 while gets
12 q.push $_
13 end
14 q.push nil # 終了の印
15 th.join
このプログラムではひとつのthreadが読み込んだ行をもうひとつの threadが出力しています.3行目を「q = []」などとして配列に変えてみるとthread間の同期が取れず,正しく動かないことが分かるでしょう.
Queueのメソッドは以下の通りです.
Queue.new- 新しいQueueを生成します.
Queue.empty?- Queueが空の時真を返します.
Queue.push value- Queueにvalueを追加します.
Queue.pop [non_block]- Queueからデータを取り出します.偽でない引数non_blockが与えられた場合にはQueueが空の時に例外を発生させます.それ以外の場合にはQueueが空の時にはQueueにデータが追加されるまで読み出したthreadを停止させます.
例題
並列プログラミングの世界では昔から有名な「哲学者の食事」問題を作ってみましょう.
「哲学者の食事」問題とは以下のような状況で哲学者がどうやって同期をとるかという問題です.
N人の哲学者が丸いテーブルに座っています.テーブルの真中には大きなスパゲティの皿が置いてあります.またN本のフォークがあって哲学者と哲学者の席の間に置いてあります.哲学者は思索を続けていますが,お腹がすくと両側のフォークを取ってスパゲティを食べます.お腹が一杯になると食べるのを止めてフォークを返します.哲学者は紳士ですから,お腹が空いていても両方のフォークが手に入るまでは待ちます.
このプログラムを実行すると現在の状態を次々と表示します.各文字の意味は以下の通りです.
- o:
- 考えている哲学者
- *:
- 仕事している哲学者
- -:
- 使われていないフォーク
- |:
- 使われているフォーク
哲学者が考えている時間と食事している時間は乱数で決めています.
1 #
2 # The Dining Philosophers - thread example
3 #
4 require "thread"
5
6 N=7 # number of philosophers
7 $forks = []
8 for i in 0..N-1
9 $forks[i] = Mutex.new
10 end
11 $state = "-o"*N
12
13 def wait
14 sleep rand(20)/10.0
15 end
16
17 def think(n)
18 wait();
19 end
20
21 def eat(n)
22 wait();
23 end
24
25 def philosopher(n)
26 while true
27 think n
28 $forks[n].lock
29 if not $forks[(n+1)%N].try_lock
30 $forks[n].unlock # avoid deadlock
31 next
32 end
33 $state[n*2] = ?|;
34 $state[(n+1)%N*2] = ?|;
35 $state[n*2+1] = ?*;
36 print $state, "\n"
37 eat(n)
38 $state[n*2] = ?-;
39 $state[(n+1)%N*2] = ?-;
40 $state[n*2+1] = ?o;
41 print $state, "\n"
42 $forks[n].unlock
43 $forks[(n+1)%N].unlock
44 end
45 end
46
47 for i in 0..N-1
48 Thread.start{philosopher(i)}
49 sleep 0.1
50 end
51 sleep
52 exit
Ruby