C++でProtocol Bufferを試してみた

Protocol Bufferとはシリアライズ、デシリアライズ対象のデータ構造を定義したインタフェース定義言語になります。言語に依存しないのでgRPCなどの通信でも使われていまして、Googleのオリジナルの実装にはC++, Java, Pythonなどがあります。

developers.google.com

言語に依存しないデータ構造等してJsonもありますが、Jsonに比べるとシンプルさとパフォーマンスを目的においているようですが、文字列型が多い場合はJsonの方がパフォーマンスが良かったりするので、どちらのパフォーマンスが良いかはデータ定義にも依存するようです。

medium.com

今回はGoogleのドキュメントにあるC++チュートリアルのデータ定義を用いてBoost::Asioでサーバ、クライアント間でシリアライズ、デシリアライズの確認をしてみます。

インストール

まずインストールですが、以下のページに従いコマンドを実行しました。

github.com

git clone https://github.com/protocolbuffers/protobuf.git
cd protobuf
git submodule update --init --recursive
./autogen.sh

 ./configure
 make
 make check
 sudo make install
 sudo ldconfig # refresh shared library cache.

これで開発用のライブラリとProtocol Bufferのコンパイラ(protoc)がインストールされるかと思います。

データ定義

チュートリアルで使うデータ定義は以下になります。

  • proto/addressbook.proto
syntax = "proto2";

package tutorial;

message Person {
  optional string name = 1;
  optional int32 id = 2;
  optional string email = 3;

  enum PhoneType {
    MOBILE = 0;
    HOME = 1;
    WORK = 2;
  }

  message PhoneNumber {
    optional string number = 1;
    optional PhoneType type = 2 [default = HOME];
  }

  repeated PhoneNumber phones = 4;
}

message AddressBook {
  repeated Person people = 1;
}

AddressBook はPerson の配列で、Personにはstringのnameとint32のid, stringのemail, それからPhoneNumber の配列になっています。ソースに出力する際のパッケージ(C++の場合はnamespace)はtutorialになります。データ定義自体はシンプルで分かりやすいかと思います。

このprotoファイルをコンパイルすることでJavaC++のソースを吐き出すことが出来るのですが、C++のソースを出力する場合は以下のコマンドを実行します。

protoc --proto_path=proto --cpp_out=src proto/addressbook.proto

このコマンドを実行すると"src/addressbook.pb.h"と"src/addressbook.pb.cc"にファイルが出力されます。

実装

CMakeLists.txt

CmakeListsの定義は以下のようにtarget_link_libraries(try_protobuf PUBLIC ${PROTOBUF_LIBRARY})をリンクするようにしました。

# project name and language
project(try LANGUAGES CXX)

set(CMAKE_CXX_STANDARD 11)
set(CMAKE_CXX_EXTENSIONS OFF)
set(CMAKE_CXX_STANDARD_REQUIRED ON)

add_executable(try_protobuf src/try_protobuf.cc)

find_package (Threads)
target_link_libraries(try_protobuf PRIVATE Threads::Threads)

include(FindProtobuf)
find_package(Protobuf REQUIRED)
include_directories(${PROTOBUF_INCLUDE_DIR})
target_link_libraries(try_protobuf PUBLIC ${PROTOBUF_LIBRARY})

add_library(addressbook STATIC src/addressbook.pb.cc)
target_link_libraries(try_protobuf PUBLIC addressbook)

find_package(Boost 1.71.0 REQUIRED system)
target_include_directories(try_protobuf PUBLIC ${Boost_INCLUDE_DIRS})

protobufのシリアライズ

チュートリアルのままですが、protobufのシリアライズは以下のようになりました。

void PromptForAddress(tutorial::Person *person)
{
    cout << "Enter person ID number: ";
    int id;
    cin >> id;
    person->set_id(id);
    cin.ignore(256, '\n');

    cout << "Enter name: ";
    getline(cin, *person->mutable_name());

    cout << "Enter email address (blank for none): ";
    string email;
    getline(cin, email);
    if (!email.empty())
    {
        person->set_email(email);
    }

    while (true)
    {
        cout << "Enter a phone number (or leave blank to finish): ";
        string number;
        getline(cin, number);
        if (number.empty())
        {
            break;
        }

        tutorial::Person::PhoneNumber *phone_number = person->add_phones();
        phone_number->set_number(number);

        cout << "Is this a mobile, home, or work phone? ";
        string type;
        getline(cin, type);
        if (type == "mobile")
        {
            phone_number->set_type(tutorial::Person::MOBILE);
        }
        else if (type == "home")
        {
            phone_number->set_type(tutorial::Person::HOME);
        }
        else if (type == "work")
        {
            phone_number->set_type(tutorial::Person::WORK);
        }
        else
        {
            cout << "Unknown phone type.  Using default." << endl;
        }
    }
}

