読者です 読者をやめる 読者になる 読者になる

gRPCを使ってみた

gRPC for Javaをいじってみたので、使い方のまとめ

gRPCとは

Googleが開発した、RPC(Remote Procedure Call)を実装するためのフレームワークです。

Protocol Bufferを利用した高速通信、.protoによるインターフェース定義、JavaやNode.jsなどを含む、多数のプログラミング言語に対応していることなどが特徴。

仕組み

大雑把な流れは、以下の通りです。

  1. .proto形式でインターフェース定義を記述
  2. .protoから、利用したいプログラミング言語用の、サーバーのスケルトン実装とクライアントスタブを生成
    1. で生成されたものをベースにサーバー、クライアントを実装

クライアントでスタブのメソッドを呼び出すと、通信はフレームワークがよしなにやってくれます。

f:id:charlier_shoe:20160908135226p:plain

公式ドキュメント「What is gRPC?」http://www.grpc.io/docs/guides/ より抜粋

Javaで実際にやってみた

Javaで実際にやってみました。 (コードはこちら

.protoの作成

以下の様な感じで、.protoを作成します。

syntax = "proto3";

option java_multiple_files = true;
option java_package = "com.oracle.jdt2016.hackathon.hr";
option java_outer_classname = "HrProto";
option objc_class_prefix = "HR";

import "google/protobuf/empty.proto";

package hr;

service Hr {
    rpc Employees(google.protobuf.Empty) returns (EmployeesReply) {}
}

message EmployeesReply {
    repeated Employee employee = 1;
}

message Employee {
    float commissionPct = 1;
    int64 departmentId = 2;
    string email = 3;
    int64 employeeId = 4;
    string firstName = 5;
    int64 hireDate = 6;
    string jobId = 7;
    string lastName = 8;
    int64 managerId = 9;
    string phoneNumber = 10;
    float salary = 11;
}

今回は、サーバーにあるダミーの従業員データを取得するインターフェースを作りました。引数は無し、返り値は従業員データの配列です。

"service"で始まる部分がメソッドの定義です。引数なしを定義するときは、"google/protobuf/empty.proto"をインポートして”google.protobuf.Empty”を引数に指定します。
返り値の型は”message”で始まる部分で定義しています。要素に配列を含みたい場合、”repeated"と記述すると、後続する型の配列だという宣言になります。

今回は、クライアント、サーバーとも同期的に処理を行なうインターフェースをとして定義していますが、非同期のインターフェースも使えます。
詳細は公式のドキュメントを参照して下さい。

サーバースケルトン、クライアントスタブの生成

今回はGradleを使います(Mavenでもいけるようです)。

まず、IDEを使うなりして適当なGradleプロジェクトを作成しておきます。

次に、src/main/protoフォルダを作成し、.protoファイルを配置します。

続いて、build.gradleを編集します。 今回使ったbuild.gradle(関連する部分を抜粋)は、以下の通りです。

buildscript {
    repositories {
        mavenCentral()
    }
    dependencies {
        classpath 'com.google.protobuf:protobuf-gradle-plugin:0.8.0'
    }
}

apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'com.google.protobuf'def grpcVersion = '1.0.0'
sourceCompatibility = 1.8
targetCompatibility = 1.8

dependencies {
    …
    compile "io.grpc:grpc-netty:${grpcVersion}"
    compile "io.grpc:grpc-protobuf:${grpcVersion}"
    compile "io.grpc:grpc-stub:${grpcVersion}"
}

protobuf {
    protoc {
        artifact = 'com.google.protobuf:protoc:3.0.0'
    }
    plugins {
        grpc {
            artifact = "io.grpc:protoc-gen-grpc-java:${grpcVersion}"
        }
    }
    generateProtoTasks {
        all()*.plugins {
            grpc {
                // To generate deprecated interfaces and static bindService method,
                // turn the enable_deprecated option to true below:
                option 'enable_deprecated=false'
            }
        }
    }
}

以下のコマンドを実行すると、build/generated配下に目的のコードが生成されます。

 > gradle generateProto

生成されるコードのパッケージ名には、.protoの"option java_package"で宣言したものになります。

サーバーの実装

サーバーの実装は以下の通りです(ほとんど公式のHelloWorldサンプルから持ってきています)。

//importは省略

public class HrServer {
    /* The port on which the server should run */
    private int port = 50051;
    private Server server;

    private void start() throws IOException {
        server = ServerBuilder.forPort(port).addService(new HrImpl())
          .build().start();
        Runtime.getRuntime().addShutdownHook(new Thread() {
            @Override
            public void run() {
                System.err.println("*** shutting down gRPC server since JVM is shutting down");
                HrServer.this.stop();
                System.err.println("*** server shut down");
            }
        });
    }

    private void stop() {
        if (server != null) {
          server.shutdown();
        }
    }

    /**
     * Await termination on the main thread since the grpc library uses daemon threads.
     */
    private void blockUntilShutdown() throws InterruptedException {
        if (server != null) {
            server.awaitTermination();
        }
    }

    /**
     * Main launches the server from the command line.
     */
    public static void main(String[] args)
            throws IOException, InterruptedException {
        final HrServer server = new HrServer();
        server.start();
        server.blockUntilShutdown();
    }

    private class HrImpl extends HrGrpc.HrImplBase {
        @Override
        public void employees(
                Empty request, StreamObserver<EmployeesReply> responseObserver) {
            // DBから従業員データを取得する
            EntityManager em = EntityManagerUtils.getEntityManager();
            @SuppressWarnings("unchecked")
            List<Employee> entities =
                    em.createNamedQuery("Employee.findAll").getResultList();
            em.close();
            // 返り値を作成
            EmployeesReply.Builder replyBuilder = EmployeesReply.newBuilder();
            for (Employee entity : entities) {
                com.oracle.jdt2016.hackathon.hr.Employee employee =
                        com.oracle.jdt2016.hackathon.hr.Employee.newBuilder()
                        .setCommissionPct(entity.getCommissionPct())
                        .setDepartmentId(entity.getDepartmentId())
                        .setEmail(entity.getEmail())
                        .setEmployeeId(entity.getEmployeeId())
                        .setFirstName(entity.getFirstName())
                        .setHireDate(entity.getHireDate().getTime())
                        .setJobId(entity.getJobId())
                        .setLastName(entity.getLastName())
                        .setManagerId(entity.getManagerId())
                        .setPhoneNumber(entity.getPhoneNumber())
                        .setSalary(entity.getSalary())
                        .build();
                replyBuilder.addEmployee(employee);
            }
            responseObserver.onNext(replyBuilder.build());
            responseObserver.onCompleted();
        }
    }
}

サーバーの起動はServerBuilderからServerを作成→startという流れ。

ロジックは、"HrGrpc.HrImplBase"をextendsして、Employeesメソッドをオーバーライドして記述します。

クライアントに返却するデータも.protoファイルを元に生成されたクラスを使います。今回はEmployeesReply、Employeeというクラスが作られています。 オーバーライドしたメソッドの中でこれらのオブジェクトを作って、responseObsererに渡して上げればよいようです。

クライアントの実装

クライアントの実装は以下の通りです(こちらも公式のHelloWorldサンプルをベースにしています)。

//importは省略

public class HrClient {
    private final ManagedChannel channel;
    private final HrGrpc.HrBlockingStub blockingStub;

    public HrClient(String host, int port) {
        channel = ManagedChannelBuilder.forAddress(host, port)
                .usePlaintext(true).build();
        blockingStub = HrGrpc.newBlockingStub(channel);
    }

    public void shutdown() throws InterruptedException {
        channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
    }

    public void getEmployees() {
        EmployeesReply response;
        long begin = System.currentTimeMillis();
        try {
            response = blockingStub.employees(Empty.newBuilder().build());
        } catch (StatusRuntimeException e) {
            System.out.println("RPC failed");
            return;
        }
        System.out.println("Employees: " + response.getEmployeeCount());
    }

    public static void main(String[] args) throws Exception {
        HrClient client = new HrClient("localhost", 50051);
        try {
            client.getEmployees();
        } finally {
            client.shutdown();
        }
    }
}

スタブオブジェクトから、ローカルのメソッドを呼び出す感覚でemployeeを実行できます。

gRPCは、Protocol Bufferを使って高速な通信を実現しているわけですが、アプリケーションを実装する分には、プロトコル依存の部分を意識する必要はほとんどありません。