From 74bf2944c30edc2d8b3960f14ada16730ddc1fed Mon Sep 17 00:00:00 2001 From: Janardhan Pulivarthi Date: Mon, 3 Nov 2025 08:17:32 +0530 Subject: [PATCH 01/14] use blockentry to hold all data related to block --- .../instructions/ooc/OOCEvictionManager.java | 37 ++++++++++++++++++- 1 file changed, 35 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCEvictionManager.java b/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCEvictionManager.java index 87984da8834..3c0e85b3d85 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCEvictionManager.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCEvictionManager.java @@ -29,6 +29,8 @@ import java.util.Iterator; import java.util.LinkedHashMap; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantLock; /** * Eviction Manager for the Out-Of-Core stream cache @@ -66,11 +68,23 @@ * must ensure serialization finishes before adding to queue or make evict * wait on serialization; careful with native memory leaks. */ -public class OOCEvictionManager { +public class OOCEvictionManager1 { // Configuration: OOC buffer limit as percentage of heap private static final double OOC_BUFFER_PERCENTAGE = 0.15; // 15% of heap + // HOT TIER: In-memory blocks + private static final ConcurrentHashMap _blocks = new ConcurrentHashMap<>(); + + // COLD-TIER: Evicted blocks on the disk + private static final ConcurrentHashMap _spillLocations = new ConcurrentHashMap<>(); + + // Partition management: per-stream partitions + private static final ConcurrentHashMap _partitions = new ConcurrentHashMap<>(); + + // Partition <--> Stream mapping + private static final ConcurrentHashMap > _streamPartitions = new ConcurrentHashMap<>(); + // Memory limit for ByteBuffers private static long _limit; private static long _size; @@ -86,6 +100,25 @@ public enum RPolicy { } private static RPolicy _policy = RPolicy.FIFO; + private enum BlockState { + HOT, // In-memory + EVICTING, // Being written to disk (transition state) + COLD // On disk + } + + // Per-block state container with own lock. + private static class BlockEntry { + private final ReentrantLock lock = new ReentrantLock(); + private BlockState state = BlockState.HOT; + private IndexedMatrixValue value; +// private final long streamId; +// private final int blockId; + + BlockEntry(long streamId, int blockId, IndexedMatrixValue value) { + this.value = value; + } + } + static { _limit = (long)(Runtime.getRuntime().maxMemory() * OOC_BUFFER_PERCENTAGE); // e.g., 20% of heap _size = 0; @@ -137,7 +170,7 @@ private static void evict(long requiredSize) { try { int pos = 0; while(_size + requiredSize > _limit && pos++ < _cache.size()) { - //System.out.println("BUFFER: "+_size+"/"+_limit+" size="+_cache.size()); +// System.out.println("BUFFER: "+_size+"/"+_limit+" size="+_cache.size()); Map.Entry tmp = removeFirstFromCache(); if( tmp == null || tmp.getValue().getValue() == null ) { if( tmp != null ) From c9ef4ceee95deb143bf75677ca3833c98d4b0d61 Mon Sep 17 00:00:00 2001 From: Janardhan Pulivarthi Date: Mon, 3 Nov 2025 08:39:18 +0530 Subject: [PATCH 02/14] initialize blockentry, add block level synchronize --- .../instructions/ooc/OOCEvictionManager.java | 104 ++++++++++-------- 1 file changed, 60 insertions(+), 44 deletions(-) diff --git a/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCEvictionManager.java b/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCEvictionManager.java index 3c0e85b3d85..9345bf87690 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCEvictionManager.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCEvictionManager.java @@ -30,6 +30,10 @@ import java.util.LinkedHashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; /** @@ -68,29 +72,21 @@ * must ensure serialization finishes before adding to queue or make evict * wait on serialization; careful with native memory leaks. */ -public class OOCEvictionManager1 { +public class OOCEvictionManager { // Configuration: OOC buffer limit as percentage of heap private static final double OOC_BUFFER_PERCENTAGE = 0.15; // 15% of heap - // HOT TIER: In-memory blocks - private static final ConcurrentHashMap _blocks = new ConcurrentHashMap<>(); - - // COLD-TIER: Evicted blocks on the disk - private static final ConcurrentHashMap _spillLocations = new ConcurrentHashMap<>(); - - // Partition management: per-stream partitions - private static final ConcurrentHashMap _partitions = new ConcurrentHashMap<>(); - - // Partition <--> Stream mapping - private static final ConcurrentHashMap > _streamPartitions = new ConcurrentHashMap<>(); - // Memory limit for ByteBuffers private static long _limit; - private static long _size; + private static final AtomicLong _size = new AtomicLong(0); // Cache structures: map key -> MatrixBlock and eviction deque (head=oldest block) - private static LinkedHashMap _cache = new LinkedHashMap<>(); + private static LinkedHashMap _cache = new LinkedHashMap<>(); + + // Cache level lock + private static final Object _cacheLock = new Object(); +// private static final AtomicLong _hotSize = new AtomicLong(0); // Spill directory for evicted blocks private static String _spillDir; @@ -109,19 +105,25 @@ private enum BlockState { // Per-block state container with own lock. private static class BlockEntry { private final ReentrantLock lock = new ReentrantLock(); + private final Condition stateUpdate = lock.newCondition(); + private BlockState state = BlockState.HOT; private IndexedMatrixValue value; -// private final long streamId; -// private final int blockId; + private final long streamId; + private final int blockId; + private final long size; - BlockEntry(long streamId, int blockId, IndexedMatrixValue value) { + BlockEntry(IndexedMatrixValue value, long streamId, int blockId, long size) { this.value = value; + this.streamId = streamId; + this.blockId = blockId; + this.size = size; } } static { _limit = (long)(Runtime.getRuntime().maxMemory() * OOC_BUFFER_PERCENTAGE); // e.g., 20% of heap - _size = 0; + _size.set(0); _spillDir = LocalFileUtils.getUniqueWorkingDir("ooc_stream"); LocalFileUtils.createLocalFileIfNotExist(_spillDir); } @@ -129,37 +131,51 @@ private static class BlockEntry { /** * Store a block in the OOC cache (serialize once) */ - public static synchronized void put(long streamId, int blockId, IndexedMatrixValue value) { + public static void put(long streamId, int blockId, IndexedMatrixValue value) { MatrixBlock mb = (MatrixBlock) value.getValue(); long size = estimateSerializedSize(mb); String key = streamId + "_" + blockId; - IndexedMatrixValue old = _cache.remove(key); // remove old value + BlockEntry newEntry = new BlockEntry(value, streamId, blockId, size); + BlockEntry old; + synchronized (_cacheLock) { + old = _cache.put(key, newEntry); + } +// BlockEntry old = _cache.remove(key); // remove old value if (old != null) { - _size -= estimateSerializedSize((MatrixBlock) old.getValue()); + old.lock.lock(); + try { + if (old.state == BlockState.HOT) { + _size.addAndGet(-old.size); // read and update size in atomic operation + } + } finally { + old.lock.unlock(); + } } + _size.addAndGet(size); //make room if needed evict(size); - - _cache.put(key, value); // put new value last - _size += size; } /** * Get a block from the OOC cache (deserialize on read) */ - public static synchronized IndexedMatrixValue get(long streamId, int blockId) { + public static IndexedMatrixValue get(long streamId, int blockId) { String key = streamId + "_" + blockId; - IndexedMatrixValue imv = _cache.get(key); + BlockEntry imv; + + synchronized (_cacheLock) { + imv = _cache.get(key); - if (imv != null && _policy == RPolicy.LRU) { - _cache.remove(key); - _cache.put(key, imv); //add last semantic + if (imv != null && _policy == RPolicy.LRU) { + _cache.remove(key); + _cache.put(key, imv); //add last semantic + } } //restore if needed - return (imv.getValue() != null) ? imv : + return (imv.value.getValue() != null) ? imv.value : loadFromDisk(streamId, blockId); } @@ -169,10 +185,10 @@ public static synchronized IndexedMatrixValue get(long streamId, int blockId) { private static void evict(long requiredSize) { try { int pos = 0; - while(_size + requiredSize > _limit && pos++ < _cache.size()) { + while(_size.get() + requiredSize > _limit && pos++ < _cache.size()) { // System.out.println("BUFFER: "+_size+"/"+_limit+" size="+_cache.size()); - Map.Entry tmp = removeFirstFromCache(); - if( tmp == null || tmp.getValue().getValue() == null ) { + Map.Entry tmp = removeFirstFromCache(); + if( tmp == null || tmp.getValue().value.getValue() == null ) { if( tmp != null ) _cache.put(tmp.getKey(), tmp.getValue()); continue; @@ -184,13 +200,13 @@ private static void evict(long requiredSize) { if (!spillDirFile.exists()) { spillDirFile.mkdirs(); } - LocalFileUtils.writeMatrixBlockToLocal(filename, (MatrixBlock)tmp.getValue().getValue()); + LocalFileUtils.writeMatrixBlockToLocal(filename, (MatrixBlock)tmp.getValue().value.getValue()); // Evict from memory - long freedSize = estimateSerializedSize((MatrixBlock)tmp.getValue().getValue()); - tmp.getValue().setValue(null); + long freedSize = estimateSerializedSize((MatrixBlock)tmp.getValue().value.getValue()); + tmp.getValue().value.setValue(null); _cache.put(tmp.getKey(), tmp.getValue()); // add last semantic - _size -= freedSize; + _size.addAndGet(-freedSize); } } catch(Exception ex) { @@ -213,9 +229,9 @@ private static IndexedMatrixValue loadFromDisk(long streamId, int blockId) { // Read from disk and put into original indexed matrix value MatrixBlock mb = LocalFileUtils.readMatrixBlockFromLocal(filename); - IndexedMatrixValue imv = _cache.get(key); - imv.setValue(mb); - return imv; + BlockEntry imv = _cache.get(key); + imv.value.setValue(mb); + return imv.value; } catch(Exception ex) { throw new DMLRuntimeException(ex); @@ -226,10 +242,10 @@ private static long estimateSerializedSize(MatrixBlock mb) { return mb.getExactSerializedSize(); } - private static Map.Entry removeFirstFromCache() { + private static Map.Entry removeFirstFromCache() { //move iterator to first entry - Iterator> iter = _cache.entrySet().iterator(); - Map.Entry entry = iter.next(); + Iterator> iter = _cache.entrySet().iterator(); + Map.Entry entry = iter.next(); //remove current iterator entry iter.remove(); From 1760e1b10aafed5226d708a84db9c09b6876a6c2 Mon Sep 17 00:00:00 2001 From: Janardhan Pulivarthi Date: Mon, 3 Nov 2025 23:51:17 +0530 Subject: [PATCH 03/14] fix size check for eviction --- .../instructions/ooc/OOCEvictionManager.java | 39 ++++++++++++++----- 1 file changed, 30 insertions(+), 9 deletions(-) diff --git a/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCEvictionManager.java b/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCEvictionManager.java index 9345bf87690..9deb6a9ed2b 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCEvictionManager.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCEvictionManager.java @@ -75,7 +75,7 @@ public class OOCEvictionManager { // Configuration: OOC buffer limit as percentage of heap - private static final double OOC_BUFFER_PERCENTAGE = 0.15; // 15% of heap + private static final double OOC_BUFFER_PERCENTAGE = 0.15 * 0.01; // 15% of heap // Memory limit for ByteBuffers private static long _limit; @@ -86,7 +86,6 @@ public class OOCEvictionManager { // Cache level lock private static final Object _cacheLock = new Object(); -// private static final AtomicLong _hotSize = new AtomicLong(0); // Spill directory for evicted blocks private static String _spillDir; @@ -139,9 +138,10 @@ public static void put(long streamId, int blockId, IndexedMatrixValue value) { BlockEntry newEntry = new BlockEntry(value, streamId, blockId, size); BlockEntry old; synchronized (_cacheLock) { - old = _cache.put(key, newEntry); + old = _cache.put(key, newEntry); // remove old value, put new value } -// BlockEntry old = _cache.remove(key); // remove old value + + // Handle replacement with a new lock if (old != null) { old.lock.lock(); try { @@ -173,10 +173,31 @@ public static IndexedMatrixValue get(long streamId, int blockId) { _cache.put(key, imv); //add last semantic } } - - //restore if needed - return (imv.value.getValue() != null) ? imv.value : - loadFromDisk(streamId, blockId); + + // use lock and check state + imv.lock.lock(); + try { + // 1. wait for eviction to complete + while (imv.state == BlockState.EVICTING) { + try { + imv.stateUpdate.wait(); + } catch (InterruptedException e) { + + throw new DMLRuntimeException(e); + } + } + + // 2. check if the block is in HOT + if (imv.state == BlockState.HOT) { + return imv.value; + } + + } finally { + imv.lock.unlock(); + } + + // restore, since the block is COLD + return loadFromDisk(streamId, blockId); } /** @@ -185,7 +206,7 @@ public static IndexedMatrixValue get(long streamId, int blockId) { private static void evict(long requiredSize) { try { int pos = 0; - while(_size.get() + requiredSize > _limit && pos++ < _cache.size()) { + while(_size.get() > _limit && pos++ < _cache.size()) { // System.out.println("BUFFER: "+_size+"/"+_limit+" size="+_cache.size()); Map.Entry tmp = removeFirstFromCache(); if( tmp == null || tmp.getValue().value.getValue() == null ) { From 2da813c533b1191df5661c7a56f86d00baccd0dd Mon Sep 17 00:00:00 2001 From: Janardhan Pulivarthi Date: Mon, 3 Nov 2025 23:58:43 +0530 Subject: [PATCH 04/14] update block state after eviction --- .../instructions/ooc/OOCEvictionManager.java | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCEvictionManager.java b/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCEvictionManager.java index 9deb6a9ed2b..d4a09df32ed 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCEvictionManager.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCEvictionManager.java @@ -225,8 +225,20 @@ private static void evict(long requiredSize) { // Evict from memory long freedSize = estimateSerializedSize((MatrixBlock)tmp.getValue().value.getValue()); - tmp.getValue().value.setValue(null); - _cache.put(tmp.getKey(), tmp.getValue()); // add last semantic + + BlockEntry entry = tmp.getValue(); + + entry.lock.lock(); + try { + entry.value.setValue(null); + entry.state = BlockState.COLD; // set state to cold, since writing to disk + + + } finally { + entry.lock.unlock(); + } + + _cache.put(tmp.getKey(), entry); // add last semantic _size.addAndGet(-freedSize); } } From 50958eded2a89401514c548befc02831fb0a7ab0 Mon Sep 17 00:00:00 2001 From: Janardhan Pulivarthi Date: Tue, 4 Nov 2025 01:00:42 +0530 Subject: [PATCH 05/14] keep all _cache updates synchronized --- .../instructions/ooc/OOCEvictionManager.java | 42 +++++++++++++++---- .../runtime/util/PartitionFileManager.java | 4 ++ 2 files changed, 37 insertions(+), 9 deletions(-) create mode 100644 src/main/java/org/apache/sysds/runtime/util/PartitionFileManager.java diff --git a/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCEvictionManager.java b/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCEvictionManager.java index d4a09df32ed..836b24fc085 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCEvictionManager.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCEvictionManager.java @@ -75,7 +75,9 @@ public class OOCEvictionManager { // Configuration: OOC buffer limit as percentage of heap - private static final double OOC_BUFFER_PERCENTAGE = 0.15 * 0.01; // 15% of heap + private static final double OOC_BUFFER_PERCENTAGE = 0.15 * 0.01 * 2; // 15% of heap + + private static final double PARTITION_EVICTION_SIZE = 64 * 1024 * 1024; // 64 MB // Memory limit for ByteBuffers private static long _limit; @@ -84,6 +86,10 @@ public class OOCEvictionManager { // Cache structures: map key -> MatrixBlock and eviction deque (head=oldest block) private static LinkedHashMap _cache = new LinkedHashMap<>(); + // Spill related structures + private static ConcurrentHashMap _spillLocations = new ConcurrentHashMap<>(); + // private static ConcurrentHashMap _partitions = new ConcurrentHashMap<>(); + // Cache level lock private static final Object _cacheLock = new Object(); @@ -101,6 +107,20 @@ private enum BlockState { COLD // On disk } + private static class spillLocation { + // structure of spillLocation: file, offset + final int partitionId; + final long offset; + + spillLocation(int partitionId, long offset, int partitionId1, long offset1) { + + this.partitionId = partitionId1; + this.offset = offset1; + } + } + + + // Per-block state container with own lock. private static class BlockEntry { private final ReentrantLock lock = new ReentrantLock(); @@ -207,7 +227,7 @@ private static void evict(long requiredSize) { try { int pos = 0; while(_size.get() > _limit && pos++ < _cache.size()) { -// System.out.println("BUFFER: "+_size+"/"+_limit+" size="+_cache.size()); + System.err.println("BUFFER: "+_size+"/"+_limit+" size="+_cache.size()); Map.Entry tmp = removeFirstFromCache(); if( tmp == null || tmp.getValue().value.getValue() == null ) { if( tmp != null ) @@ -238,7 +258,9 @@ private static void evict(long requiredSize) { entry.lock.unlock(); } - _cache.put(tmp.getKey(), entry); // add last semantic + synchronized (_cacheLock) { + _cache.put(tmp.getKey(), entry); // add last semantic + } _size.addAndGet(-freedSize); } } @@ -276,13 +298,15 @@ private static long estimateSerializedSize(MatrixBlock mb) { } private static Map.Entry removeFirstFromCache() { - //move iterator to first entry - Iterator> iter = _cache.entrySet().iterator(); - Map.Entry entry = iter.next(); + synchronized (_cacheLock) { + //move iterator to first entry + Iterator> iter = _cache.entrySet().iterator(); + Map.Entry entry = iter.next(); - //remove current iterator entry - iter.remove(); + //remove current iterator entry + iter.remove(); - return entry; + return entry; + } } } diff --git a/src/main/java/org/apache/sysds/runtime/util/PartitionFileManager.java b/src/main/java/org/apache/sysds/runtime/util/PartitionFileManager.java new file mode 100644 index 00000000000..5886598cb64 --- /dev/null +++ b/src/main/java/org/apache/sysds/runtime/util/PartitionFileManager.java @@ -0,0 +1,4 @@ +package org.apache.sysds.runtime.util; + +public class partitionFileManager { +} From 23673e16209c98b2bb3b2d62d848c16f94946016 Mon Sep 17 00:00:00 2001 From: Janardhan Pulivarthi Date: Tue, 4 Nov 2025 01:15:55 +0530 Subject: [PATCH 06/14] spillfile and partitionfile structures * also add missing lock for the cacheobject --- .../instructions/ooc/OOCEvictionManager.java | 39 ++++++++++++++++--- 1 file changed, 33 insertions(+), 6 deletions(-) diff --git a/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCEvictionManager.java b/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCEvictionManager.java index 836b24fc085..90abcf6ed44 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCEvictionManager.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCEvictionManager.java @@ -88,7 +88,9 @@ public class OOCEvictionManager { // Spill related structures private static ConcurrentHashMap _spillLocations = new ConcurrentHashMap<>(); - // private static ConcurrentHashMap _partitions = new ConcurrentHashMap<>(); + private static ConcurrentHashMap _partitions = new ConcurrentHashMap<>(); + private static final AtomicInteger _partitionCounter = new AtomicInteger(0); + // Cache level lock private static final Object _cacheLock = new Object(); @@ -119,8 +121,17 @@ private static class spillLocation { } } + private static class partitionFile { + final String filePath; + final long streamId; + private partitionFile(String filePath, long streamId) { + this.filePath = filePath; + this.streamId = streamId; + } + } + // Per-block state container with own lock. private static class BlockEntry { private final ReentrantLock lock = new ReentrantLock(); @@ -229,9 +240,15 @@ private static void evict(long requiredSize) { while(_size.get() > _limit && pos++ < _cache.size()) { System.err.println("BUFFER: "+_size+"/"+_limit+" size="+_cache.size()); Map.Entry tmp = removeFirstFromCache(); - if( tmp == null || tmp.getValue().value.getValue() == null ) { - if( tmp != null ) - _cache.put(tmp.getKey(), tmp.getValue()); + + if (tmp == null) { continue; } + + BlockEntry entry = tmp.getValue(); + + if( entry.value.getValue() == null ) { + synchronized (_cacheLock) { + _cache.put(tmp.getKey(), entry); + } continue; } @@ -243,11 +260,21 @@ private static void evict(long requiredSize) { } LocalFileUtils.writeMatrixBlockToLocal(filename, (MatrixBlock)tmp.getValue().value.getValue()); + // partition file + // 1. generate a new ID for the present "partition" (file) + int partitionId = _partitionCounter.incrementAndGet(); + + // 2. create the partition file metadata + partitionFile partFile = new partitionFile(filename, entry.streamId); + _partitions.put(partitionId, partFile); + + // 3. create the spillLocation + spillLocation sloc = new spillLocation(partitionId, entry.streamId, entry.blockId, entry.size); + _spillLocations.put(filename, sloc); + // Evict from memory long freedSize = estimateSerializedSize((MatrixBlock)tmp.getValue().value.getValue()); - BlockEntry entry = tmp.getValue(); - entry.lock.lock(); try { entry.value.setValue(null); From 07dd611fb7741021c3dbe010d918cdcf1f35220a Mon Sep 17 00:00:00 2001 From: Janardhan Pulivarthi Date: Tue, 4 Nov 2025 01:37:46 +0530 Subject: [PATCH 07/14] use spillLocation and partitionFile for a single block * there's an issue with spillLocation get --- .../instructions/ooc/OOCEvictionManager.java | 32 ++++++++++++++++--- 1 file changed, 28 insertions(+), 4 deletions(-) diff --git a/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCEvictionManager.java b/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCEvictionManager.java index 90abcf6ed44..e06425244c3 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCEvictionManager.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCEvictionManager.java @@ -262,7 +262,7 @@ private static void evict(long requiredSize) { // partition file // 1. generate a new ID for the present "partition" (file) - int partitionId = _partitionCounter.incrementAndGet(); + int partitionId = _partitionCounter.getAndIncrement(); // 2. create the partition file metadata partitionFile partFile = new partitionFile(filename, entry.streamId); @@ -301,7 +301,19 @@ private static void evict(long requiredSize) { */ private static IndexedMatrixValue loadFromDisk(long streamId, int blockId) { String key = streamId + "_" + blockId; - String filename = _spillDir + "/" + key; + + // 1. find the blocks address (spill location) + spillLocation sloc = _spillLocations.get(key); + if (sloc == null) { + throw new DMLRuntimeException("Failed to load spill location for: " + key); + } + + partitionFile partFile = _partitions.get(sloc.partitionId); + if (partFile == null) { + throw new DMLRuntimeException("Failed to load partition for: " + sloc.partitionId); + } + + String filename = partFile.filePath; try { // check if file exists @@ -311,8 +323,20 @@ private static IndexedMatrixValue loadFromDisk(long streamId, int blockId) { // Read from disk and put into original indexed matrix value MatrixBlock mb = LocalFileUtils.readMatrixBlockFromLocal(filename); - BlockEntry imv = _cache.get(key); - imv.value.setValue(mb); + BlockEntry imv; + synchronized (_cacheLock) { + imv = _cache.get(key); + } + + imv.lock.lock(); + try { + if (imv.state == BlockState.COLD) { + imv.value.setValue(mb); + _size.addAndGet(imv.size); + } + } finally { + imv.lock.unlock(); + } return imv.value; } catch(Exception ex) { From d5a2bc0796f0f01563d9c2a792a1b40a7ebba3bc Mon Sep 17 00:00:00 2001 From: Janardhan Pulivarthi Date: Tue, 4 Nov 2025 01:43:53 +0530 Subject: [PATCH 08/14] fix spillLocations inputs, use correct key instead of filename --- .../sysds/runtime/instructions/ooc/OOCEvictionManager.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCEvictionManager.java b/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCEvictionManager.java index e06425244c3..9a326dbfc90 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCEvictionManager.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCEvictionManager.java @@ -270,7 +270,7 @@ private static void evict(long requiredSize) { // 3. create the spillLocation spillLocation sloc = new spillLocation(partitionId, entry.streamId, entry.blockId, entry.size); - _spillLocations.put(filename, sloc); + _spillLocations.put(tmp.getKey(), sloc); // Evict from memory long freedSize = estimateSerializedSize((MatrixBlock)tmp.getValue().value.getValue()); From 50214e33399fd5f669f9180877ab139cf9c13d42 Mon Sep 17 00:00:00 2001 From: Janardhan Pulivarthi Date: Tue, 4 Nov 2025 09:22:11 +0530 Subject: [PATCH 09/14] use filestream to read/write --- .../instructions/ooc/OOCEvictionManager.java | 120 ++++++++++++++---- 1 file changed, 98 insertions(+), 22 deletions(-) diff --git a/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCEvictionManager.java b/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCEvictionManager.java index 9a326dbfc90..9f39f419a19 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCEvictionManager.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCEvictionManager.java @@ -19,13 +19,26 @@ package org.apache.sysds.runtime.instructions.ooc; +import org.apache.hadoop.io.Writable; import org.apache.sysds.runtime.DMLRuntimeException; import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue; +import org.apache.sysds.runtime.io.IOUtilFunctions; import org.apache.sysds.runtime.matrix.data.MatrixBlock; +import org.apache.sysds.runtime.matrix.data.MatrixIndexes; +import org.apache.sysds.runtime.util.FastBufferedDataInputStream; +import org.apache.sysds.runtime.util.FastBufferedDataOutputStream; import org.apache.sysds.runtime.util.LocalFileUtils; +import java.io.BufferedOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.channels.Channels; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.Map; @@ -258,7 +271,37 @@ private static void evict(long requiredSize) { if (!spillDirFile.exists()) { spillDirFile.mkdirs(); } - LocalFileUtils.writeMatrixBlockToLocal(filename, (MatrixBlock)tmp.getValue().value.getValue()); +// LocalFileUtils.writeMatrixBlockToLocal(filename, (MatrixBlock)tmp.getValue().value.getValue()); + +// try { +// LocalFileUtils.writeWritableToLocal(filename, entry.value.getValue(), false); +// } catch (IOException e) { +// throw new DMLRuntimeException(e); +// } + +// try (FileOutputStream fos = new FileOutputStream(filename); +// DataOutputStream dos = new DataOutputStream(new BufferedOutputStream(fos))) { +// entry.value.getIndexes().write(dos); // write Indexes +// entry.value.getValue().write(dos); // write MatrixBlock +// +// dos.flush(); +// } + + // use the same way we do in other places in systemds + FileOutputStream fos = null; + FastBufferedDataOutputStream dos = null; + try { + fos = new FileOutputStream(filename); +// dos = new DataOutputStream(new BufferedOutputStream(fos)) + dos = new FastBufferedDataOutputStream(fos); + entry.value.getIndexes().write(dos); // write Indexes + entry.value.getValue().write(dos); // write MatrixBlock + } catch (IOException e) { + throw new DMLRuntimeException(e); + } finally { + IOUtilFunctions.closeSilently(dos); + IOUtilFunctions.closeSilently(fos); + } // partition file // 1. generate a new ID for the present "partition" (file) @@ -315,33 +358,66 @@ private static IndexedMatrixValue loadFromDisk(long streamId, int blockId) { String filename = partFile.filePath; + MatrixIndexes ix = new MatrixIndexes(); + MatrixBlock mb = new MatrixBlock(); + + // Create an empty object to read data into. +// IndexedMatrixValue imvRead = new IndexedMatrixValue(new MatrixIndexes(), new MatrixBlock()); +// +// try { +// LocalFileUtils.readWritableFromLocal(filename, (Writable) imvRead); +// } catch (IOException e) { +// throw new DMLRuntimeException(e); +// } + + +// try (RandomAccessFile raf = new RandomAccessFile(filename, "r")) { +// // seek to the block specfic offset +// raf.seek(sloc.offset); +// +// try (DataInputStream dis = new DataInputStream(Channels.newInputStream(raf.getChannel()))) { +// ix.readFields(dis); +// mb.readFields(dis); +// } +// } catch (IOException e) { +// throw new DMLRuntimeException(e); +// } + + // inspire from existing utilities in systemds + FileInputStream fis = null; + FastBufferedDataInputStream din = null; try { - // check if file exists - if (!LocalFileUtils.isExisting(filename)) { - throw new IOException("File " + filename + " does not exist"); - } + fis = new FileInputStream(filename); + din = new FastBufferedDataInputStream(fis, 65536); // 64K buffer + ix.readFields(din); // 1. Read Indexes + mb.readFields(din); // 2. Read Block + } + catch (IOException ex) { + throw new DMLRuntimeException("Failed to load block " + key + " from " + filename, ex); + } + finally { + // Use the SystemDS-native close + IOUtilFunctions.closeSilently(din); + IOUtilFunctions.closeSilently(fis); + } // Read from disk and put into original indexed matrix value - MatrixBlock mb = LocalFileUtils.readMatrixBlockFromLocal(filename); - BlockEntry imv; - synchronized (_cacheLock) { - imv = _cache.get(key); - } +// MatrixBlock mb = LocalFileUtils.readMatrixBlockFromLocal(filename); + BlockEntry imvCacheEntry; + synchronized (_cacheLock) { + imvCacheEntry = _cache.get(key); + } - imv.lock.lock(); - try { - if (imv.state == BlockState.COLD) { - imv.value.setValue(mb); - _size.addAndGet(imv.size); - } - } finally { - imv.lock.unlock(); + imvCacheEntry.lock.lock(); + try { + if (imvCacheEntry.state == BlockState.COLD) { + imvCacheEntry.value = new IndexedMatrixValue(ix, mb); + _size.addAndGet(imvCacheEntry.size); } - return imv.value; - } - catch(Exception ex) { - throw new DMLRuntimeException(ex); + } finally { + imvCacheEntry.lock.unlock(); } + return imvCacheEntry.value; } private static long estimateSerializedSize(MatrixBlock mb) { From f1b95a1fb269ca3f7298cf8235f4bf4f6e1bf6f6 Mon Sep 17 00:00:00 2001 From: Janardhan Pulivarthi Date: Wed, 5 Nov 2025 13:54:56 +0530 Subject: [PATCH 10/14] add partionfile setup, and offsets --- .../instructions/ooc/OOCEvictionManager.java | 146 +++++++----------- 1 file changed, 58 insertions(+), 88 deletions(-) diff --git a/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCEvictionManager.java b/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCEvictionManager.java index 9f39f419a19..56223659c9b 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCEvictionManager.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCEvictionManager.java @@ -19,7 +19,6 @@ package org.apache.sysds.runtime.instructions.ooc; -import org.apache.hadoop.io.Writable; import org.apache.sysds.runtime.DMLRuntimeException; import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue; import org.apache.sysds.runtime.io.IOUtilFunctions; @@ -29,21 +28,17 @@ import org.apache.sysds.runtime.util.FastBufferedDataOutputStream; import org.apache.sysds.runtime.util.LocalFileUtils; -import java.io.BufferedOutputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; import java.io.File; import java.io.FileInputStream; -import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; -import java.io.RandomAccessFile; -import java.nio.channels.Channels; +import java.util.ArrayList; import java.util.Iterator; import java.util.LinkedHashMap; +import java.util.LinkedList; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Condition; @@ -127,10 +122,10 @@ private static class spillLocation { final int partitionId; final long offset; - spillLocation(int partitionId, long offset, int partitionId1, long offset1) { + spillLocation(int partitionId, long offset) { - this.partitionId = partitionId1; - this.offset = offset1; + this.partitionId = partitionId; + this.offset = offset; } } @@ -199,7 +194,7 @@ public static void put(long streamId, int blockId, IndexedMatrixValue value) { _size.addAndGet(size); //make room if needed - evict(size); + evict(); } /** @@ -247,74 +242,62 @@ public static IndexedMatrixValue get(long streamId, int blockId) { /** * Evict ByteBuffers to disk */ - private static void evict(long requiredSize) { + private static void evict() { + + if (_size.get() <= _limit) { // only trigger eviction, if filled. + return; + } + + // write to partition file + // 1. generate a new ID for the present "partition" (file) + int partitionId = _partitionCounter.getAndIncrement(); + + // Spill to disk + String filename = _spillDir + "/stream_batch_part_" + partitionId; + File spillDirFile = new File(_spillDir); + if (!spillDirFile.exists()) { + spillDirFile.mkdirs(); + } + + // 2. create the partition file metadata + partitionFile partFile = new partitionFile(filename, 0); + _partitions.put(partitionId, partFile); + + FileOutputStream fos = null; + FastBufferedDataOutputStream dos = null; try { + fos = new FileOutputStream(filename); + dos = new FastBufferedDataOutputStream(fos); + int pos = 0; while(_size.get() > _limit && pos++ < _cache.size()) { System.err.println("BUFFER: "+_size+"/"+_limit+" size="+_cache.size()); Map.Entry tmp = removeFirstFromCache(); - - if (tmp == null) { continue; } +// candidates.add(tmp); + if (tmp == null) { continue; } // cache is empty BlockEntry entry = tmp.getValue(); + // Skip if block is null. i.e, COLD if( entry.value.getValue() == null ) { synchronized (_cacheLock) { _cache.put(tmp.getKey(), entry); } continue; } - - // Spill to disk - String filename = _spillDir + "/" + tmp.getKey(); - File spillDirFile = new File(_spillDir); - if (!spillDirFile.exists()) { - spillDirFile.mkdirs(); - } -// LocalFileUtils.writeMatrixBlockToLocal(filename, (MatrixBlock)tmp.getValue().value.getValue()); - -// try { -// LocalFileUtils.writeWritableToLocal(filename, entry.value.getValue(), false); -// } catch (IOException e) { -// throw new DMLRuntimeException(e); -// } - -// try (FileOutputStream fos = new FileOutputStream(filename); -// DataOutputStream dos = new DataOutputStream(new BufferedOutputStream(fos))) { -// entry.value.getIndexes().write(dos); // write Indexes -// entry.value.getValue().write(dos); // write MatrixBlock -// -// dos.flush(); -// } - - // use the same way we do in other places in systemds - FileOutputStream fos = null; - FastBufferedDataOutputStream dos = null; - try { - fos = new FileOutputStream(filename); -// dos = new DataOutputStream(new BufferedOutputStream(fos)) - dos = new FastBufferedDataOutputStream(fos); - entry.value.getIndexes().write(dos); // write Indexes - entry.value.getValue().write(dos); // write MatrixBlock - } catch (IOException e) { - throw new DMLRuntimeException(e); - } finally { - IOUtilFunctions.closeSilently(dos); - IOUtilFunctions.closeSilently(fos); - } - - // partition file - // 1. generate a new ID for the present "partition" (file) - int partitionId = _partitionCounter.getAndIncrement(); - // 2. create the partition file metadata - partitionFile partFile = new partitionFile(filename, entry.streamId); - _partitions.put(partitionId, partFile); + // flush any buffered data to the file + dos.flush(); + long offset = fos.getChannel().position(); + + entry.value.getIndexes().write(dos); // write Indexes + entry.value.getValue().write(dos); // 3. create the spillLocation - spillLocation sloc = new spillLocation(partitionId, entry.streamId, entry.blockId, entry.size); + spillLocation sloc = new spillLocation(partitionId, offset); _spillLocations.put(tmp.getKey(), sloc); + // Evict from memory long freedSize = estimateSerializedSize((MatrixBlock)tmp.getValue().value.getValue()); @@ -322,8 +305,7 @@ private static void evict(long requiredSize) { try { entry.value.setValue(null); entry.state = BlockState.COLD; // set state to cold, since writing to disk - - + entry.stateUpdate.signalAll(); } finally { entry.lock.unlock(); } @@ -334,8 +316,11 @@ private static void evict(long requiredSize) { _size.addAndGet(-freedSize); } } - catch(Exception ex) { + catch(IOException ex) { throw new DMLRuntimeException(ex); + } finally { + IOUtilFunctions.closeSilently(dos); + IOUtilFunctions.closeSilently(fos); } } @@ -358,31 +343,10 @@ private static IndexedMatrixValue loadFromDisk(long streamId, int blockId) { String filename = partFile.filePath; + // Create an empty object to read data into. MatrixIndexes ix = new MatrixIndexes(); MatrixBlock mb = new MatrixBlock(); - // Create an empty object to read data into. -// IndexedMatrixValue imvRead = new IndexedMatrixValue(new MatrixIndexes(), new MatrixBlock()); -// -// try { -// LocalFileUtils.readWritableFromLocal(filename, (Writable) imvRead); -// } catch (IOException e) { -// throw new DMLRuntimeException(e); -// } - - -// try (RandomAccessFile raf = new RandomAccessFile(filename, "r")) { -// // seek to the block specfic offset -// raf.seek(sloc.offset); -// -// try (DataInputStream dis = new DataInputStream(Channels.newInputStream(raf.getChannel()))) { -// ix.readFields(dis); -// mb.readFields(dis); -// } -// } catch (IOException e) { -// throw new DMLRuntimeException(e); -// } - // inspire from existing utilities in systemds FileInputStream fis = null; FastBufferedDataInputStream din = null; @@ -401,8 +365,7 @@ private static IndexedMatrixValue loadFromDisk(long streamId, int blockId) { IOUtilFunctions.closeSilently(fis); } - // Read from disk and put into original indexed matrix value -// MatrixBlock mb = LocalFileUtils.readMatrixBlockFromLocal(filename); + // Read from disk and put into original indexed matrix value BlockEntry imvCacheEntry; synchronized (_cacheLock) { imvCacheEntry = _cache.get(key); @@ -412,11 +375,14 @@ private static IndexedMatrixValue loadFromDisk(long streamId, int blockId) { try { if (imvCacheEntry.state == BlockState.COLD) { imvCacheEntry.value = new IndexedMatrixValue(ix, mb); + imvCacheEntry.state = BlockState.HOT; _size.addAndGet(imvCacheEntry.size); } } finally { imvCacheEntry.lock.unlock(); } + + evict(); // when we add the block, we shall check for limit. return imvCacheEntry.value; } @@ -426,6 +392,10 @@ private static long estimateSerializedSize(MatrixBlock mb) { private static Map.Entry removeFirstFromCache() { synchronized (_cacheLock) { + + if (_cache.isEmpty()) { + return null; + } //move iterator to first entry Iterator> iter = _cache.entrySet().iterator(); Map.Entry entry = iter.next(); From dcd1f0394db6a97e30f1efb856b6b7d7d0f4db29 Mon Sep 17 00:00:00 2001 From: Janardhan Pulivarthi Date: Wed, 5 Nov 2025 16:12:37 +0530 Subject: [PATCH 11/14] fix file reading --- .../instructions/ooc/OOCEvictionManager.java | 56 ++++++++++++------- 1 file changed, 37 insertions(+), 19 deletions(-) diff --git a/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCEvictionManager.java b/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCEvictionManager.java index 56223659c9b..a0347138ea3 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCEvictionManager.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCEvictionManager.java @@ -28,16 +28,21 @@ import org.apache.sysds.runtime.util.FastBufferedDataOutputStream; import org.apache.sysds.runtime.util.LocalFileUtils; +import java.io.DataInputStream; import java.io.File; import java.io.FileInputStream; +import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.channels.Channels; import java.util.ArrayList; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -99,6 +104,9 @@ public class OOCEvictionManager { private static ConcurrentHashMap _partitions = new ConcurrentHashMap<>(); private static final AtomicInteger _partitionCounter = new AtomicInteger(0); + // Track which partitions belong to which stream (for cleanup) + private static final ConcurrentHashMap> _streamPartitions = new ConcurrentHashMap<>(); + // Cache level lock private static final Object _cacheLock = new Object(); @@ -248,6 +256,7 @@ private static void evict() { return; } + long totalFreedSize = 0; // write to partition file // 1. generate a new ID for the present "partition" (file) int partitionId = _partitionCounter.getAndIncrement(); @@ -274,7 +283,7 @@ private static void evict() { System.err.println("BUFFER: "+_size+"/"+_limit+" size="+_cache.size()); Map.Entry tmp = removeFirstFromCache(); // candidates.add(tmp); - if (tmp == null) { continue; } // cache is empty + if (tmp == null) { break; } // cache is empty BlockEntry entry = tmp.getValue(); @@ -297,10 +306,14 @@ private static void evict() { spillLocation sloc = new spillLocation(partitionId, offset); _spillLocations.put(tmp.getKey(), sloc); + // 4. track file for cleanup + _streamPartitions + .computeIfAbsent(entry.streamId, k -> ConcurrentHashMap.newKeySet()) + .add(filename); // Evict from memory long freedSize = estimateSerializedSize((MatrixBlock)tmp.getValue().value.getValue()); - + totalFreedSize += freedSize; entry.lock.lock(); try { entry.value.setValue(null); @@ -313,7 +326,6 @@ private static void evict() { synchronized (_cacheLock) { _cache.put(tmp.getKey(), entry); // add last semantic } - _size.addAndGet(-freedSize); } } catch(IOException ex) { @@ -322,6 +334,10 @@ private static void evict() { IOUtilFunctions.closeSilently(dos); IOUtilFunctions.closeSilently(fos); } + + if (totalFreedSize > 0) { // note the size, without evicted blocks + _size.addAndGet(-totalFreedSize); + } } /** @@ -348,23 +364,25 @@ private static IndexedMatrixValue loadFromDisk(long streamId, int blockId) { MatrixBlock mb = new MatrixBlock(); // inspire from existing utilities in systemds - FileInputStream fis = null; - FastBufferedDataInputStream din = null; - try { - fis = new FileInputStream(filename); - din = new FastBufferedDataInputStream(fis, 65536); // 64K buffer - ix.readFields(din); // 1. Read Indexes - mb.readFields(din); // 2. Read Block - } - catch (IOException ex) { - throw new DMLRuntimeException("Failed to load block " + key + " from " + filename, ex); - } - finally { - // Use the SystemDS-native close - IOUtilFunctions.closeSilently(din); - IOUtilFunctions.closeSilently(fis); +// FileInputStream fis = null; +// FastBufferedDataInputStream din = null; + try (RandomAccessFile raf = new RandomAccessFile(filename, "r")) { + raf.seek(sloc.offset); + + try { + DataInputStream dis = new DataInputStream(Channels.newInputStream(raf.getChannel())); + ix.readFields(dis); // 1. Read Indexes + mb.readFields(dis); // 2. Read Block + } catch (IOException ex) { + throw new DMLRuntimeException("Failed to load block " + key + " from " + filename, ex); +// } finally { +// // Use the SystemDS-native close +// IOUtilFunctions.closeSilently(dis); +// IOUtilFunctions.closeSilently(fis); + } + } catch (IOException e) { + throw new RuntimeException(e); } - // Read from disk and put into original indexed matrix value BlockEntry imvCacheEntry; synchronized (_cacheLock) { From 7d26ad71bb763f936cc2a78052fb570f6af368ae Mon Sep 17 00:00:00 2001 From: Janardhan Pulivarthi Date: Wed, 5 Nov 2025 17:28:07 +0530 Subject: [PATCH 12/14] collect a list of blocks to evict at once --- .../instructions/ooc/OOCEvictionManager.java | 134 +++++++++++------- 1 file changed, 85 insertions(+), 49 deletions(-) diff --git a/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCEvictionManager.java b/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCEvictionManager.java index a0347138ea3..7e7f9352ed2 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCEvictionManager.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCEvictionManager.java @@ -214,13 +214,16 @@ public static IndexedMatrixValue get(long streamId, int blockId) { synchronized (_cacheLock) { imv = _cache.get(key); - + System.err.println( "value of imv: " + imv); if (imv != null && _policy == RPolicy.LRU) { _cache.remove(key); _cache.put(key, imv); //add last semantic } } + if (imv == null) { + throw new DMLRuntimeException("Block not found in cache: " + key); + } // use lock and check state imv.lock.lock(); try { @@ -251,12 +254,47 @@ public static IndexedMatrixValue get(long streamId, int blockId) { * Evict ByteBuffers to disk */ private static void evict() { - + long currentSize = _size.get(); if (_size.get() <= _limit) { // only trigger eviction, if filled. + System.err.println("Evicting condition: " + _size.get() + "/" + _limit); return; } + // --- 1. COLLECTION PHASE --- long totalFreedSize = 0; + // list of eviction candidates + List> candidates = new ArrayList<>(); + long targetFreedSize = Math.max(currentSize - _limit, (long) PARTITION_EVICTION_SIZE); + + synchronized (_cacheLock) { + + //move iterator to first entry + Iterator> iter = _cache.entrySet().iterator(); + + while (iter.hasNext() && totalFreedSize < targetFreedSize) { + Map.Entry e = iter.next(); + BlockEntry entry = e.getValue(); + + entry.lock.lock(); + try { + if (entry.state == BlockState.HOT) { + entry.state = BlockState.EVICTING; + candidates.add(e); + totalFreedSize += entry.size; + + //remove current iterator entry + iter.remove(); + } + } finally { + entry.lock.unlock(); + } + } + + } + + if (candidates.isEmpty()) { return; } // no eviction candidates found + + // --- 2. WRITE PHASE --- // write to partition file // 1. generate a new ID for the present "partition" (file) int partitionId = _partitionCounter.getAndIncrement(); @@ -281,50 +319,54 @@ private static void evict() { int pos = 0; while(_size.get() > _limit && pos++ < _cache.size()) { System.err.println("BUFFER: "+_size+"/"+_limit+" size="+_cache.size()); - Map.Entry tmp = removeFirstFromCache(); -// candidates.add(tmp); - if (tmp == null) { break; } // cache is empty - BlockEntry entry = tmp.getValue(); + // loop over the list of blocks we collected + for (Map.Entry tmp : candidates) { + BlockEntry entry = tmp.getValue(); + + // Skip if block is null. i.e, COLD +// if (entry.value.getValue() == null) { +// synchronized (_cacheLock) { +// _cache.put(tmp.getKey(), entry); +// } +// continue; +// } + + // 1. get the current file position. this is the offset. + // flush any buffered data to the file + dos.flush(); + long offset = fos.getChannel().position(); + + // 2. write indexes and block + entry.value.getIndexes().write(dos); // write Indexes + entry.value.getValue().write(dos); + + // 3. create the spillLocation + spillLocation sloc = new spillLocation(partitionId, offset); + _spillLocations.put(tmp.getKey(), sloc); + + // 4. track file for cleanup + _streamPartitions + .computeIfAbsent(entry.streamId, k -> ConcurrentHashMap.newKeySet()) + .add(filename); + + // account for memory + long freedSize = estimateSerializedSize((MatrixBlock) tmp.getValue().value.getValue()); + totalFreedSize += freedSize; + + // 5. change state to COLD + entry.lock.lock(); + try { + entry.value.setValue(null); + entry.state = BlockState.COLD; // set state to cold, since writing to disk + entry.stateUpdate.signalAll(); // wake up any "get()" threads + } finally { + entry.lock.unlock(); + } - // Skip if block is null. i.e, COLD - if( entry.value.getValue() == null ) { synchronized (_cacheLock) { - _cache.put(tmp.getKey(), entry); + _cache.put(tmp.getKey(), entry); // add last semantic } - continue; - } - - // flush any buffered data to the file - dos.flush(); - long offset = fos.getChannel().position(); - - entry.value.getIndexes().write(dos); // write Indexes - entry.value.getValue().write(dos); - - // 3. create the spillLocation - spillLocation sloc = new spillLocation(partitionId, offset); - _spillLocations.put(tmp.getKey(), sloc); - - // 4. track file for cleanup - _streamPartitions - .computeIfAbsent(entry.streamId, k -> ConcurrentHashMap.newKeySet()) - .add(filename); - - // Evict from memory - long freedSize = estimateSerializedSize((MatrixBlock)tmp.getValue().value.getValue()); - totalFreedSize += freedSize; - entry.lock.lock(); - try { - entry.value.setValue(null); - entry.state = BlockState.COLD; // set state to cold, since writing to disk - entry.stateUpdate.signalAll(); - } finally { - entry.lock.unlock(); - } - - synchronized (_cacheLock) { - _cache.put(tmp.getKey(), entry); // add last semantic } } } @@ -335,6 +377,7 @@ private static void evict() { IOUtilFunctions.closeSilently(fos); } + // --- 3. ACCOUNTING PHASE --- if (totalFreedSize > 0) { // note the size, without evicted blocks _size.addAndGet(-totalFreedSize); } @@ -363,9 +406,6 @@ private static IndexedMatrixValue loadFromDisk(long streamId, int blockId) { MatrixIndexes ix = new MatrixIndexes(); MatrixBlock mb = new MatrixBlock(); - // inspire from existing utilities in systemds -// FileInputStream fis = null; -// FastBufferedDataInputStream din = null; try (RandomAccessFile raf = new RandomAccessFile(filename, "r")) { raf.seek(sloc.offset); @@ -375,10 +415,6 @@ private static IndexedMatrixValue loadFromDisk(long streamId, int blockId) { mb.readFields(dis); // 2. Read Block } catch (IOException ex) { throw new DMLRuntimeException("Failed to load block " + key + " from " + filename, ex); -// } finally { -// // Use the SystemDS-native close -// IOUtilFunctions.closeSilently(dis); -// IOUtilFunctions.closeSilently(fis); } } catch (IOException e) { throw new RuntimeException(e); From dc40bf02dd75bd6ffe5234cfe18c68ab144844a3 Mon Sep 17 00:00:00 2001 From: Janardhan Pulivarthi Date: Wed, 5 Nov 2025 17:31:09 +0530 Subject: [PATCH 13/14] add license header for PartitionFileManager --- .../runtime/util/PartitionFileManager.java | 25 ++++++++++++++++++- 1 file changed, 24 insertions(+), 1 deletion(-) diff --git a/src/main/java/org/apache/sysds/runtime/util/PartitionFileManager.java b/src/main/java/org/apache/sysds/runtime/util/PartitionFileManager.java index 5886598cb64..361e2ea68fa 100644 --- a/src/main/java/org/apache/sysds/runtime/util/PartitionFileManager.java +++ b/src/main/java/org/apache/sysds/runtime/util/PartitionFileManager.java @@ -1,4 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.sysds.runtime.util; -public class partitionFileManager { +/** + * reference-counted file manager for SystemDS partition files. + * Prevents deletion while concurrent readers are active. + */ +public class PartitionFileManager { } From df7ba43f36ea57226fae9787d2584c3260041f34 Mon Sep 17 00:00:00 2001 From: Janardhan Pulivarthi Date: Thu, 6 Nov 2025 09:00:20 +0530 Subject: [PATCH 14/14] fix get() null value issue due to mutation * remove while loop to remove two loops --- .../instructions/ooc/OOCEvictionManager.java | 129 +++++++++--------- 1 file changed, 61 insertions(+), 68 deletions(-) diff --git a/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCEvictionManager.java b/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCEvictionManager.java index 7e7f9352ed2..1d47ac10dce 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCEvictionManager.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCEvictionManager.java @@ -24,14 +24,11 @@ import org.apache.sysds.runtime.io.IOUtilFunctions; import org.apache.sysds.runtime.matrix.data.MatrixBlock; import org.apache.sysds.runtime.matrix.data.MatrixIndexes; -import org.apache.sysds.runtime.util.FastBufferedDataInputStream; import org.apache.sysds.runtime.util.FastBufferedDataOutputStream; import org.apache.sysds.runtime.util.LocalFileUtils; import java.io.DataInputStream; import java.io.File; -import java.io.FileInputStream; -import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.io.RandomAccessFile; @@ -39,7 +36,6 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.LinkedHashMap; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; @@ -230,7 +226,7 @@ public static IndexedMatrixValue get(long streamId, int blockId) { // 1. wait for eviction to complete while (imv.state == BlockState.EVICTING) { try { - imv.stateUpdate.wait(); + imv.stateUpdate.await(); } catch (InterruptedException e) { throw new DMLRuntimeException(e); @@ -275,19 +271,20 @@ private static void evict() { Map.Entry e = iter.next(); BlockEntry entry = e.getValue(); - entry.lock.lock(); - try { - if (entry.state == BlockState.HOT) { - entry.state = BlockState.EVICTING; - candidates.add(e); - totalFreedSize += entry.size; - - //remove current iterator entry - iter.remove(); + if (entry.lock.tryLock()) { + try { + if (entry.state == BlockState.HOT) { + entry.state = BlockState.EVICTING; + candidates.add(e); + totalFreedSize += entry.size; + + //remove current iterator entry +// iter.remove(); + } + } finally { + entry.lock.unlock(); } - } finally { - entry.lock.unlock(); - } + } // if tryLock() fails, it means a thread is loading/reading this block. we shall skip it. } } @@ -316,57 +313,42 @@ private static void evict() { fos = new FileOutputStream(filename); dos = new FastBufferedDataOutputStream(fos); - int pos = 0; - while(_size.get() > _limit && pos++ < _cache.size()) { - System.err.println("BUFFER: "+_size+"/"+_limit+" size="+_cache.size()); - - // loop over the list of blocks we collected - for (Map.Entry tmp : candidates) { - BlockEntry entry = tmp.getValue(); - - // Skip if block is null. i.e, COLD -// if (entry.value.getValue() == null) { -// synchronized (_cacheLock) { -// _cache.put(tmp.getKey(), entry); -// } -// continue; -// } - - // 1. get the current file position. this is the offset. - // flush any buffered data to the file - dos.flush(); - long offset = fos.getChannel().position(); - - // 2. write indexes and block - entry.value.getIndexes().write(dos); // write Indexes - entry.value.getValue().write(dos); - - // 3. create the spillLocation - spillLocation sloc = new spillLocation(partitionId, offset); - _spillLocations.put(tmp.getKey(), sloc); - - // 4. track file for cleanup - _streamPartitions - .computeIfAbsent(entry.streamId, k -> ConcurrentHashMap.newKeySet()) - .add(filename); - - // account for memory - long freedSize = estimateSerializedSize((MatrixBlock) tmp.getValue().value.getValue()); - totalFreedSize += freedSize; - - // 5. change state to COLD - entry.lock.lock(); - try { - entry.value.setValue(null); - entry.state = BlockState.COLD; // set state to cold, since writing to disk - entry.stateUpdate.signalAll(); // wake up any "get()" threads - } finally { - entry.lock.unlock(); - } - synchronized (_cacheLock) { - _cache.put(tmp.getKey(), entry); // add last semantic - } + // loop over the list of blocks we collected + for (Map.Entry tmp : candidates) { + BlockEntry entry = tmp.getValue(); + + // 1. get the current file position. this is the offset. + // flush any buffered data to the file + dos.flush(); + long offset = fos.getChannel().position(); + + // 2. write indexes and block + entry.value.getIndexes().write(dos); // write Indexes + entry.value.getValue().write(dos); + System.out.println("written, partition id: " + _partitions.get(partitionId) + ", offset: " + offset); + + // 3. create the spillLocation + spillLocation sloc = new spillLocation(partitionId, offset); + _spillLocations.put(tmp.getKey(), sloc); + + // 4. track file for cleanup + _streamPartitions + .computeIfAbsent(entry.streamId, k -> ConcurrentHashMap.newKeySet()) + .add(filename); + + // 5. change state to COLD + entry.lock.lock(); + try { + entry.value = null; // only release ref, don't mutate object + entry.state = BlockState.COLD; // set state to cold, since writing to disk + entry.stateUpdate.signalAll(); // wake up any "get()" threads + } finally { + entry.lock.unlock(); + } + + synchronized (_cacheLock) { + _cache.put(tmp.getKey(), entry); // add last semantic } } } @@ -425,18 +407,29 @@ private static IndexedMatrixValue loadFromDisk(long streamId, int blockId) { imvCacheEntry = _cache.get(key); } + // 2. Check if it's null (the bug you helped fix before) + if(imvCacheEntry == null) { + throw new DMLRuntimeException("Block entry " + key + " was not in cache during load."); + } + imvCacheEntry.lock.lock(); try { if (imvCacheEntry.state == BlockState.COLD) { imvCacheEntry.value = new IndexedMatrixValue(ix, mb); imvCacheEntry.state = BlockState.HOT; _size.addAndGet(imvCacheEntry.size); + + synchronized (_cacheLock) { + _cache.remove(key); + _cache.put(key, imvCacheEntry); + } } + +// evict(); // when we add the block, we shall check for limit. } finally { imvCacheEntry.lock.unlock(); } - evict(); // when we add the block, we shall check for limit. return imvCacheEntry.value; }