void clientThread()
{
    cout << "start client thread. " << endl;
    asio::io_service io_service;
    tcp::socket socket(io_service);

    // 接続
    socket.connect(tcp::endpoint(asio::ip::address::from_string(host), port));

    string send_msg;
    boost::asio::streambuf read_buffer;

    for (;;)
    {
        this_thread::sleep_for(chrono::milliseconds(10));
        tutorial::AddressBook address_book;
        PromptForAddress(address_book.add_people());

        boost::asio::streambuf b;
        ostream os(&b);
        address_book.SerializeToOstream(&os);
        asio::write(socket, b);
    }

    cout << "end client thread. " << endl;
}

PromptForAddress(address_book.add_people());でユーザのデータを追加した後address_book.SerializeToOstream(&os);シリアライズしたデータをストリームに書き込み、asio::write(socket, b);tcp接続のソケットに書き込んでいます。 SerializeToOstreamなど共通で使えるものはMessageLiteで定義されているようでした。

  • addressbook.pb.h
class AddressBook final :
    public ::PROTOBUF_NAMESPACE_ID::Message /* @@protoc_insertion_point(class_definition:tutorial.AddressBook) */ {
  • message.h
// Users must not derive from this class. Only the protocol compiler and
// the internal library are allowed to create subclasses.
class PROTOBUF_EXPORT Message : public MessageLite {
 public:
  constexpr Message() {}
  • message_lite.h
// Users must not derive from this class. Only the protocol compiler and
// the internal library are allowed to create subclasses.
class PROTOBUF_EXPORT MessageLite {
 public:
  constexpr MessageLite() {}
  virtual ~MessageLite() = default;
~省略~
  // Serialize the message and write it to the given C++ ostream.  All
  // required fields must be set.
  bool SerializeToOstream(std::ostream* output) const;

シリアライズ

次にデシリアライズは以下のようになりました。

void ListPeople(const tutorial::AddressBook &address_book)
{
    for (int i = 0; i < address_book.people_size(); i++)
    {
        const tutorial::Person &person = address_book.people(i);

        cout << "show protobuf:" << endl;
        cout << "Person ID: " << person.id() << endl;
        cout << "  Name: " << person.name() << endl;
        if (person.has_email())
        {
            cout << "  E-mail address: " << person.email() << endl;
        }

        for (int j = 0; j < person.phones_size(); j++)
        {
            const tutorial::Person::PhoneNumber &phone_number = person.phones(j);

            switch (phone_number.type())
            {
            case tutorial::Person::MOBILE:
                cout << "  Mobile phone #: ";
                break;
            case tutorial::Person::HOME:
                cout << "  Home phone #: ";
                break;
            case tutorial::Person::WORK:
                cout << "  Work phone #: ";
                break;
            }
            cout << phone_number.number() << endl;
        }
    }
}

void show_hex(const char *data)
{
    cout << "show hex:" << endl;
    const char *hex_pointer = data;
    const char *str_pointer = data;
    int current = 0;

    auto print = [](const char **from, const char **to)
    {
        cout << ":";
        while (*from != *to)
        {
            cout << **from;
            (*from)++;
        }
        cout << endl;
    };

    while (*hex_pointer)
    {
        cout << (int)*hex_pointer << " ";
        hex_pointer++;
        current++;
        if (current > 1 && current % 16 == 0)
        {

            print(&str_pointer, &hex_pointer);
        }
    }
    print(&str_pointer, &hex_pointer);
    cout << endl;
}

void serverThread()
{
    cout << "start server thread. " << endl;
    asio::io_service io_service;

    tcp::acceptor acceptor(io_service, tcp::endpoint(tcp::v4(), port));
    tcp::socket socket(io_service);

    // 接続待機
    acceptor.accept(socket);
    cout << "server accept" << endl;

    // メッセージ受信
    string buffer;
    for (;;)
    {
        asio::streambuf receive_buffer;
        boost::system::error_code error;
        asio::read(socket, receive_buffer, asio::transfer_at_least(1), error);
        if (error && error != asio::error::eof)
        {
            cout << "receive failed: " << error.message() << endl;
        }
        else
        {
            const char *data = asio::buffer_cast<const char *>(receive_buffer.data());
            cout << "server receive(text): " << data << endl;
            show_hex(data);

            tutorial::AddressBook address_book;
            stringbuf strBuf(data);
            istream istream(&strBuf);

            if (address_book.ParseFromIstream(&istream))
            {
                ListPeople(address_book);
            }
            else
            {
                cout << "parse failed." << endl;
            }

            receive_buffer.consume(receive_buffer.size());
        }
    }

    cout << "end server thread. " << endl;
}

address_book.ParseFromIstream(&istream)でistreamからprotobufのデータにデシリアライズしています。

全体

実装全体は以下のようになりました。

#include <boost/asio.hpp>
#include <iostream>
#include <thread>

#include "addressbook.pb.h"

using namespace std;

namespace asio = boost::asio;
using asio::ip::tcp;

string host = "127.0.0.1";
int port = 8080;

void show_hex(const char *data)
{
    cout << "show hex:" << endl;
    const char *hex_pointer = data;
    const char *str_pointer = data;
    int current = 0;

    auto print = [](const char **from, const char **to)
    {
        cout << ":";
        while (*from != *to)
        {
            cout << **from;
            (*from)++;
        }
        cout << endl;
    };

    while (*hex_pointer)
    {
        cout << (int)*hex_pointer << " ";
        hex_pointer++;
        current++;
        if (current > 1 && current % 16 == 0)
        {

            print(&str_pointer, &hex_pointer);
        }
    }
    print(&str_pointer, &hex_pointer);
    cout << endl;
}

void PromptForAddress(tutorial::Person *person)
{
    cout << "Enter person ID number: ";
    int id;
    cin >> id;
    person->set_id(id);
    cin.ignore(256, '\n');

    cout << "Enter name: ";
    getline(cin, *person->mutable_name());

    cout << "Enter email address (blank for none): ";
    string email;
    getline(cin, email);
    if (!email.empty())
    {
        person->set_email(email);
    }

    while (true)
    {
        cout << "Enter a phone number (or leave blank to finish): ";
        string number;
        getline(cin, number);
        if (number.empty())
        {
            break;
        }

        tutorial::Person::PhoneNumber *phone_number = person->add_phones();
        phone_number->set_number(number);

        cout << "Is this a mobile, home, or work phone? ";
        string type;
        getline(cin, type);
        if (type == "mobile")
        {
            phone_number->set_type(tutorial::Person::MOBILE);
        }
        else if (type == "home")
        {
            phone_number->set_type(tutorial::Person::HOME);
        }
        else if (type == "work")
        {
            phone_number->set_type(tutorial::Person::WORK);
        }
        else
        {
            cout << "Unknown phone type.  Using default." << endl;
        }
    }
}

// Iterates though all people in the AddressBook and prints info about them.
void ListPeople(const tutorial::AddressBook &address_book)
{
    for (int i = 0; i < address_book.people_size(); i++)
    {
        const tutorial::Person &person = address_book.people(i);

        cout << "show protobuf:" << endl;
        cout << "Person ID: " << person.id() << endl;
        cout << "  Name: " << person.name() << endl;
        if (person.has_email())
        {
            cout << "  E-mail address: " << person.email() << endl;
        }

        for (int j = 0; j < person.phones_size(); j++)
        {
            const tutorial::Person::PhoneNumber &phone_number = person.phones(j);

            switch (phone_number.type())
            {
            case tutorial::Person::MOBILE:
                cout << "  Mobile phone #: ";
                break;
            case tutorial::Person::HOME:
                cout << "  Home phone #: ";
                break;
            case tutorial::Person::WORK:
                cout << "  Work phone #: ";
                break;
            }
            cout << phone_number.number() << endl;
        }
    }
}

void serverThread()
{
    cout << "start server thread. " << endl;
    asio::io_service io_service;

    tcp::acceptor acceptor(io_service, tcp::endpoint(tcp::v4(), port));
    tcp::socket socket(io_service);

    // 接続待機
    acceptor.accept(socket);
    cout << "server accept" << endl;

    // メッセージ受信
    string buffer;
    for (;;)
    {
        asio::streambuf receive_buffer;
        boost::system::error_code error;
        asio::read(socket, receive_buffer, asio::transfer_at_least(1), error);
        if (error && error != asio::error::eof)
        {
            cout << "receive failed: " << error.message() << endl;
        }
        else
        {
            const char *data = asio::buffer_cast<const char *>(receive_buffer.data());
            cout << "server receive(text): " << data << endl;
            show_hex(data);

            tutorial::AddressBook address_book;
            stringbuf strBuf(data);
            istream istream(&strBuf);

            if (address_book.ParseFromIstream(&istream))
            {
                ListPeople(address_book);
            }
            else
            {
                cout << "parse failed." << endl;
            }

            receive_buffer.consume(receive_buffer.size());
        }
    }

    cout << "end server thread. " << endl;
}

void clientThread()
{
    cout << "start client thread. " << endl;
    asio::io_service io_service;
    tcp::socket socket(io_service);

    // 接続
    socket.connect(tcp::endpoint(asio::ip::address::from_string(host), port));

    string send_msg;
    boost::asio::streambuf read_buffer;

    for (;;)
    {
        this_thread::sleep_for(chrono::milliseconds(10));
        tutorial::AddressBook address_book;
        PromptForAddress(address_book.add_people());

        boost::asio::streambuf b;
        ostream os(&b);
        address_book.SerializeToOstream(&os);
        asio::write(socket, b);
    }

    cout << "end client thread. " << endl;
}

int main(int argc, char *argv[])
{
    GOOGLE_PROTOBUF_VERIFY_VERSION;

    cout << "start main" << endl;

    thread server_thread(serverThread);
    cout << "server thread id = " << server_thread.get_id() << endl;

    thread client_thread(clientThread);
    cout << "client thread id = " << client_thread.get_id() << endl;

    server_thread.join();
    client_thread.join();
    cout << "end main" << endl;

    // Optional:  Delete all global objects allocated by libprotobuf.
    google::protobuf::ShutdownProtobufLibrary();
    return 0;
}

動作確認

動作を確認すると以下のようにシリアライズしたデータをTCPで送って受け取ったサーバ側でデシリアライズできていることが確認できました。

$ ./try_protobuf 
start main
server thread id = 140231180019456
start server thread. 
client thread id = 140231171626752
start client thread. 
server accept
Enter person ID number: 1
Enter name: jon doh
Enter email address (blank for none): hancock@gmail.com
Enter a phone number (or leave blank to finish): 0120-0000-0000
Is this a mobile, home, or work phone? home
Enter a phone number (or leave blank to finish): 080-1234-5678
Is this a mobile, home, or work phone? mobile
Enter a phone number (or leave blank to finish): 
server receive(text): 
E
jon dohhancock@gmail.com"
0120-0000-0000"
080-1234-5678
show hex:
10 69 10 7 106 111 110 32 100 111 104 16 1 26 17 104 :
E
jon dohh
97 110 99 111 99 107 64 103 109 97 105 108 46 99 111 109 :ancock@gmail.com
34 18 10 14 48 49 50 48 45 48 48 48 48 45 48 48 :"
0120-0000-00
48 48 16 1 34 17 10 13 48 56 48 45 49 50 51 52 :00"
080-1234
45 53 54 55 56 16 1 :-5678

show protobuf:
Person ID: 1
  Name: jon doh
  E-mail address: hancock@gmail.com
  Home phone #: 0120-0000-0000
  Mobile phone #: 080-1234-5678
Enter person ID number: 

簡単に動作確認をしましたが、プロトコルバッファの動きは問題なさそうです。

今回はサーバー側は受け取ったバイナリをそのまま変換するだけにしていますが、Protocol Buffer自体には区切り文字がないので終端まで受け取ったかどうか判断できるようにしておく必要はある気がします、また複数のメッセージが同時に送られてくる場合もあるので、受け取ったメッセージを区切る必要はありそうです。