Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.apache.samza.system.SystemStream;
import org.apache.samza.task.TaskInstanceCollector;
import org.apache.samza.util.Clock;
import org.apache.samza.util.FileUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -274,7 +275,11 @@ private Map<TaskName, Checkpoint> restoreStores() throws InterruptedException {
taskCheckpoint = checkpointManager.readLastCheckpoint(taskName);
LOG.info("Obtained checkpoint: {} for state restore for taskName: {}", taskCheckpoint, taskName);
}
taskCheckpoints.put(taskName, taskCheckpoint);

// Only insert non-null checkpoints
if (taskCheckpoint != null) {
taskCheckpoints.put(taskName, taskCheckpoint);
}

Map<String, Set<String>> backendFactoryToStoreNames =
ContainerStorageManagerUtil.getBackendFactoryStoreNames(
Expand Down Expand Up @@ -308,6 +313,15 @@ private Map<TaskName, Checkpoint> restoreStores() throws InterruptedException {
taskBackendFactoryToStoreNames.put(taskName, backendFactoryToStoreNames);
});

// if we have received no input checkpoints, it can only be due to two reasons:
// a) Samza job is new, so it has no previous checkpoints.
// b) The checkpoints were cleared.
// We should be able to safely clear local logged stores in either case
if (taskCheckpoints.isEmpty()) {
LOG.info("No checkpoints read. Attempting to clear logged stores.");
clearLoggedStores(loggedStoreBaseDirectory);
}

// Init all taskRestores and if successful, restores all the task stores concurrently
LOG.debug("Pre init and restore checkpoints is: {}", taskCheckpoints);
CompletableFuture<Map<TaskName, Checkpoint>> initRestoreAndNewCheckpointFuture =
Expand Down Expand Up @@ -357,6 +371,19 @@ private Map<TaskName, Checkpoint> restoreStores() throws InterruptedException {
return taskCheckpoints;
}

private static void clearLoggedStores(File loggedStoreBaseDir) {
final FileUtil fileUtil = new FileUtil();
final File[] storeDirs = loggedStoreBaseDir.listFiles();
if (storeDirs == null || storeDirs.length == 0) {
LOG.info("No stores to delete");
return;
}
for (File storeDir: storeDirs) {
LOG.info("Clearing store dir {} from logged stores.", storeDir);
fileUtil.rm(storeDir);
}
}

/**
* Get the {@link StorageEngine} instance with a given name for a given task.
* @param taskName the task name for which the storage engine is desired.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.common.collect.ImmutableSet;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -102,7 +103,6 @@
@RunWith(PowerMockRunner.class)
@PrepareForTest({ReflectionUtil.class, ContainerStorageManagerRestoreUtil.class})
public class TestContainerStorageManager {

private static final String STORE_NAME = "store";
private static final String SYSTEM_NAME = "kafka";
private static final String STREAM_NAME = "store-stream";
Expand All @@ -116,6 +116,7 @@ public class TestContainerStorageManager {
private SamzaContainerMetrics samzaContainerMetrics;
private Map<TaskName, TaskModel> tasks;
private StandbyTestContext testContext;
private CheckpointManager checkpointManager;

private volatile int systemConsumerCreationCount;
private volatile int systemConsumerStartCount;
Expand Down Expand Up @@ -143,7 +144,7 @@ private void addMockedTask(String taskname, int changelogPartition) {
* Method to create a containerStorageManager with mocked dependencies
*/
@Before
public void setUp() throws InterruptedException {
public void setUp() throws InterruptedException, IOException {
taskRestoreMetricGauges = new HashMap<>();
this.tasks = new HashMap<>();
this.taskInstanceMetrics = new HashMap<>();
Expand Down Expand Up @@ -248,7 +249,7 @@ public Void answer(InvocationOnMock invocation) {
.thenReturn(
new scala.collection.immutable.Map.Map1(new SystemStream(SYSTEM_NAME, STREAM_NAME), systemStreamMetadata));

CheckpointManager checkpointManager = mock(CheckpointManager.class);
this.checkpointManager = mock(CheckpointManager.class);
when(checkpointManager.readLastCheckpoint(any(TaskName.class))).thenReturn(new CheckpointV1(new HashMap<>()));

SSPMetadataCache mockSSPMetadataCache = mock(SSPMetadataCache.class);
Expand Down Expand Up @@ -320,6 +321,40 @@ public void testParallelismAndMetrics() throws InterruptedException {
Assert.assertEquals("systemConsumerStartCount count should be 1", 1, this.systemConsumerStartCount);
}

/**
* This test will attempt to verify if logged stores are deleted if the input checkpoints are empty.
* */
@Test
@SuppressWarnings("ResultOfMethodCallIgnored")
public void testDeleteLoggedStoreOnNoCheckpoints() {
// reset the mock to reset the stubs in setup method
reset(this.checkpointManager);
// redo stubbing to return null checkpoints
when(this.checkpointManager.readLastCheckpoint(any())).thenReturn(null);
// create store under logged stores to demonstrate deletion
final File storeFile = new File(DEFAULT_LOGGED_STORE_BASE_DIR.getPath() + File.separator + STORE_NAME);
// add contents to store
final File storeFilePartition = new File(DEFAULT_LOGGED_STORE_BASE_DIR.getPath() + File.separator + STORE_NAME + File.separator + "Partition_0");
storeFilePartition.deleteOnExit();
storeFile.deleteOnExit();
try {
storeFile.mkdirs();
storeFilePartition.createNewFile();
Assert.assertTrue("Assert that stores are present prior to the test.", storeFile.exists());
Assert.assertTrue("Assert that store files are present prior to the test.", storeFilePartition.exists());
this.containerStorageManager.start();
this.containerStorageManager.shutdown();
Assert.assertFalse("Assert that stores are deleted after the test.", storeFile.exists());
Assert.assertFalse("Assert that store files are deleted after the test.", storeFilePartition.exists());
} catch (Exception e) {
System.out.printf("File %s could not be created.", storeFile);
Assert.fail();
} finally {
storeFilePartition.delete();
storeFile.delete();
}
}

@Test
public void testNoConfiguredDurableStores() throws InterruptedException {
taskRestoreMetricGauges = new HashMap<>();
Expand Down
Loading