Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,8 @@ public final class OzoneConsts {
public static final String TENANT = "tenant";
public static final String USER_PREFIX = "userPrefix";
public static final String REWRITE_GENERATION = "rewriteGeneration";
/** Sentinel generation used to request atomic create-if-not-exists(put if absent) semantics. */
public static final long EXPECTED_GEN_CREATE_IF_NOT_EXISTS = -1L;
public static final String FROM_SNAPSHOT = "fromSnapshot";
public static final String TO_SNAPSHOT = "toSnapshot";
public static final String TOKEN = "token";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ public enum OzoneManagerVersion implements ComponentVersion {

S3_LIST_MULTIPART_UPLOADS_PAGINATION(11,
"OzoneManager version that supports S3 list multipart uploads API with pagination"),

ATOMIC_CREATE_IF_NOT_EXISTS(12,
"OzoneManager version that supports explicit create-if-not-exists key semantics"),

FUTURE_VERSION(-1, "Used internally in the client when the server side is "
+ " newer and an unknown server version has arrived to the client.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,7 @@ public OzoneOutputStream createKey(String key, long size,
*
* @param keyName Existing key to rewrite. This must exist in the bucket.
* @param size The size of the new key
* @param existingKeyGeneration The generation of the existing key which is checked for changes at key create
* @param existingKeyGeneration The positive generation of the existing key which is checked for changes at key create
* and commit time.
* @param replicationConfig The replication configuration for the key to be rewritten.
* @param metadata custom key value metadata
Expand Down Expand Up @@ -1047,8 +1047,8 @@ public List<OzoneFileStatus> listStatus(String keyName, boolean recursive,
*
* @param prefix Optional string to filter for the selected keys.
*/
public OzoneMultipartUploadList listMultipartUploads(String prefix,
String keyMarker, String uploadIdMarker, int maxUploads)
public OzoneMultipartUploadList listMultipartUploads(String prefix,
String keyMarker, String uploadIdMarker, int maxUploads)
throws IOException {
return proxy.listMultipartUploads(volumeName, getName(), prefix, keyMarker, uploadIdMarker, maxUploads);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ OzoneOutputStream createKey(String volumeName, String bucketName,
* @param bucketName Name of the Bucket
* @param keyName Existing key to rewrite. This must exist in the bucket.
* @param size The size of the new key
* @param existingKeyGeneration The generation of the existing key which is checked for changes at key create
* @param existingKeyGeneration The positive generation of the existing key which is checked for changes at key create
* and commit time.
* @param replicationConfig The replication configuration for the key to be rewritten.
* @param metadata custom key value metadata
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -677,7 +677,7 @@ public void createBucket(
builder.setDefaultReplicationConfig(defaultReplicationConfig);
}

String replicationType = defaultReplicationConfig == null
String replicationType = defaultReplicationConfig == null
? "server-side default replication type"
: defaultReplicationConfig.getType().toString();

Expand Down Expand Up @@ -1317,7 +1317,7 @@ public List<OzoneBucket> listBuckets(String volumeName, String bucketPrefix,
List<OmBucketInfo> buckets = ozoneManagerClient.listBuckets(
volumeName, prevBucket, bucketPrefix, maxListResult, hasSnapshot);

return buckets.stream().map(bucket ->
return buckets.stream().map(bucket ->
OzoneBucket.newBuilder(conf, this)
.setVolumeName(bucket.getVolumeName())
.setName(bucket.getBucketName())
Expand Down Expand Up @@ -1408,6 +1408,9 @@ public OzoneOutputStream rewriteKey(String volumeName, String bucketName, String
if (omVersion.compareTo(OzoneManagerVersion.ATOMIC_REWRITE_KEY) < 0) {
throw new IOException("OzoneManager does not support atomic key rewrite.");
}
Preconditions.checkArgument(existingKeyGeneration > 0,
"existingKeyGeneration must be positive, but was %s",
existingKeyGeneration);

createKeyPreChecks(volumeName, bucketName, keyName, replicationConfig);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ public enum ResultCodes {
USER_MISMATCH, // Error code when requested user name passed is different
// from remote user.

INVALID_PART, // When part name is not found or not matching with partname
INVALID_PART, // When part name is not found or not matching with partname
// in OM MPU partInfo.

INVALID_PART_ORDER, // When list of parts mentioned to complete MPU are not
Expand Down Expand Up @@ -267,7 +267,7 @@ public enum ResultCodes {
UNAUTHORIZED,

S3_SECRET_ALREADY_EXISTS,

INVALID_PATH,
TOO_MANY_BUCKETS,
KEY_UNDER_LEASE_RECOVERY,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1255,7 +1255,7 @@ public String createSnapshot(String volumeName,
if (!StringUtils.isBlank(snapshotName)) {
requestBuilder.setSnapshotName(snapshotName);
}

final OMRequest omRequest = createOMRequest(Type.CreateSnapshot)
.setCreateSnapshotRequest(requestBuilder)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SNAPSHOT_DELETING_SERVICE_INTERVAL;
import static org.apache.hadoop.ozone.OzoneConsts.DEFAULT_OM_UPDATE_ID;
import static org.apache.hadoop.ozone.OzoneConsts.ETAG;
import static org.apache.hadoop.ozone.OzoneConsts.EXPECTED_GEN_CREATE_IF_NOT_EXISTS;
import static org.apache.hadoop.ozone.OzoneConsts.GB;
import static org.apache.hadoop.ozone.OzoneConsts.MD5_HASH;
import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_DELIMITER;
Expand Down Expand Up @@ -1435,6 +1436,26 @@ void rewriteFailsDueToOutdatedGenerationAtCommit(BucketLayout layout) throws IOE
assertUnchanged(keyInfo, ozoneManager.lookupKey(keyArgs));
}

@ParameterizedTest
@EnumSource
void rewriteRejectsNonPositiveGeneration(BucketLayout layout)
throws IOException {
checkFeatureEnable(OzoneManagerVersion.ATOMIC_REWRITE_KEY);
OzoneBucket bucket = createBucket(layout);
OzoneKeyDetails key1Details = createTestKey(bucket, "key1", "value".getBytes(UTF_8));
IllegalArgumentException e = assertThrows(IllegalArgumentException.class,
() -> {
bucket.rewriteKey("key2",
1024,
EXPECTED_GEN_CREATE_IF_NOT_EXISTS,
RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.ONE),
singletonMap("key", "value"));
});

assertThat(e).hasMessageContaining("existingKeyGeneration must be positive");
assertKeyContent(bucket, key1Details.getName(), "value".getBytes(UTF_8));
}

@ParameterizedTest
@EnumSource
void cannotRewriteDeletedKey(BucketLayout layout) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,13 @@ public OMRequest preExecute(OzoneManager ozoneManager) throws IOException {
KeyArgs keyArgs = commitKeyRequest.getKeyArgs();

if (keyArgs.hasExpectedDataGeneration()) {
ozoneManager.checkFeatureEnabled(OzoneManagerVersion.ATOMIC_REWRITE_KEY);
if (keyArgs.getExpectedDataGeneration()
== OzoneConsts.EXPECTED_GEN_CREATE_IF_NOT_EXISTS) {
ozoneManager.checkFeatureEnabled(
OzoneManagerVersion.ATOMIC_CREATE_IF_NOT_EXISTS);
} else {
ozoneManager.checkFeatureEnabled(OzoneManagerVersion.ATOMIC_REWRITE_KEY);
}
}

if (ozoneManager.getConfig().isKeyNameCharacterCheckEnabled()) {
Expand Down Expand Up @@ -616,14 +622,23 @@ protected void validateAtomicRewrite(OmKeyInfo existing, OmKeyInfo toCommit, Map
if (toCommit.getExpectedDataGeneration() != null) {
// These values are not passed in the request keyArgs, so add them into the auditMap if they are present
// in the open key entry.
auditMap.put(OzoneConsts.REWRITE_GENERATION, String.valueOf(toCommit.getExpectedDataGeneration()));
if (existing == null) {
throw new OMException("Atomic rewrite is not allowed for a new key", KEY_NOT_FOUND);
}
if (!toCommit.getExpectedDataGeneration().equals(existing.getUpdateID())) {
throw new OMException("Cannot commit as current generation (" + existing.getUpdateID() +
") does not match the expected generation to rewrite (" + toCommit.getExpectedDataGeneration() + ")",
KEY_NOT_FOUND);
Long expectedGen = toCommit.getExpectedDataGeneration();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the original purpose of this field?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original purpose of this field was to support atomic key rewrite/create using optimistic concurrency. The client sends the generation it observed for the existing key, and OM only allows the write to proceed if the key still has that same generation at open and commit time. In practice, this generation is currently the key’s updateID, so the field is used for fencing against concurrent modification, rather than as a generic resource epoch.

auditMap.put(OzoneConsts.REWRITE_GENERATION, String.valueOf(expectedGen));

if (expectedGen == OzoneConsts.EXPECTED_GEN_CREATE_IF_NOT_EXISTS) {
if (existing != null) {
throw new OMException("Key already exists",
OMException.ResultCodes.KEY_ALREADY_EXISTS);
}
Comment on lines +629 to +632
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you help check whether it's better to use a new ResultCodes specific to return S3 412 Precondition Failed? Although KEY_ALREADY_EXISTS does not seem to be handled by S3G currently, it might be safer to add a new result code just in case KEY_ALREADY_EXISTS will be used for other S3 operations.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can use the header to tell the meaning behind the exception

just like how KEY_NOT_FOUND is interpreted in #9815 in the context of the normal put and conditional put.

https://github.com/apache/ozone/pull/9815/changes#diff-bd6abd262ed8e24eaeea5cb0e7c2a1a128411b284f9f27dae1ad47be6717a940R198-R201

} else if (ex.getResult() == ResultCodes.KEY_NOT_FOUND
    && getHeaders().getHeaderString(S3Consts.IF_MATCH_HEADER) != null) {
  // If-Match failed because the key doesn't exist
  throw newError(PRECOND_FAILED, keyPath, ex);

} else {
if (existing == null) {
throw new OMException("Atomic rewrite is not allowed for a new key", KEY_NOT_FOUND);
}
if (expectedGen != existing.getUpdateID()) {
throw new OMException("Cannot commit as current generation (" + existing.getUpdateID() +
") does not match the expected generation to rewrite (" + expectedGen + ")",
KEY_NOT_FOUND);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
import org.apache.hadoop.hdds.utils.UniqueId;
import org.apache.hadoop.ozone.OmUtils;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.OzoneManagerVersion;
import org.apache.hadoop.ozone.audit.OMAction;
import org.apache.hadoop.ozone.om.OMMetadataManager;
Expand Down Expand Up @@ -95,7 +96,13 @@ public OMRequest preExecute(OzoneManager ozoneManager) throws IOException {
final OMPerformanceMetrics perfMetrics = ozoneManager.getPerfMetrics();

if (keyArgs.hasExpectedDataGeneration()) {
ozoneManager.checkFeatureEnabled(OzoneManagerVersion.ATOMIC_REWRITE_KEY);
if (keyArgs.getExpectedDataGeneration()
== OzoneConsts.EXPECTED_GEN_CREATE_IF_NOT_EXISTS) {
ozoneManager.checkFeatureEnabled(
OzoneManagerVersion.ATOMIC_CREATE_IF_NOT_EXISTS);
} else {
ozoneManager.checkFeatureEnabled(OzoneManagerVersion.ATOMIC_REWRITE_KEY);
}
}

OmUtils.verifyKeyNameWithSnapshotReservedWord(keyArgs.getKeyName());
Expand Down Expand Up @@ -189,7 +196,7 @@ public OMRequest preExecute(OzoneManager ozoneManager) throws IOException {

KeyArgs.Builder finalNewKeyArgs = newKeyArgs;
KeyArgs resolvedKeyArgs =
captureLatencyNs(perfMetrics.getCreateKeyResolveBucketAndAclCheckLatencyNs(),
captureLatencyNs(perfMetrics.getCreateKeyResolveBucketAndAclCheckLatencyNs(),
() -> resolveBucketAndCheckKeyAcls(finalNewKeyArgs.build(), ozoneManager,
IAccessAuthorizer.ACLType.CREATE));
newCreateKeyRequest =
Expand Down Expand Up @@ -369,7 +376,7 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, Execut
} else {
perfMetrics.addCreateKeyFailureLatencyNs(createKeyLatency);
}

if (acquireLock) {
mergeOmLockDetails(ozoneLockStrategy
.releaseWriteLock(omMetadataManager, volumeName,
Expand Down Expand Up @@ -471,12 +478,22 @@ public static OMRequest blockCreateKeyWithBucketLayoutFromOldClient(
protected void validateAtomicRewrite(OmKeyInfo dbKeyInfo, KeyArgs keyArgs)
throws OMException {
if (keyArgs.hasExpectedDataGeneration()) {
// If a key does not exist, or if it exists but the updateID do not match, then fail this request.
if (dbKeyInfo == null) {
throw new OMException("Key not found during expected rewrite", OMException.ResultCodes.KEY_NOT_FOUND);
}
if (dbKeyInfo.getUpdateID() != keyArgs.getExpectedDataGeneration()) {
throw new OMException("Generation mismatch during expected rewrite", OMException.ResultCodes.KEY_NOT_FOUND);
long expectedGen = keyArgs.getExpectedDataGeneration();
// If expectedGen is EXPECTED_GEN_CREATE_IF_NOT_EXISTS, it means the key MUST NOT exist (If-None-Match)
if (expectedGen == OzoneConsts.EXPECTED_GEN_CREATE_IF_NOT_EXISTS) {
if (dbKeyInfo != null) {
throw new OMException("Key already exists",
OMException.ResultCodes.KEY_ALREADY_EXISTS);
}
} else {
// If a key does not exist, or if it exists but the updateID do not match, then fail this request.
if (dbKeyInfo == null) {
throw new OMException("Key not found during expected rewrite", OMException.ResultCodes.KEY_NOT_FOUND);
}
if (dbKeyInfo.getUpdateID() != expectedGen) {
throw new OMException("Generation mismatch during expected rewrite",
OMException.ResultCodes.KEY_NOT_FOUND);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.hadoop.ozone.om.request.key;

import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status.KEY_ALREADY_EXISTS;
import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status.KEY_NOT_FOUND;
import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status.OK;
import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -279,6 +280,76 @@ public void testAtomicRewrite() throws Exception {
assertEquals(acls, committedKey.getAcls());
}

@Test
public void testAtomicCreateIfNotExistsCommitKeyAbsent() throws Exception {
Table<String, OmKeyInfo> openKeyTable = omMetadataManager.getOpenKeyTable(getBucketLayout());
Table<String, OmKeyInfo> closedKeyTable = omMetadataManager.getKeyTable(getBucketLayout());

OMRequest modifiedOmRequest = doPreExecute(createCommitKeyRequest());
OMKeyCommitRequest omKeyCommitRequest = getOmKeyCommitRequest(modifiedOmRequest);
KeyArgs keyArgs = modifiedOmRequest.getCommitKeyRequest().getKeyArgs();

OMRequestTestUtils.addVolumeAndBucketToDB(volumeName, bucketName,
omMetadataManager, omKeyCommitRequest.getBucketLayout());

List<OmKeyLocationInfo> allocatedLocationList =
keyArgs.getKeyLocationsList().stream()
.map(OmKeyLocationInfo::getFromProtobuf)
.collect(Collectors.toList());

OmKeyInfo.Builder omKeyInfoBuilder = OMRequestTestUtils.createOmKeyInfo(
volumeName, bucketName, keyName, replicationConfig,
new OmKeyLocationInfoGroup(version, new ArrayList<>()));
omKeyInfoBuilder.setExpectedDataGeneration(OzoneConsts.EXPECTED_GEN_CREATE_IF_NOT_EXISTS);

String openKey = addKeyToOpenKeyTable(allocatedLocationList, omKeyInfoBuilder);
assertNotNull(openKeyTable.get(openKey));
assertNull(closedKeyTable.get(getOzonePathKey()));

OMClientResponse omClientResponse =
omKeyCommitRequest.validateAndUpdateCache(ozoneManager, 100L);
assertEquals(OK, omClientResponse.getOMResponse().getStatus());

OmKeyInfo committedKey = closedKeyTable.get(getOzonePathKey());
assertNotNull(committedKey);
assertNull(committedKey.getExpectedDataGeneration());
}

@Test
public void testAtomicCreateIfNotExistsCommitKeyAlreadyExists() throws Exception {
Table<String, OmKeyInfo> openKeyTable = omMetadataManager.getOpenKeyTable(getBucketLayout());
Table<String, OmKeyInfo> closedKeyTable = omMetadataManager.getKeyTable(getBucketLayout());

OMRequest modifiedOmRequest = doPreExecute(createCommitKeyRequest());
OMKeyCommitRequest omKeyCommitRequest = getOmKeyCommitRequest(modifiedOmRequest);
KeyArgs keyArgs = modifiedOmRequest.getCommitKeyRequest().getKeyArgs();

OMRequestTestUtils.addVolumeAndBucketToDB(volumeName, bucketName,
omMetadataManager, omKeyCommitRequest.getBucketLayout());

List<OmKeyLocationInfo> allocatedLocationList =
keyArgs.getKeyLocationsList().stream()
.map(OmKeyLocationInfo::getFromProtobuf)
.collect(Collectors.toList());

OmKeyInfo.Builder omKeyInfoBuilder = OMRequestTestUtils.createOmKeyInfo(
volumeName, bucketName, keyName, replicationConfig,
new OmKeyLocationInfoGroup(version, new ArrayList<>()));
omKeyInfoBuilder.setExpectedDataGeneration(OzoneConsts.EXPECTED_GEN_CREATE_IF_NOT_EXISTS);

String openKey = addKeyToOpenKeyTable(allocatedLocationList, omKeyInfoBuilder);
assertNotNull(openKeyTable.get(openKey));

OmKeyInfo existingClosedKey = OMRequestTestUtils.createOmKeyInfo(
volumeName, bucketName, keyName, replicationConfig,
new OmKeyLocationInfoGroup(version, new ArrayList<>())).build();
closedKeyTable.put(getOzonePathKey(), existingClosedKey);

OMClientResponse omClientResponse =
omKeyCommitRequest.validateAndUpdateCache(ozoneManager, 100L);
assertEquals(KEY_ALREADY_EXISTS, omClientResponse.getOMResponse().getStatus());
}

@Test
public void testValidateAndUpdateCacheWithUncommittedBlocks()
throws Exception {
Expand Down Expand Up @@ -456,7 +527,7 @@ private Map<String, RepeatedOmKeyInfo> doKeyCommit(boolean isHSync,
.collect(Collectors.toList());
String openKey = addKeyToOpenKeyTable(allocatedBlockList);
String ozoneKey = getOzonePathKey();

OMClientResponse omClientResponse =
omKeyCommitRequest.validateAndUpdateCache(ozoneManager, 100L);
assertEquals(OK,
Expand Down
Loading