Skip to content

V3.3.2#1

Open
sixtus wants to merge 179 commits intotrunkfrom
v3.3.2
Open

V3.3.2#1
sixtus wants to merge 179 commits intotrunkfrom
v3.3.2

Conversation

@sixtus
Copy link
Owner

@sixtus sixtus commented Feb 3, 2023

apply apache#12358 to release 3.3.2

Eugene Tolbakov and others added 30 commits July 12, 2022 09:32
…pache#12388)

KIP-800 added the `reason` field to the JoinGroupRequest and the LeaveGroupRequest as I mean to provide more information to the group coordinator. In https://issues.apache.org/jira/browse/KAFKA-13998, we discovered that the size of the field is limited to 32767 chars by our serialisation mechanism. At the moment, the field either provided directly by the user or constructed internally is directly set regardless of its length.

This patch sends only the first 255 chars of the used provided or internally generated reason on the wire. Given the purpose of this field, that seems acceptable and that should still provide enough information to operators to understand the cause of a rebalance.

Reviewers: David Jacot <djacot@confluent.io>
…pache#12398)

- Different objects should be considered unique even with same content to support logout
- Added comments for SaslExtension re: removal of equals and hashCode
- Also swapped out the use of mocks in exchange for *real* SaslExtensions so that we exercise the use of default equals() and hashCode() methods.
- Updates to implement equals and hashCode and add tests in SaslExtensionsTest to confirm

Co-authored-by: Purshotam Chauhan <pchauhan@confluent.io>

Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>
)

There are 3 bugs when a broker generates a snapshot.

1. Broker should not generate snapshots until it starts publishing.
    Before a broker starts publishing, BrokerMetadataListener._publisher=None, so _publisher.foreach(publish) will do nothing, so featuresDelta.metadataVersionChange().isPresent is always true, so we will generating a snapshot on every commit since we believe metadata version has changed, here are the logs, note offset 1 is a LeaderChangeMessage so there is no snapshot:

[2022-06-08 13:07:43,010] INFO [BrokerMetadataSnapshotter id=0] Creating a new snapshot at offset 0... (kafka.server.metadata.BrokerMetadataSnapshotter:66)
[2022-06-08 13:07:43,222] INFO [BrokerMetadataSnapshotter id=0] Creating a new snapshot at offset 2... (kafka.server.metadata.BrokerMetadataSnapshotter:66)
[2022-06-08 13:07:43,727] INFO [BrokerMetadataSnapshotter id=0] Creating a new snapshot at offset 3... (kafka.server.metadata.BrokerMetadataSnapshotter:66)
[2022-06-08 13:07:44,228] INFO [BrokerMetadataSnapshotter id=0] Creating a new snapshot at offset 4... (kafka.server.metadata.BrokerMetadataSnapshotter:66)

2. We should compute metadataVersionChanged before _publisher.foreach(publish)
    After _publisher.foreach(publish) the BrokerMetadataListener_delta is always Empty, so metadataVersionChanged is always false, this means we will never trigger snapshot generating even metadata version has changed.

3. We should try to generate a snapshot when starting publishing
    When we started publishing, there may be a metadata version change, so we should try to generate a snapshot before first publishing.

Reviewers: Jason Gustafson <jason@confluent.io>, Divij Vaidya <diviv@amazon.com>, José Armando García Sancio <jsancio@users.noreply.github.com>
KRaft mode will not support writing messages with an older message format (2.8) since the min supported IBP is 3.0 for KRaft. Testing support for reading older message formats will be covered by https://issues.apache.org/jira/browse/KAFKA-14056.

Reviewers: David Jacot <djacot@confluent.io>
…P-846 (apache#12377)

Reviewers: Walker Carlson <wcarlson@confluent.io>, Matthias J. Sax <matthias@confluent.io>
)

This implements the AdminAPI portion of KIP-709: https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=173084258. The request/response protocol changes were implemented in 3.0.0. A new batched API has been introduced to list consumer offsets for different groups. For brokers older than 3.0.0, separate requests are sent for each group.

