状態機械を合成してデッドロックを検出できる Go 言語パッケージを作ってみました
はじめに
マルチスレッドで動作するプログラムの設計は難しい問題です。個々のスレッドの動作は単純に見えても、複数が並行して動作する場合の動作は組み合わせ論的に複雑になります。また、タイミングに依存する不具合は狙って再現することが難しく、通常の単体テストによる検出にも限界があります。
そんなとき、有効な手法がモデル検査です。システムの取りうる状態をあらかじめ網羅的に探索することで、「実際に動作させた際にごく低い確率で踏むバグ」であっても、動作させることなく設計段階で発見することが可能になります。
ところでちょうど先日、デッドロック発見器を自作するハンズオンに参加する機会がありました。内容は非常にシンプルなモデル検査器を実装するというもので、せっかくなのでそのときの成果物を Go のパッケージとしてまとめたものを以下に公開しました。
以下、このパッケージで何ができるのかを具体例とともに紹介します。
例 1: 食事する哲学者の問題
まず、上に貼った GitHub の README にも挙げてある食事する哲学者の問題を試してみましょう。サンプルコードはこちらです。
このパッケージは、状態機械の遷移規則を定義するための DSL を提供します。例えば哲学者を表す状態機械は以下のようにして構築できます。
philo := func(me int, left, right vars.Name) deadlock.Process { return deadlock.NewProcess(). EnterAt("0"). Define(rule.At("0").Only(when.Var(left).Is(0)). Let("up_l", do.Set(me).ToVar(left)).MoveTo("1")). Define(rule.At("1").Only(when.Var(right).Is(0)). Let("up_r", do.Set(me).ToVar(right)).MoveTo("2")). Define(rule.At("2").Only(when.Var(right).Is(me)). Let("down_r", do.Set(0).ToVar(right)).MoveTo("3")). Define(rule.At("3").Only(when.Var(left).Is(me)). Let("down_l", do.Set(0).ToVar(left)).MoveTo("0")) }
単一のプロセスは deadlock.Process
で表現されます。プロセスたちは int
に値を取ることができる変数を共有しており、この変数を書き換えることで処理を行っています。ここでは哲学者を複数配置することを考えて、哲学者を識別する番号と左右のフォークを表す変数をパラメータとして与えられるようにしてあります。
定義されている遷移規則を読み下すと、個々の哲学者は以下のように動作することが主張されています。
- 実行は状態 0 からスタート
- 自分の左側のフォークを表す変数
left
の値が 0 なら、誰にも占有されていないと判断し、そこに自分の番号を代入して状態 1 に移行 - 同様に右側のフォーク
right
も確保して状態 2 に移行 - 取った時とは逆順で、右側のフォークの占有を解除するために 0 を代入して状態 3 に移行
- 残った左側のフォークも占有を解除して初期状態 0 に戻る
なお、実際のプロセスは状態 2 において獲得したフォークを使用する何らかのタスクを行うはずですが、ロックの取得や解放には直接関係しないため、ここではモデリングに含めていません。
さて、それでは定義を確認するために、まずは哲学者が一人しかいない場合について遷移グラフを書いてみましょう。検査を行うには、deadlock.System
に変数の初期値と Process
を登録します。以下は哲学者 P1
が一人だけいて、左右のフォークの状態を変数 f1
および f2
に割り当てています。
system := deadlock.NewSystem(). Declare(vars.Shared{"f1": 0, "f2": 0}). Register("P1", philo(1, "f1", "f2"))
検査器本体は deadlock.Detector
です。定義した System
を引数として Detect
を呼び出すことで検査が実行されて結果が返ります。
report, err := deadlock.NewDetector().Detect(system)
返ってきた Report
構造体から直接値を取り出すことも可能ですが、Graphviz の dot 形式で出力するためのプリティプリンタも提供されています。
_, err = deadlock.NewPrinter(os.Stdout).Print(report)
ここまでのコードを main()
内に記述し、得られた出力を dot
コマンドに渡すと次のような画像が得られるはずです。なお、Go 言語の map が順序不定になっている関係で、出力は実行ごとに若干異なる可能性があります。
各時点でプロセスがいる状態と変数の値が図示されています。青い部分が初期状態で、意図した通り、up_l
、up_r
、down_r
、down_l
の順に遷移して初期状態に戻ってきていることがわかります。
デッドロックの検出
次に、同じ動作をする哲学者が複数いる場合を考えましょう。よく知られている通り、この場合はデッドロックが発生します。
哲学者 philo
はあらかじめパラメータ化してあったので、同じ動作をするプロセスを複製して複数登録することも簡単です。
system := deadlock.NewSystem(). Declare(vars.Shared{"f1": 0, "f2": 0}). Register("P1", philo(1, "f1", "f2")). Register("P2", philo(2, "f2", "f1"))
二人目の哲学者 P2
は一人目の P1
と向かい合う形で座っていて、彼の左側にはフォーク f2
が、右側にはフォーク f1
があるとします。P1
から見たフォークとは左右が逆になっています。
同じようにして図示させてみると、デッドロックを表す赤い状態とそこへのエラートレース(のうち最初に見つけたもの)が表示されます。初期状態から P1.up_l
、P2.up_l
の順、すなわち「哲学者 1 が左のフォークを取る」「哲学者 2 が左のフォークを取る」と遷移した場合にデッドロックになることが検出されました。実際、デッドロック状態からは次に出て行く遷移がないことがわかります。
デッドロックの解消
それでは、哲学者の動作を少し変えて、このデッドロック状態を解消できるようにしてみましょう。
philo := func(me int, left, right vars.Name) deadlock.Process { return deadlock.NewProcess(). EnterAt("0"). Define(rule.At("0").Only(when.Var(left).Is(0)). Let("up_l", do.Set(me).ToVar(left)).MoveTo("1")). Define(rule.At("1").Only(when.Var(right).Is(0)). Let("up_r", do.Set(me).ToVar(right)).MoveTo("2")). // Add the new rule Define(rule.At("1").Only(when.Var(right).IsNot(0)). Let("down_l", do.Set(0).ToVar(left)).MoveTo("0")). Define(rule.At("2").Only(when.Var(right).Is(me)). Let("down_r", do.Set(0).ToVar(right)).MoveTo("3")). Define(rule.At("3").Only(when.Var(left).Is(me)). Let("down_l", do.Set(0).ToVar(left)).MoveTo("0")) }
追加された 2 行は「状態 1、すなわち左側のフォークを確保した時点で、もし右側のフォークが占有中であれば、確保した左側のフォークを一旦解放して戻る」という規則を表現しています。結果を図示してみると赤い状態がなくなっており、先ほどのデッドロックが解消したことがわかります。
なおこの修正を入れた場合であっても、すべての哲学者が「左側のフォークを確保」「右側のフォークが確保できないのでやり直し」だけを繰り返す、いわゆるライブロックの問題は解決していません。ライブロックの不在を保証する活性の検査は、現状スコープ外です。
例 2: 排他制御
先ほどの哲学者は二本のフォークで食事した後で最初に戻って無限にループしていましたが、今度は停止するタイプのプロセスを書いてみましょう。サンプルコードはこちらです。
二つのプロセスが一つのグローバル変数を共有していて、それぞれのプロセスはこの変数を 1 だけインクリメントするとします。より具体的には、各プロセスは次のように動作します。
コードは以下のようになります。HaltAt
に指定された状態は受理状態と見なされ、デッドロックとして検出されなくなります。複数のプロセスを合成した結果に対しては、全てのプロセスが受理状態にあるとき全体として受理状態であると見なされます。
proc := func(global, local vars.Name) deadlock.Process { return deadlock.NewProcess(). EnterAt("0"). Define(rule.At("0"). Let("read", do.CopyVar(global).ToVar(local)).MoveTo("1")). Define(rule.At("1"). Let("incr", do.Add(1).ToVar(local)).MoveTo("2")). Define(rule.At("2"). Let("write", do.CopyVar(local).ToVar(global)).MoveTo("3")). HaltAt("3") } system := deadlock.NewSystem(). Declare(vars.Shared{"var": 0, "tmp1": 0, "tmp2": 0}). Register("P", proc("var", "tmp1")). Register("Q", proc("var", "tmp2"))
得られた図が以下です。受理状態は二重丸として表現されており、出て行く遷移がないにも関わらず赤く表示されてはいません。
図中には三つの受理状態がありますが、よく見ると最終的に var = 1
になる場合(中央)と var = 2
になる場合(左右)があることがわかります。二つのプロセスが独立にインクリメントした結果として意図される値は 2 ですが、第一のプロセスが値を書き戻す (P.write
) より前に第二のプロセスの読み込み (Q.read
) が発生すると、書き込んだ値が上書きされてしまうためです。
ミューテックスによる直列化
この問題を解決するために、変数 mutex
を新しく導入し、値を操作する際にはロックを取得させるようにします。
proc := func(global, local, mutex vars.Name) deadlock.Process { return deadlock.NewProcess(). EnterAt("0"). // Add the new rule Define(rule.At("0").Only(when.Var(mutex).Is(0)). Let("lock", do.Set(1).ToVar(mutex)).MoveTo("1")). Define(rule.At("1"). Let("read", do.CopyVar(global).ToVar(local)).MoveTo("2")). Define(rule.At("2"). Let("incr", do.Add(1).ToVar(local)).MoveTo("3")). Define(rule.At("3"). Let("write", do.CopyVar(local).ToVar(global)).MoveTo("4")). // Add the new rule Define(rule.At("4"). Let("unlock", do.Set(0).ToVar(mutex)).MoveTo("5")). HaltAt("5") } system := deadlock.NewSystem(). Declare(vars.Shared{"var": 0, "tmp1": 0, "tmp2": 0, "mut": 0}). Register("P", proc("var", "tmp1", "mut")). Register("Q", proc("var", "tmp2", "mut"))
動作の最初でロックを取得し、最後に解放する動作を追加しています。表示された図の受理状態ではいずれも var = 2
となっており、各プロセスがread
、incr
、write
の遷移を連続して割り込まれることなく実行していることがわかります。
例 3: 生産者・消費者問題
今度はもう少し複雑な動作をモデリングしてみましょう。
二つのプロセス「生産者」と「消費者」がキューを挟んで接続されている状況を考えます。生産者がキューに要素を詰めるプロセス、消費者はそれを取り出すプロセスです。
キューが容量いっぱいの時、生産者は要素の生成をブロックされます。逆に消費者は、キューが空の場合には要素が取り出せないためブロックされます。ただし、単にブロックされているのではなく、待ち状態に移行して待機するとします。
この待ち状態は、条件変数に送られるシグナルを検知することで解除されます。つまり、生産者は「キューから要素が取り出され空きができた」という条件変数、消費者は「キューに要素が投入された」という条件変数をそれぞれ監視します。
より具体的には、生産者は以下のように動作します。
- まずキューにアクセスするためにロックを取得
- キューを確認し、容量に空きがなければロックを解放して待ち状態に入る
- 待ち状態では条件変数へのシグナルを待ち、受信したら最初に戻る
- キュー容量に空きがあった場合、要素を一つ追加
- 条件変数にシグナルを送り、消費者が待ち状態にいれば起動させる
- キューに対するロックを解放し、最初に戻る
同様に、消費者は以下のように動作します。
- まずキューにアクセスするためにロックを取得
- キューを確認し、一つも要素がなければロックを解放して待ち状態に入る
- 待ち状態では条件変数へのシグナルを待ち、受信したら最初に戻る
- キューに要素が存在した場合、その要素を一つ削除
- 条件変数にシグナルを送り、生産者が待ち状態にいれば起動させる
- キューに対するロックを解放し、最初に戻る
これをそのまま実装したソースコードは次のようになります。
- キュー内の要素の個数:
queue
- キューへのアクセスを保護するミューテックス:
mutex
- キューが容量一杯になっているフラグ:
over
- キューが空になっているフラグ:
under
capacity
はキューの最大容量です。今回はキュー内の要素が何かは考えず、要素の個数だけを問題にします。
capacity = 1 producer := func(queue, mutex, over, under vars.Name) deadlock.Process { return deadlock.NewProcess(). EnterAt("0"). Define(rule.At("0").Only(when.Var(mutex).Is(0)). Let("lock", do.Set(1).ToVar(mutex)).MoveTo("1")). Define(rule.At("1").Only(when.Var(queue).Is(capacity)). Let("unlock", do.Set(0).ToVar(mutex)).MoveTo("2")). Define(rule.At("2"). Let("wait", do.Set(1).ToVar(over)).MoveTo("3")). Define(rule.At("3").Only(when.Var(over).Is(0)). Let("wakeup", do.Nothing()).MoveTo("0")). Define(rule.At("1").Only(when.Var(queue).IsLessThan(capacity)). Let("produce", do.Add(1).ToVar(queue)).MoveTo("4")). Define(rule.At("4"). Let("signal", do.Set(0).ToVar(under)).MoveTo("5")). Define(rule.At("5"). Let("unlock", do.Set(0).ToVar(mutex)).MoveTo("0")) } consumer := func(queue, mutex, over, under vars.Name) deadlock.Process { return deadlock.NewProcess(). EnterAt("0"). Define(rule.At("0").Only(when.Var(mutex).Is(0)). Let("lock", do.Set(1).ToVar(mutex)).MoveTo("1")). Define(rule.At("1").Only(when.Var(queue).Is(0)). Let("unlock", do.Set(0).ToVar(mutex)).MoveTo("2")). Define(rule.At("2"). Let("wait", do.Set(1).ToVar(under)).MoveTo("3")). Define(rule.At("3").Only(when.Var(under).Is(0)). Let("wakeup", do.Nothing()).MoveTo("0")). Define(rule.At("1").Only(when.Var(queue).IsGreaterThan(0)). Let("consume", do.Add(-1).ToVar(queue)).MoveTo("4")). Define(rule.At("4"). Let("signal", do.Set(0).ToVar(over)).MoveTo("5")). Define(rule.At("5"). Let("unlock", do.Set(0).ToVar(mutex)).MoveTo("0")) } system := deadlock.NewSystem(). Declare(vars.Shared{"que": 0, "mut": 0, "over": 0, "under": 0}). Register("P", producer("que", "mut", "over", "under")). Register("C", consumer("que", "mut", "over", "under"))
この実装では、キューの状態を確認して駄目ならロックを解放する遷移 unlock
と自分が待ち状態に入ったフラグを立てる遷移 wait
が、連続して発生するとは限らないことに注意してください。この二動作の間に、相手プロセスから割り込まれる可能性があります。
検査した結果が上の図です。例えば右のほうのデッドロックに至る経緯を観察すると、P.unlock
、C.lock
、C.consume
、C.signal
、P.wait
というシーケンス、すなわち生産者がロックを解放した直後、待ち状態に入る前に消費者がシグナルを発信していることがわかります。このとき、生産者はシグナルを受け取ることができず、そのまま待ち状態で固定されてしまいます。
アトミック操作による割り込みの禁止
そこで、unlock
と wait
の間で他のプロセスが動かないように一つの遷移にまとめてしまうことを考えます。現状、deadlock
パッケージが提供する DSL の範囲では一回の遷移で複数の変数を変更することができませんが、直接書き下すことにより実装が可能です。ロックを解放したと同時に待ち状態に入る遷移は以下のようになります。
waitConditionVar := func(mutex, cond vars.Name) do.Action { return func(vs vars.Shared) (vars.Shared, error) { newVars := vs.Clone() newVars[mutex] = 0 newVars[cond] = 1 return newVars, nil } }
これを用いて、先ほどの生産者と消費者の定義にあった「ロックの解放」「待ち状態への遷移」を一つの不可分な遷移に置き換えます。検査してみると、先程までのデッドロックが解消していることがわかります。ちなみに、キューの容量 capacity
を増やしてもやはりデッドロックが発生することはありません。
producer := func(queue, mutex, over, under vars.Name) deadlock.Process { return deadlock.NewProcess(). EnterAt("0"). Define(rule.At("0").Only(when.Var(mutex).Is(0)). Let("lock", do.Set(1).ToVar(mutex)).MoveTo("1")). // Replaced Define(rule.At("1").Only(when.Var(queue).Is(capacity)). Let("wait", waitConditionVar(mutex, over)).MoveTo("3")). Define(rule.At("3").Only(when.Var(over).Is(0)). Let("wakeup", do.Nothing()).MoveTo("0")). Define(rule.At("1").Only(when.Var(queue).IsLessThan(capacity)). Let("produce", do.Add(1).ToVar(queue)).MoveTo("4")). Define(rule.At("4"). Let("signal", do.Set(0).ToVar(under)).MoveTo("5")). Define(rule.At("5"). Let("unlock", do.Set(0).ToVar(mutex)).MoveTo("0")) } consumer := func(queue, mutex, over, under vars.Name) deadlock.Process { return deadlock.NewProcess(). EnterAt("0"). Define(rule.At("0").Only(when.Var(mutex).Is(0)). Let("lock", do.Set(1).ToVar(mutex)).MoveTo("1")). // Replaced Define(rule.At("1").Only(when.Var(queue).Is(0)). Let("wait", waitConditionVar(mutex, under)).MoveTo("3")). Define(rule.At("3").Only(when.Var(under).Is(0)). Let("wakeup", do.Nothing()).MoveTo("0")). Define(rule.At("1").Only(when.Var(queue).IsGreaterThan(0)). Let("consume", do.Add(-1).ToVar(queue)).MoveTo("4")). Define(rule.At("4"). Let("signal", do.Set(0).ToVar(over)).MoveTo("5")). Define(rule.At("5"). Let("unlock", do.Set(0).ToVar(mutex)).MoveTo("0")) }
まとめ
以上、Go 言語でグローバル変数を共有した複数の状態機械を定義し、並行動作させた場合に生じうるデッドロックを検出するデモを紹介しました。トイモデルではありますが、これだけでも意外と色々なものが実装できて、遷移グラフを描いてみると割と面白い結果が出ます。よかったら遊んでみて、もし気に入ったらスターを付けてやってください。
実際にはまだ色々直したいところはあって、
- 最初の n 個のデッドロック状態を発見した時点で探索を打ち切る
- デッドロック以外にもユーザが与えたアサーションに対する違反を検出する
- 結果表示の見た目をオプションで変更できるようにする
- 状態のハッシュ値を使いまわすことで、内部の無駄な計算を削る
など、時間を見つけて改良したいと思っていますが、それはまた別の話。