1
/
5

実践 Pub/Sub Messaging - Cloud Pub/Sub を活用するための開発基盤を作った話

はじめに

こんにちは、Wantedly の Infrastructure Team でエンジニアをしている南(@south37)です。

先日、Wantedly では「システム全体の信頼性向上」と「開発生産性の向上」を両立するために Event-Driven Architecture を採用しているというブログを書かせて頂きました。

開発生産性と信頼性の両立を目指すための Event-Driven Architecture - より良いマイクロサービスアーキテクチャを求めて | Wantedly Engineer Blog
こんにちは、Wantedly の Infrastructure Team でエンジニアをしている南( south37 )です。 Wantedly では、マイクロサービスアーキテクチャを採用しています。 マイクロサービスで開発を進める上で、重要な関心ごとの1つが「多数のマイクロサービスの協調動作をどう実現するか」です。そもそもマイクロサービスとは、多数の小さなサービスが協調動作をすることでシステムとしての振る舞いを実現するものです。そのため、「マイクロサービス同士の連携方法」は、システムにとって極めて重要な
https://www.wantedly.com/companies/wantedly/post_articles/306549

Event-Driven Architecture の実現にあたって、気になるトピックの1つが「Event router(あるいは Pub/Sub)ミドルウェアとして何を利用しているか」という部分です。Wantedly では、この用途では Google Cloud Platform(GCP)Cloud Pub/Sub というサービスを利用しています。また、シンプルな Message Queue という用途においても、Cloud Pub/Sub を利用しています。

今日は、この Cloud Pub/Sub を対象に、「Cloud Pub/Sub とは何なのか」、「Cloud Pub/Sub を上手く活用するために開発基盤としてどういったものを用意したのか」についてご紹介したいと思います。特に、Cloud Pub/Sub を活用するための基盤としては、「subeegcpc といった OSS のライブラリ」も開発しています。そういった OSS のご紹介もいたします。

Cloud Pub/Sub とは何なのか

Cloud Pub/Sub は、一言でいえば Pub/Sub Messaging を実現するためのマネージドサービスです。「Pub/Sub Messaging とは何なのか」という部分については、AWS が公開する Document の1つである Pub/Sub Messaging - AWS が分かりやすいでしょう。この Document においては、Pub/Sub Messaging は以下の図を利用して紹介されています。

図1. Pub/Sub Messaging の模式図。出典: Pub/Sub Messaging - AWS

上記の図を見ていただくと分かる通り、Pub/Sub Messaging においては Publisher が Message(Event-Driven Architecture の文脈では Event)を Topic に対して Publish すると、その Message に興味のある複数の Subscriber がその Message を受け取ることができる、という仕組みになっています。同じ Message を複数の Subscriber に届けることができる、というのがポイントです。

「Pub/Sub Messaging」と、前回のブログで紹介した「Event-Driven Architecture」の関係性はややこしいのですが、自分は「Pub/Sub Messaging は通信方法あるいはそのためのミドルウェアを指す名称」、「Event-Driven Architecture は、Pub/Sub Messaging などを利用して実現するシステムアーキテクチャ」だと捉えています。実際に、AWS の Document においても以下のように「Pub/Sub Messaging は Event-Driven Architecture 実現に利用できる」という説明がなされています。

Pub/sub messaging can be used to enable event-driven architectures, or to decouple applications in order to increase performance, reliability and scalability.

引用: Pub/Sub Messaging - AWS

Pub/Sub Messaing を実現するミドルウェアやマネージドサービスはいくつかあります。そのうち代表的なものは以下でしょう。

この中でも、Wantedly では「運用負荷を下げるためにマネージドサービスサービスを利用したい」、「Pull 型での Subscribe を実現したい」という理由から Cloud Pub/Sub を利用しています。また、社内では「Pub/Sub Messaging」と「シンプルな Message Queue」のそれぞれにニーズがあり、その両方を1つのサービスで実現出来るというのも Cloud Pub/Sub を選定する理由となっています。特に、後述するように、社内では Cloud Pub/Sub を上手く利用するためのライブラリ開発を行っているため、1つのサービスに絞ることで基盤開発をやりやすくしています。

