wantedly/subee
✉️ 🐝 It's not only a bee, but a message - Pub/Sub Worker Framework Implementation - wantedly/subee
https://github.com/wantedly/subee
People tribe / Backend squad の @izumin5210 です。来月以降もこのペースでブログを書いていけるといいですね。
この記事は Go3 Advent Calendar 2019 の25日目です。長い1ヶ月だった。
5月に開催された Go Conference 2019 Spring にて "Subee: Pub/Sub Worker Framework Implementation" という発表がありました。Pub/Sub というパラダイムをどのように活用するか・Go で如何に効率よく Subscriber worker を実装するかに焦点を当て、それを実装するためのフレームワーク Subee を紹介したプレゼンです。
この発表は @hlts2 (Twitter: @hiroto_hlts2)が Wantedly People チームでのインターンシップの成果をまとめてくれたものです。この成果は現在も Wantedly Visit・People 両方のチームで超有効に活用されています。
本記事ではそんな @hlts2 と Wantedly の Pub/Sub working group のメンバーで作り上げた Cloud Pub/Sub 導入のための基盤開発について紹介していきます。
まずは Pub/Sub について、Go Conference での発表の復習も兼ねつつ簡単に見ていきましょう。
Pub/Sub とはなにか、 Wikipedia では次のように紹介されています。
非同期メッセージングパラダイムの一種であり、メッセージの送信者(出版側)が特定の受信者(購読側)を想定せずにメッセージを送るようプログラムされたものである。出版されたメッセージはクラス分けされ、購読者に関する知識を持たない。購読側は興味のあるクラスを指定しておき、そのクラスに属するメッセージだけを受け取り、出版者についての知識を持たない。出版側と購読側の結合度が低いため、スケーラビリティがよく、動的なネットワーク構成に対応可能である。
- 出版-購読型モデル - Wikipedia (2019/05/27, 強調は筆者による)
Apache Kafka や、マネージドサービスであれば Amazon SNS やGoogle の Cloud Pub/Sub などが存在します。Wantedly では Cloud Pub/Sub を利用しています。
また、Cloud Pub/Sub のドキュメントでは一般的なユースケースとして7つほど紹介されています。Wantedly では大まかにはこのユースケースに沿いつつ、とくに N publishers - 1 subscriber な Queue としての利用がメインになっています。
Cloud Pub/Sub のユースケースの中では「ネットワーク クラスタでの負荷のバランス調整」「非同期ワークフローの実装」などが該当するでしょう。
たとえば「大量の Push 通知を送信したい」パターン:
いままでは Push 通知をしたいドメインの都合で大量のリクエストを送ってしまい、Push 通知サーバに過剰負荷をかけてしまう、といったことがありました。時間をずらして負荷分散する?処理が狙った時間からズレるのが許容できるのか…考えることが余計に増えてしまいます。
また、元のアーキテクチャでは App server が同期的にリクエストを送っています。いくら App server が独自にキューを持っていたとしても、Push server が落ちていればキューは徐々に詰まっていってしまいます。システムが自律的・安定的に動作するためにも、この依存関係はなるべく疎であるべきでしょう。
ここで右のように Pub/Sub をキューとして使うことを考えます。 Cloud Pub/Sub はメッセージの受信に Pull 型と Push 型を選択することができ、とくに Pull 型は自分のペースで必要なだけメッセージを受信することが出来ます。処理できる数だけメッセージを Pull し、処理が終わったら次のメッセージを Pull して…、このようにして、Push 通知サーバは自分のペースでリクエストを取得し処理することができるようになります。通知を送りたいドメインでも、Push 通知サーバがどれくらいのワークロードに耐えられるかを気にすることなく Push を送ることができるようになります。
ここで挙げたのはあくまで一例ですが、並行・分散システム化を推進していくにあたって分散キューは不可欠ということで、 Wantedly で Cloud Pub/Sub を導入しました(より大局的な話は技術書典で頒布している Wantedly Tech Book 7 をご覧ください)。
さて、そんなこんなで Cloud Pub/Sub が導入されました。最初は Go で書かれたサービスが Subscriber になるものでした。それ自体はなんとか本番に導入されたのですが、当然ながら Cloud Pub/Sub の SDK では足りない、本番で運用するために必要な機能がいくつか存在します。例を挙げると:
また、単純に「よく書く定形処理」みたいなものもいくつか存在します:
これらを新しい Subscriber worker が出てくるたびに書くのは大変です。Pub/Sub の導入(= 非同期コミュニケーション化)を推進するためにも、もっと気軽に Subscriber を追加できるようにする必要がありました。
そこで @hlts2 と一緒に設計・開発したのが、「gRPC server の開発体験を Pub/Sub subscriber に持ってくる」ことを目標としたフレームワークである Subee です(読み方は /ˈsʌbiː/
)。名前は手紙を運ぶ働き蜂のイメージだそうです。
subee 自体は非常にミニマルで、最低限の機能しか持ちません。
CLI は構造体のパッケージと名前を指定することで、 Pub/Sub のペイロードを構造体に Unmarshal するコードまでを生成します。
% subee g subscriber book -p ./api -m Book -e protobuf
✔ pkg/consumer/book_consumer.go
✔ pkg/consumer/book_consumer_adapter.go
✔ cmd/book-subscriber/run.go
✔ cmd/book-subscriber/main.go
Scaffold book-subscriber successfully 🎉
▸ At first, you should implement createSubscriber() in ./cmd/book-subscriber/run.go
▸ You can implement a messages handler in ./pkg/consumer/book_consumer.go
▸ You can run subscribers with subee start command
この例では、開発者は基本的に pkg/consumer/book_consumer.go
にメインのロジックを実装していくことになります。飛んでくるメッセージはすべて subee g subscriber
コマンドで指定した構造体に変換されます。このあたりも gRPC の開発体験を参考にしています。
// BookConsumer is a consumer interface for api_pb.Book.
type BookConsumer interface {
Consume(context.Context, *api_pb.Book) error
}
// NewBookConsumer creates a new consumer instance.
func NewBookConsumer() BookConsumer {
return &bookConsumerImpl{}
}
type bookConsumerImpl struct{}
func (c *bookConsumerImpl) Consume(ctx context.Context, msg *api_pb.Book) error {
return errors.New("Consume() has not been implemented yet")
}
Wantedly では通常の Web API の定義も Protocol Buffers の DSL で定義しています。これにあわせて Pub/Sub でも Protocol Buffers を利用することで、.proto
を元にほぼ同じ interface で JSON Web API server (もしくは gRPC server)と Pub/Sub subscriber の両方をセットアップできるようになっています。
複数メッセージのメッセージをまとめて処理するほうが効率が良いことがあります。RDB への INSERT などはそうなりがちですね。そういうときのために、複数メッセージをまとめて受けられる interface を利用できるようになっています。先程の CLI であれば、 --batch というオプションを付けることで対応コードが生成されます。
// BookBatchConsumer is a batch consumer interface for api_pb.Book.
type BookBatchConsumer interface {
BatchConsume(context.Context, []*api_pb.Book) error
}
// NewBookBatchConsumer creates a new consumer instance.
func NewBookBatchConsumer() BookBatchConsumer {
return &bookBatchConsumerImpl{}
}
type bookBatchConsumerImpl struct{}
func (c *bookBatchConsumerImpl) BatchConsume(ctx context.Context, msgs []*api_pb.Book) error {
return errors.New("BatchConsume() has not been implemented yet")
}
内部的には Cloud Pub/Sub とは別に Queue を持っていて、そこで一定時間経過もしくは一定数溜まるまでメッセージをバッファリングすることで実現しています。
他にも gRPC server っぽい感じで Interceptor や StatsHandler の機構を備えており、本番で使うときのいろんな設定に活用できるようになっています。その他の機能や使い方に関してはサンプルアプリやミドルウェアの実装がリポジトリにあるので、興味がある方は見てみてください。
Publisher 側も Subscriber と同じように Interceptor を挟んだりしたい欲求がありますね。そういうときのために、兄弟パッケージとして Pubee というものも作ってみました(これは趣味)。Go で Cloud Pub/Sub を使う人はこちらもぜひ試してみてください。
ちなみにまだ Wantedly ではつかってません。
マイクロサービスでなくとも、ある程度以上の規模のシステムではキューを用いた非同期 API / バックグラウンドジョブを扱うシステムは不可欠でしょう。Go でそれが必要になったときは Cloud Pub/Sub と Subee のことを思い出してもらえると嬉しいです。
Subee は Wantedly People の Backend Engineer の長期インターンだった @hlts2 くんと一緒に作り、公開した OSS です。途中から「最終成果発表は春の GoCon Spring で!」とか言ってたんですが、本当にそうなりました。(メンターだった自分が言うのもなんですが、)かなり良い成果・良い発表だったのではないでしょうか。メンターのめちゃくちゃな無茶振りに着いてきてくれてありがとうございました👏
こんな感じで、技術力を活かしプロダクトを良くしつつあわよくば世の Backend Engineer, Gopher, ... に貢献するようなことしたい、もしくは「自分ならもっとイイもの作れるぜ!」って人、ぜひ話を聞きに来てみてください!