// no coordinator will be constructed for the default (null) group id this.coordinator = groupId == null ? null : new ConsumerCoordinator(logContext, this.client, groupId, this.groupInstanceId, maxPollIntervalMs, sessionTimeoutMs, new Heartbeat(time, sessionTimeoutMs, heartbeatIntervalMs, maxPollIntervalMs, retryBackoffMs), assignors, this.metadata, this.subscriptions, metrics, metricGrpPrefix, this.time, retryBackoffMs, enableAutoCommit, config.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG), this.interceptors, config.getBoolean(ConsumerConfig.LEAVE_GROUP_ON_CLOSE_CONFIG));
/** * Subscribe to the given list of topics to get dynamically assigned partitions. * <b>Topic subscriptions are not incremental. This list will replace the current * assignment (if there is one).</b> It is not possible to combine topic subscription with group management * with manual partition assignment through {@link #assign(Collection)}. * * If the given list of topics is empty, it is treated the same as {@link #unsubscribe()}. * * <p> * This is a short-hand for {@link #subscribe(Collection, ConsumerRebalanceListener)}, which * uses a no-op listener. If you need the ability to seek to particular offsets, you should prefer * {@link #subscribe(Collection, ConsumerRebalanceListener)}, since group rebalances will cause partition offsets * to be reset. You should also provide your own listener if you are doing your own offset * management since the listener gives you an opportunity to commit offsets before a rebalance finishes. * * @param topics The list of topics to subscribe to * @throws IllegalArgumentException If topics is null or contains null or empty elements * @throws IllegalStateException If {@code subscribe()} is called previously with pattern, or assign is called * previously (without a subsequent call to {@link #unsubscribe()}), or if not * configured at-least one partition assignment strategy */ @Override publicvoidsubscribe(Collection<String> topics){ subscribe(topics, new NoOpConsumerRebalanceListener()); }
/** * Subscribe to the given list of topics to get dynamically * assigned partitions. <b>Topic subscriptions are not incremental. This list will replace the current * assignment (if there is one).</b> Note that it is not possible to combine topic subscription with group management * with manual partition assignment through {@link #assign(Collection)}. * * If the given list of topics is empty, it is treated the same as {@link #unsubscribe()}. * * <p> * As part of group management, the consumer will keep track of the list of consumers that belong to a particular * group and will trigger a rebalance operation if any one of the following events are triggered: * <ul> * <li>Number of partitions change for any of the subscribed topics * <li>A subscribed topic is created or deleted * <li>An existing member of the consumer group is shutdown or fails * <li>A new member is added to the consumer group * </ul> * <p> * When any of these events are triggered, the provided listener will be invoked first to indicate that * the consumer's assignment has been revoked, and then again when the new assignment has been received. * Note that rebalances will only occur during an active call to {@link #poll(Duration)}, so callbacks will * also only be invoked during that time. * * The provided listener will immediately override any listener set in a previous call to subscribe. * It is guaranteed, however, that the partitions revoked/assigned through this interface are from topics * subscribed in this call. See {@link ConsumerRebalanceListener} for more details. * * @param topics The list of topics to subscribe to * @param listener Non-null listener instance to get notifications on partition assignment/revocation for the * subscribed topics * @throws IllegalArgumentException If topics is null or contains null or empty elements, or if listener is null * @throws IllegalStateException If {@code subscribe()} is called previously with pattern, or assign is called * previously (without a subsequent call to {@link #unsubscribe()}), or if not * configured at-least one partition assignment strategy */ @Override publicvoidsubscribe(Collection<String> topics, ConsumerRebalanceListener listener){ acquireAndEnsureOpen(); try { maybeThrowInvalidGroupIdException(); if (topics == null) thrownew IllegalArgumentException("Topic collection to subscribe to cannot be null"); if (topics.isEmpty()) { // treat subscribing to empty topic list as the same as unsubscribing this.unsubscribe(); } else { for (String topic : topics) { if (topic == null || topic.trim().isEmpty()) thrownew IllegalArgumentException("Topic collection to subscribe to cannot contain null or empty topic"); }
/** * This method sets the subscription type if it is not already set (i.e. when it is NONE), * or verifies that the subscription type is equal to the give type when it is set (i.e. * when it is not NONE) * @param type The given subscription type */ privatevoidsetSubscriptionType(SubscriptionType type){ if (this.subscriptionType == SubscriptionType.NONE) this.subscriptionType = type; elseif (this.subscriptionType != type) thrownew IllegalStateException(SUBSCRIPTION_EXCEPTION_MESSAGE); }
private ConsumerRecords<K, V> poll(final Timer timer, finalboolean includeMetadataInTimeout){ // Step1:确认KafkaConsumer实例是单线程运行,以及没有被关闭 acquireAndEnsureOpen(); try { if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) { thrownew IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions"); }
// poll for new data until the timeout expires do { client.maybeTriggerWakeup();
if (includeMetadataInTimeout) { // Step2:更新metadata信息,获取GroupCoordinator的ip以及接口,并连接、 join-group、sync-group,期间group会进行rebalance。在此步骤,consumer会先加入group,然后获取需要消费的topic partition的offset信息 if (!updateAssignmentMetadataIfNeeded(timer)) { return ConsumerRecords.empty(); } } else { while (!updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE))) { log.warn("Still waiting for metadata"); } }
// Step3:拉取数据 final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(timer); if (!records.isEmpty()) { // before returning the fetched records, we can send off the next round of fetches // and avoid block waiting for their responses to enable pipelining while the user // is handling the fetched records. // // NOTE: since the consumed position has already been updated, we must not allow // wakeups or any other errors to be triggered prior to returning the fetched records. if (fetcher.sendFetches() > 0 || client.hasPendingRequests()) { client.pollNoWakeup(); }
returnthis.interceptors.onConsume(new ConsumerRecords<>(records)); } } while (timer.notExpired());
/** * Acquire the light lock and ensure that the consumer hasn't been closed. * @throws IllegalStateException If the consumer has been closed */ privatevoidacquireAndEnsureOpen(){ acquire(); if (this.closed) { release(); thrownew IllegalStateException("This consumer has already been closed."); } }
1 2 3 4 5 6 7 8 9 10 11 12
/** * Acquire the light lock protecting this consumer from multi-threaded access. Instead of blocking * when the lock is not available, however, we just throw an exception (since multi-threaded usage is not * supported). * @throws ConcurrentModificationException if another thread already has the lock */ privatevoidacquire(){ long threadId = Thread.currentThread().getId(); if (threadId != currentThread.get() && !currentThread.compareAndSet(NO_CURRENT_THREAD, threadId)) thrownew ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access"); refcount.incrementAndGet(); }
KafkaConsumer 如何加入 consumer group
一个 KafkaConsumer 实例消费数据的前提是能够加入一个 consumer group 成功,并获取其要订阅的 tp(topic-partition)列表,因此首先要做的就是和 GroupCoordinator 建立连接,加入组织。
/** * Poll for coordinator events. This ensures that the coordinator is known and that the consumer * has joined the group (if it is using group management). This also handles periodic offset commits * if they are enabled. * (确保group的coordinator是已知的,并且这个consumer是已经加入到了group中,也用于offset周期性的commit) * <p> * Returns early if the timeout expires * * @param timer Timer bounding how long this method can block * @return true iff the operation succeeded */ publicbooleanpoll(Timer timer){ maybeUpdateSubscriptionMetadata();
invokeCompletedOffsetCommitCallbacks();
// 如果是subscribe方式订阅的topic if (subscriptions.partitionsAutoAssigned()) { // Always update the heartbeat last poll time so that the heartbeat thread does not leave the // group proactively due to application inactivity even if (say) the coordinator cannot be found. // 1.检查心跳线程运行是否正常,如果心跳线程失败则抛出异常,反之则更新poll调用的时间 pollHeartbeat(timer.currentTimeMs()); // 2.如果coordinator未知,则初始化ConsumeCoordinator if (coordinatorUnknown() && !ensureCoordinatorReady(timer)) { returnfalse; }
// 判断是否需要重新加入group,如果订阅的partition变化或者分配的partition变化,都可能需要重新加入group if (rejoinNeededOrPending()) { // due to a race condition between the initial metadata fetch and the initial rebalance, // we need to ensure that the metadata is fresh before joining initially. This ensures // that we have matched the pattern against the cluster's topics at least once before joining. if (subscriptions.hasPatternSubscription()) { // For consumer group that uses pattern-based subscription, after a topic is created, // any consumer that discovers the topic after metadata refresh can trigger rebalance // across the entire consumer group. Multiple rebalances can be triggered after one topic // creation if consumers refresh metadata at vastly different times. We can significantly // reduce the number of rebalances caused by single topic creation by asking consumer to // refresh metadata before re-joining the group as long as the refresh backoff time has // passed. if (this.metadata.timeToAllowUpdate(timer.currentTimeMs()) == 0) { this.metadata.requestUpdate(); }
if (!client.ensureFreshMetadata(timer)) { returnfalse; }
maybeUpdateSubscriptionMetadata(); }
// 3.确保group是active的,重新加入group,分配订阅的partition if (!ensureActiveGroup(timer)) { returnfalse; } } } else { // For manually assigned partitions, if there are no ready nodes, await metadata. // If connections to all nodes fail, wakeups triggered while attempting to send fetch // requests result in polls returning immediately, causing a tight loop of polls. Without // the wakeup, poll() with no channels would block for the timeout, delaying re-connection. // awaitMetadataUpdate() initiates new connections with configured backoff and avoids the busy loop. // When group management is used, metadata wait is already performed for this scenario as // coordinator is unknown, hence this check is not required. if (metadata.updateRequested() && !client.hasReadyNodes(timer.currentTimeMs())) { client.awaitMetadataUpdate(timer); } }
/** * Visible for testing. * * Ensure that the coordinator is ready to receive requests. * * @param timer Timer bounding how long this method can block * @return true If coordinator discovery and initial connection succeeded, false otherwise */ protectedsynchronizedbooleanensureCoordinatorReady(final Timer timer){ if (!coordinatorUnknown()) returntrue;
do { // 找到GroupCoordinator,并建立连接 final RequestFuture<Void> future = lookupCoordinator(); client.poll(future, timer);
if (!future.isDone()) { // ran out of time break; }
if (future.failed()) { if (future.isRetriable()) { log.debug("Coordinator discovery failed, refreshing metadata"); client.awaitMetadataUpdate(timer); } else throw future.exception(); } elseif (coordinator != null && client.isUnavailable(coordinator)) { // we found the coordinator, but the connection has failed, so mark // it dead and backoff before retrying discovery markCoordinatorUnknown(); timer.sleep(retryBackoffMs); } } while (coordinatorUnknown() && timer.notExpired());
return !coordinatorUnknown(); }
1 2 3 4 5 6 7 8 9 10 11 12 13
protectedsynchronized RequestFuture<Void> lookupCoordinator(){ if (findCoordinatorFuture == null) { // find a node to ask about the coordinator(找一个最少连接的broker,此处对应的应该就是文章开头处确定GroupCoordinator节点的发发) Node node = this.client.leastLoadedNode(); if (node == null) { log.debug("No broker available to send FindCoordinator request"); return RequestFuture.noBrokersAvailable(); } else // 对找到的broker发送FindCoordinator请求,并对response进行处理 findCoordinatorFuture = sendFindCoordinatorRequest(node); } return findCoordinatorFuture; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
/** * Discover the current coordinator for the group. Sends a GroupMetadata request to * one of the brokers. The returned future should be polled to get the result of the request. * @return A request future which indicates the completion of the metadata request */ private RequestFuture<Void> sendFindCoordinatorRequest(Node node){ // initiate the group metadata request log.debug("Sending FindCoordinator request to broker {}", node); FindCoordinatorRequest.Builder requestBuilder = new FindCoordinatorRequest.Builder( new FindCoordinatorRequestData() .setKeyType(CoordinatorType.GROUP.id()) .setKey(this.groupId)); // 发送请求,并将response转换为RequestFuture // compose的作用是将FindCoordinatorResponseHandler类转换为RequestFuture // 实际上就是为返回的Future类重置onSuccess()和onFailure()方法 return client.send(node, requestBuilder) .compose(new FindCoordinatorResponseHandler()); }
FindCoordinatorResponse findCoordinatorResponse = (FindCoordinatorResponse) resp.responseBody(); Errors error = findCoordinatorResponse.error(); if (error == Errors.NONE) { // 如果正确获取broker上的GroupCoordinator,建立连接,并更新心跳时间 synchronized (AbstractCoordinator.this) { // use MAX_VALUE - node.id as the coordinator id to allow separate connections // for the coordinator in the underlying network client layer int coordinatorConnectionId = Integer.MAX_VALUE - findCoordinatorResponse.data().nodeId();
/** * Ensure the group is active (i.e., joined and synced) * * @param timer Timer bounding how long this method can block * @return true iff the group is active */ booleanensureActiveGroup(final Timer timer){ // always ensure that the coordinator is ready because we may have been disconnected // when sending heartbeats and does not necessarily require us to rejoin the group. // 1.确保GroupCoordinator已经连接 if (!ensureCoordinatorReady(timer)) { returnfalse; }
/** * Joins the group without starting the heartbeat thread. * * Visible for testing. * * @param timer Timer bounding how long this method can block * @return true iff the operation succeeded */ booleanjoinGroupIfNeeded(final Timer timer){ while (rejoinNeededOrPending()) { if (!ensureCoordinatorReady(timer)) { returnfalse; }
// call onJoinPrepare if needed. We set a flag to make sure that we do not call it a second // time if the client is woken up before a pending rebalance completes. This must be called // on each iteration of the loop because an event requiring a rebalance (such as a metadata // refresh which changes the matched subscription set) can occur while another rebalance is // still in progress. // 触发onJoinPrepare,包括offset commit和rebalance listener if (needsJoinPrepare) { // 如果是自动提交,则要开始提交offset以及在join group之前回调reblance listener接口 onJoinPrepare(generation.generationId, generation.memberId); needsJoinPrepare = false; }
// 初始化joinGroup请求,并发送joinGroup请求,核心步骤 final RequestFuture<ByteBuffer> future = initiateJoinGroup(); client.poll(future, timer); if (!future.isDone()) { // we ran out of time returnfalse; }
// join succeed,这一步时,时间上sync-group已经成功了 if (future.succeeded()) { // Duplicate the buffer in case `onJoinComplete` does not complete and needs to be retried. ByteBuffer memberAssignment = future.value().duplicate(); // 发送完成,consumer加入group成功,触发onJoinComplete()方法 onJoinComplete(generation.generationId, generation.memberId, generation.protocol, memberAssignment);
// We reset the join group future only after the completion callback returns. This ensures // that if the callback is woken up, we will retry it on the next joinGroupIfNeeded. // 重置joinFuture为空 resetJoinGroupFuture(); needsJoinPrepare = true; } else { resetJoinGroupFuture(); final RuntimeException exception = future.exception(); if (exception instanceof UnknownMemberIdException || exception instanceof RebalanceInProgressException || exception instanceof IllegalGenerationException || exception instanceof MemberIdRequiredException) continue; elseif (!future.isRetriable()) throw exception;
privatesynchronized RequestFuture<ByteBuffer> initiateJoinGroup(){ // we store the join future in case we are woken up by the user after beginning the // rebalance in the call to poll below. This ensures that we do not mistakenly attempt // to rejoin before the pending rebalance has completed. if (joinFuture == null) { // fence off the heartbeat thread explicitly so that it cannot interfere with the join group. // Note that this must come after the call to onJoinPrepare since we must be able to continue // sending heartbeats if that callback takes some time. // Step1:rebalance期间,心跳线程停止运行 disableHeartbeatThread();
// 设置当前状态为rebalance state = MemberState.REBALANCING; // Step2:发送joinGroup请求,核心步骤 joinFuture = sendJoinGroupRequest(); // Step3:为joinGroup请求添加监听器,监听joinGroup请求的结果并做相应的处理 joinFuture.addListener(new RequestFutureListener<ByteBuffer>() { @Override publicvoidonSuccess(ByteBuffer value){ // handle join completion in the callback so that the callback will be invoked // even if the consumer is woken up before finishing the rebalance synchronized (AbstractCoordinator.this) { log.info("Successfully joined group with generation {}", generation.generationId); // 如果joinGroup成功,设置状态为stable state = MemberState.STABLE; rejoinNeeded = false;
if (heartbeatThread != null) // Step4:允许心跳线程继续运行 heartbeatThread.enable(); } }
@Override publicvoidonFailure(RuntimeException e){ // we handle failures below after the request finishes. if the join completes // after having been woken up, the exception is ignored and we will rejoin synchronized (AbstractCoordinator.this) { // 如果joinGroup失败,设置状态为unjoined state = MemberState.UNJOINED; } } }); } return joinFuture; }
/** * Join the group and return the assignment for the next generation. This function handles both * JoinGroup and SyncGroup, delegating to {@link #performAssignment(String, String, List)} if * elected leader by the coordinator. * * NOTE: This is visible only for testing * * @return A request future which wraps the assignment returned from the group leader */ // 发送joinGroup请求 RequestFuture<ByteBuffer> sendJoinGroupRequest(){ if (coordinatorUnknown()) return RequestFuture.coordinatorNotAvailable();
// send a join group request to the coordinator log.info("(Re-)joining group"); JoinGroupRequest.Builder requestBuilder = new JoinGroupRequest.Builder( new JoinGroupRequestData() .setGroupId(groupId) .setSessionTimeoutMs(this.sessionTimeoutMs) .setMemberId(this.generation.memberId) .setGroupInstanceId(this.groupInstanceId.orElse(null)) .setProtocolType(protocolType()) .setProtocols(metadata()) .setRebalanceTimeoutMs(this.rebalanceTimeoutMs) );
log.debug("Sending JoinGroup ({}) to coordinator {}", requestBuilder, this.coordinator);
// Note that we override the request timeout using the rebalance timeout since that is the // maximum time that it may block on the coordinator. We add an extra 5 seconds for small delays.
synchronized (AbstractCoordinator.this) { if (state != MemberState.REBALANCING) { // if the consumer was woken up before a rebalance completes, we may have already left // the group. In this case, we do not want to continue with the sync group. future.raise(new UnjoinedGroupException()); } else { AbstractCoordinator.this.generation = new Generation(joinResponse.data().generationId(), joinResponse.data().memberId(), joinResponse.data().protocolName()); // Step6:joinGroup成功,下面需要进行sync-group,获取分配的tp列表 if (joinResponse.isLeader()) { // 当前consumer是leader onJoinLeader(joinResponse).chain(future); } else { // 当前consumer是follower onJoinFollower().chain(future); } } } } elseif (error == Errors.COORDINATOR_LOAD_IN_PROGRESS) { log.debug("Attempt to join group rejected since coordinator {} is loading the group.", coordinator()); // backoff and retry future.raise(error); } elseif (error == Errors.UNKNOWN_MEMBER_ID) { // reset the member id and retry immediately resetGeneration(); log.debug("Attempt to join group failed due to unknown member id."); future.raise(Errors.UNKNOWN_MEMBER_ID); } elseif (error == Errors.COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR) { // re-discover the coordinator and retry with backoff markCoordinatorUnknown(); log.debug("Attempt to join group failed due to obsolete coordinator information: {}", error.message()); future.raise(error); } elseif (error == Errors.FENCED_INSTANCE_ID) { log.error("Received fatal exception: group.instance.id gets fenced"); future.raise(error); } elseif (error == Errors.INCONSISTENT_GROUP_PROTOCOL || error == Errors.INVALID_SESSION_TIMEOUT || error == Errors.INVALID_GROUP_ID || error == Errors.GROUP_AUTHORIZATION_FAILED || error == Errors.GROUP_MAX_SIZE_REACHED) { // log the error and re-throw the exception log.error("Attempt to join group failed due to fatal error: {}", error.message()); if (error == Errors.GROUP_MAX_SIZE_REACHED) { future.raise(new GroupMaxSizeReachedException(groupId)); } elseif (error == Errors.GROUP_AUTHORIZATION_FAILED) { future.raise(new GroupAuthorizationException(groupId)); } else { future.raise(error); } } elseif (error == Errors.UNSUPPORTED_VERSION) { log.error("Attempt to join group failed due to unsupported version error. Please unset field group.instance.id and retry" + "to see if the problem resolves"); future.raise(error); } elseif (error == Errors.MEMBER_ID_REQUIRED) { // Broker requires a concrete member id to be allowed to join the group. Update member id // and send another join group request in next cycle. synchronized (AbstractCoordinator.this) { AbstractCoordinator.this.generation = new Generation(OffsetCommitRequest.DEFAULT_GENERATION_ID, joinResponse.data().memberId(), null); AbstractCoordinator.this.rejoinNeeded = true; AbstractCoordinator.this.state = MemberState.UNJOINED; } future.raise(Errors.MEMBER_ID_REQUIRED); } else { // unexpected error, throw the exception log.error("Attempt to join group failed due to unexpected error: {}", error.message()); future.raise(new KafkaException("Unexpected error in join group response: " + error.message())); } } }
上面代码的主要过程如下:
如果 group 是新的 group.id,那么此时 group 初始化的状态为 Empty;
当 GroupCoordinator 接收到 consumer 的 join-group 请求后,由于此时这个 group 的 member 列表还是空 (group 是新建的,每个 consumer 实例被称为这个 group 的一个 member),第一个加入的 member 将被选为 leader,也就是说,对于一个新的 consumer group 而言,当第一个 consumer 实例加入后将会被选为 leader。如果后面 leader 挂了,会从其他 member 里面随机选择一个 member 成为新的 leader;
// 当consumer为follower时,从GroupCoordinator拉取分配结果 private RequestFuture<ByteBuffer> onJoinFollower(){ // send follower's sync group with an empty assignment SyncGroupRequest.Builder requestBuilder = new SyncGroupRequest.Builder( new SyncGroupRequestData() .setGroupId(groupId) .setMemberId(generation.memberId) .setGroupInstanceId(this.groupInstanceId.orElse(null)) .setGenerationId(generation.generationId) .setAssignments(Collections.emptyList()) ); log.debug("Sending follower SyncGroup to coordinator {}: {}", this.coordinator, requestBuilder); // 发送sync-group请求 return sendSyncGroupRequest(requestBuilder); }
// 当consumer客户端为leader时,对group下的所有实例进行分配,将assign的结果发送到GroupCoordinator private RequestFuture<ByteBuffer> onJoinLeader(JoinGroupResponse joinResponse){ try { // perform the leader synchronization and send back the assignment for the group(assign 操作) Map<String, ByteBuffer> groupAssignment = performAssignment(joinResponse.data().leader(), joinResponse.data().protocolName(), joinResponse.data().members());
List<SyncGroupRequestData.SyncGroupRequestAssignment> groupAssignmentList = new ArrayList<>(); for (Map.Entry<String, ByteBuffer> assignment : groupAssignment.entrySet()) { groupAssignmentList.add(new SyncGroupRequestData.SyncGroupRequestAssignment() .setMemberId(assignment.getKey()) .setAssignment(Utils.toArray(assignment.getValue())) ); }
分区策略并非由 leader 消费者来决定,而是各个消费者投票决定的,谁的票多就采用什么分区策略。这里的分区策略是通过 partition.assignment.strategy 参数设置的,可以设置多个。如果选举出了消费者不支持的策略,那么就会抛出异常 IllegalArgumentException: Member does not support protocol。
经过上面的步骤,一个 consumer 实例就已经加入 group 成功了,加入 group 成功后,将会触发 ConsumerCoordinator 的 onJoinComplete () 方法,其作用就是:更新订阅的 tp 列表、更新其对应的 metadata 及触发注册的 listener。
// 加入group成功 @Override protectedvoidonJoinComplete(int generation, String memberId, String assignmentStrategy, ByteBuffer assignmentBuffer){ // only the leader is responsible for monitoring for metadata changes (i.e. partition changes) if (!isLeader) assignmentSnapshot = null;
// The leader may have assigned partitions which match our subscription pattern, but which // were not explicitly requested, so we update the joined subscription here. maybeUpdateJoinedSubscription(assignedPartitions);
// give the assignor a chance to update internal state based on the received assignment assignor.onAssignment(assignment, generation);
// reschedule the auto commit starting from now if (autoCommitEnabled) this.nextAutoCommitTimer.updateAndReset(autoCommitIntervalMs);