kafka扩展安装好后就可以编程测试了,以下代码是在thinkphp5上实现的。
测试kafka生产者
-
~~~
//测试kafka生产者
public function testProducer()
{
$broker_list = '127.0.0.1:9092';
$topic = 'test';
$partition = 0;
$rk = new \RdKafka\Producer();
$rk->setLogLevel(LOG_DEBUG);
$rk->addBrokers($broker_list);
$topic = $rk->newTopic($topic);
$message = 'this is a php test';
$topic->produce(RD_KAFKA_PARTITION_UA,$partition,json_encode($message));
}
~~~
测试kafka消费者
-
~~~
//测试kafka消费者
public function testConsumer($object,$callback)
{
$broker_list = '127.0.0.1:9092';
$topic = 'test';
$conf = new \RdKafka\Conf();
$conf->set('group.id',0);
$conf->set('metadata.broker.list',$broker_list);
$topicConf = new \RdKafka\TopicConf();
$topicConf->set('auto.offset.reset','smallest');
$conf->setDefaultTopicConf($topicConf);
$consumer = new \RdKafka\KafkaConsumer($conf);
$consumer->subscribe([$topic]);
echo 'waiting message....';
while (true){
$message = $consumer->consume(120*1000);
switch ($message->err){
case RD_KAFKA_RESP_ERR_NO_ERROR:
echo 'msg payload....';
$object->$callback($message->payload);
break;
}
sleep(1);
}
}
~~~