petitviolet blog

    gRPCのサーバ/クライアントをScalaで実装する

    2018-02-28

    QiitaScalasbtgRPC

    この記事はなに?

    Scala で gRPC なアプリケーションを作るためのやりかた。

    具体的には 4 つの通信方式、

    • unary
    • server straming
    • client streaming
    • bidirectional streaming

    それぞれについてサーバ/クライアントを Scala で実装する。

    参考

    目次

    • sbt の設定、マルチプロジェクト対応
    • .proto ファイルを作成
    • .proto から生成したクラスを Scala から使用する
      • unary
      • server straming
      • client streaming
      • bidirectional streaming

    サンプルコードはpetitviolet/scala-grpc-pracに置いた。

    sbt の設定、マルチプロジェクト対応

    modules配下にmain, model, protocolという 3 つのプロジェクトを設定する。

    gRPC の protocol buffer で使う.proto ファイルはmodules/protocol/protocol配下に置けるようにする。 tree だとこんな感じ。

    ./modules/
    ├── main
    │   └── src
    │       └── main
    ├── model
    │   └── src
    │       └── main
    └── protocol
        └── protocol
            └── my_service.proto
    

    build.sbt に設定を記述する

    だいたい全部を載せておく

    // 共通設定
    def commonSettings(_name: String) = Seq(
      scalaVersion := "2.12.4",
      version := "1.0.0",
      libraryDependencies ++= commonDependencies,
      name := _name,
    )
    
    // gRPC用の設定
    def grpcProtocolSettings = {
      import scalapb.compiler.Version
    
      Seq(
        PB.targets in Compile := Seq(
          scalapb.gen(singleLineToProtoString = true) -> (sourceManaged in Compile).value
        ),
        PB.protoSources in Compile +=
          (baseDirectory in ThisProject).value / "protocol",
    
        libraryDependencies ++= Seq(
          "com.thesamet.scalapb" %% "scalapb-runtime" % Version.scalapbVersion % "protobuf",
          "com.thesamet.scalapb" %% "scalapb-runtime-grpc" % Version.scalapbVersion,
          "io.grpc" % "grpc-netty" % Version.grpcJavaVersion
        )
      )
    }
    
    // rootプロジェクト
    lazy val grpcPrac = (project in file("."))
      .settings(commonSettings("grpcPrac"))
      .aggregate(main)
    
    // gRPCの.protoファイルを配置するプロジェクト
    lazy val protocol = (project in file("modules/protocol"))
      .settings(commonSettings("protocol"))
      .settings(grpcProtocolSettings)  // gRPC設定を適用
    
    // gRPC非依存なプロジェクト
    lazy val model = (project in file("modules/model"))
      .settings(commonSettings("model"))
    
    // gRPCなserverを実装するプロジェクト
    lazy val main = (project in file("modules/main"))
      .settings(
        commonSettings("main"),
      )
      .dependsOn(model, protocol)
    

    これで OK。

    .proto ファイルを作成

    Protocol Buffer の文法とかはLanguage Guide (proto3)を見たほうが早い。 今回のサンプルで使うのは以下の proto ファイル。

    modules/protocol/protocol/MyService.protoに置いてある。

    syntax = "proto3";
    
    // 出力されるScalaプログラムのpackageを指定
    package net.petitviolet.prac.grpc.protocol;
    
    service MyService {
        // unary
        rpc ShowOrganization (ShowOrganizationRequest) returns (Organization) {
        };
        // server stream
        rpc ShowEmployees (ShowEmployeeRequest) returns (stream Employee) {
        };
        // client stream
        rpc AddEmployee (stream Employee) returns (MessageResponse) {
        };
        // bidirectional stream
        rpc Lottery(stream FetchRandomRequest) returns (stream Employee) {};
    }
    
    message MessageResponse {
        string message = 1;
    }
    
    message ShowEmployeeRequest {
        int32 organizationId = 1;
    }
    
    message ShowOrganizationRequest {
        int32 organizationId = 1;
    }
    
    message FetchRandomRequest {
    }
    
    message Employee {
        string name = 1;
        int32 age = 2;
        enum Post {
            NoTitle = 0;
            Manager = 1;
            Officer = 2;
        }
        int32 organizationId = 3;
    }
    
    message Organization {
        int32 id = 1;
        string name = 2;
        repeated Employee emproyees = 3;
    }
    

    これを置いた上でsbt compileするとmodules/protocol/target配下に Scala ファイルが吐かれる。 gRPC 使う側は import するだけで使用できる。

    packageを指定して MyService.proto というファイル名なので、実際に使う際の import 文は以下になる。

    import net.petitviolet.prac.grpc.protocol.MyService._
    

    .proto から生成したクラスを Scala から使用する

    grpc / gRPC Concepts#RPC life cycleにあるように RPC の方式は 4 つある。

    • unary
    • server streaming
    • client streaming
    • bidirectional streaming

    それぞれについて server/client で実装してみる。

    記事に書くにあたってサンプル実装とは一部変更してある。

    共通

    server

    実装するべき I/F はこのようになっている。

    class MyServiceImpl extends MyServiceGrpc.MyService {
      // unary
      def showOrganization(request: MyService.ShowOrganizationRequest): Future[MyService.Organization] = ???
      // server streaming
      def showEmployees(request: MyService.ShowEmployeeRequest, responseObserver: StreamObserver[MyService.Employee]): Unit = ???
      // client streaming
      def addEmployee(responseObserver: StreamObserver[MyService.MessageResponse]): StreamObserver[MyService.Employee] = ???
      // bidirectional streaming
      def lottery(responseObserver: StreamObserver[MyService.Employee]): StreamObserver[MyService.FetchRandomRequest] = ???
    }
    

    これを実装してServiceBuilder#addServiceに渡せば良い。
    サンプル実装はこのあたり

    client

    こちらは既に実装されているので、利用するにあたっては I/F に合わせて利用するだけ。 MyServiceBlockingClientMyServiceStubの 2 クラスが生成されている。 前者は名前の通り blocking するクライアントになっていて、unary, server streaming の 2 パターンしかサポートしていない。 後者のMyServiceStubは全てサポートしていて以下のような I/F になっている。

    class MyServiceStub(channel: io.grpc.Channel, options: io.grpc.CallOptions = io.grpc.CallOptions.DEFAULT) extends io.grpc.stub.AbstractStub[MyServiceStub](channel, options) with MyService {
      // unary
      override def showOrganization(request: MyService.ShowOrganizationRequest): Future[MyService.Organization] = ???
      // server streaming
      override def showEmployees(request: MyService.ShowEmployeeRequest, responseObserver: _root_.io.grpc.stub.StreamObserver[MyService.Employee]): Unit = ???
      // client streaming
      override def addEmployee(responseObserver: io.grpc.stub.StreamObserver[MyService.MessageResponse]): io.grpc.stub.StreamObserver[MyService.Employee] = ???
      // bidirectional streaming
      override def lottery(responseObserver: io.grpc.stub.StreamObserver[MyService.Employee]): io.grpc.stub.StreamObserver[MyService.FetchRandomRequest] = ???
    }
    

    上記のMyServiceStubクラスのコンストラクタにあるio.grpc.Channelインスタンスからクライアントは new できる。

    val blockingClient: MyServiceGrpc.MyServiceBlockingClient = MyServiceGrpc.blockingStub(channel)
    val asyncClient: MyServiceGrpc.MyServiceStub = MyServiceGrpc.stub(channel)
    

    unary

    1 リクエストに対して 1 レスポンスのもっともシンプルな通信。

    server

    普通に実装すればよく、特に言及することはない。

    override def showOrganization(request: ShowOrganizationRequest): Future[Organization] = {
      organizationRepository.findById(request.organizationId) // Future[Option[Organization]]
        .map {
          case Some(organization) => organization
          case None =>
            throw new RuntimeException(s"invalid request organization id = ${ request.organizationId }")
        }
      }
    

    client

    blocking/async どちらにも実装されているメソッドを呼び出すだけ。

    val org: Organization = blockingClient.showOrganization(new ShowOrganizationRequest(organizationId = 2))
    val orgF: Future[Organization] = asyncClient.showOrganization(new ShowOrganizationRequest(organizationId = 2))
    

    client streaming

    クライアントからの stream なリクエストに対してサーバから 1 レスポンスを返す方式。

    server

    引数と返り値が共にStreamObserverとなっていて一瞬混乱するかも。 注意点としては、サーバからは 1 レスポンスな通信方式なので、返り値のStreamObserver.onNextを複数回呼ばないようにすること。

    override def addEmployee(responseObserver: StreamObserver[MessageResponse]): StreamObserver[Employee] = {
      new StreamObserver[Employee] {
        override def onError(t: Throwable): Unit =  {
          logger.error("addEmployee onError", t)
          responseObserver.onError(t)
        }
    
        override def onCompleted(): Unit = {
          // ここで一度だけonNextを実行し、続けてonCompleteを実行する
          responseObserver.onNext(MessageResponse(s"addEmployee succeeded"))
          responseObserver.onCompleted()
        }
    
        // ここがclientからのリクエストによって複数実行される可能性がある
        override def onNext(employee: Employee): Unit = {
          employeeRepository.store(employee)
            .onComplete { // ここではresponseObserver.onNextは呼ばない
              case Success(_) => logger.info(s"addEmployee onNext: ${ employee.name }")
              case Failure(t) => onError(t)
            }
        }
      }
    }
    

    client

    async なクライアントしか使用することは出来ない。 注意点としては、asyncClient.addEmployeeの返り値のStreamObserveronCompletedを呼ぶこと。

    def addEmployee(name: String) = rpc {
      val responseObserver = new StreamObserver[MessageResponse] {
        override def onError(t: Throwable): Unit = logger.error("add failed to add employee", t)
    
        override def onCompleted(): Unit = logger.info("add completed to add employee")
    
        override def onNext(value: MessageResponse): Unit = logger.info("add onNext. message = ${value.message}")
      }
    
      val requestObserver: StreamObserver[Employee] = asyncClient.addEmployee(responseObserver)
      (1 to 3).foreach { i =>
        val employee = Employee(s"${ name }-$i", i * 10, i)
        requestObserver.onNext(employee)
      }
    
      responseObserver.onCompleted()
      requestObserver.onCompleted() // これを呼ぶ
    }
    

    requestObserver.onCompleted()を実行しないとクライアントからの streaming が終わったことを通知する必要があるらしい。 そうしないと、クライアント側のManagedChannel.shutdown()を実行した時にサーバ側のonErrorが呼び出されてしまう。

    上の実装だとlogger.errorのあとにresponseObserver.onErrorも実行しているが、既に接続が切れているのでクライアント側には通知されない。 ちなみにonError時の stacktrace を抜粋するとこんな感じ。

    io.grpc.StatusException: CANCELLED
            at io.grpc.Status.asException(Status.java:534)
            at io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onCancel(ServerCalls.java:272)
            at io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.closed(ServerCallImpl.java:282)
            at io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$3.runInContext(ServerImpl.java:618)
    

    server streaming

    クライアントからの 1 リクエストに対してサーバから stream で複数レスポンスを返す方式。

    server

    クライアントからのリクエストに対して、responseObserver.onNextを使って結果を複数返却する。

    override def showEmployees(request: ShowEmployeeRequest, responseObserver: StreamObserver[Employee]): Unit = {
      Option(request.organizationId)
        .filterNot { _ == 0 } // デフォルト値を除外
        .map { orgId =>
          employeeRepository.findByOrganizationId(orgId) // Future[Seq[Employee]]
        }.getOrElse {
          employeeRepository.findAll()  // Future[Seq[Employee]]
        }.map { employees: Seq[Employee] =>
          employees.foreach { employee: Employee =>
            responseObserver.onNext(employee) // onNextを複数回呼んで良い
          }
        }.onComplete {
          case Success(_) => responseObserver.onCompleted() // 全て終了したらonComplete
          case Failure(t) => responseObserver.onError(t) // 何か失敗したらonError
        }
    }
    

    onNextを全て呼び終わったら、最後に一度だけonCompleteを実行する。

    client

    こちらはリクエストを 1 回送り、複数レスポンスをStreamObserverで受け付ける。

    def showEmployees(): Future[Unit] = rpc {
      val promise = Promise[Unit]()
      val responseObserver = new StreamObserver[Employee] {
        override def onError(t: Throwable): Unit = {
          logger.error(s"showEmployee onError", t)
          promise.failure(t)
        }
        override def onCompleted(): Unit = {
          logger.info(s"showEmployee onComplete")
          promise.success(())
        }
    
        override def onNext(value: Employee): Unit = {
          logger.info(s"showEmployee onNext: $value")
        }
      }
      asyncClient.showEmployees(new ShowEmployeeRequest(), responseObserver)
      promise.future
    }
    

    onCompleteが呼ばれるタイミングはサーバ側に依存するがPromiseを使って表現することが出来る。

    bidirectional streaming

    クライアントとサーバ間で双方向にリクエストしあう方式。

    server

    実装としては client streaming の時の実装とよく似ている。

    override def lottery(responseObserver: StreamObserver[Employee]): StreamObserver[FetchRandomRequest] = {
      new StreamObserver[FetchRandomRequest] {
        override def onError(t: Throwable): Unit = logger.error(s"lottery onError", t)
    
        override def onCompleted(): Unit = {
          logger.info(s"lottery onCompleted")
          responseObserver.onCompleted()
        }
    
        override def onNext(value: FetchRandomRequest): Unit = {
          employeeRepository.findAll() foreach { _.headOption { em =>
            responseObserver.onNext(Employee(em.name, em.age, em.organization.id))
          }
        }
      }
    }
    

    この実装だとしていないが、サーバ側でresponseObserver.onNextはどんなタイミングで呼んでもよいものになっている。
    サーバ側のonCompletedresponseObserver.onCompletedを呼ぶように実装しておく。

    client

    こちらは server streaming の時の実装に似たものになる。

    def lottery(): Future[Unit] = rpc {
      val promise = Promise[Unit]()
      val responseObserver = new StreamObserver[Employee] {
        override def onError(t: Throwable): Unit = {
          logger.error(s"lottery onError", t)
          promise.failure(t)
        }
        override def onCompleted(): Unit = {
          logger.info(s"lottery onComplete")
          promise.success(())
        }
        override def onNext(value: Employee): Unit = logger.info(s"lottery onNext: $value")
      }
    
      val requestObserver: StreamObserver[FetchRandomRequest] = asyncClient.lottery(responseObserver)
    
      (1 to 3).foreach { i =>
        requestObserver.onNext(FetchRandomRequest())
      }
      requestObserver.onCompleted()
      promise.future
    }
    

    server streaming の時と同様にPromiseを使って結果を受け取れるような実装にしてある。
    クライアント側からrequestObserver.onCompletedを呼ぶことで、サーバからresponseObserver.onCompletedが実行されるように実装してあるのでこれで両方正常に終了出来る。

    所感

    gRPC、Protocol Buffer で IDL が提供されているのでサーバ・クライアント間のインタフェースが型安全に表現できて非常に便利。
    ScalaPB のおかげで Scala からも特に違和感なく使うことが出来る。
    ただ、4 つの通信方式それぞれで知っておいたほうがいいこともあるようなので、当たり前だけど慎重に実装する必要はある。

    from: https://qiita.com/petitviolet/items/a82c446edbd81c884246