Swift Concurrencyでセマフォを作る

こんにちは、レシピサービス開発部と技術部兼務のヴァンサン(@vincentisambart)です。

Swift Concurrencyに関する中級の記事がまだ多くない気がしていたので、そういう記事を書くことにしました。

Swift Concurrencyの理解を深めたい人にはWWDC21の「Swift concurrency: Behind the scenes」がおすすめです。そのプレゼンの中でDispatchSemaphoreをSwift Concurrencyで使うべきではないと述べられました。

Preserving the runtime contract - Forward progress

Swift Concurrencyに提供されているツールを見ると、セマフォがありません。でも提供されたものでセマフォを作れないでしょうか?セマフォを使いたい場面が多いわけではありませんが、良い勉強になると思います。

どういうツールが標準で提供されているのでしょうか?safe(安全に使えるもの)はactorTaskTaskGroupasync letAsyncStreamCheckedContinuation、くらいですかね。

セマフォ自体の説明をすると長くなるので、下記はセマフォをある程度知っている前提で書いています。セマフォのメソッド名はセマフォの説明でたまに使われる分かりにくいPVではなく、DispatchSemaphoreも使っているwait()(待つ)とsignal()(合図を送る)を使います。

セマフォの値が0以下だったらwait()が次のsignal()まで待つ必要があるので、待たせる仕組み、もう待たなくて良いという合図を送る仕組み、が必要です。Swift Concurrencyのそれぞれのツールで実装できないか検討してみましょう。

actor

actor自体でできることを色々見ても、何かを待たせるすべはなさそうです(ビジーウェイトは論外)。

とはいえ、セマフォの状態を正しく保つには良いかもしれません。別のツールと合わせてなら役に立てるかもしれません。

TaskGroupasync let

async letのevolution proposalを見ると、TaskGroupと比較して紹介されていて、2つのユースケースが似ています:処理をいくつかの子タスクに分けて、最後に親タスクが結果をまとめます。async letは子タスクの数を動的に変えられないけどもっと使いやすい感じですかね。

特定のセマフォのwait()signal()を呼んでいるタスクが親子や兄弟であると限らないので、TaskGroupasync letをもっと詳しく調べなくても2つとも向いていなそうです。

AsyncStream

AsyncStreamの紹介事例は基本的に既存のSwift Concurrencyを使わないコードをSwift Concurrencyの世界に持ってきます。ゼロからSwift Concurrencyを使ってセマフォを作ろうとしているので、AsyncStreamは向いていないのでは?と思うかもしれませんが、もう少し見てみましょう。

  • AsyncStreamAsyncSequenceなので、非同期に値を順番に生み出します。次の値を入手するにはawaitを使う必要があるので、セマフォのwait()に近いかもしれません
  • AsyncStreamはクロージャーを渡して作成します:AsyncStream { continuation in ... }。クロージャーに渡されるAsyncStream.Continuationyield())がsignal()に少し似ているかもしれません

wait()signal()の実装に使えそうなものを見つけたので、本当に実装できるのかまずもう少しドキュメントを見てみましょう。

signal()に使えそうなAsyncSequence.Continution.yield()は並行で複数のタスクから呼んでも問題ないようです(この場合、値が取り出される順序が保証されませんが)。AsyncStream.init(_:bufferingPolicy:_:)のドキュメント)から抜粋:

The AsyncStream.Continuation received by the build closure is appropriate for use in concurrent contexts. It is thread safe to send and finish; all calls to the continuation are serialized. However, calling this from multiple concurrent contexts could result in out-of-order delivery.

ですが、残念ながら、wait()の実装はできないようです。並行で複数のタスクからAsyncStreamの次の値をawaitすることはできません。AsyncStream.Iteratorのドキュメントから抜粋:

This type doesn’t conform to Sendable. Don’t use it from multiple concurrent contexts. It is a programmer error to invoke next() from a concurrent context that constends with another such call, which results in a call to fatalError().