ここまでが、「Cloud Pub/Sub とは何なのか、特にいくつかの Pub/Sub Messaging の中でなぜそれを選んだのか」という話でした。次は、Cloud Pub/Sub のアーキテクチャに踏み込んでさらに詳細な説明をしたいと思います。

Cloud Pub/Sub のアーキテクチャ

Cloud Pub/Sub について簡単にそのアーキテクチャを概観してみましょう。What Is Pub/Sub? - Cloud Pub/Sub というドキュメントでは、Cloud Pub/Sub のアーキテクチャが以下のような図で紹介されています。

図2. Cloud Pub/Sub のアーキテクチャ。What Is Pub/Sub? - Cloud Pub/Sub より引用

上図を見ていただくと、Cloud Pub/Sub の中には、Topic と Subscription という 2 種類のリソースが存在することが分かります。上図で "Topic C", "Subscription YC", "Subscription ZC" という名前がついているリソースが、Topic および Subscritption に該当します。

Message の Publish を行うには、まず Topic の作成を行います。Topic は、基本的には「Message の種類」ごとに作成します。次に、Message の Subscribe を行いたい場合には、「興味のある Message の Publish が行われる Topic に対して、それにひもづく Subscription の作成」を行います。こうすることで、Message の Publish が行われるたびに、Subscription に Message が流れてくるようになります。

Subscription は1つの Topic に対して幾つでも追加することが出来ます。そのため、「1つの Message を複数の Subscriber が独立に取得して、それぞれの用途で利用する」ということが出来ます。まさに、「Pub/Sub Messaging」の機能を実現している事が分かります。

Topic と Subscription は、それぞれ「イベントを発行するマイクロサービス(= Publisher)」と「イベントをチェックするマイクロサービス(= Subscriber)」から利用されます。Publisher は Topic に対して Message の Publish を行い、Subscriber は Subscription から Message を取得して必要な処理を行います。

ここで興味深いのが、Subscriber が Subscription から Message を取得する方法です。Cloud Pub/Sub では、Message の取得方法として「Push 型 / Pull 型」が選べるようになっています。Push 型では、Subscriber 側で Endpoint を用意しておくことで、Message が Publish された際に Message を HTTP request として受けとる事ができます。一方、Pull 型では、Subscriber として「Subscription に対して Message の取得を行う Worker Process」を立てておくことで、Polling での Message 取得(あるいは gRPC の Streaming Pull での Message 取得)を行うことができます。

「Push 型 / Pull 型」はそれぞれ Pros/Cons があります。Message を Pull 型で取得する場合には、「Subscriber の障害や過負荷」への心配を減らせるというメリットがあることから、Wantedly では主に Pull 型を利用しています。例えば、Subscriber で障害が起きてしばらく処理が止まってしまう場合などを考えてみましょう。未処理の Message は Subscription 内部の queue にたまっていて、それが Subscriber の復旧とともに DOS のように押し寄せてくる懸念があります。もちろん、Cloud Pub/Sub は配信量をコントロールする仕組みは持っています(詳細は Delivery rate - Cloud Pub/Sub を参照)が、Subscriber 側で完全にコントロール出来る方が扱いやすいと判断しています。

ここまで、Cloud Pub/Sub のアーキテクチャおよび利用方法について説明してきました。Wantedly では、この Cloud Pub/Sub を様々な箇所で利用しています。その際、開発を効率的に進めるために、「開発基盤としてのライブラリ整備」を行っています。次に、この「Cloud Pub/Sub を上手く活用するための開発基盤」についてご紹介します。

Cloud Pub/Sub を上手く活用するための開発基盤

Cloud Pub/Sub は、様々な言語で利用できるように公式ライブラリが提供されています。例えば、Quickstart: Using Client Libraries では以下の言語からの利用方法が記載されています。

  • Go, Ruby, Python, Node.js, C++, C#, Java, PHP

基本的には、公式ライブラリを利用するだけで Cloud Pub/Sub を利用し始めることが出来ます。しかし、実際に本番環境で Cloud Pub/Sub を利用しようと思うと、「Message の順序の入れ替わりや重複への対応」、「エラー発生時のエラーレポーティング」、「Observability のための Metrics 送信」など、様々なことを考える必要があります。そういった「ドメインロジックとは独立した関心ごとの実装」に時間を取られるのは、開発生産性を考えると避けたいものです。

