読者です 読者をやめる 読者になる 読者になる

Kafka + Vagrant でメッセージ送受信(PHP + Golang)

職場にKafkaが導入されました。

今後触れる機会がどんどん増えてくるので、実際にVagrant環境にKafkaを入れて使ってみました。

Vagrant起動

$ vagrant up
The provider 'virtualbox' that was requested to back the machine
'default' is reporting that it isn't usable on this system. The
reason is shown below:

Vagrant has detected that you have a version of VirtualBox installed
that is not supported. Please install one of the supported versions
listed below to use Vagrant:

4.0, 4.1, 4.2, 4.3

VirtualBoxのVersionが5.0にあがったようだが、インストールされているVagrantバージョンが対応していない模様。

バージョンを確認してみる

$ vagrant version
Installed Version: 1.7.2
Latest Version: 1.8.1
 
To upgrade to the latest version, visit the downloads page and
download and install the latest version of Vagrant from the URL
below:

  http://www.vagrantup.com/downloads.html

If you're curious what changed in the latest release, view the
CHANGELOG below:

  https://github.com/mitchellh/vagrant/blob/v1.8.1/CHANGELOG.md

最新が1.8.1という事で、

上記に記載されているダウンロードURLから最新版を落としてダウンロード&インストール

 

インストール後のバージョン確認

$ vagrant version
Installed Version: 1.8.1
Latest Version: 1.8.1
 
You're running an up-to-date version of Vagrant!

バージョンアップに成功したので、気を取り直して改めて。

$ vagrant up
$ vagrant ssh
Last login: Tue Feb 23 05:11:42 2016 from 10.0.2.2
Welcome to your Vagrant-built virtual machine.
[vagrant@localhost ~]$

Kafka入手

下記サイトから最新版をダウンロード(2016.2.24時点の最新バージョン:9.0.1)

Apache Download Mirrors

[vagrant@localhost ~]$ wget http://ftp.tsukuba.wide.ad.jp/software/apache/kafka/0.9.0.1/kafka_2.11-0.9.0.1.tgz
[vagrant@localhost ~]$ tar xzvf kafka_2.11-0.9.0.1.tgz

ZooKeeper起動

[vagrant@localhost ~]$ cd kafka_2.11-0.9.0.1
[vagrant@localhost kafka_2.11-0.9.0.1]$ bin/zookeeper-server-start.sh config/zookeeper.properties 
/home/vagrant/kafka_2.11-0.9.0.1/bin/kafka-run-class.sh: line 167: exec: java: not found

と、ここでJavaのRuntimeエラー。

インストール可能なリスト確認

[vagrant@localhost kafka_2.11-0.9.0.1]$ yum search openjdk
読み込んだプラグイン:fastestmirror
Determining fastest mirrors
 * base: www.ftp.ne.jp
 * extras: www.ftp.ne.jp
 * updates: www.ftp.ne.jp
base                                                                                                                    | 3.7 kB     00:00     
base/primary_db                                                                                                         | 4.6 MB     00:00     
extras                                                                                                                  | 3.4 kB     00:00     
extras/primary_db                                                                                                       |  34 kB     00:00     
updates                                                                                                                 | 3.4 kB     00:00     
updates/primary_db                                                                                                      | 3.9 MB     00:00     
============================================================ N/S Matched: openjdk =============================================================
java-1.6.0-openjdk.x86_64 : OpenJDK Runtime Environment
java-1.6.0-openjdk-demo.x86_64 : OpenJDK Demos
java-1.6.0-openjdk-devel.x86_64 : OpenJDK Development Environment
・
・
・
java-1.8.0-openjdk-src.x86_64 : OpenJDK Source Bundle
java-1.8.0-openjdk-src-debug.x86_64 : OpenJDK Source Bundle for packages with debug on
icedtea-web.x86_64 : Additional Java components for OpenJDK - Java browser plug-in and Web Start implementation

  Name and summary matches only, use "search all" for everything.

SDKは不要でRuntimeだけ欲しいので、

最新版の java-1.8.0-openjdk.x86_64 をインストール

[vagrant@localhost kafka_2.11-0.9.0.1]$ sudo yum -y install java-1.8.0-openjdk.x86_64

再度ZooKeeper起動

[vagrant@localhost kafka_2.11-0.9.0.1]$ bin/zookeeper-server-start.sh config/zookeeper.properties
[2016-02-23 05:23:36,296] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2016-02-23 05:23:36,299] INFO autopurge.snapRetainCount set to 3 (org.apache.zookeeper.server.DatadirCleanupManager)
・
・
・

