第21章 並行処理
並行処理とは何か
並行処理(Concurrency)とは、複数のタスクが同時に進行するプログラミングのアプローチです。これは、タスクが交互に実行されることで、同時に進行しているように見えるプロセスです。
並行処理と並列処理の違い
- 並行処理(Concurrency)は、複数のタスクが時間的に重なり合って実行されるプロセスを管理することです。並行処理では、一つのCPUがタスク間で切り替わりながら作業を進めます。
- 並列処理(Parallelism)は、複数のタスクが文字通り同時に実行されることです。これは、マルチコアプロセッサ上で異なるタスクが同時に実行される場合に発生します。
- 並行実行: ゴルーチンは複数のタスクを並行して実行する能力を提供します。
- 通信: チャネルはゴルーチン間での通信手段を提供し、データの送受信を可能にします。
並行処理の利点と挑戦
利点には、効率的なリソース利用、応答性の向上、待機時間の短縮などがあります。しかし、デッドロック、競合状態、リソース共有の問題など、特有の挑戦も伴います。
ゴルーチン(Goroutines)
ゴルーチンはGo言語における軽量スレッドのような存在で、関数またはメソッドを並行して実行するための仕組みです。goキーワードを使用して簡単に起動でき、OSスレッドよりもずっと少ないオーバーヘッドで実行できます。ゴルーチンは独自の実行パスを持ち、他のゴルーチンと独立して動作します。
チャネル(Channels)
チャネルは、ゴルーチン間でのデータのやり取りをするためのパイプラインです。チャネルを通じて、一方のゴルーチンから別のゴルーチンに安全にデータを送信することができます。チャネルは、データ競合や不整合を防ぎながら並行処理を行うために設計されています。
ゴルーチンとチャネルの関係
ゴルーチンとチャネルは互いに補完し合う関係にあります。ゴルーチンは並行処理のための実行単位を提供し、チャネルはそれらのゴルーチン間でデータを安全に交換するメカニズムを提供します。この関係は以下のように要約できます
ゴルーチン
ゴルーチンはGo言語における強力な機能の一つで、軽量スレッドのように動作し、並行処理を簡単に扱えるようにします。ゴルーチンの使用方法を理解することは、Go言語で効率的なプログラムを書くために不可欠です。以下にゴルーチンの使用例を示します。
ゴルーチンの基本
ゴルーチンは、goキーワードを使って関数またはメソッドを非同期で実行します。以下の例では、say関数をゴルーチンで起動しています。
package main
import (
"fmt"
"time"
)
func say(s string) {
for i := 0; i < 5; i++ {
time.Sleep(100 * time.Millisecond)
fmt.Println(s)
}
}
func main() {
go say("world")
say("hello")
}
このプログラムは、say関数を2回呼び出しています。一度はメインの実行パスで直接呼び出し、もう一度はgoキーワードを使ってゴルーチンで呼び出しています。この結果、"hello"と"world"の出力が交互に現れます。
ゴルーチンの同期
ゴルーチンが終了する前にプログラムが終了しないように、sync.WaitGroupを使用してゴルーチンの完了を待つことができます。
package main
import (
"fmt"
"sync"
"time"
)
var wg sync.WaitGroup
func worker(id int) {
defer wg.Done()
fmt.Printf("Worker %d starting\n", id)
time.Sleep(time.Second)
fmt.Printf("Worker %d done\n", id)
}
func main() {
for i := 1; i <= 5; i++ {
wg.Add(1)
go worker(i)
}
wg.Wait() // すべてのゴルーチンが完了するのを待ちます
}
この例では、main関数がworkerゴルーチンの実行が完了するのを待ってから終了するようにしています。wg.Add(1)でカウンターを増やし、worker関数の最初でdefer wg.Done()を呼び出してカウンターを減らします。main関数の最後でwg.Wait()を呼び出すと、カウンターが0になるまでブロックされ、すべてのゴルーチンが完了するのを待ちます。
チャネル
チャネルはGo言語の並行処理の中核をなす機能で、ゴルーチン間でのデータのやり取りを可能にします。チャネルを使用することで、データ競合やその他の並行性に関連する問題を避けながら、安全にデータを交換できます。ここでは、チャネルの基本的な使い方と、それを活用したいくつかの具体的な例を紹介します。
チャネルの基本
チャネルは、chanキーワードを使用して作成します。チャネルを通じて、ゴルーチン間でメッセージを送受信できます。
package main
import "fmt"
func main() {
messages := make(chan string) // チャネルの作成
// 別のゴルーチンからチャネルにメッセージを送信
go func() { messages <- "ping" }()
// メインゴルーチンでチャネルからメッセージを受信
msg := <-messages
fmt.Println(msg) // pingを出力
}
バッファ付きチャネル
チャネルはバッファ(容量)を持たせることができ、バッファ内の空きがあればブロックせずにデータを送信できます。
package main
import "fmt"
func main() {
messages := make(chan string, 2) // バッファサイズ2のチャネルを作成
// バッファに空きがあるため、これらの送信はブロックされない
messages <- "buffered"
messages <- "channel"
// これらの受信はバッファからデータを取得する
fmt.Println(<-messages) // bufferedを出力
fmt.Println(<-messages) // channelを出力
}
チャネルのクローズ
チャネルは、送信側がこれ以上送信するデータがないことを受信側に伝えるためにクローズできます。クローズされたチャネルからは無限にデータを受信できますが、新しいデータが送信されることはありません。
package main
import (
"fmt"
)
func produce(ch chan<- int) {
for i := 0; i < 5; i++ {
ch <- i
}
close(ch) // データの送信が終わったのでチャネルをクローズ
}
func main() {
ch := make(chan int)
go produce(ch)
// チャネルがクローズされるまで受信を続ける
for i := range ch {
fmt.Println(i)
}
}
チャネルとselect文
select文を使用すると、複数のチャネル操作を待機し、準備ができた操作を実行できます。
package main
import (
"fmt"
"time"
)
func main() {
c1 := make(chan string)
c2 := make(chan string)
go func() {
time.Sleep(1 * time.Second)
c1 <- "one"
}()
go func() {
time.Sleep(2 * time.Second)
c2 <- "two"
}()
for i := 0; i < 2; i++ {
select {
case msg1 := <-c1:
fmt.Println("Received", msg1)
case msg2 := <-c2:
fmt.Println("Received", msg2)
}
}
}
並行処理パターン
Go言語の並行処理パターンは、複雑な並行処理タスクをより管理しやすく効率的に実装するためのパワフルなテクニックを提供します。ここでは、特に「ファンインとファンアウト」、「タイムアウトとキャンセル」という二つの重要なパターンについて、具体的な例を交えて説明します。
ファンアウト
ファンアウトは、一つのタスクを複数のゴルーチンに分割して並行処理するパターンです。これにより、タスクの処理を並行して行うことができ、全体の処理時間を短縮できます。
package main
import (
"fmt"
"sync"
"time"
)
func worker(id int, jobs <-chan int, results chan<- int) {
for j := range jobs {
fmt.Println("worker", id, "processing job", j)
time.Sleep(time.Second)
results <- j * 2
}
}
func main() {
jobs := make(chan int, 100)
results := make(chan int, 100)
// 3つのワーカーをファンアウトする
for w := 1; w <= 3; w++ {
go worker(w, jobs, results)
}
// ジョブをキューに追加
for j := 1; j <= 9; j++ {
jobs <- j
}
close(jobs)
// 全ての結果を受け取る
for a := 1; a <= 9; a++ {
<-results
}
}
この例では、9つのジョブを3つのワーカーゴルーチンに分配しています。各ワーカーはジョブを並行して処理し、その結果をresultsチャネルに送信します。
ファンイン
ファンインは、複数のゴルーチンからの出力を一つのチャネルに統合するパターンです。これにより、並行処理の結果を簡単に収集・利用できます。
package main
import (
"fmt"
"sync"
)
func produce(ch chan<- int, start, end int) {
for i := start; i <= end; i++ {
ch <- i
}
}
func main() {
ch := make(chan int, 10)
go produce(ch, 1, 5)
go produce(ch, 6, 10)
// チャネルを閉じるためのWaitGroup
var wg sync.WaitGroup
wg.Add(2)
go func() {
wg.Wait()
close(ch)
}()
// 結果の印刷
for i := range ch {
fmt.Println(i)
}
}
この例では、produce関数を2つの異なる範囲で実行していますが、その結果は同じチャネルchに統合されています。
タイムアウトの実装
タイムアウトは、特定の時間が経過した後に処理を停止させるパターンです。例えば、外部APIへのリクエストが長引いてしまった場合に、処理を中断してリソースを解放することができます。
package main
import (
"context"
"fmt"
"time"
)
func operation(ctx context.Context) {
select {
case <-time.After(1 * time.Second): // 模擬的な重い処理
fmt.Println("Operation completed")
case <-ctx.Done():
fmt.Println("Operation canceled or timed out")
}
}
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
go operation(ctx)
// 処理の終了を待つ
select {
case <-ctx.Done():
switch ctx.Err() {
case context.DeadlineExceeded:
fmt.Println("Main: Operation timed out")
case context.Canceled:
fmt.Println("Main: Operation canceled")
}
}
time.Sleep(1 * time.Second) // 出力を見るために待つ
}
この例では、context.WithTimeoutを使用して、operation関数に500ミリ秒のタイムアウトを設定しています。タイムアウト時間内に処理が完了しない場合、ctx.Done()チャネルが閉じられ、処理が中断されます。
キャンセルの実装
キャンセルは、実行中の処理を任意のタイミングで停止させるパターンです。例えば、ユーザーが操作をキャンセルした場合や、アプリケーションが終了する際に、進行中のタスクを中断することができます。
package main
import (
"context"
"fmt"
"time"
)
func operation(ctx context.Context) {
select {
case <-time.After(2 * time.Second): // 模擬的な重い処理
fmt.Println("Operation completed")
case <-ctx.Done():
fmt.Println("Operation canceled")
}
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
go operation(ctx)
// 処理をキャンセルする
time.Sleep(1 * time.Second) // 模擬的な処理時間
cancel()
time.Sleep(1 * time.Second) // 出力を見るために待つ
}
この例では、context.WithCancelを使用して、1秒後にoperation関数をキャンセルしています。cancel()関数が呼び出されると、ctx.Done()チャネルが閉じられ、進行中の処理が中断されます。
並行処理のデバッグとパフォーマンス
Go言語での並行処理プログラムは強力でありながら、デバッグとパフォーマンスの最適化は挑戦的な部分です。並行処理における問題はしばしば非決定的で、再現が難しいため、特別なアプローチが必要になります。ここでは、並行処理のデバッグとパフォーマンス最適化に役立つ具体的な戦略を紹介します。
デッドロックの検出
デッドロックは、複数のゴルーチンがお互いに待ち合わせていて、いずれも進行できなくなる状態です。Go言語は、デッドロックを検出するための機能を提供しています。
package main
func main() {
ch := make(chan int)
ch <- 1 // この送信は受信者がいないため、ブロックされる
<-ch // この受信は送信が完了することがないため、到達不可能
}
このプログラムは実行時にデッドロックに陥り、Goランタイムによってデッドロックが検出されます。
競合状態の検出
競合状態は、複数のゴルーチンがデータに同時にアクセスし、そのうち少なくとも一つがデータの書き込みを行う状況で発生します。競合状態を検出するには、go run、go build、go testコマンドに-raceフラグを使用します。
package main
import "sync"
var counter int
func main() {
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
counter++ // 書き込み操作
}()
go func() {
defer wg.Done()
counter++ // 書き込み操作
}()
wg.Wait()
println(counter)
}
このコードを-raceフラグをつけて実行すると、競合状態が検出されます。
パフォーマンスプロファイリング
Go言語はパフォーマンスプロファイリングのための複数のツールを提供しています。これらは、プログラムのどの部分が最も時間を消費しているか、またはメモリを多く使用しているかを特定するのに役立ちます。
net/http/pprofパッケージを使用することで、アプリケーションにHTTPサーバーを組み込み、プロファイリング情報を収集できます。
package main
import (
"net/http"
_ "net/http/pprof"
"time"
)
func main() {
go func() {
for {
time.Sleep(time.Second)
}
}()
http.ListenAndServe("localhost:6060", nil)
}
このプログラムは、http://localhost:6060/debug/pprof/でプロファイリング情報にアクセスできるようになります。
実践的な並行処理の応用
Go言語における並行処理の応用は多岐にわたりますが、特にWebサーバーやデータ処理においてその真価を発揮します。ここでは、実践的な並行処理の応用として「並行処理を使ったWebサーバー」と「バッチ処理システムの並行化」に焦点を当てて、具体的な事例を交えて説明します。
並行処理を使ったWebサーバー
Go言語でのHTTPサーバーは、その設計から自然とリクエストを並行処理します。各リクエストは独自のゴルーチンで処理されるため、リクエストごとに独立した処理を行うことができます。
package main
import (
"fmt"
"net/http"
"time"
)
func handler(w http.ResponseWriter, r *http.Request) {
// 模擬的な重い処理
time.Sleep(2 * time.Second)
fmt.Fprintf(w, "Hello, %s!", r.URL.Path[1:])
}
func main() {
http.HandleFunc("/", handler)
fmt.Println("Server is running on http://localhost:8080/")
http.ListenAndServe(":8080", nil)
}
このサンプルでは、handler関数が各HTTPリクエストを処理します。リクエストごとにhandlerが新しいゴルーチンで実行されるため、複数のリクエストを同時に並行処理することが可能です。
バッチ処理システムの並行化
大量のデータ処理を行うバッチ処理システムでも、並行処理を利用することでパフォーマンスを大幅に向上させることができます。
package main
import (
"fmt"
"sync"
"time"
)
// processData は模擬的なデータ処理を表します。
func processData(i int) {
fmt.Printf("Processing data %d\n", i)
time.Sleep(1 * time.Second) // 模擬的な処理時間
}
func main() {
var wg sync.WaitGroup
data := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
for _, d := range data {
wg.Add(1)
go func(d int) {
defer wg.Done()
processData(d)
}(d)
}
wg.Wait() // すべてのゴルーチンが完了するまで待機
fmt.Println("All data processed.")
}
この例では、dataスライス内の各要素を並行して処理しています。sync.WaitGroupを使用することで、すべてのデータ処理が完了するまでプログラムの終了を待機しています。
練習問題1.
10個のゴルーチンを起動し、それぞれのゴルーチンから1から10までの整数を返すプログラムを作成してください。メインゴルーチンは、これらのゴルーチンから送られてくる整数を受け取り、合計値を計算して出力してください。
練習問題2.
バッファ付きチャネルを使用して、一つのゴルーチンから1から100までの整数を送信し、別のゴルーチンでこれらの整数を受け取って合計するプログラムを作成してください。最終的な合計値をメインゴルーチンで出力してください。
練習問題3.
複数のゴルーチンからデータを受け取り、それを一つのチャネルで集約(ファンイン)するプログラムを作成してください。具体的には、3つのゴルーチンがそれぞれ異なるメッセージをランダムな間隔で送信し、メインゴルーチンでこれらのメッセージを受け取って出力するようにしてください。