Ruby
チュートリアル: Thread

«チュートリアル: 正規表現 Top RAAリニューアル»

# チュートリアル: Thread

Threadとは

Threadとはひとつのプログラムの中で複数の制御の流れを扱うことが出来る機能です.OSで提供されるプロセスとは違ってthreadではメモリ空間が共有されます.

Rubyで使われているthreadはユーザレベルthreadと呼ばれるもので,rubyインタプリタ自身が自分でthreadの切替えを行っています.この方法は,OSで実装されているものよりも効率が低い,マルチCPUを活かすことが出来ない,というデメリットがありますが,その代わり移植性が高いというメリットがあります.

Threadの生成

新しいThreadを作るためにはThread.startというメソッドを使います.使い方は以下の通りです.

Thread.start { .... }

Thread.startは新しいthreadを作り,そのthreadでイテレータブロックを評価します.簡単なプログラムでthreadが動く様子を見てみましょう.

1  Thread.start {
2     while true
3        print "thread 1\n"
4     end
5  }
6
7  while true
8     print "thread 2\n"
9  end

このプログラムを動かすと「thread 1」と「thread 2」が混じって表示されるので,二つの無限ループが同時に動作しているのが分かると思います.このプログラムを終了させるためにはCtrl-Cを押してください.

Threadの操作

Threadクラスのメソッドは以下の通りです.

Thread.start {...}
Thread.new {...}
新しいthreadを生成し,その中でイテレータブロックを評価する.新しく生成されたthreadオブジェクトを返す.newはstart の別名.
Thread.current
現在実行しているthreadオブジェクトを返す.
Thread.exit
現在実行しているthreadを終了させる.
Thread.join thread
指定したthreadの実行が終了するまで,現在のthreadを停止させる.
Thread.kill thread
指定したthreadの実行を終了させる.
Thread.pass
実行可能な他のthreadに明示的に制御を渡す.
Thread.stop
現在のtheadの実行を停止する.他のthreadがthread#runを実行するまで停止し続ける
Thread#exit
レシーバのthreadの実行を終了させる.
Thread#run
レシーバの実行を再開させる.
Thread#stop
レシーバの実行を停止させる.
Thread#status
レシーバがまだ生きていれば真を返す.例外によってthreadが終了していればその例外を発生させる.
Thread#value
レシーバのイテレータブロックを評価した結果を返す.まだイテレータブロックの評価が終了していない時にはそのthreadが終了するまで待つ.

Thread間の同期

Threadはメモリ空間を共有しているのでThread間のデータのやりとりは普通の変数を使って行うことができますが,動作するタイミングを合わせるために同期を行う必要があります.この同期に失敗すると,来るはずの無いデータを待って永遠に待ち続けるデッドロックと呼ばれる状態になったり,期待するのと違うデータを受け取って見付けにくいバグの元になったりします.

Rubyのthreadライブラリでは二つの同期方法を提供しています.ひとつは同期だけを行うMutexとデータの受渡しも行うQueueです.これらのライブラリを使うためにはプログラムの先頭で

require "thread"

を呼び出しておく必要があります.

Mutex

Mutexとはmutual-exclusion lock(相互排他ロック)の略です.Mutexをロックしようとした時にすでにロックされていれば, threadはロックが解除されるまで停止します.

並行アクセスから共有データを保護するためには以下のようなコードを用いて行います(ここでmをMutexのインスタンスとします).

begin
   m.lock
   # mで保護される共有データへのアクセス
ensure
   m.unlock
end

同じことをより簡単に行うためMutexにはsynchronizeというメソッドがあります.

m.synchronize {
   # mで保護される共有データへのアクセス
}

例として簡単なプログラムを用意してみましょう.

 1  require "thread"
 2
 3  m = Mutex.new
 4  v = 0;                # mで保護されるデータ
 5
 6  Thread.start {
 7     while true
 8        m.synchronize {
 9          v = v + 100
10        }
11     end
12  }
13
14  while true
15     m.synchronize {
16        v = v - 33
17    }
18  end

このプログラムをMutexで保護しないと,タイミングによっては vの値を取り出してから代入までの間に他のthreadによって値が変更されてしまう可能性があります.

