/** * Subscribe to the given list of topics to get dynamically * assigned partitions. <b>Topic subscriptions are not incremental. This list will replace the current * assignment (if there is one).</b> Note that it is not possible to combine topic subscription with group management * with manual partition assignment through {@link #assign(Collection)}. * * If the given list of topics is empty, it is treated the same as {@link #unsubscribe()}. * * <p> * As part of group management, the consumer will keep track of the list of consumers that belong to a particular * group and will trigger a rebalance operation if any one of the following events are triggered: * <ul> * <li>Number of partitions change for any of the subscribed topics * <li>A subscribed topic is created or deleted * <li>An existing member of the consumer group is shutdown or fails * <li>A new member is added to the consumer group * </ul> * <p> * When any of these events are triggered, the provided listener will be invoked first to indicate that * the consumer's assignment has been revoked, and then again when the new assignment has been received. * Note that rebalances will only occur during an active call to {@link #poll(Duration)}, so callbacks will * also only be invoked during that time. * * The provided listener will immediately override any listener set in a previous call to subscribe. * It is guaranteed, however, that the partitions revoked/assigned through this interface are from topics * subscribed in this call. See {@link ConsumerRebalanceListener} for more details. * * @param topics The list of topics to subscribe to * @param listener Non-null listener instance to get notifications on partition assignment/revocation for the * subscribed topics * @throws IllegalArgumentException If topics is null or contains null or empty elements, or if listener is null * @throws IllegalStateException If {@code subscribe()} is called previously with pattern, or assign is called * previously (without a subsequent call to {@link #unsubscribe()}), or if not * configured at-least one partition assignment strategy */ @Override publicvoidsubscribe(Collection<String> topics, ConsumerRebalanceListener listener){ acquireAndEnsureOpen(); try { maybeThrowInvalidGroupIdException(); if (topics == null) thrownew IllegalArgumentException("Topic collection to subscribe to cannot be null"); if (topics.isEmpty()) { // treat subscribing to empty topic list as the same as unsubscribing this.unsubscribe(); } else { for (String topic : topics) { if (topic == null || topic.trim().isEmpty()) thrownew IllegalArgumentException("Topic collection to subscribe to cannot contain null or empty topic"); }
privatebooleanchangeSubscription(Set<String> topicsToSubscribe){ if (subscription.equals(topicsToSubscribe)) returnfalse;
subscription = topicsToSubscribe; if (subscriptionType != SubscriptionType.USER_ASSIGNED) { groupSubscription = new HashSet<>(groupSubscription); groupSubscription.addAll(topicsToSubscribe); } else { groupSubscription = new HashSet<>(topicsToSubscribe); } returntrue; }
请求更新 metadata。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
publicsynchronizedvoidrequestUpdateForNewTopics(){ // Override the timestamp of last refresh to let immediate update. this.lastRefreshMs = 0; this.requestVersion++; requestUpdate(); }
/** * Request an update of the current cluster metadata info, return the current updateVersion before the update */ publicsynchronizedintrequestUpdate(){ this.needUpdate = true; returnthis.updateVersion; }
/** * Subscribe to all topics matching specified pattern to get dynamically assigned partitions. * The pattern matching will be done periodically against all topics existing at the time of check. * This can be controlled through the {@code metadata.max.age.ms} configuration: by lowering * the max metadata age, the consumer will refresh metadata more often and check for matching topics. * <p> * See {@link #subscribe(Collection, ConsumerRebalanceListener)} for details on the * use of the {@link ConsumerRebalanceListener}. Generally rebalances are triggered when there * is a change to the topics matching the provided pattern and when consumer group membership changes. * Group rebalances only take place during an active call to {@link #poll(Duration)}. * * @param pattern Pattern to subscribe to * @param listener Non-null listener instance to get notifications on partition assignment/revocation for the * subscribed topics * @throws IllegalArgumentException If pattern or listener is null * @throws IllegalStateException If {@code subscribe()} is called previously with topics, or assign is called * previously (without a subsequent call to {@link #unsubscribe()}), or if not * configured at-least one partition assignment strategy */ @Override publicvoidsubscribe(Pattern pattern, ConsumerRebalanceListener listener){ maybeThrowInvalidGroupIdException(); if (pattern == null) thrownew IllegalArgumentException("Topic pattern to subscribe to cannot be null");
publicvoidupdatePatternSubscription(Cluster cluster){ final Set<String> topicsToSubscribe = cluster.topics().stream() .filter(subscriptions::matchesSubscribedPattern) .collect(Collectors.toSet()); if (subscriptions.subscribeFromPattern(topicsToSubscribe)) metadata.requestUpdateForNewTopics(); }
publicsynchronizedbooleansubscribeFromPattern(Set<String> topics){ if (subscriptionType != SubscriptionType.AUTO_PATTERN) thrownew IllegalArgumentException("Attempt to subscribe from pattern while subscription type set to " + subscriptionType);
return changeSubscription(topics); }
privatebooleanchangeSubscription(Set<String> topicsToSubscribe){ if (subscription.equals(topicsToSubscribe)) returnfalse;
subscription = topicsToSubscribe; if (subscriptionType != SubscriptionType.USER_ASSIGNED) { groupSubscription = new HashSet<>(groupSubscription); groupSubscription.addAll(topicsToSubscribe); } else { groupSubscription = new HashSet<>(topicsToSubscribe); } returntrue; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14
publicsynchronizedvoidrequestUpdateForNewTopics(){ // Override the timestamp of last refresh to let immediate update. this.lastRefreshMs = 0; this.requestVersion++; requestUpdate(); }
/** * Request an update of the current cluster metadata info, return the current updateVersion before the update */ publicsynchronizedintrequestUpdate(){ this.needUpdate = true; returnthis.updateVersion; }
/** * Manually assign a list of partitions to this consumer. This interface does not allow for incremental assignment * and will replace the previous assignment (if there is one). * <p> * If the given list of topic partitions is empty, it is treated the same as {@link #unsubscribe()}. * <p> * Manual topic assignment through this method does not use the consumer's group management * functionality. As such, there will be no rebalance operation triggered when group membership or cluster and topic * metadata change. Note that it is not possible to use both manual partition assignment with {@link #assign(Collection)} * and group assignment with {@link #subscribe(Collection, ConsumerRebalanceListener)}. * <p> * If auto-commit is enabled, an async commit (based on the old assignment) will be triggered before the new * assignment replaces the old one. * * @param partitions The list of partitions to assign this consumer * @throws IllegalArgumentException If partitions is null or contains null or empty topics * @throws IllegalStateException If {@code subscribe()} is called previously with topics or pattern * (without a subsequent call to {@link #unsubscribe()}) */ @Override publicvoidassign(Collection<TopicPartition> partitions){ acquireAndEnsureOpen(); try { if (partitions == null) { thrownew IllegalArgumentException("Topic partition collection to assign to cannot be null"); } elseif (partitions.isEmpty()) { this.unsubscribe(); } else { for (TopicPartition tp : partitions) { String topic = (tp != null) ? tp.topic() : null; if (topic == null || topic.trim().isEmpty()) thrownew IllegalArgumentException("Topic partitions to assign to cannot have null or empty topic"); } fetcher.clearBufferedDataForUnassignedPartitions(partitions);
// make sure the offsets of topic partitions the consumer is unsubscribing from // are committed since there will be no following rebalance if (coordinator != null) this.coordinator.maybeAutoCommitOffsetsAsync(time.milliseconds());
log.info("Subscribed to partition(s): {}", Utils.join(partitions, ", ")); if (this.subscriptions.assignFromUser(new HashSet<>(partitions))) metadata.requestUpdateForNewTopics(); } } finally { release(); } }