
| class kafka { public $broker_list; public $topic; public $group_id; protected $producer = null; protected $consumer = null; protected $receive_wait_time; protected $receive_wait_num;
public function __construct() { $this->broker_list = config('kafka.broker_list'); $this->topic = config('kafka.default_topic'); $this->group_id = config('kafka.default_group_id'); $this->producer = null; $this->consumer = null; $this->receive_wait_time = config('kafka.default_receive_wait_time', 10); $this->receive_wait_num = config('kafka.default_receive_wait_num', 100); } /** * 获取生产者 */ public function Producer() { $conf = new \RdKafka\Conf(); $conf->set('bootstrap.servers', $this->broker_list); // 0:不会等待服务器的反馈。该消息会被立刻添加到 socket buffer 中并认为已经发送完成 // 1:leader节点会将记录写入本地日志,并且在所有 follower 节点反馈之前就先确认成功 // all:leader 节点会等待所有同步中的副本确认之后再确认这条记录是否发送完成 $conf->set('acks', '0');
//If you need to produce exactly once and want to keep the original produce order, uncomment the line below //$conf->set('enable.idempotence', 'true');
$producer = new \RdKafka\Producer($conf);
$this->producer = $producer; return $this; }
/** * 发送消息 * * @param string|array $msg * @param string $topic * @return void */ public function SendMsg($msg = '', $topic = '') { if (empty($topic)) { $topic = $this->topic; }
$producer = $this->producer;
$topic = $producer->newTopic($topic);
if (!is_array($msg)) { $msg = [$msg]; } foreach ($msg as $value) { $topic->produce(RD_KAFKA_PARTITION_UA, 0, $value); $producer->poll(0); }
for ($flushRetries = 0; $flushRetries < count($msg); $flushRetries++) { $result = $producer->flush(10000); if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) { break; } }
if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) { throw new \RuntimeException('Kafka消息发送失败'); } }
/** * 获取消费者 * * @param string $group_id * @return void */ public function Consumer($group_id = '') { $conf = new \RdKafka\Conf();
// Set a rebalance callback to log partition assignments (optional) $conf->setRebalanceCb(function (\RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) { switch ($err) { case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS: echo "Assign: "; var_dump($partitions); $kafka->assign($partitions); break;
case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS: echo "Revoke: "; var_dump($partitions); $kafka->assign(NULL); break;
default: throw new \Exception($err); } });
// Configure the group.id. All consumer with the same group.id will consume // different partitions. if (empty($group_id)) { $group_id = $this->group_id; }
// 设置相同的group,防止一次消息被多次消费。 // 消费者启动的进程数应小于等于topic的分区数,否则多余的进程是无用的 $conf->set('group.id', $group_id);
// Initial list of Kafka brokers $conf->set('bootstrap.servers', $this->broker_list);
// Set where to start consuming messages when there is no initial offset in // offset store or the desired offset is out of range. // 'smallest': start from the beginning //earliest:简单理解为从头开始消费,latest:简单理解为从最新的开始消费 $conf->set('auto.offset.reset', 'earliest'); // 在interval.ms的时间内定期向ZooKeeper提交使用者已获取的消息的偏移量 // 自动提交分区消费的位置,手动可确保消息被消费 $conf->set('enable.auto.commit', true); $conf->set('auto.commit.interval.ms', 1000);
$consumer = new \RdKafka\KafkaConsumer($conf);
$this->consumer = $consumer; return $this; }
/** * 接收消息 * * @param string $topic * @param array $callback * @return void */ public function ReceiveMsg($topic = '', array $callback = []) { $consumer = $this->consumer;
if (empty($topic)) { $topic = $this->topic; } if (!is_array($topic)) { $topic = [$topic]; } // Subscribe to topic 'test' $consumer->subscribe($topic);
echo "Waiting for partition assignment... (make take some time when\n"; echo "quickly re-joining the group after leaving it.)\n";
$i = 0; $msg_list = []; while (true) { $i++; if ($i > $this->receive_wait_time) { $i = 0; if (empty($msg_list)) { continue; } if (!empty($callback)) { call_user_func_array($callback, [$msg_list]); } $msg_list = []; } // 阻塞一秒钟 $message = $consumer->consume(1000); switch ($message->err) { case RD_KAFKA_RESP_ERR_NO_ERROR: $msg_list[] = $message->payload; if (count($msg_list) < $this->receive_wait_num) { continue; } if (!empty($callback)) { call_user_func_array($callback, [$msg_list]); } $i = 0; $msg_list = []; break; case RD_KAFKA_RESP_ERR__PARTITION_EOF: // echo "No more messages; will wait for more\n"; break; case RD_KAFKA_RESP_ERR__TIMED_OUT: // echo "Timed out\n"; break; default: throw new \Exception($message->errstr(), $message->err); break; } } } }
|