Kafka + Vagrant でメッセージ送受信(PHP + Golang)
職場にKafkaが導入されました。
今後触れる機会がどんどん増えてくるので、実際にVagrant環境にKafkaを入れて使ってみました。
Vagrant起動
$ vagrant up The provider 'virtualbox' that was requested to back the machine 'default' is reporting that it isn't usable on this system. The reason is shown below: Vagrant has detected that you have a version of VirtualBox installed that is not supported. Please install one of the supported versions listed below to use Vagrant: 4.0, 4.1, 4.2, 4.3
VirtualBoxのVersionが5.0にあがったようだが、インストールされているVagrantバージョンが対応していない模様。
バージョンを確認してみる
$ vagrant version Installed Version: 1.7.2 Latest Version: 1.8.1 To upgrade to the latest version, visit the downloads page and download and install the latest version of Vagrant from the URL below: http://www.vagrantup.com/downloads.html If you're curious what changed in the latest release, view the CHANGELOG below: https://github.com/mitchellh/vagrant/blob/v1.8.1/CHANGELOG.md
最新が1.8.1という事で、
上記に記載されているダウンロードURLから最新版を落としてダウンロード&インストール
インストール後のバージョン確認
$ vagrant version Installed Version: 1.8.1 Latest Version: 1.8.1 You're running an up-to-date version of Vagrant!
バージョンアップに成功したので、気を取り直して改めて。
$ vagrant up
$ vagrant ssh Last login: Tue Feb 23 05:11:42 2016 from 10.0.2.2 Welcome to your Vagrant-built virtual machine. [vagrant@localhost ~]$
Kafka入手
下記サイトから最新版をダウンロード(2016.2.24時点の最新バージョン:9.0.1)
[vagrant@localhost ~]$ wget http://ftp.tsukuba.wide.ad.jp/software/apache/kafka/0.9.0.1/kafka_2.11-0.9.0.1.tgz [vagrant@localhost ~]$ tar xzvf kafka_2.11-0.9.0.1.tgz
ZooKeeper起動
[vagrant@localhost ~]$ cd kafka_2.11-0.9.0.1 [vagrant@localhost kafka_2.11-0.9.0.1]$ bin/zookeeper-server-start.sh config/zookeeper.properties /home/vagrant/kafka_2.11-0.9.0.1/bin/kafka-run-class.sh: line 167: exec: java: not found
と、ここでJavaのRuntimeエラー。
インストール可能なリスト確認
[vagrant@localhost kafka_2.11-0.9.0.1]$ yum search openjdk 読み込んだプラグイン:fastestmirror Determining fastest mirrors * base: www.ftp.ne.jp * extras: www.ftp.ne.jp * updates: www.ftp.ne.jp base | 3.7 kB 00:00 base/primary_db | 4.6 MB 00:00 extras | 3.4 kB 00:00 extras/primary_db | 34 kB 00:00 updates | 3.4 kB 00:00 updates/primary_db | 3.9 MB 00:00 ============================================================ N/S Matched: openjdk ============================================================= java-1.6.0-openjdk.x86_64 : OpenJDK Runtime Environment java-1.6.0-openjdk-demo.x86_64 : OpenJDK Demos java-1.6.0-openjdk-devel.x86_64 : OpenJDK Development Environment ・ ・ ・ java-1.8.0-openjdk-src.x86_64 : OpenJDK Source Bundle java-1.8.0-openjdk-src-debug.x86_64 : OpenJDK Source Bundle for packages with debug on icedtea-web.x86_64 : Additional Java components for OpenJDK - Java browser plug-in and Web Start implementation Name and summary matches only, use "search all" for everything.
SDKは不要でRuntimeだけ欲しいので、
最新版の java-1.8.0-openjdk.x86_64 をインストール
[vagrant@localhost kafka_2.11-0.9.0.1]$ sudo yum -y install java-1.8.0-openjdk.x86_64
再度ZooKeeper起動
[vagrant@localhost kafka_2.11-0.9.0.1]$ bin/zookeeper-server-start.sh config/zookeeper.properties [2016-02-23 05:23:36,296] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig) [2016-02-23 05:23:36,299] INFO autopurge.snapRetainCount set to 3 (org.apache.zookeeper.server.DatadirCleanupManager) ・ ・ ・
起動しました
Kafka起動
[vagrant@localhost kafka_2.11-0.9.0.1]$ bin/kafka-server-start.sh config/server.properties [2016-02-23 05:29:17,557] INFO KafkaConfig values: ・ ・ ・
Kafkaも問題なく起動
Topicの作成
Topicは送信側と受信側がやり取りするメッセージボックス名称。今回は「sample」としました。
[vagrant@localhost kafka_2.11-0.9.0.1]$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic sample Created topic "sample".
topicが作成され、送受信の為の必要最低限の準備が整いました。
送受信
受信用のターミナルを開きながら、別ターミナルで送信コマンドを叩いてみます
送信側(Producer)
[vagrant@localhost kafka_2.11-0.9.0.1]bin/kafka-console-producer.sh --broker-list localhost:9092 --topic sample send sample message
「send sample message」と送ると
受信側(Consumer)
[vagrant@localhost kafka_2.11-0.9.0.1]$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic sample --from-beginning send sample message
受信側にも即時に反映されました!(テンションあがるところ)
PHP(送信)+ GO(受信)で実装してみる
今の職場の環境がこれに近いので、実際にPHP側でメッセージを送り、
GO側でメッセージを受け取る処理を実装してみました。
PHP extension:
PHPからKafkaを扱えるようにする為にextensionのインストール
こちらのマニュアルに沿ってインストールしました
https://github.com/EVODelavega/phpkafka
[vagrant@localhost]$ git clone https://github.com/EVODelavega/phpkafka.git [vagrant@localhost phpkafka]$ cd phpkafka [vagrant@localhost phpkafka]$ phpize [vagrant@localhost phpkafka]$ ./configure --enable-kafka [vagrant@localhost phpkafka]$ make [vagrant@localhost phpkafka]$ sudo make install [vagrant@localhost phpkafka]$ sudo sh -c 'echo "extension=kafka.so" >> /etc/php5/conf.d/kafka.ini' [vagrant@localhost phpkafka]$ sudo sh -c 'echo "extension=kafka.so" >> /etc/php5/cli/conf.d/20-kafka.ini' [vagrant@localhost phpkafka]$ php -m | grep kafka kafka [vagrant@localhost phpkafka]$
Kafkaのモジュールが追加されました。
サンプルプログラム
メッセージを送信する超シンプルなスクリプト
10回メッセージを送信しています
produce.php
<?php $kafka = new Kafka("localhost:9092"); for ($i=0; $i<10; $i++) { $kafka->produce("sample", "message send test.".$i); }
GOクライアントライブラリ
いくつかあるようですが、今回はこちらを使ってみました。
https://github.com/optiopay/kafka
[vagrant@localhost kafka_2.11-0.9.0.1]$ go get github.com/optiopay/kafka
consume.go
package main import ( "log" "github.com/optiopay/kafka" ) const ( topic = "sample" partition = 0 ) var kafkaAddrs = []string{"localhost:9092"} func printConsumed(broker kafka.Client) { conf := kafka.NewConsumerConf(topic, partition) conf.StartOffset = kafka.StartOffsetNewest consumer, err := broker.Consumer(conf) if err != nil { log.Fatalf("cannot create kafka consumer for %s:%d: %s", topic, partition, err) } for { msg, err := consumer.Consume() if err != nil { if err != kafka.ErrNoData { log.Printf("cannot consume %q topic message: %s", topic, err) } break } log.Printf("message %d: %s", msg.Offset, msg.Value) } log.Print("consumer quit") } func main() { broker, err := kafka.Dial(kafkaAddrs, kafka.NewBrokerConf("test-client")) if err != nil { log.Fatalf("cannot connect to kafka cluster: %s", err) } defer broker.Close() printConsumed(broker) }
試してみよう
まずはGO側でメッセージ待機
PHPを動かしてメッセージ送信
結果