gRPCのサーバ/クライアントをScalaで実装する
2018-02-28
QiitaScalasbtgRPCこの記事はなに?
Scala で gRPC なアプリケーションを作るためのやりかた。
具体的には 4 つの通信方式、
- unary
- server straming
- client streaming
- bidirectional streaming
それぞれについてサーバ/クライアントを Scala で実装する。
参考
目次
- sbt の設定、マルチプロジェクト対応
- .proto ファイルを作成
- .proto から生成したクラスを Scala から使用する
- unary
- server straming
- client streaming
- bidirectional streaming
サンプルコードはpetitviolet/scala-grpc-pracに置いた。
sbt の設定、マルチプロジェクト対応
modules
配下にmain
, model
, protocol
という 3 つのプロジェクトを設定する。
gRPC の protocol buffer で使う.proto ファイルはmodules/protocol/protocol
配下に置けるようにする。
tree だとこんな感じ。
./modules/
├── main
│ └── src
│ └── main
├── model
│ └── src
│ └── main
└── protocol
└── protocol
└── my_service.proto
build.sbt に設定を記述する
だいたい全部を載せておく
// 共通設定
def commonSettings(_name: String) = Seq(
scalaVersion := "2.12.4",
version := "1.0.0",
libraryDependencies ++= commonDependencies,
name := _name,
)
// gRPC用の設定
def grpcProtocolSettings = {
import scalapb.compiler.Version
Seq(
PB.targets in Compile := Seq(
scalapb.gen(singleLineToProtoString = true) -> (sourceManaged in Compile).value
),
PB.protoSources in Compile +=
(baseDirectory in ThisProject).value / "protocol",
libraryDependencies ++= Seq(
"com.thesamet.scalapb" %% "scalapb-runtime" % Version.scalapbVersion % "protobuf",
"com.thesamet.scalapb" %% "scalapb-runtime-grpc" % Version.scalapbVersion,
"io.grpc" % "grpc-netty" % Version.grpcJavaVersion
)
)
}
// rootプロジェクト
lazy val grpcPrac = (project in file("."))
.settings(commonSettings("grpcPrac"))
.aggregate(main)
// gRPCの.protoファイルを配置するプロジェクト
lazy val protocol = (project in file("modules/protocol"))
.settings(commonSettings("protocol"))
.settings(grpcProtocolSettings) // gRPC設定を適用
// gRPC非依存なプロジェクト
lazy val model = (project in file("modules/model"))
.settings(commonSettings("model"))
// gRPCなserverを実装するプロジェクト
lazy val main = (project in file("modules/main"))
.settings(
commonSettings("main"),
)
.dependsOn(model, protocol)
これで OK。
.proto ファイルを作成
Protocol Buffer の文法とかはLanguage Guide (proto3)を見たほうが早い。 今回のサンプルで使うのは以下の proto ファイル。
modules/protocol/protocol/MyService.proto
に置いてある。
syntax = "proto3";
// 出力されるScalaプログラムのpackageを指定
package net.petitviolet.prac.grpc.protocol;
service MyService {
// unary
rpc ShowOrganization (ShowOrganizationRequest) returns (Organization) {
};
// server stream
rpc ShowEmployees (ShowEmployeeRequest) returns (stream Employee) {
};
// client stream
rpc AddEmployee (stream Employee) returns (MessageResponse) {
};
// bidirectional stream
rpc Lottery(stream FetchRandomRequest) returns (stream Employee) {};
}
message MessageResponse {
string message = 1;
}
message ShowEmployeeRequest {
int32 organizationId = 1;
}
message ShowOrganizationRequest {
int32 organizationId = 1;
}
message FetchRandomRequest {
}
message Employee {
string name = 1;
int32 age = 2;
enum Post {
NoTitle = 0;
Manager = 1;
Officer = 2;
}
int32 organizationId = 3;
}
message Organization {
int32 id = 1;
string name = 2;
repeated Employee emproyees = 3;
}
これを置いた上でsbt compile
するとmodules/protocol/target
配下に Scala ファイルが吐かれる。
gRPC 使う側は import するだけで使用できる。
package
を指定して MyService.proto というファイル名なので、実際に使う際の import 文は以下になる。
import net.petitviolet.prac.grpc.protocol.MyService._
.proto から生成したクラスを Scala から使用する
grpc / gRPC Concepts#RPC life cycleにあるように RPC の方式は 4 つある。
- unary
- server streaming
- client streaming
- bidirectional streaming
それぞれについて server/client で実装してみる。
記事に書くにあたってサンプル実装とは一部変更してある。
共通
server
実装するべき I/F はこのようになっている。
class MyServiceImpl extends MyServiceGrpc.MyService {
// unary
def showOrganization(request: MyService.ShowOrganizationRequest): Future[MyService.Organization] = ???
// server streaming
def showEmployees(request: MyService.ShowEmployeeRequest, responseObserver: StreamObserver[MyService.Employee]): Unit = ???
// client streaming
def addEmployee(responseObserver: StreamObserver[MyService.MessageResponse]): StreamObserver[MyService.Employee] = ???
// bidirectional streaming
def lottery(responseObserver: StreamObserver[MyService.Employee]): StreamObserver[MyService.FetchRandomRequest] = ???
}
これを実装してServiceBuilder#addService
に渡せば良い。
サンプル実装はこのあたり
client
こちらは既に実装されているので、利用するにあたっては I/F に合わせて利用するだけ。
MyServiceBlockingClient
とMyServiceStub
の 2 クラスが生成されている。
前者は名前の通り blocking するクライアントになっていて、unary, server streaming の 2 パターンしかサポートしていない。
後者のMyServiceStub
は全てサポートしていて以下のような I/F になっている。
class MyServiceStub(channel: io.grpc.Channel, options: io.grpc.CallOptions = io.grpc.CallOptions.DEFAULT) extends io.grpc.stub.AbstractStub[MyServiceStub](channel, options) with MyService {
// unary
override def showOrganization(request: MyService.ShowOrganizationRequest): Future[MyService.Organization] = ???
// server streaming
override def showEmployees(request: MyService.ShowEmployeeRequest, responseObserver: _root_.io.grpc.stub.StreamObserver[MyService.Employee]): Unit = ???
// client streaming
override def addEmployee(responseObserver: io.grpc.stub.StreamObserver[MyService.MessageResponse]): io.grpc.stub.StreamObserver[MyService.Employee] = ???
// bidirectional streaming
override def lottery(responseObserver: io.grpc.stub.StreamObserver[MyService.Employee]): io.grpc.stub.StreamObserver[MyService.FetchRandomRequest] = ???
}
上記のMyServiceStub
クラスのコンストラクタにあるio.grpc.Channel
インスタンスからクライアントは new できる。
val blockingClient: MyServiceGrpc.MyServiceBlockingClient = MyServiceGrpc.blockingStub(channel)
val asyncClient: MyServiceGrpc.MyServiceStub = MyServiceGrpc.stub(channel)
unary
1 リクエストに対して 1 レスポンスのもっともシンプルな通信。
server
普通に実装すればよく、特に言及することはない。
override def showOrganization(request: ShowOrganizationRequest): Future[Organization] = {
organizationRepository.findById(request.organizationId) // Future[Option[Organization]]
.map {
case Some(organization) => organization
case None =>
throw new RuntimeException(s"invalid request organization id = ${ request.organizationId }")
}
}
client
blocking/async どちらにも実装されているメソッドを呼び出すだけ。
val org: Organization = blockingClient.showOrganization(new ShowOrganizationRequest(organizationId = 2))
val orgF: Future[Organization] = asyncClient.showOrganization(new ShowOrganizationRequest(organizationId = 2))
client streaming
クライアントからの stream なリクエストに対してサーバから 1 レスポンスを返す方式。
server
引数と返り値が共にStreamObserver
となっていて一瞬混乱するかも。
注意点としては、サーバからは 1 レスポンスな通信方式なので、返り値のStreamObserver.onNext
を複数回呼ばないようにすること。
override def addEmployee(responseObserver: StreamObserver[MessageResponse]): StreamObserver[Employee] = {
new StreamObserver[Employee] {
override def onError(t: Throwable): Unit = {
logger.error("addEmployee onError", t)
responseObserver.onError(t)
}
override def onCompleted(): Unit = {
// ここで一度だけonNextを実行し、続けてonCompleteを実行する
responseObserver.onNext(MessageResponse(s"addEmployee succeeded"))
responseObserver.onCompleted()
}
// ここがclientからのリクエストによって複数実行される可能性がある
override def onNext(employee: Employee): Unit = {
employeeRepository.store(employee)
.onComplete { // ここではresponseObserver.onNextは呼ばない
case Success(_) => logger.info(s"addEmployee onNext: ${ employee.name }")
case Failure(t) => onError(t)
}
}
}
}
client
async なクライアントしか使用することは出来ない。
注意点としては、asyncClient.addEmployee
の返り値のStreamObserver
のonCompleted
を呼ぶこと。
def addEmployee(name: String) = rpc {
val responseObserver = new StreamObserver[MessageResponse] {
override def onError(t: Throwable): Unit = logger.error("add failed to add employee", t)
override def onCompleted(): Unit = logger.info("add completed to add employee")
override def onNext(value: MessageResponse): Unit = logger.info("add onNext. message = ${value.message}")
}
val requestObserver: StreamObserver[Employee] = asyncClient.addEmployee(responseObserver)
(1 to 3).foreach { i =>
val employee = Employee(s"${ name }-$i", i * 10, i)
requestObserver.onNext(employee)
}
responseObserver.onCompleted()
requestObserver.onCompleted() // これを呼ぶ
}
requestObserver.onCompleted()
を実行しないとクライアントからの streaming が終わったことを通知する必要があるらしい。
そうしないと、クライアント側のManagedChannel.shutdown()
を実行した時にサーバ側のonError
が呼び出されてしまう。
上の実装だとlogger.error
のあとにresponseObserver.onError
も実行しているが、既に接続が切れているのでクライアント側には通知されない。
ちなみにonError
時の stacktrace を抜粋するとこんな感じ。
io.grpc.StatusException: CANCELLED
at io.grpc.Status.asException(Status.java:534)
at io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onCancel(ServerCalls.java:272)
at io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.closed(ServerCallImpl.java:282)
at io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$3.runInContext(ServerImpl.java:618)
server streaming
クライアントからの 1 リクエストに対してサーバから stream で複数レスポンスを返す方式。
server
クライアントからのリクエストに対して、responseObserver.onNext
を使って結果を複数返却する。
override def showEmployees(request: ShowEmployeeRequest, responseObserver: StreamObserver[Employee]): Unit = {
Option(request.organizationId)
.filterNot { _ == 0 } // デフォルト値を除外
.map { orgId =>
employeeRepository.findByOrganizationId(orgId) // Future[Seq[Employee]]
}.getOrElse {
employeeRepository.findAll() // Future[Seq[Employee]]
}.map { employees: Seq[Employee] =>
employees.foreach { employee: Employee =>
responseObserver.onNext(employee) // onNextを複数回呼んで良い
}
}.onComplete {
case Success(_) => responseObserver.onCompleted() // 全て終了したらonComplete
case Failure(t) => responseObserver.onError(t) // 何か失敗したらonError
}
}
onNext
を全て呼び終わったら、最後に一度だけonComplete
を実行する。
client
こちらはリクエストを 1 回送り、複数レスポンスをStreamObserver
で受け付ける。
def showEmployees(): Future[Unit] = rpc {
val promise = Promise[Unit]()
val responseObserver = new StreamObserver[Employee] {
override def onError(t: Throwable): Unit = {
logger.error(s"showEmployee onError", t)
promise.failure(t)
}
override def onCompleted(): Unit = {
logger.info(s"showEmployee onComplete")
promise.success(())
}
override def onNext(value: Employee): Unit = {
logger.info(s"showEmployee onNext: $value")
}
}
asyncClient.showEmployees(new ShowEmployeeRequest(), responseObserver)
promise.future
}
onComplete
が呼ばれるタイミングはサーバ側に依存するがPromise
を使って表現することが出来る。
bidirectional streaming
クライアントとサーバ間で双方向にリクエストしあう方式。
server
実装としては client streaming の時の実装とよく似ている。
override def lottery(responseObserver: StreamObserver[Employee]): StreamObserver[FetchRandomRequest] = {
new StreamObserver[FetchRandomRequest] {
override def onError(t: Throwable): Unit = logger.error(s"lottery onError", t)
override def onCompleted(): Unit = {
logger.info(s"lottery onCompleted")
responseObserver.onCompleted()
}
override def onNext(value: FetchRandomRequest): Unit = {
employeeRepository.findAll() foreach { _.headOption { em =>
responseObserver.onNext(Employee(em.name, em.age, em.organization.id))
}
}
}
}
この実装だとしていないが、サーバ側でresponseObserver.onNext
はどんなタイミングで呼んでもよいものになっている。
サーバ側のonCompleted
でresponseObserver.onCompleted
を呼ぶように実装しておく。
client
こちらは server streaming の時の実装に似たものになる。
def lottery(): Future[Unit] = rpc {
val promise = Promise[Unit]()
val responseObserver = new StreamObserver[Employee] {
override def onError(t: Throwable): Unit = {
logger.error(s"lottery onError", t)
promise.failure(t)
}
override def onCompleted(): Unit = {
logger.info(s"lottery onComplete")
promise.success(())
}
override def onNext(value: Employee): Unit = logger.info(s"lottery onNext: $value")
}
val requestObserver: StreamObserver[FetchRandomRequest] = asyncClient.lottery(responseObserver)
(1 to 3).foreach { i =>
requestObserver.onNext(FetchRandomRequest())
}
requestObserver.onCompleted()
promise.future
}
server streaming の時と同様にPromise
を使って結果を受け取れるような実装にしてある。
クライアント側からrequestObserver.onCompleted
を呼ぶことで、サーバからresponseObserver.onCompleted
が実行されるように実装してあるのでこれで両方正常に終了出来る。
所感
gRPC、Protocol Buffer で IDL が提供されているのでサーバ・クライアント間のインタフェースが型安全に表現できて非常に便利。
ScalaPB のおかげで Scala からも特に違和感なく使うことが出来る。
ただ、4 つの通信方式それぞれで知っておいたほうがいいこともあるようなので、当たり前だけど慎重に実装する必要はある。
from: https://qiita.com/petitviolet/items/a82c446edbd81c884246