PHP開源:Kafka-php-使用 PHP 編寫的 Kafka 客戶端

WayneMacias 7年前發布 | 46K 次閱讀 PHP PHP開發 Apache Kafka

Kafka-php

English Document

Kafka-php 使用純粹的PHP 編寫的 kafka 客戶端,目前支持 0.8.x 以上版本的 Kafka,該項目 v0.2.x 和 v0.1.x 不兼容,如果使用原有的 v0.1.x 的可以參照文檔 Kafka PHP v0.1.x Document , 不過建議切換到 v0.2.x 上。v0.2.x 使用 PHP 異步執行的方式來和kafka broker 交互,較 v0.1.x 更加穩定高效, 由于使用 PHP 語言編寫所以不用編譯任何的擴展就可以使用,降低了接入與維護成本

安裝環境要求

  • PHP 版本大于 5.5
  • Kafka Server 版本大于 0.8.0
  • 消費模塊 Kafka Server 版本需要大于 0.9.0

Installation

使用 Composer 安裝

添加 composer 依賴 nmred/kafka-php 到項目的 composer.json 文件中即可,如:

{
    "require": {
        "nmred/kafka-php": "0.2.*"
    }
}

Produce

<?php
require '../vendor/autoload.php';
date_default_timezone_set('PRC');
use Monolog\Logger;
use Monolog\Handler\StdoutHandler;
// Create the logger
$logger = new Logger('my_logger');
// Now add some handlers
$logger->pushHandler(new StdoutHandler());

// 設置生產相關配置,具體配置參數見 Configuration $config = \Kafka\ProducerConfig::getInstance(); $config->setMetadataRefreshIntervalMs(10000); $config->setMetadataBrokerList('10.13.4.159:9192'); $config->setBrokerVersion('0.9.0.1'); $config->setRequiredAck(1); $config->setIsAsyn(false); $config->setProduceInterval(500); $producer = new \Kafka\Producer(function() { return array( array( 'topic' => 'test', 'value' => 'test....message.', 'key' => 'testkey', ), ); }); $producer->setLogger($logger); $producer->success(function($result) { var_dump($result); }); $producer->error(function($errorCode, $context) { var_dump($errorCode); }); $producer->send();</code></pre>

Consumer

<?php
require '../vendor/autoload.php';
date_default_timezone_set('PRC');
use Monolog\Logger;
use Monolog\Handler\StdoutHandler;
// Create the logger
$logger = new Logger('my_logger');
// Now add some handlers
$logger->pushHandler(new StdoutHandler());

$config = \Kafka\ConsumerConfig::getInstance(); $config->setMetadataRefreshIntervalMs(10000); $config->setMetadataBrokerList('10.13.4.159:9192'); $config->setGroupId('test'); $config->setBrokerVersion('0.9.0.1'); $config->setTopics(array('test')); //$config->setOffsetReset('earliest'); $consumer = new \Kafka\Consumer(); $consumer->setLogger($logger); $consumer->start(function($topic, $part, $message) { var_dump($message); });</code></pre>

Basic Protocol

基礎協議 API 調用方式見 Example

 

項目主頁:http://www.baiduhome.net/lib/view/home/1493705358573

 

 本文由用戶 WayneMacias 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!