上記に書いてある時点で実際のコードで動いたとしても使うべきではありませんが、遊び感覚で試してみました。IteratorSendableでないためタスク間で共有できないが、並行で複数のタスクからvar iterator = stream.makeAsyncIterator(); await iterator.next()をしてみたら、記述の通りfatalError()が起きました。

_Concurrency/AsyncStreamBuffer.swift:253: Fatal error: attempt to await next() on more than one task

このため、今回のユースケースには向きません。とはいえ、AsyncStreamが既存のコードをSwift Concurrencyの世界に持っていくためだけのツールではないことを見られたと思います。

CheckedContinuation

CheckedContinuationとは

AsyncStream同様、CheckedContinuationは既存のコードをSwift Concurrencyの世界に持っていくためのツールとしてよく紹介されています。コールバックを使っているメソッドをawaitできるようにするためのツールですが、今回のユースケースで使えないでしょうか?

CheckedContinuationwithCheckedContinuation(function:_:))(エラーが発生することがあればwithCheckedThrowingContinuation(function:_:)))を使って作られます。

一番シンプルなユースケースは以下のようにhogeWithCallbackawaitできるようにすることです。

// `hogeWithCallback(_:)`を`hoge(_:)`と命名しても大丈夫です。
func hogeWithCallback(_ callback: () -> Void) { ... }
func hoge() async {
    await withCheckedContinuation { (continuation: CheckedContinuation<Void, Never>) in
        hogeWithCallback {
            continuation.resume()
        }
    }
}

非同期のタスクがawait withCheckedContinuationに止まって、continuationを渡されたクロージャーが実行されて、hogeWithCallbackが呼ばれます。CheckedContinuationresumeメソッドが呼ばれたらawait withCheckedContinuationに止まっていたタスクが再開します。

セマフォはwait()側でawait withCheckedContinuationをして、signal()側でcontinuation.resume()を呼べたら実装できるかもしれません。同時に複数のタスクが待つ可能性があるので、1つのCheckedContinuationでは足りませんが、CheckedContinuationの配列を使えば良いでしょう。並行でさまざまなタスクからアクセスされても、セマフォの内部状態である配列と値の正当性を保つにはactorが向いていそうです。

セマフォの内部状態

では実装してみましょう。状態はとりあえず上記の説明にあった待機中のCheckedContinuationの配列とセマフォの値が必要です。

