1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201
| class kafka { public $broker_list; public $topic; public $group_id; protected $producer = null; protected $consumer = null; protected $receive_wait_time; protected $receive_wait_num;
public function __construct() { $this->broker_list = config('kafka.broker_list'); $this->topic = config('kafka.default_topic'); $this->group_id = config('kafka.default_group_id'); $this->producer = null; $this->consumer = null; $this->receive_wait_time = config('kafka.default_receive_wait_time', 10); $this->receive_wait_num = config('kafka.default_receive_wait_num', 100); } /** * 获取生产者 */ public function Producer() { $conf = new \RdKafka\Conf(); $conf->set('bootstrap.servers', $this->broker_list); // 0:不会等待服务器的反馈。该消息会被立刻添加到 socket buffer 中并认为已经发送完成 // 1:leader节点会将记录写入本地日志,并且在所有 follower 节点反馈之前就先确认成功 // all:leader 节点会等待所有同步中的副本确认之后再确认这条记录是否发送完成 $conf->set('acks', '0');
//If you need to produce exactly once and want to keep the original produce order, uncomment the line below //$conf->set('enable.idempotence', 'true');
$producer = new \RdKafka\Producer($conf);
$this->producer = $producer; return $this; }
/** * 发送消息 * * @param string|array $msg * @param string $topic * @return void */ public function SendMsg($msg = '', $topic = '') { if (empty($topic)) { $topic = $this->topic; }
$producer = $this->producer;
$topic = $producer->newTopic($topic);
if (!is_array($msg)) { $msg = [$msg]; } foreach ($msg as $value) { $topic->produce(RD_KAFKA_PARTITION_UA, 0, $value); $producer->poll(0); }
for ($flushRetries = 0; $flushRetries < count($msg); $flushRetries++) { $result = $producer->flush(10000); if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) { break; } }
if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) { throw new \RuntimeException('Kafka消息发送失败'); } }
/** * 获取消费者 * * @param string $group_id * @return void */ public function Consumer($group_id = '') { $conf = new \RdKafka\Conf();
// Set a rebalance callback to log partition assignments (optional) $conf->setRebalanceCb(function (\RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) { switch ($err) { case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS: echo "Assign: "; var_dump($partitions); $kafka->assign($partitions); break;
case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS: echo "Revoke: "; var_dump($partitions); $kafka->assign(NULL); break;
default: throw new \Exception($err); } });
// Configure the group.id. All consumer with the same group.id will consume // different partitions. if (empty($group_id)) { $group_id = $this->group_id; }
// 设置相同的group,防止一次消息被多次消费。 // 消费者启动的进程数应小于等于topic的分区数,否则多余的进程是无用的 $conf->set('group.id', $group_id);
// Initial list of Kafka brokers $conf->set('bootstrap.servers', $this->broker_list);
// Set where to start consuming messages when there is no initial offset in // offset store or the desired offset is out of range. // 'smallest': start from the beginning //earliest:简单理解为从头开始消费,latest:简单理解为从最新的开始消费 $conf->set('auto.offset.reset', 'earliest'); // 在interval.ms的时间内定期向ZooKeeper提交使用者已获取的消息的偏移量 // 自动提交分区消费的位置,手动可确保消息被消费 $conf->set('enable.auto.commit', true); $conf->set('auto.commit.interval.ms', 1000);
$consumer = new \RdKafka\KafkaConsumer($conf);
$this->consumer = $consumer; return $this; }
/** * 接收消息 * * @param string $topic * @param array $callback * @return void */ public function ReceiveMsg($topic = '', array $callback = []) { $consumer = $this->consumer;
if (empty($topic)) { $topic = $this->topic; } if (!is_array($topic)) { $topic = [$topic]; } // Subscribe to topic 'test' $consumer->subscribe($topic);
echo "Waiting for partition assignment... (make take some time when\n"; echo "quickly re-joining the group after leaving it.)\n";
$i = 0; $msg_list = []; while (true) { $i++; if ($i > $this->receive_wait_time) { $i = 0; if (empty($msg_list)) { continue; } if (!empty($callback)) { call_user_func_array($callback, [$msg_list]); } $msg_list = []; } // 阻塞一秒钟 $message = $consumer->consume(1000); switch ($message->err) { case RD_KAFKA_RESP_ERR_NO_ERROR: $msg_list[] = $message->payload; if (count($msg_list) < $this->receive_wait_num) { continue; } if (!empty($callback)) { call_user_func_array($callback, [$msg_list]); } $i = 0; $msg_list = []; break; case RD_KAFKA_RESP_ERR__PARTITION_EOF: // echo "No more messages; will wait for more\n"; break; case RD_KAFKA_RESP_ERR__TIMED_OUT: // echo "Timed out\n"; break; default: throw new \Exception($message->errstr(), $message->err); break; } } } }
|