そこで、Wantedly は社内で Cloud Pub/Sub を利用していくにあたって、以下のような Go および Ruby のライブラリ(公式ライブラリを wrap したライブラリ)を用意して使うことにしています。社内のマイクロサービスは Go と Ruby で書かれてるものが多いので、この2つの言語向けにライブラリを用意しています。どちらのライブラリも、OSS として公開しています。

上記のライブラリはどちらも、「interceptor という形で機能を追加する枠組みを作る」ために利用しています。この intercptor というのは、「メインの処理を wrap する形で、任意の処理を差し込むための機構」です。Faraday Middleware や Rack Middleware、あるいは gRPC interceptor のようなもの、というと分かりやすいかもしれません。実際、実装時にはそれらの仕組みをかなり参考にしています。

具体的なコードを見てみましょう。例えば Ruby で gcpc gem を利用して subscribe を行う場合は、以下のようなコードを書きます。

class LogInterceptor < Gcpc::Subscriber::BaseInterceptor
  MyLogger = Logger.new(STDOUT)

  # @param [String] data
  # @param [Hash] attributes
  # @param [Google::Cloud::Pubsub::ReceivedMessage] message
  def handle(data, attributes, message)
    MyLogger.info "[Interceptor Log] subscribed a message: #{message}"
    yield data, attributes, message
  end
end

subscriber = Gcpc::Subscriber.new(
  project_id:   "<project id>",
  subscription: "<subscription name>",
  interceptors: [LogInterceptor],  # ここで interceptor を設定
  credentials:  "/path/to/credentials",
)

class NopHandler < Gcpc::Subscriber::BaseHandler
  # @param [String] data
  # @param [Hash] attributes
  # @param [Google::Cloud::Pubsub::ReceivedMessage] message
  def handle(data, attributes, message)
    # Do nothing. Consume only.
  end
end

subscriber.handle(NopHandler)  # Message の handler を登録
subscriber.run  # Subscribe 開始

注目したいのは LogInterceptor という class で、これは「標準出力に log を吐く機能」を持つ interceptor です。こういった interceptor を用意して、さらに Gcpc::Subscriber.new へ引数として interceptor を渡すと、Message を処理する際に自動でこれらの interceptor の処理が行われます。実際の Message 処理を行うドメインロジックは Handler class として実装しているのに対して、それとは独立した形で interceptor を用意することができます。

Wantedly では、interceptor も事前にライブラリとして用意しておくことで、本質的でないロジックの実装に時間を取られずに、重要なロジックの実装に注力する事ができるようにしています。具体的には、「社内で共通で設定して欲しい interceptor」はライブラリとして用意してあり、さらにそのライブラリを組み込んだ社内ライブラリも用意しています。

interceptor として実現している機能の一例は以下です。

  • Message の重複の有無の check およびハンドリング
  • Message の順序の入れ替わりの check およびハンドリング
  • Message 処理に失敗した場合の error reporting
  • etc.

こういった内容は、どの Subscriber でも漏れなく考慮されていて欲しいポイントです。これらの機能をライブラリとして提供することで、個々の開発者が高い生産性で開発を行えるようになっています。

なお、subeegcpc およびそれをベースに機能追加した社内ライブラリについては、最近はさらなる改善が進んでいます。個々のライブラリの詳細については、また後日ブログでご紹介したいと思います。

まとめ

GCP の Cloud Pub/Sub という Pub/Sub Messaging のためのマネージドサービスと、Wantedly が Cloud Pub/Sub を活用するために用意した「subeegcpc をベースとする開発基盤」についてご紹介しました。

以前ご紹介した Event-Driven Architecture などは1つの活用例ですが、Cloud Pub/Sub はとても強力で便利なサービスです。また、それを活用するための基盤である subeegcpc も、手前味噌ではありますが便利なライブラリだと考えています。どちらも、このブログを機会に興味を持っていただけると幸いです!

Invitation from Wantedly, Inc.
If this story triggered your interest, have a chat with the team?
Wantedly, Inc.'s job postings
19 Likes
19 Likes

Weekly ranking

Show other rankings
Like Nao Minami's Story
Let Nao Minami's company know you're interested in their content