起動しました

Kafka起動

[vagrant@localhost kafka_2.11-0.9.0.1]$ bin/kafka-server-start.sh config/server.properties
[2016-02-23 05:29:17,557] INFO KafkaConfig values:
・
・
・

Kafkaも問題なく起動

Topicの作成

Topicは送信側と受信側がやり取りするメッセージボックス名称。今回は「sample」としました。

[vagrant@localhost kafka_2.11-0.9.0.1]$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic sample
Created topic "sample".

topicが作成され、送受信の為の必要最低限の準備が整いました。

送受信

受信用のターミナルを開きながら、別ターミナルで送信コマンドを叩いてみます

送信側(Producer)
[vagrant@localhost kafka_2.11-0.9.0.1]bin/kafka-console-producer.sh --broker-list localhost:9092 --topic sample
send sample message

「send sample message」と送ると

受信側(Consumer)
[vagrant@localhost kafka_2.11-0.9.0.1]$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic sample --from-beginning
send sample message

受信側にも即時に反映されました!(テンションあがるところ)

PHP(送信)+ GO(受信)で実装してみる

今の職場の環境がこれに近いので、実際にPHP側でメッセージを送り、

GO側でメッセージを受け取る処理を実装してみました。

PHP extension:

PHPからKafkaを扱えるようにする為にextensionのインストール

こちらのマニュアルに沿ってインストールしました

https://github.com/EVODelavega/phpkafka

[vagrant@localhost]$ git clone https://github.com/EVODelavega/phpkafka.git
[vagrant@localhost phpkafka]$ cd phpkafka
[vagrant@localhost phpkafka]$ phpize
[vagrant@localhost phpkafka]$ ./configure --enable-kafka
[vagrant@localhost phpkafka]$ make
[vagrant@localhost phpkafka]$ sudo make install
[vagrant@localhost phpkafka]$ sudo sh -c 'echo "extension=kafka.so" >> /etc/php5/conf.d/kafka.ini'
[vagrant@localhost phpkafka]$ sudo sh -c 'echo "extension=kafka.so" >> /etc/php5/cli/conf.d/20-kafka.ini'
[vagrant@localhost phpkafka]$ php -m | grep kafka
kafka
[vagrant@localhost phpkafka]$

Kafkaのモジュールが追加されました。

サンプルプログラム

メッセージを送信する超シンプルなスクリプト

10回メッセージを送信しています

produce.php
<?php
$kafka = new Kafka("localhost:9092");
for ($i=0; $i<10; $i++) {
    $kafka->produce("sample", "message send test.".$i);
}
GOクライアントライブラリ

いくつかあるようですが、今回はこちらを使ってみました。

https://github.com/optiopay/kafka

[vagrant@localhost kafka_2.11-0.9.0.1]$ go get github.com/optiopay/kafka
consume.go
package main

import (
    "log"

    "github.com/optiopay/kafka"
)

const (
    topic     = "sample"
    partition = 0 
)

var kafkaAddrs = []string{"localhost:9092"}

func printConsumed(broker kafka.Client) {
    conf := kafka.NewConsumerConf(topic, partition)
    conf.StartOffset = kafka.StartOffsetNewest
    consumer, err := broker.Consumer(conf)
    if err != nil {
        log.Fatalf("cannot create kafka consumer for %s:%d: %s", topic, partition, err)
    }   

    for {
        msg, err := consumer.Consume()
        if err != nil {
            if err != kafka.ErrNoData {
                log.Printf("cannot consume %q topic message: %s", topic, err)
            }   
            break
        }   
        log.Printf("message %d: %s", msg.Offset, msg.Value)
    }   
    log.Print("consumer quit")
}

func main() {
    broker, err := kafka.Dial(kafkaAddrs, kafka.NewBrokerConf("test-client"))
    if err != nil {
        log.Fatalf("cannot connect to kafka cluster: %s", err)
    }   
    defer broker.Close()
    printConsumed(broker)
}

試してみよう

まずはGO側でメッセージ待機

f:id:kotakotaking:20160224181856p:plain

PHPを動かしてメッセージ送信

f:id:kotakotaking:20160224181900p:plain

結果

f:id:kotakotaking:20160224181911p:plain

だんヾ(*ΦωΦ)ノ ヒャッホゥ