Co-authored-by: Rajini Sivaram <rajinisivaram@googlemail.com>
Co-authored-by: David Jacot <djacot@confluent.io>

Reviewers: David Jacot <djacot@confluent.io>,  Rajini Sivaram <rajinisivaram@googlemail.com>
…pache#12405)


Currently, preferredReplicaImbalanceCount calculation has a race that becomes negative when topic deletion is initiated simultaneously. This PR addresses the problem by fixing cleanPreferredReplicaImbalanceMetric to be called only once per topic-deletion procedure

Reviewers: Luke Chen <showuon@gmail.com>
…lier (apache#12420)

This commit changes the version check from != to > as the process method
works correctly on both version 1 and 2. != incorrectly throws on v1
records.

Reviewers: Matthias J. Sax <matthias@confluent.io>
apache#11228)" (apache#12421)

This reverts commit 4835c64

Reviewers: Matthias J. Sax <matthias@confluent.io>
…tive rebalance (apache#12349)

In KAFKA-13310, we tried to fix a issue that consumer#poll(duration) will be returned after the provided duration. It's because if rebalance needed, we'll try to commit current offset first before rebalance synchronously. And if the offset committing takes too long, the consumer#poll will spend more time than provided duration. To fix that, we change commit sync with commit async before rebalance (i.e. onPrepareJoin).

However, in this ticket, we found the async commit will keep sending a new commit request during each Consumer#poll, because the offset commit never completes in time. The impact is that the existing consumer will be kicked out of the group after rebalance timeout without joining the group. That is, suppose we have consumer A in group G, and now consumer B joined the group, after the rebalance, only consumer B in the group.

Besides, there's also another bug found during fixing this bug. Before KAFKA-13310, we commitOffset sync with rebalanceTimeout, which will retry when retriable error until timeout. After KAFKA-13310, we thought we have retry, but we'll retry after partitions revoking. That is, even though the retried offset commit successfully, it still causes some partitions offsets un-committed, and after rebalance, other consumers will consume overlapping records.

Reviewers: RivenSun <riven.sun@zoom.us>, Luke Chen <showuon@gmail.com>
As part of KAFKA-10888 work, there were a couple regressions introduced:

A call to time.milliseconds() got moved under the queue lock, moving it back outside the lock. The call may be expensive and cause lock contention. Now the call is moved back outside of the lock.

The reference to ProducerRecord was held in the batch completion callback, so it was kept alive as long as the batch was alive, which may increase the amount of memory in certain scenario and cause excessive GC work. Now the reference is reset early, so the ProducerRecord lifetime isn't bound to the batch lifetime.

Tested via manually crafted benchmark, lock profile shows ~15% lock contention on the ArrayQueue lock without the fix and ~5% lock contention with the fix (which is also consistent with pre-KAFKA-10888 profile).

Alloc profile shows ~10% spent in ProducerBatch.completeFutureAndFireCallbacks without the fix vs. ~0.25% with the fix (which is also consistent with pre-KAFKA-10888 profile).

Reviewers: Ismael Juma <ismael@juma.me.uk>, Jun Rao <junrao@gmail.com>
- used static memberId was incorrect
- need to remove all threads/members from the group
- need to use admit client correctly

Add test to verify fixes.

Reviewers: Matthias J. Sax <matthias@confluent.io>
Implementation for KIP-831.
1. add remainingLogsToRecover metric for the number of remaining logs for each log.dir to be recovered
2.  add remainingSegmentsToRecover metric for the number of remaining segments for the current log assigned to the recovery thread.
3. remove these metrics after log loaded completely
4. add tests 

Reviewers: Jun Rao <jun@confluent.io>, Tom Bentley <tbentley@redhat.com>
…ader recovery (apache#12433)


Reviewers: Mickael Maison <mickael.maison@gmail.com>

, Tom Bentley <tbentley@redhat.com>
…valid replica (apache#12411)

After the fix for apache#12150, if a follower receives a request from another replica, it will return UNKNOWN_LEADER_EPOCH even if the leader epoch matches. We need to do epoch leader/epoch validation first before we check whether we have a valid replica.

Reviewers: David Jacot <djacot@confluent.io>
Only pass configs from the request to the AlterConfigPolicy. This changes the KRaft usage of the AlterConfigPolicy to match the usage in ZK mode.

Reviewers: Jason Gustafson <jason@confluent.io>
…ance is ALL (apache#12415)

Make sure to ack all records where produce failed, when a connector's `errors.tolerance` config property is set to `all`. Acking is essential so that the task will continue to commit future record offsets properly and remove the records from internal tracking, preventing a memory leak.

(cherry picked and slightly modified from commit 63e06aa)

Reviewers: Chris Egerton <fearthecellos@gmail.com>, Randall Hauch <rhauch@gmail.com>
…once support in Connect integration test (apache#12429)


Reviewers: Mickael Maison <mickael.maison@gmail.com>

, Tom Bentley <tbentley@redhat.com>
Read and write access to the TreeMap in snapshots needs to be synchronized.

Reviewers: David Arthur <mumrah@gmail.com>
Enable some of the dynamic broker reconfiguration tests in KRaft mode
)

The code used BrokerRegistrationFencingChange.FENCE when unfencing a broker and used BrokerRegistrationFencingChange.UNFENCE when fencing a broker, this is confusing. This commit flips the values of the two enums and changes their usage at all of the call sites.

Reviewers: José Armando García Sancio <jsancio@users.noreply.github.com>
…istributedHerder

Reviewers: Chris Egerton <fearthecellos@gmail.com>
Reviewers: Mickael Maison <mickael.maison@gmail.com>, Aaron Freeland <afreeland@gmail.com>
…rds (apache#12457)

This commit adds a check to ensure the RecordBatch CRC is valid when
iterating over a Batch of Records using the RecordsIterator. The
RecordsIterator is used by both Snapshot reads and Log Records reads in
Kraft. The check can be turned off by a class parameter and is on by default.

Reviewers: José Armando García Sancio <jsancio@users.noreply.github.com>
…ition epoch (apache#12489)

This PR fixes an AlterPartition regression introduced in apache#12032

When an AlterPartition request succeeds, the partition epoch gets bumped. In Zk controller mode the sender also relies on the AlterPartition response to be informed of the new partition epoch.
If the sender times out the request before a response is sent, the sender will have a stale partition epoch compared to the ZK controller state and will be fenced on subsequent AlterPartition request attempts. The sender will not receive an updated partition epoch until it receives a LeaderAndIsr request for controller-initiated ISR changes.

Reviewers: Jason Gustafson <jason@confluent.io>
…che#12486)

BrokerMetadataSnapshotter should split up record lists that exceed the batch size.

Reviewers: David Arthur <mumrah@gmail.com>
Before trying to commit a batch of records to the __cluster_metadata log, the active controller
should try to apply them to its current in-memory state. If this application process fails, the
active controller process should exit, allowing another node to take leadership. This will prevent
most bad metadata records from ending up in the log and help to surface errors during testing.

Similarly, if the active controller attempts to renounce leadership, and the renunciation process
itself fails, the process should exit. This will help avoid bugs where the active controller
continues in an undefined state.

In contrast, standby controllers that experience metadata application errors should continue on, in
order to avoid a scenario where a bad record brings down the whole controller cluster.  The
intended effect of these changes is to make it harder to commit a bad record to the metadata log,
but to continue to ride out the bad record as well as possible if such a record does get committed.

This PR introduces the FaultHandler interface to implement these concepts. In junit tests, we use a
FaultHandler implementation which does not exit the process. This allows us to avoid terminating
the gradle test runner, which would be very disruptive. It also allows us to ensure that the test
surfaces these exceptions, which we previously were not doing (the mock fault handler stores the
exception).

In addition to the above, this PR fixes a bug where RaftClient#resign was not being called from the
renounce() function. This bug could have resulted in the raft layer not being informed of an active
controller resigning.

Reviewers: David Arthur <mumrah@gmail.com>
jeffkbkim and others added 30 commits November 16, 2022 14:55
When a consumer makes a fetch request to a follower (KIP-392), the fetch request will sit in the purgatory until `fetch.max.wait.ms` is reached because the purgatory is not completed after replication. This patch aims to complete the delayed fetch purgatory after successfully replicating from the leader.

Reviewers: Artem Livshits <alivshits@confluent.io>, Luke Chen <showuon@gmail.com>, David Jacot <djacot@confluent.io>
… DSL (apache#12861)

With the addition of the new Processor API the newly added FixedKeyProcessorNodeFactory extends the ProcessorNodeFactory class. The ProcessorNodeFactory had a private field Set<String> stateStoreNames initialized to an empty see. The FixedKeyProcessorNodeFactory also had a private field Set<String> stateStoreNames.

When executing InternalTopologyBuilder.build executing the buildProcessorNode method passed any node factory as ProcessorNodeFactory and the method references the stateStoreNames field, it's pointing to the superclass field, which is empty so the corresponding StoreBuilder(s) are never added - causing NPE in the topology.

This PR makes the field protected on the ProcessorNodeFactory class so FixedKeyProcessorNodeFactory inherits it.

The added test fails without this change.

Reviewers: Matthias J. Sax <mjsax@apache.org>,  Sophie Blee-Goldman <sophie@confluent.io>, Jorge Esteban Quilcate Otoya <quilcate.jorge@gmail.com>
…to infinite loop (apache#12752) (apache#12872)

This fixes an bug which causes a call to producer.send(record) with a record without a key and configured with batch.size=0 never to return.

Without specifying a key or a custom partitioner the new BuiltInPartitioner, as described in KIP-749 kicks in.

BuiltInPartitioner seems to have been designed with the reasonable assumption that the batch size will never be lower than one.

However, documentation for producer configuration states batch.size=0 as a valid value, and even recommends its use directly. [1]

[1] clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java:87

Reviewers: Artem Livshits <alivshits@confluent.io>, Luke Chen <showuon@gmail.com>, Ismael Juma <ismael@juma.me.uk>, Jun Rao <junrao@gmail.com>
…ashing distributed herder (apache#12295)

Reviewers: Kvicii <kvicii.yu@gmail.com>, Chris Egerton <chrise@aiven.io>
 (apache#12840)

* Updated Jackson to version 2.13.4 for fixing CVE-2022-42004, CVE-2020-36518
* Updated Jackson data bind to version 2.13.4.2 for fixing CVE-2022-42004

Co-authored-by: Pratim SC <pratim.sunilkumar.chaudhuri@mercer.com>

 Reviewers: Luke Chen <showuon@gmail.com>, Manikumar Reddy <manikumar.reddy@gmail.com>
… followup (apache#12869)

Fix for the subtle bug described in KAFKA-14382 that was causing rebalancing loops. If we trigger a new rebalance while the current one is still ongoing, it may cause some members to fail the first rebalance if they weren't able to send the SyncGroup request in time (for example due to processing records during the rebalance). This means those consumers never receive their assignment from the original rebalance, and won't revoke any partitions they might have needed to. This can send the group into a loop as each rebalance schedules a new followup cooperative rebalance due to partitions that need to be revoked, and each followup rebalance causes some consumer(s) to miss the SyncGroup and never revoke those partitions.

Reviewers: John Roesler <vvcephei@apache.org>
Handle null processor supplier

Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
…ge (apache#12651)

Currently HttpAccessTokenRetriever client side class does not retrieve error response from the token e/p. As a result, seemingly trivial config issues could take a lot of time to diagnose and fix. For example, client could be sending invalid client secret, id or scope.
This PR aims to remedy the situation by retrieving the error response, if present and logging as well as appending to any exceptions thrown.
New unit tests have also been added.

Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>
…apache#12877)

The default replica selector chooses a replica on whether the broker.rack matches the client.rack in the fetch request and whether the offset exists in the follower. If the follower is not in the ISR, we know it's lagging behind which will also lag the consumer behind. there are two cases:
1. the follower recovers and joins the isr. the consumer will no longer fall behind.
2. the follower continues to lag behind. after 5 minutes, the consumer will refresh its preferred read replica and the leader will return the same lagging follower since the offset the consumer fetched up to is capped by the follower's HWM. this can go on indefinitely.

If the replica selector chooses a broker in the ISR then we can ensure that at least every 5 minutes the consumer will consume from an up-to-date replica. 

Reviewers: David Jacot <djacot@confluent.io>
…ejoins

When consumers use static membership protocol, they can not update the rebalance timeout because the group coordinator simply ignore any new values. This patch fixes this.

Reviewers: David Jacot <djacot@confluent.io>
… group with members not supporting static members (apache#12909)

When a consumer group on a version prior to 2.3 is upgraded to a newer version and static membership is enabled in the meantime, the consumer group remains stuck, iff the leader is still on the old version.

The issue is that setting `GroupInstanceId` in the response to the leader is only supported from JoinGroup version >= 5 and that `GroupInstanceId` is not ignorable nor handled anywhere else. Hence is there is at least one static member in the group, sending the JoinGroup response to the leader fails with a serialization error.

```
org.apache.kafka.common.errors.UnsupportedVersionException: Attempted to write a non-default groupInstanceId at version 2
```

When this happens, the member stays around until the group coordinator is bounced because a member with a non-null `awaitingJoinCallback` is never expired.

This patch fixes the issue by making `GroupInstanceId` ignorable. A unit test has been modified to cover this.

Reviewers: Jason Gustafson <jason@confluent.io>
…che#12910)

We recently had a bug causing the JoinGroup callback to thrown an exception (apache#12909). When it happens, the exception is propagated to the caller and the JoinGroup callback is never completed. To make it worst, the member whose callback failed become a zombie because the group coordinator does not expire member with a pending callback.

This patch catch exceptions for both invocation of JoinGroup and SyncGroup callbacks and retry to complete them with a `UNKNOWN_SERVER_ERROR` error if they failed.

Reviewers: Jason Gustafson <jason@confluent.io>
…n trying offsetWriter flush (apache#12920)

Reviewers: Chris Egerton <chrise@aiven.io>
apache#12898)

Reviewers: Andriy Redko <andriy.redko@aiven.io>, Chris Egerton <chrise@aiven.io>
With KRaft the cluster metadata topic (__cluster_metadata) has a different implementation compared to regular topic. The user should not be allowed to create this topic. This can cause issues if the metadata log dir is the same as one of the log dirs.

This change returns an authorization error if the user tries to create the cluster metadata topic.

Reviewers: David Arthur <mumrah@gmail.com>
…ache#12935)

RocksDBStore relied on finalizers to not leak memory (and leaked memory after the upgrade to RocksDB 7).
The problem was that every call to options.statistics creates a new wrapper object that needs to be finalized.

I simplified the logic a bit and moved the ownership of the statistics from ValueProvider to RocksDBStore.

Reviewers: Bruno Cadonna <cadonna@apache.org>, Anna Sophie Blee-Goldman <ableegoldman@apache.org>, Christo Lolov <lolovc@amazon.com>
…erIdRequest, treats as fatal error (apache#12915)

The broker may return the `REQUEST_TIMED_OUT` error in `InitProducerId` responses when allocating the ID using the `AllocateProducerIds` request. The client currently does not handle this. Instead of retrying as we would expect, the client raises a fatal exception to the application. 

In this patch, we address this problem by modifying the producer to handle `REQUEST_TIMED_OUT` and any other retriable errors by re-enqueuing the request. 

Reviewers: Jason Gustafson <jason@confluent.io>
… StandardAuthorizer

Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>, Colin Patrick McCabe <cmccabe@apache.org>
Adds docs for KIP-761.

Reviewers: Anurag Bandyopadhyay (@Anuragkillswitch), Matthias J. Sax <matthias@confluent.io>
… metadata (apache#12956)

The consumer (fetcher) used to refresh the preferred read replica on
three conditions:
    
1. the consumer receives an OFFSET_OUT_OF_RANGE error
2. the follower does not exist in the client's metadata (i.e., offline)
3. after metadata.max.age.ms (5 min default)
    
For other errors, it will continue to reach to the possibly unavailable
follower and only after 5 minutes will it refresh the preferred read
replica and go back to the leader.
    
Another problem is that the client might have stale metadata and not
send fetches to preferred replica, even after the leader redirects to
the preferred replica.
    
A specific example is when a partition is reassigned. the consumer will
get NOT_LEADER_OR_FOLLOWER which triggers a metadata update but the
preferred read replica will not be refreshed as the follower is still
online. it will continue to reach out to the old follower until the
preferred read replica expires.
    
The consumer can instead refresh its preferred read replica whenever it
makes a metadata update request, so when the consumer receives i.e.
NOT_LEADER_OR_FOLLOWER it can find the new preferred read replica without
waiting for the expiration.
    
Generally, we will rely on the leader to choose the correct preferred
read replica and have the consumer fail fast (clear preferred read replica
cache) on errors and reach out to the leader.

Co-authored-by: Jeff Kim <jeff.kim@confluent.io>

Reviewers: David Jacot <djacot@confluent.io>, Jason Gustafson <jason@confluent.io>
…annelManager (apache#12856)

In BrokerToControllerChannelManager, set the request timeout to the minimum of the retry timeout
and the controller socket timeout. This fixes some cases where we were unintentionally setting an
overly long request timeout. 

Also, the channel manager used by the BrokerLifecycleManager should set a retry timeout equal to
half of the broker session timeout, rather than the entire broker session timeout, to allow for a
retransmission if the initial attempt fails.

These two fixes should address some cases where heartbeat broker requests were not being resent
in a timely fashion after a network glitch.

Reviewers: Colin P. McCabe <cmccabe@apache.org>, José Armando García Sancio <jsancio@apache.org>
…Roll (apache#12987)

The test was added with a fix to KAFKA-14379, the problem was that the replication factor for the offset topic was 1 and consumer group coordinator got unavailable when one of the brokers got shut down.

Reviewers: David Jacot <djacot@confluent.io>
Reviewers: John Roesler <john@confluent.io>, Bill Bejeck <bill@confluent.io>
…ckHandler (apache#13000)

The OAuth code to generate the Authentication header was incorrectly
using the URL-safe base64 encoder. For client IDs and/or secrets with
dashes and/or plus signs would not be encoded correctly, leading to the
OAuth server to reject the credentials.

This change uses the correct base64 encoder, per RFC-7617.

Co-authored-by: Endre Vig <vendre@gmail.com>
…`InitProducerId` (apache#12968)

Older clients can not handle the `REQUEST_TIMED_OUT` error that is returned from `InitProducerId` when the next producerId block cannot be fetched from the controller. In this patch, we return `COORDINATOR_LOAD_IN_PROGRESS` instead which is retriable.

Reviewers: Jason Gustafson <jason@confluent.io>
…apache#13023)

The failure handling code for fetches could run into an IllegalStateException if a fetch response came back with a failure after the corresponding topic partition has already been removed from the assignment.

Reviewers: David Jacot <djacot@confluent.io>
…ache#12994)

The controller metrics in the controllers has three problems. 1) the active controller exposes uncommitted data in the metrics. 2) the active controller doesn't update the metrics when the uncommitted data gets aborted. 3) the controller doesn't update the metrics when the entire state gets reset.

We fix these issues by only updating the metrics when processing committed metadata records and reset the metrics when the metadata state is reset.

This change adds a new type `ControllerMetricsManager` which processes committed metadata records and updates the metrics accordingly. This change also removes metrics updating responsibilities from the rest of the controller managers.

Reviewers: Ron Dagostino <rdagostino@confluent.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.