pache Kafka is a fast, real-time, distributed, fault-tolerant message broker.

Using Kafka, you can transfer streaming data to the cluster, which is generated continuously, for example, history of website visits, financial transactions, online shopping orders, application logs, etc. This information can help to understand what is happening with the data right now, create recommendations, use machine learning or aggregate data for further analysis. All this takes seconds or minutes, instead of hours and days.

You can read more in the official documentation.

1. Download the code

First, download and untar the code

wget http://mirror.linux-ia64.org/apache/kafka/2.1.0/kafka_2.11-2.1.0.tgz
tar -xzf kafka_2.11-2.1.0.tgz
cd kafka_2.11-2.1.0

2. Start the server

Kafka uses ZooKeeper, we will use the script packaged with Kafka to get a single-node ZooKeeper instance.

bin/zookeeper-server-start.sh config/zookeeper.properties

Start the kafka server.

bin/kafka-server-start.sh config/server.properties

Now we can create a topic named “test” with a single partition and one replica.

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

3. Install PHP client

There are several php clients for Kafka. I recommend using this one https://github.com/weiboad/kafka-php, because I already used it in production and it showed better performance than some other php clients.

composer require nmred/kafka-php

4. Create consumer

Create file Consumer.php and set broker list to 127.0.0.1:9092 as we have Kafka cluster locally with one broker (node).

<?php
require '../vendor/autoload.php';

$config = \Kafka\ConsumerConfig::getInstance();
$config->setMetadataRefreshIntervalMs(10000);
$config->setMetadataBrokerList('127.0.0.1:9092');
$config->setGroupId('test');
$config->setBrokerVersion('1.0.0');
$config->setTopics(['test']);
$consumer = new \Kafka\Consumer();

$consumer->start(function($topic, $part, $message) {
    var_dump($message);
});

5. Create producer

A producer can work in two modes – asynchronous and synchronous. Let’s try both.

First, create file Producer.php with following content for async mode.

<?php
require '../vendor/autoload.php';

$config = \Kafka\ProducerConfig::getInstance();
$config->setMetadataRefreshIntervalMs(10000);
$config->setMetadataBrokerList('127.0.0.1:9092');
$config->setBrokerVersion('1.0.0');
$config->setRequiredAck(1);
$config->setIsAsyn(false);
$config->setProduceInterval(500);

$producer = new \Kafka\Producer(
    function() {
        return [
            [
                'topic' => 'test',
                'value' => 'test....message.',
                'key' => 'testkey',
            ],
        ];
    }
);

$producer->success(function($result) {
    var_dump($result);
});
$producer->error(function($errorCode) {
    var_dump($errorCode);
});
$producer->send(true);

And create file ProducerSync.php for sync mode where we send 100 messages to Kafka in a cycle.

<?php
require '../vendor/autoload.php';

$config = \Kafka\ProducerConfig::getInstance();
$config->setMetadataRefreshIntervalMs(10000);
$config->setMetadataBrokerList('127.0.0.1:9092');
$config->setBrokerVersion('1.0.0');
$config->setRequiredAck(1);
$config->setIsAsyn(false);
$config->setProduceInterval(500);
$producer = new \Kafka\Producer();

for($i = 0; $i < 100; $i++) {
    $producer->send([
        [
            'topic' => 'test',
            'value' => 'test... message #'.$i,
            'key' => '',
        ],
    ]);
}

6. Let’s send messages

Ok, we have all that we need. Start our consumer, run in a console.

php Consumer.php

Note that it can take some time before consumer begins to receive our messages.

Now try to send messages by running in a console in a new tab

php Producer.php

And

php ProducerSync.php

And as we can see all messages come to our consumer, exactly what we want!

What’s next?

We have a single-node instance with a single partition and one replica. Not many changes if we want to start a few more broker instances, just see example in official documentation.

 

 

https://github.com/weiboad/kafka-php

 

 

 

 

 

 

version: '2'

services:
  zookeeper:
    image: 'bitnami/zookeeper:3'
    ports:
      - '2181:2181'
    volumes:
      - 'zookeeper_data:/bitnami'
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
  kafka:
    image: 'bitnami/kafka:2'
    ports:
      - '9092:9092'
    volumes:
      - 'kafka_data:/bitnami'
    environment:
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - ALLOW_PLAINTEXT_LISTENER=yes
    depends_on:
      - zookeeper

volumes:
  zookeeper_data:
    driver: local
  kafka_data:
    driver: local