Kafka学习笔记4--Kafka生产者的客户端(PHP)开发 - 鹿呦呦 - 博客园


本站和网页 https://www.cnblogs.com/sunshineliulu/p/12129610.html 的作者无关,不对其内容负责。快照谨为网络故障时之索引,不代表被搜索网站的即时页面。

Kafka学习笔记4--Kafka生产者的客户端(PHP)开发 - 鹿呦呦 - 博客园
首页
新闻
博问
专区
闪存
班级
我的博客
我的园子
账号设置
简洁模式 ...
退出登录
注册
登录
呦呦鹿鸣
博客园
首页
联系
管理
Kafka学习笔记4--Kafka生产者的客户端(PHP)开发
PHP操作Kafka之生产者的实现
一、准备工作
虽然 Kafka 是用 Java/Scala 语言编写的,但这不妨碍它对多语言的支持。可以在 Kafka 官网的 CLIENTS 查看 Kafka 支持的语言,其中包括 C/C++、Python、Go 等语言。
PHP 操作 Kafka 需要安装 librdkafka 库和 kafka 的 PHP 扩展。
1.安装 librdkafka 库
git clone https://github.com/edenhill/librdkafka.git
./configure
make
sudo make install
2.安装 php-kafka 扩展
$ git clone https://github.com/arnaud-lb/php-rdkafka.git
$ cd librdkafka/
$ phpize
$ ./configure
$ make
$ sudo make install
#在php.ini 文件中配置 rdkafka扩展
extension=rdkafka.so
#查看扩展是否生效
php -m | grep kafka
二、代码实现
demo 来源于 https://github.com/arnaud-lb/php-rdkafka#examples
正常的生产逻辑如下:
1.配置生产者客户端参数及创建相应的生产者实例;
/**
* Create a producer
*/
$conf = new RdKafka\Conf();
$conf->set('log_level', LOG_DEBUG);
//$conf->set('debug', 'all');
$rk = new RdKafka\Producer($conf);
$rk->addBrokers("127.0.0.1");
2.构建主题;
/**
* Create a topic instance from the producer
*/
$topic = $rk->newTopic("test");
3.发送消息;
/**
* Producing messages
* The first argument is the partition. RD_KAFKA_PARTITION_UA stands for unassigned, and lets librdkafka choose the partition.
* 第一个参数是分区,RD_KAFKA_PARTITION_UA 表示未分配,并且由 librdkafka 选择分区。
* The second argument are message flags and should be either 0 or RD_KAFKA_MSG_F_BLOCK to block produce on full queue.
* 第二个参数是消息标志,为 0 或 RD_KAFKA_MSG_F_BLOCK,当队列满了时阻止生产消息。
* The message payload can be anything.
* 消息可以是任何内容。
*/
$topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message payload");
4.关闭生产者实例。
/**
* Proper shutdown
* This should be done prior to destroying a producer instance
* to make sure all queued and in-flight produce requests are completed before terminating.
* 关闭生产者实例前需确保所有在队列中和正在生产的生产请求都已完成。
* Not calling flush can lead to message loss!
* 不调用flush会导致消息丢失!
*/
$timeout_ms = 60000; // 1 minute
$rk->flush($timeout_ms);
检验消息是否发送成功
终端开启一个消费者:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test
在另一个窗口执行 php producer.php,
可看到消费者终端接收到消息。
完整代码如下:
<?php
/**
* Created by PhpStorm.
* User: liulu
* Date: 2020/1/1
* Time: 18:38
*/
/**
* Create a producer
*/
$conf = new RdKafka\Conf();
$conf->set('log_level', LOG_DEBUG);
//$conf->set('debug', 'all');
$rk = new RdKafka\Producer($conf);
$rk->addBrokers("127.0.0.1");
/**
* Create a topic instance from the producer
*/
$topic = $rk->newTopic("test");
/**
* Producing messages
* The first argument is the partition. RD_KAFKA_PARTITION_UA stands for unassigned, and lets librdkafka choose the partition.
* 第一个参数是分区,RD_KAFKA_PARTITION_UA 表示未分配,并且由 librdkafka 选择分区。
* The second argument are message flags and should be either 0 or RD_KAFKA_MSG_F_BLOCK to block produce on full queue.
* 第二个参数是消息标志,为 0 或 RD_KAFKA_MSG_F_BLOCK,当队列满了时阻止生产消息。
* The message payload can be anything.
* 消息可以是任何内容。
*/
$topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message payload");
/**
* Proper shutdown
* This should be done prior to destroying a producer instance
* to make sure all queued and in-flight produce requests are completed before terminating.
* 关闭生产者实例前需确保所有在队列中和正在生产的生产请求都已完成。
* Not calling flush can lead to message loss!
* 不调用flush会导致消息丢失!
*/
$timeout_ms = 60000; // 1 minute
$rk->flush($timeout_ms);
echo 'finished';exit;
posted @
2020-01-01 19:38
鹿呦呦
阅读(1752)
评论(0)
编辑
收藏
举报
刷新评论刷新页面返回顶部
Copyright 2022 鹿呦呦
Powered by .NET 7.0 on Kubernetes