Mutexのメソッドは以下の通りです.

Mutex.new
新しいロックを生成する
Mutex#lock
ロックする.すでにロックされている場合にはロックが解除されるまで待つ.
Mutex#unlock
ロックを解除する.ロックを待っている他のthreadがあればそちらを走らせる.
Mutex#synchronize
ロックの獲得から解除までを行うイテレータ.
Mutex#try_lock
ロックを獲得する.すでにロックされている場合には停止せず FALSEを返す.

Queue

Queueはデータを読み書きするパイプのようなものです.データを提供するthreadは一方からデータを書き込み,読み出すthreadはもう一方からデータを取り出します.Queueに読み出すデータが残っていない時には読み出そうとしたthreadはデータが来るまで停止します.

Queueを使った簡単なプログラムは以下のようになります.

 1 require "thread"
 2 
 3 q = Queue.new
 4 
 5 th = Thread.start {
 6    while line = q.pop
 7       print line
 8    end
 9 }
10 
11 while gets
12    q.push $_
13 end
14 q.push nil	# 終了の印
15 th.join

このプログラムではひとつのthreadが読み込んだ行をもうひとつの threadが出力しています.3行目を「q = []」などとして配列に変えてみるとthread間の同期が取れず,正しく動かないことが分かるでしょう.

Queueのメソッドは以下の通りです.

Queue.new
新しいQueueを生成します.
Queue.empty?
Queueが空の時真を返します.
Queue.push value
Queueにvalueを追加します.
Queue.pop [non_block]
Queueからデータを取り出します.偽でない引数non_blockが与えられた場合にはQueueが空の時に例外を発生させます.それ以外の場合にはQueueが空の時にはQueueにデータが追加されるまで読み出したthreadを停止させます.

例題

並列プログラミングの世界では昔から有名な「哲学者の食事」問題を作ってみましょう.

「哲学者の食事」問題とは以下のような状況で哲学者がどうやって同期をとるかという問題です.

N人の哲学者が丸いテーブルに座っています.テーブルの真中には大きなスパゲティの皿が置いてあります.またN本のフォークがあって哲学者と哲学者の席の間に置いてあります.哲学者は思索を続けていますが,お腹がすくと両側のフォークを取ってスパゲティを食べます.お腹が一杯になると食べるのを止めてフォークを返します.哲学者は紳士ですから,お腹が空いていても両方のフォークが手に入るまでは待ちます.

このプログラムを実行すると現在の状態を次々と表示します.各文字の意味は以下の通りです.

o:
考えている哲学者
*:
仕事している哲学者
-:
使われていないフォーク
|:
使われているフォーク

哲学者が考えている時間と食事している時間は乱数で決めています.

 1 #
 2 # The Dining Philosophers - thread example
 3 #
 4 require "thread"
 5 
 6 N=7    # number of philosophers
 7 $forks = []
 8 for i in 0..N-1
 9   $forks[i] = Mutex.new
10 end
11 $state = "-o"*N
12 
13 def wait
14   sleep rand(20)/10.0
15 end
16 
17 def think(n)
18   wait();
19 end
20 
21 def eat(n)
22   wait();
23 end
24 
25 def philosopher(n)
26   while true
27     think n
28     $forks[n].lock
29     if not $forks[(n+1)%N].try_lock
30       $forks[n].unlock    # avoid deadlock
31       next
32     end
33     $state[n*2] = ?|;
34     $state[(n+1)%N*2] = ?|;
35     $state[n*2+1] = ?*;
36     print $state, "\n"
37     eat(n)
38     $state[n*2] = ?-;
39     $state[(n+1)%N*2] = ?-;
40     $state[n*2+1] = ?o;
41     print $state, "\n"
42     $forks[n].unlock
43     $forks[(n+1)%N].unlock
44   end
45 end
46 
47 for i in 0..N-1
48   Thread.start{philosopher(i)}
49   sleep 0.1
50 end
51 sleep
52 exit
Last update on December 14, 2002 10:02

«チュートリアル: 正規表現 Top RAAリニューアル»