actor ContinuationSemaphore {
    // 待機中のタスクの`CheckedContinuation`です。
    // この`Void`はこの`CheckedContinuation`が値を返さないことを示します。
    // この`Never`はこの`CheckedContinuation`ではエラーが発生しないことを示します。
    private var waiters: [CheckedContinuation<Void, Never>] = []
    // セマフォの値です。
    // `value`が負であれば、`waiters.count == -value`が保証されます。
    private var value: Int

    init(value: Int) {
        // `DispatchSemaphore`同様、初期値が負であってはいけません。
        assert(value >= 0)
        self.value = value
    }

    // セマフォの正当性が保たれているのを確認するメソッドです。
    private func ensureValidState() {
        // セマフォの値が正であれば、待つタスクがあるべきではありません。
        // セマフォの値が負であれば、待つタスクの数が`-value`個であるべきです。
        assert((value >= 0 && waiters.isEmpty) || (waiters.count == -value))
    }

wait()

一番複雑なのがwait()の実装です。

コード自体が長いわけでもなく、シンプルにも見えますが、特別なことが起きています。

    func wait() async {
        value -= 1
        // 引き算後に値が0以上だったら待つ必要がありません。
        if value < 0 {
            await withCheckedContinuation { continuation in
                waiters.append(continuation)
                ensureValidState()
            }
        }
        ensureValidState()
    }

この実装は本当に大丈夫でしょうか?

awaitを使うたびに制御フローが別のタスクに移る可能性があります。wait()が呼ばれる時にvalue0で、waitersが空っぽだったのを仮定します。1を引かれたvalue-1になって、await withCheckedContinuationが呼ばれます。ここで制御フローが別のタスクに移るとしたら、別のタスクがセマフォを使えば、value-1なのにwaitersが空っぽなので不正状態ですよね…

また、withCheckedContinuationasync関数なのに、渡されるクロージャーが直接actorのプロパティwaitersを変更できているのは僕にとって少し不思議でした。

actor上でのクロージャーの制限を洗い出してみましょう。

actor上でのクロージャーの制限

特定のactorにとって、メソッドや関数はそのactorのisolatedな(孤立した)環境に属するかどうかで区別されます。このactorのisolatedな環境に属するメソッドや関数は以下の通りです:

  • actorの一部として実装されていて、nonisolatedでないもの
  actor MyActor {
    func anyMethod() {}
  }
  • actor外で実装されているが、actorを引数で受け取って、この引数にisolatedが明記されているもの
  actor MyActor {}
  func someFunction(myActor: isolated MyActor) {}

逆にactorのisolatedな環境に属さないメソッドや関数は以下の通りです:

  • actorの一部として実装されているがnonisolatedであるもの
  actor MyActor {
    nonisolated func anyMethod() {}
  }
  • actor外で実装されているが、このactorをisolated引数で受け取っていないもの
  actor MyActor {}
  func someFunction() {}

今回気になっているwithCheckedContinuationが特定のactorのisolatedな環境にも属しませんし、このwithCheckedContinuationがactorのisolatedな環境に属するメソッドから呼ばれてクロージャーを渡されるので、このユースケースに焦点を当てましょう。

現状SwiftのConcurrencyチェックがデフォルトで緩いので、Strict Concurrency Checkingを一番厳しい設定「Complete」にして色々試してみましょう。

Swift Compiler - Language - Strict Concurrency Checking

ひとまずは一番シンプルなケース、呼ばれる関数が普通である(asyncでない)場合を見てみましょう。

func normalFunctionTakingClosure(block: () -> Void) {}
actor MyActor {
    var value: Int = 0
    func anyMethod() async {
        normalFunctionTakingClosure { value = 1 }
    }
}

上記のコードでself.を明記せずにactorのプロパティをそのままアクセスしても何の警告が出ません。呼ばれる関数がasyncでないので、actorのisolatedな環境で実行されるので問題が起きる心配はありません。

async関数で試してみましょう。

func asyncFunctionTakingClosure(block: () -> Void) async {}
actor MyActor {
    var value: Int = 0
    func anyMethod() async {
        await asyncFunctionTakingClosure { value = 1 }
    }
}

上記のコードをビルドすると以下の警告が出ます。actorのisolatedな環境に属さないasyncメソッドや関数は別の環境で実行されます。() -> Voidは環境の境界線を渡れないと。(blockの型を() async -> Voidにしたとして同じです)

Non-sendable type () -> Void exiting actor-isolated context in call to non-isolated global function asyncFunctionTakingClosure(block:) cannot cross actor boundary

実行環境間、タスク間にクロージャーを送りたい場合、@Sendableをつける必要があるので試してみましょう。

func asyncFunctionTakingClosure(block: @Sendable () -> Void) async {}
actor MyActor {
    var value: Int = 0
    func anyMethod() async {
        await asyncFunctionTakingClosure { value = 1 }
    }
}

上記のコードをビルドしたら以下のエラーになってしまいました。

Actor-isolated property value can not be mutated from a Sendable closure

actorはisolatedな環境で実行されるものなので、プロパティを別の環境からアクセスできたらisolatedでなくなります。

withCheckedContinuationは上記のasyncFunctionTakingClosureに似ていそうなのに、警告なくクロージャーからactorのプロパティにアクセスできます。

withCheckedContinuationの定義を見てみると、クロージャーの型に@Sendableがついていませんし、不思議な@_unsafeInheritExecutorというのがついています。

この@_unsafeInheritExecutorの影響でwithCheckedContinuationが特別な振る舞いをします。async関数ではあるものの、actorから呼んでも実行環境(executor)が継承されます。渡されるクロージャーが@Sendableでもなく@escapingでもなく同じ実行環境のまま実行されます。もっと詳しい説明はこちらをご覧ください

注意:unsafeのついたものはリリースされるコードで使う場合、注意が必要です。UnsafeContinuationを利用することはできますが、安全なCheckedContinuationを使うべきです。@_unsafeInheritExecutorが分かりにくく、特別な振る舞いをするので一般的な開発において使う必要が出ることはないと思います。

好奇心を満たすためだけに試してみたら予想通り以下のコードで何の警告も出ません。

@_unsafeInheritExecutor
func asyncFunctionInheritExecutorTakingNonEscapingClosure(block: () -> Void) async {}
actor MyActor {
    var value: Int = 0
    func anyMethod() async {
        await asyncFunctionInheritExecutorTakingNonEscapingClosure { value = 1 }
    }
}

wait()再び

余談はここまでにして、ContinuationSemaphorewait()に戻りましょう。

    func wait() async {
        value -= 1
        // 引き算後に値が0以上だったら待つ必要がありません。
        if value < 0 {
            await withCheckedContinuation { continuation in
                waiters.append(continuation)
                ensureValidState()
            }
        }
        ensureValidState()
    }

「Strict Concurrency Checking」の厳しさを最大のCompleteにしても、上記のコードで警告が出ません。

(正確にはXcode 14.0.1でmacOS用にビルドすると出ますが、iOS用だと出ないし、Xcode 14.1だとmacOS用でも出ません。警告が出るのはバグで間違いなさそうです)。

改めてwait()が呼ばれる時にvalue0で、waitersが空っぽだったのを仮定して実行順を追ってみます。

  1. 実行環境がactorの環境に切り替わります。
  2. actorの実行環境上でvalue -= 1if value < 0 {が実行されます。
  3. withCheckedContinuationがactorに属さないとはいえ、@_unsafeInheritExecutorがついているので実行環境(executor)が継承され、withCheckedContinuationが実行されます。
  4. withCheckedContinuationの内部の動きが複雑ですが、簡単にまとめてみると、continuationが作成されて、actorの実行環境のままクロージャーが呼ばれるようです。
  5. value -= 1waiters.append(continuation)の間actorの実行環境を離れていないので、正当性が保たれるはずです。

signal()

signal()は割とシンプルです。待っているものがあれば、一番前から待っていたcontinuationを取り出してresume()を呼びます。

    func signal() {
        value += 1
        if value <= 0 {
            // 配列が空っぽの場合、`removeFirst()`によって異常終了されるが、
            // この`if`の条件が満たされていて`signal()`が呼ばれた時点ではvalueの値が-1以下のはずなので、
            // 待機中のタスクが1つ以上だったはずです。
            let waiter = waiters.removeFirst()
            // `waiters`から1つの`CheckedContinuation`を取り出して、正しい状態が保たれるはずです。
            ensureValidState()
            // 待っていたタスクが続行できます。
            waiter.resume()
        }
        ensureValidState()
    }
}

全体のコード

上記にContinuationSemaphoreの全てのコードを3つに分けて載せましたが、念のためまとめて載せます。意外と短いですね(正当性の確認を消すとなおさら)。

actor ContinuationSemaphore {
    private var waiters: [CheckedContinuation<Void, Never>] = []
    private var value: Int

    init(value: Int) {
        assert(value >= 0)
        self.value = value
    }

    private func ensureValidState() {
        assert((value >= 0 && waiters.isEmpty) || (waiters.count == -value))
    }

    func wait() async {
        value -= 1
        if value < 0 {
            await withCheckedContinuation { continuation in
                waiters.append(continuation)
                ensureValidState()
            }
        }
        ensureValidState()
    }

    func signal() {
        value += 1
        if value <= 0 {
            let waiter = waiters.removeFirst()
            ensureValidState()
            waiter.resume()
        }
        ensureValidState()
    }
}

注意事項

上記のコードは少し試したし、理解を深めるには良い例だと思いますが、そのままプロダクションで使うのをおすすめできません。あまりテストされていないことを除いても、以下の問題が未解決状態です。

  • ContinuationSemaphoreが解放される時、waitersが残っていれば何をすべきでしょうか?
    • DispatchSemaphoreは解放時のvalueが作成時に渡されたvalueより低かったら強制終了です。
  • タスクのキャンセルをどう扱うべきでしょうか?
  • 提供されている機能が限られています。wait()にタイムアウトを指定できるようにするとか、signal()に数字を渡せるようにするとか、多くのセマフォの実装に入っているけどここにはないさまざまな機能があります。

キャンセルを扱う場合、注意が必要です。wait()を呼んでいるコードがキャンセルを意識しなければ、キャンセルの影響でwait()が終わるとき、セマフォに守られているリソースが使えるのを勘違いしていれば困ります。Taskがキャンセルされるときにwait()throwするようにした方が良いかもしれません。

Task(ボーナスコンテンツ)

さまざまなツールを見てみましたが、Taskの話はまだしていませんでしたね。Taskは特定なタスクが終わるのを待つことはできます(await task.value)が、そのTask自体を止める方法がなさそうです(ビジーウェイトはもちろん論外)。

元々Taskに関してはこの数行だけで留まるつもりでしたが、Taskのドキュメントを改めて見てよく考えたら、ビジーウェイトには少し似ている邪道な方法を思いつきました。実行されたTaskをキャンセルできる機能とsleepさせる機能を利用すれば…

上記のContinuationSemaphoreCheckedContinuationの使い方が想定されたものだと思います。以下のTaskSemaphoreがそうでもありませんし、僕の気づいていない問題が隠れているかもしれません。こういう使い方をおすすめできないとはいえ、勉強になり得るので、とりあえず興味がある方のために載せてみます。全体の構成がCheckedContinuationを使ったバージョンに近いので、説明は少なめです。

actor TaskSemaphore {
    private var value: Int
    private var tasks: [Task<Void, Never>] = []

    init(value: Int) {
        assert(value >= 0)
        self.value = value
    }

    private func ensureValidState() {
        assert((value >= 0 && tasks.isEmpty) || (tasks.count == -value))
    }

    func wait() async {
        value -= 1
        if value < 0 {
            let task = Task {
                // キャンセルされない限り永遠にsleepさせます
                while !Task.isCancelled {
                    // タスクがキャンセルされたら、途中だったsleepがすぐ終わるはずです。
                    try? await Task.sleep(nanoseconds: 100_000_000_000 /* 適当に100秒 */)
                }
            }
            tasks.append(task)
            ensureValidState()
            await task.value
        }
    }

    func signal() {
        value += 1
        if value <= 0 {
            let task = tasks.removeFirst()
            ensureValidState()
            task.cancel() // 永遠にsleepさせているタスクをキャンセルすることで起こします。
        }
        ensureValidState()
    }
}

最後に

この記事からひとつだけ覚えるとしたら、既存のコードをSwift Concurrencyの世界から使えるためにあると紹介されているツールは他のユースケースでも利用できる場合があるということでしょう。CheckedContinuationAsyncStreamも最初からSwift Concurrencyを使っているコードでも役に立つ場面があります。

また、すべてのツールを洗い出して、セマフォを実装できましたが、現在存在する標準ライブラリのツールだけで実装できないものもあると思います。

この記事を読んでくれた皆さんがSwift Concurrencyに少しでも詳しくなっていたら嬉しく思います。