From b0370eaf8658ba8f5b0ccadcdaa72a7c3ae7005a Mon Sep 17 00:00:00 2001 From: Janardhan Pulivarthi Date: Wed, 13 Aug 2025 08:46:08 +0530 Subject: [PATCH 01/10] setup basic code and test setup MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit OOC°r'°_mVar0·MATRIX·FP64°_mVar1·MATRIX·FP64 --- .../ooc/TransposeSelfMMOOCInstruction.java | 142 ++++++++++++++++++ .../functions/ooc/TransposeSelfMMTest.java | 124 +++++++++++++++ .../scripts/functions/ooc/TransposeSelfMM.dml | 28 ++++ 3 files changed, 294 insertions(+) create mode 100644 src/main/java/org/apache/sysds/runtime/instructions/ooc/TransposeSelfMMOOCInstruction.java create mode 100644 src/test/java/org/apache/sysds/test/functions/ooc/TransposeSelfMMTest.java create mode 100644 src/test/scripts/functions/ooc/TransposeSelfMM.dml diff --git a/src/main/java/org/apache/sysds/runtime/instructions/ooc/TransposeSelfMMOOCInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/ooc/TransposeSelfMMOOCInstruction.java new file mode 100644 index 00000000000..0346c1e86f2 --- /dev/null +++ b/src/main/java/org/apache/sysds/runtime/instructions/ooc/TransposeSelfMMOOCInstruction.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.runtime.instructions.ooc; + +import org.apache.sysds.common.Opcodes; +import org.apache.sysds.conf.ConfigurationManager; +import org.apache.sysds.runtime.DMLRuntimeException; +import org.apache.sysds.runtime.controlprogram.caching.MatrixObject; +import org.apache.sysds.runtime.controlprogram.context.ExecutionContext; +import org.apache.sysds.runtime.controlprogram.parfor.LocalTaskQueue; +import org.apache.sysds.runtime.functionobjects.Multiply; +import org.apache.sysds.runtime.functionobjects.Plus; +import org.apache.sysds.runtime.instructions.InstructionUtils; +import org.apache.sysds.runtime.instructions.cp.CPOperand; +import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue; +import org.apache.sysds.runtime.matrix.data.MatrixBlock; +import org.apache.sysds.runtime.matrix.data.MatrixIndexes; +import org.apache.sysds.runtime.matrix.operators.AggregateBinaryOperator; +import org.apache.sysds.runtime.matrix.operators.AggregateOperator; +import org.apache.sysds.runtime.matrix.operators.BinaryOperator; +import org.apache.sysds.runtime.matrix.operators.Operator; +import org.apache.sysds.runtime.util.CommonThreadPool; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ExecutorService; + +public class TransposeSelfMMOOCInstruction extends ComputationOOCInstruction { + + + protected TransposeSelfMMOOCInstruction(OOCType type, Operator op, CPOperand in1, CPOperand in2, CPOperand out, String opcode, String istr) { + super(type, op, in1, in2, out, opcode, istr); + } + + public static TransposeSelfMMOOCInstruction parseInstruction(String str) { + String[] parts = InstructionUtils.getInstructionPartsWithValueType(str); + InstructionUtils.checkNumFields(parts, 4); + String opcode = parts[0]; + CPOperand in1 = new CPOperand(parts[1]); // the larget matrix (streamed) + CPOperand in2 = new CPOperand(parts[2]); // the small vector (in-memory) + CPOperand out = new CPOperand(parts[3]); + + AggregateOperator agg = new AggregateOperator(0, Plus.getPlusFnObject()); + AggregateBinaryOperator ba = new AggregateBinaryOperator(Multiply.getMultiplyFnObject(), agg); + + return new TransposeSelfMMOOCInstruction(OOCType.MAPMM, ba, in1, in2, out, opcode, str); + } + + @Override + public void processInstruction( ExecutionContext ec ) { + // 1. Identify the inputs + MatrixObject min = ec.getMatrixObject(input1); // big matrix + MatrixBlock vin = ec.getMatrixObject(input2) + .acquireReadAndRelease(); // in-memory vector + + // 2. Pre-partition the in-memory vector into a hashmap + HashMap partitionedVector = new HashMap<>(); + int blksize = vin.getDataCharacteristics().getBlocksize(); + if (blksize < 0) + blksize = ConfigurationManager.getBlocksize(); + for (int i=0; i qIn = min.getStreamHandle(); + LocalTaskQueue qOut = new LocalTaskQueue<>(); + BinaryOperator plus = InstructionUtils.parseBinaryOperator(Opcodes.PLUS.toString()); + ec.getMatrixObject(output).setStreamHandle(qOut); + + ExecutorService pool = CommonThreadPool.get(); + try { + // Core logic: background thread + pool.submit(() -> { + IndexedMatrixValue tmp = null; + try { + HashMap partialResults = new HashMap<>(); + while((tmp = qIn.dequeueTask()) != LocalTaskQueue.NO_MORE_TASKS) { + MatrixBlock matrixBlock = (MatrixBlock) tmp.getValue(); + long rowIndex = tmp.getIndexes().getRowIndex(); + long colIndex = tmp.getIndexes().getColumnIndex(); + MatrixBlock vectorSlice = partitionedVector.get(colIndex); + + // Now, call the operation with the correct, specific operator. + MatrixBlock partialResult = matrixBlock.aggregateBinaryOperations( + matrixBlock, vectorSlice, new MatrixBlock(), (AggregateBinaryOperator) _optr); + + // for single column block, no aggregation neeeded + if( min.getNumColumns() <= min.getBlocksize() ) { + qOut.enqueueTask(new IndexedMatrixValue(tmp.getIndexes(), partialResult)); + } + else { + MatrixBlock currAgg = partialResults.get(rowIndex); + if (currAgg == null) + partialResults.put(rowIndex, partialResult); + else + currAgg.binaryOperationsInPlace(plus, partialResult); + } + } + + // emit aggregated blocks + if( min.getNumColumns() > min.getBlocksize() ) { + for (Map.Entry entry : partialResults.entrySet()) { + MatrixIndexes outIndexes = new MatrixIndexes(entry.getKey(), 1L); + qOut.enqueueTask(new IndexedMatrixValue(outIndexes, entry.getValue())); + } + } + } + catch(Exception ex) { + throw new DMLRuntimeException(ex); + } + finally { + qOut.closeInput(); + } + }); + } catch (Exception e) { + throw new DMLRuntimeException(e); + } + finally { + pool.shutdown(); + } + } +} diff --git a/src/test/java/org/apache/sysds/test/functions/ooc/TransposeSelfMMTest.java b/src/test/java/org/apache/sysds/test/functions/ooc/TransposeSelfMMTest.java new file mode 100644 index 00000000000..803e6364b47 --- /dev/null +++ b/src/test/java/org/apache/sysds/test/functions/ooc/TransposeSelfMMTest.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.test.functions.ooc; + +import org.apache.sysds.common.Types; +import org.apache.sysds.runtime.io.MatrixWriter; +import org.apache.sysds.runtime.io.MatrixWriterFactory; +import org.apache.sysds.runtime.matrix.data.MatrixBlock; +import org.apache.sysds.runtime.meta.MatrixCharacteristics; +import org.apache.sysds.runtime.util.DataConverter; +import org.apache.sysds.runtime.util.HDFSTool; +import org.apache.sysds.test.AutomatedTestBase; +import org.apache.sysds.test.TestConfiguration; +import org.apache.sysds.test.TestUtils; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; + +public class TransposeSelfMMTest extends AutomatedTestBase { + private final static String TEST_NAME1 = "TransposeSelfMM"; + private final static String TEST_DIR = "functions/ooc/"; + private final static String TEST_CLASS_DIR = TEST_DIR + TransposeSelfMMTest.class.getSimpleName() + "/"; + private final static double eps = 1e-10; + private static final String INPUT_NAME = "X"; + private static final String OUTPUT_NAME = "res"; + + private final static int rows = 5000; + private final static int cols_wide = 2000; + private final static int cols_skinny = 500; + + private final static double sparsity1 = 0.7; + private final static double sparsity2 = 0.1; + + @Override + public void setUp() { + TestUtils.clearAssertionInformation(); + TestConfiguration config = new TestConfiguration(TEST_CLASS_DIR, TEST_NAME1); + addTestConfiguration(TEST_NAME1, config); + } + + @Test + public void testTransposeSelfMatrixMultiplication1() { + runMatrixVectorMultiplicationTest(cols_wide, false); + } + + @Test + public void testTransposeSelfMatrixMultiplication2() { + runMatrixVectorMultiplicationTest(cols_skinny, false); + } + + private void runMatrixVectorMultiplicationTest(int cols, boolean sparse ) + { + Types.ExecMode platformOld = setExecMode(Types.ExecMode.SINGLE_NODE); + + try + { + getAndLoadTestConfiguration(TEST_NAME1); + String HOME = SCRIPT_DIR + TEST_DIR; + fullDMLScriptName = HOME + TEST_NAME1 + ".dml"; + programArgs = new String[]{"-explain", "-stats", "-ooc", + "-args", input(INPUT_NAME), output(OUTPUT_NAME)}; + + // 1. Generate the data in-memory as MatrixBlock objects + double[][] A_data = getRandomMatrix(rows, cols, 0, 1, sparse?sparsity2:sparsity1, 10); + + // 2. Convert the double arrays to MatrixBlock objects + MatrixBlock A_mb = DataConverter.convertToMatrixBlock(A_data); + + // 3. Create a binary matrix writer + MatrixWriter writer = MatrixWriterFactory.createMatrixWriter(Types.FileFormat.BINARY); + + // 4. Write matrix A to a binary SequenceFile + writer.writeMatrixToHDFS(A_mb, input(INPUT_NAME), rows, cols, 1000, A_mb.getNonZeros()); + HDFSTool.writeMetaDataFile(input(INPUT_NAME + ".mtd"), Types.ValueType.FP64, + new MatrixCharacteristics(rows, cols, 1000, A_mb.getNonZeros()), Types.FileFormat.BINARY); + + boolean exceptionExpected = false; + runTest(true, exceptionExpected, null, -1); + +// double[][] C1 = readMatrix(output(OUTPUT_NAME), Types.FileFormat.BINARY, rows, cols, 1000, 1000); +// double result = 0.0; +// for(int i = 0; i < rows; i++) { // verify the results with Java +// double expected = 0.0; +// for(int j = 0; j < cols; j++) { +// expected += A_mb.get(i, j) * x_mb.get(j,0); +// } +// result = C1[i][0]; +// Assert.assertEquals(expected, result, eps); +// } + } + catch (IOException e) { + throw new RuntimeException(e); + } + finally { + resetExecMode(platformOld); + } + } + + private static double[][] readMatrix(String fname, Types.FileFormat fmt, long rows, long cols, int brows, int bcols ) + throws IOException + { + MatrixBlock mb = DataConverter.readMatrixFromHDFS(fname, fmt, rows, cols, brows, bcols); + double[][] C = DataConverter.convertToDoubleMatrix(mb); + return C; + } +} diff --git a/src/test/scripts/functions/ooc/TransposeSelfMM.dml b/src/test/scripts/functions/ooc/TransposeSelfMM.dml new file mode 100644 index 00000000000..432d2d9daab --- /dev/null +++ b/src/test/scripts/functions/ooc/TransposeSelfMM.dml @@ -0,0 +1,28 @@ +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- + +# Read input matrix and operator from command line args +X = read($1); + +# Operation under test +res = t(X) %*% X; + +write(res, $2, format="binary") From f51fad6a6210f8c0e42e1f03564426fd22482b63 Mon Sep 17 00:00:00 2001 From: Janardhan Pulivarthi Date: Wed, 13 Aug 2025 08:58:25 +0530 Subject: [PATCH 02/10] use ooc r' in the oocparser --- .../runtime/instructions/OOCInstructionParser.java | 3 +++ .../runtime/instructions/ooc/OOCInstruction.java | 2 +- .../ooc/TransposeSelfMMOOCInstruction.java | 11 +++++------ 3 files changed, 9 insertions(+), 7 deletions(-) diff --git a/src/main/java/org/apache/sysds/runtime/instructions/OOCInstructionParser.java b/src/main/java/org/apache/sysds/runtime/instructions/OOCInstructionParser.java index 9b1165b819b..f6a48b39912 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/OOCInstructionParser.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/OOCInstructionParser.java @@ -29,6 +29,7 @@ import org.apache.sysds.runtime.instructions.ooc.ReblockOOCInstruction; import org.apache.sysds.runtime.instructions.ooc.UnaryOOCInstruction; import org.apache.sysds.runtime.instructions.ooc.MatrixVectorBinaryOOCInstruction; +import org.apache.sysds.runtime.instructions.ooc.TransposeSelfMMOOCInstruction; public class OOCInstructionParser extends InstructionParser { protected static final Log LOG = LogFactory.getLog(OOCInstructionParser.class.getName()); @@ -60,6 +61,8 @@ public static OOCInstruction parseSingleInstruction(InstructionType ooctype, Str case AggregateBinary: case MAPMM: return MatrixVectorBinaryOOCInstruction.parseInstruction(str); + case Reorg: + return TransposeSelfMMOOCInstruction.parseInstruction(str); default: throw new DMLRuntimeException("Invalid OOC Instruction Type: " + ooctype); diff --git a/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCInstruction.java index d3c2dfcbd77..8ac8ca1625d 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCInstruction.java @@ -30,7 +30,7 @@ public abstract class OOCInstruction extends Instruction { protected static final Log LOG = LogFactory.getLog(OOCInstruction.class.getName()); public enum OOCType { - Reblock, AggregateUnary, Binary, Unary, MAPMM, AggregateBinary + Reblock, AggregateUnary, Binary, Unary, MAPMM, AggregateBinary, Reorg } protected final OOCInstruction.OOCType _ooctype; diff --git a/src/main/java/org/apache/sysds/runtime/instructions/ooc/TransposeSelfMMOOCInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/ooc/TransposeSelfMMOOCInstruction.java index 0346c1e86f2..a01537951fb 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/ooc/TransposeSelfMMOOCInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/ooc/TransposeSelfMMOOCInstruction.java @@ -45,22 +45,21 @@ public class TransposeSelfMMOOCInstruction extends ComputationOOCInstruction { - protected TransposeSelfMMOOCInstruction(OOCType type, Operator op, CPOperand in1, CPOperand in2, CPOperand out, String opcode, String istr) { - super(type, op, in1, in2, out, opcode, istr); + protected TransposeSelfMMOOCInstruction(OOCType type, Operator op, CPOperand in1, CPOperand out, String opcode, String istr) { + super(type, op, in1, out, opcode, istr); } public static TransposeSelfMMOOCInstruction parseInstruction(String str) { String[] parts = InstructionUtils.getInstructionPartsWithValueType(str); - InstructionUtils.checkNumFields(parts, 4); + InstructionUtils.checkNumFields(parts, 3); String opcode = parts[0]; CPOperand in1 = new CPOperand(parts[1]); // the larget matrix (streamed) - CPOperand in2 = new CPOperand(parts[2]); // the small vector (in-memory) - CPOperand out = new CPOperand(parts[3]); + CPOperand out = new CPOperand(parts[2]); AggregateOperator agg = new AggregateOperator(0, Plus.getPlusFnObject()); AggregateBinaryOperator ba = new AggregateBinaryOperator(Multiply.getMultiplyFnObject(), agg); - return new TransposeSelfMMOOCInstruction(OOCType.MAPMM, ba, in1, in2, out, opcode, str); + return new TransposeSelfMMOOCInstruction(OOCType.Reorg, ba, in1, out, opcode, str); } @Override From 91333057a02839210dbf8242dab0f673bafd6959 Mon Sep 17 00:00:00 2001 From: Janardhan Pulivarthi Date: Wed, 13 Aug 2025 20:36:57 +0530 Subject: [PATCH 03/10] identify tsmm ooc op --- .../org/apache/sysds/hops/AggBinaryOp.java | 26 ++++++++++++++----- .../ooc/TransposeSelfMMOOCInstruction.java | 2 +- 2 files changed, 20 insertions(+), 8 deletions(-) diff --git a/src/main/java/org/apache/sysds/hops/AggBinaryOp.java b/src/main/java/org/apache/sysds/hops/AggBinaryOp.java index fb20cc41d01..63c6db1b8e6 100644 --- a/src/main/java/org/apache/sysds/hops/AggBinaryOp.java +++ b/src/main/java/org/apache/sysds/hops/AggBinaryOp.java @@ -241,13 +241,25 @@ public Lop constructLops() { throw new HopsException(this.printErrorLocation() + "Invalid Matrix Mult Method (" + _method + ") while constructing SPARK lops."); } } else if (et == ExecType.OOC) { - Lop in1 = getInput().get(0).constructLops(); - Lop in2 = getInput().get(1).constructLops(); - MatMultCP matmult = new MatMultCP(in1, in2, getDataType(), getValueType(), - et, OptimizerUtils.getConstrainedNumThreads(_maxNumThreads)); - setOutputDimensions(matmult); - setLineNumbers(matmult); - setLops(matmult); + _method = optFindMMultMethodCP(input1.getDim1(), input1.getDim2(), + input2.getDim1(), input2.getDim2(), mmtsj, chain, _hasLeftPMInput); + + switch (_method) { + case TSMM: + constructCPLopsTSMM(mmtsj, et); + break; + case MM: + Lop in1 = getInput().get(0).constructLops(); + Lop in2 = getInput().get(1).constructLops(); + MatMultCP matmult = new MatMultCP(in1, in2, getDataType(), getValueType(), + et, OptimizerUtils.getConstrainedNumThreads(_maxNumThreads)); + setOutputDimensions(matmult); + setLineNumbers(matmult); + setLops(matmult); + break; + default: + throw new HopsException(this.printErrorLocation() + "Invalid Matrix Mult Method (" + _method + ") while constructing CP lops."); + } } } else throw new HopsException(this.printErrorLocation() + "Invalid operation in AggBinary Hop, aggBin(" + innerOp + "," + outerOp + ") while constructing lops."); diff --git a/src/main/java/org/apache/sysds/runtime/instructions/ooc/TransposeSelfMMOOCInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/ooc/TransposeSelfMMOOCInstruction.java index a01537951fb..66bc6a9cc32 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/ooc/TransposeSelfMMOOCInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/ooc/TransposeSelfMMOOCInstruction.java @@ -51,7 +51,7 @@ protected TransposeSelfMMOOCInstruction(OOCType type, Operator op, CPOperand in1 public static TransposeSelfMMOOCInstruction parseInstruction(String str) { String[] parts = InstructionUtils.getInstructionPartsWithValueType(str); - InstructionUtils.checkNumFields(parts, 3); + InstructionUtils.checkNumFields(parts, 2); String opcode = parts[0]; CPOperand in1 = new CPOperand(parts[1]); // the larget matrix (streamed) CPOperand out = new CPOperand(parts[2]); From d53e1de825732c78642dc89bcc2cef7358aa6eb8 Mon Sep 17 00:00:00 2001 From: Janardhan Pulivarthi Date: Wed, 13 Aug 2025 22:01:07 +0530 Subject: [PATCH 04/10] use mmtsj --- .../runtime/instructions/OOCInstructionParser.java | 3 ++- .../runtime/instructions/ooc/OOCInstruction.java | 2 +- .../ooc/TransposeSelfMMOOCInstruction.java | 12 +++++++++--- 3 files changed, 12 insertions(+), 5 deletions(-) diff --git a/src/main/java/org/apache/sysds/runtime/instructions/OOCInstructionParser.java b/src/main/java/org/apache/sysds/runtime/instructions/OOCInstructionParser.java index f6a48b39912..1d12b076b0d 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/OOCInstructionParser.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/OOCInstructionParser.java @@ -61,7 +61,8 @@ public static OOCInstruction parseSingleInstruction(InstructionType ooctype, Str case AggregateBinary: case MAPMM: return MatrixVectorBinaryOOCInstruction.parseInstruction(str); - case Reorg: + case TSMM: + case MMTSJ: return TransposeSelfMMOOCInstruction.parseInstruction(str); default: diff --git a/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCInstruction.java index 8ac8ca1625d..869fff4492a 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCInstruction.java @@ -30,7 +30,7 @@ public abstract class OOCInstruction extends Instruction { protected static final Log LOG = LogFactory.getLog(OOCInstruction.class.getName()); public enum OOCType { - Reblock, AggregateUnary, Binary, Unary, MAPMM, AggregateBinary, Reorg + Reblock, AggregateUnary, Binary, Unary, MAPMM, AggregateBinary, TSMM, MMTSJ } protected final OOCInstruction.OOCType _ooctype; diff --git a/src/main/java/org/apache/sysds/runtime/instructions/ooc/TransposeSelfMMOOCInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/ooc/TransposeSelfMMOOCInstruction.java index 66bc6a9cc32..9a46ee71d49 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/ooc/TransposeSelfMMOOCInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/ooc/TransposeSelfMMOOCInstruction.java @@ -21,6 +21,8 @@ import org.apache.sysds.common.Opcodes; import org.apache.sysds.conf.ConfigurationManager; +import org.apache.sysds.lops.MMTSJ; +import org.apache.sysds.lops.MMTSJ.MMTSJType; import org.apache.sysds.runtime.DMLRuntimeException; import org.apache.sysds.runtime.controlprogram.caching.MatrixObject; import org.apache.sysds.runtime.controlprogram.context.ExecutionContext; @@ -29,6 +31,7 @@ import org.apache.sysds.runtime.functionobjects.Plus; import org.apache.sysds.runtime.instructions.InstructionUtils; import org.apache.sysds.runtime.instructions.cp.CPOperand; +import org.apache.sysds.runtime.instructions.cp.MMTSJCPInstruction; import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue; import org.apache.sysds.runtime.matrix.data.MatrixBlock; import org.apache.sysds.runtime.matrix.data.MatrixIndexes; @@ -44,22 +47,25 @@ public class TransposeSelfMMOOCInstruction extends ComputationOOCInstruction { + private final MMTSJType _tstype; - protected TransposeSelfMMOOCInstruction(OOCType type, Operator op, CPOperand in1, CPOperand out, String opcode, String istr) { + protected TransposeSelfMMOOCInstruction(OOCType type, Operator op, CPOperand in1, CPOperand out, MMTSJType tstype, String opcode, String istr) { super(type, op, in1, out, opcode, istr); + _tstype = tstype; } public static TransposeSelfMMOOCInstruction parseInstruction(String str) { String[] parts = InstructionUtils.getInstructionPartsWithValueType(str); - InstructionUtils.checkNumFields(parts, 2); + InstructionUtils.checkNumFields(parts, 3); String opcode = parts[0]; CPOperand in1 = new CPOperand(parts[1]); // the larget matrix (streamed) CPOperand out = new CPOperand(parts[2]); + MMTSJType tstype = MMTSJType.valueOf(parts[3]); AggregateOperator agg = new AggregateOperator(0, Plus.getPlusFnObject()); AggregateBinaryOperator ba = new AggregateBinaryOperator(Multiply.getMultiplyFnObject(), agg); - return new TransposeSelfMMOOCInstruction(OOCType.Reorg, ba, in1, out, opcode, str); + return new TransposeSelfMMOOCInstruction(OOCType.TSMM, ba, in1, out, tstype, opcode, str); } @Override From c38dc91a7f8c0cd966369c8af7e7ac79bc8d1a72 Mon Sep 17 00:00:00 2001 From: Janardhan Pulivarthi Date: Thu, 14 Aug 2025 07:31:35 +0530 Subject: [PATCH 05/10] plan created with tsmm Heavy hitter instructions: # Instruction Time(s) Count 1 write 0.231 1 2 createvar 0.038 3 3 ooc_rblk 0.002 1 4 rmvar 0.001 2 5 ooc_tsmm 0.000 1 --- .../ooc/TransposeSelfMMOOCInstruction.java | 46 ++----------------- 1 file changed, 3 insertions(+), 43 deletions(-) diff --git a/src/main/java/org/apache/sysds/runtime/instructions/ooc/TransposeSelfMMOOCInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/ooc/TransposeSelfMMOOCInstruction.java index 9a46ee71d49..a1a0a45abe5 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/ooc/TransposeSelfMMOOCInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/ooc/TransposeSelfMMOOCInstruction.java @@ -71,25 +71,10 @@ public static TransposeSelfMMOOCInstruction parseInstruction(String str) { @Override public void processInstruction( ExecutionContext ec ) { // 1. Identify the inputs - MatrixObject min = ec.getMatrixObject(input1); // big matrix - MatrixBlock vin = ec.getMatrixObject(input2) - .acquireReadAndRelease(); // in-memory vector - - // 2. Pre-partition the in-memory vector into a hashmap - HashMap partitionedVector = new HashMap<>(); - int blksize = vin.getDataCharacteristics().getBlocksize(); - if (blksize < 0) - blksize = ConfigurationManager.getBlocksize(); - for (int i=0; i qIn = min.getStreamHandle(); LocalTaskQueue qOut = new LocalTaskQueue<>(); - BinaryOperator plus = InstructionUtils.parseBinaryOperator(Opcodes.PLUS.toString()); ec.getMatrixObject(output).setStreamHandle(qOut); ExecutorService pool = CommonThreadPool.get(); @@ -98,36 +83,11 @@ public void processInstruction( ExecutionContext ec ) { pool.submit(() -> { IndexedMatrixValue tmp = null; try { - HashMap partialResults = new HashMap<>(); - while((tmp = qIn.dequeueTask()) != LocalTaskQueue.NO_MORE_TASKS) { + HashMap partialResults = new HashMap<>(); + while ((tmp = qIn.dequeueTask()) != LocalTaskQueue.NO_MORE_TASKS) { MatrixBlock matrixBlock = (MatrixBlock) tmp.getValue(); long rowIndex = tmp.getIndexes().getRowIndex(); long colIndex = tmp.getIndexes().getColumnIndex(); - MatrixBlock vectorSlice = partitionedVector.get(colIndex); - - // Now, call the operation with the correct, specific operator. - MatrixBlock partialResult = matrixBlock.aggregateBinaryOperations( - matrixBlock, vectorSlice, new MatrixBlock(), (AggregateBinaryOperator) _optr); - - // for single column block, no aggregation neeeded - if( min.getNumColumns() <= min.getBlocksize() ) { - qOut.enqueueTask(new IndexedMatrixValue(tmp.getIndexes(), partialResult)); - } - else { - MatrixBlock currAgg = partialResults.get(rowIndex); - if (currAgg == null) - partialResults.put(rowIndex, partialResult); - else - currAgg.binaryOperationsInPlace(plus, partialResult); - } - } - - // emit aggregated blocks - if( min.getNumColumns() > min.getBlocksize() ) { - for (Map.Entry entry : partialResults.entrySet()) { - MatrixIndexes outIndexes = new MatrixIndexes(entry.getKey(), 1L); - qOut.enqueueTask(new IndexedMatrixValue(outIndexes, entry.getValue())); - } } } catch(Exception ex) { From 8d5d331952317ff37a2b52b15fbd99b21e709198 Mon Sep 17 00:00:00 2001 From: Janardhan Pulivarthi Date: Thu, 14 Aug 2025 07:44:37 +0530 Subject: [PATCH 06/10] do calculation and write to disk --- .../ooc/TransposeSelfMMOOCInstruction.java | 12 ++++++++++-- .../test/functions/ooc/TransposeSelfMMTest.java | 2 +- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/src/main/java/org/apache/sysds/runtime/instructions/ooc/TransposeSelfMMOOCInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/ooc/TransposeSelfMMOOCInstruction.java index a1a0a45abe5..848568d5fd8 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/ooc/TransposeSelfMMOOCInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/ooc/TransposeSelfMMOOCInstruction.java @@ -72,11 +72,15 @@ public static TransposeSelfMMOOCInstruction parseInstruction(String str) { public void processInstruction( ExecutionContext ec ) { // 1. Identify the inputs MatrixObject min = ec.getMatrixObject(input1); + long cols = min.getNumColumns(); LocalTaskQueue qIn = min.getStreamHandle(); LocalTaskQueue qOut = new LocalTaskQueue<>(); + BinaryOperator plus = InstructionUtils.parseBinaryOperator(Opcodes.PLUS.toString()); ec.getMatrixObject(output).setStreamHandle(qOut); + MatrixBlock result = new MatrixBlock((int)cols, (int)cols, false); + ExecutorService pool = CommonThreadPool.get(); try { // Core logic: background thread @@ -86,8 +90,12 @@ public void processInstruction( ExecutionContext ec ) { HashMap partialResults = new HashMap<>(); while ((tmp = qIn.dequeueTask()) != LocalTaskQueue.NO_MORE_TASKS) { MatrixBlock matrixBlock = (MatrixBlock) tmp.getValue(); - long rowIndex = tmp.getIndexes().getRowIndex(); - long colIndex = tmp.getIndexes().getColumnIndex(); +// long rowIndex = tmp.getIndexes().getRowIndex(); +// long colIndex = tmp.getIndexes().getColumnIndex(); + + MatrixBlock partialResult = matrixBlock.transposeSelfMatrixMultOperations(new MatrixBlock(), _tstype); +// result.binaryOperationsInPlace(plus, partialResult); + qOut.enqueueTask(new IndexedMatrixValue(tmp.getIndexes(), partialResult)); } } catch(Exception ex) { diff --git a/src/test/java/org/apache/sysds/test/functions/ooc/TransposeSelfMMTest.java b/src/test/java/org/apache/sysds/test/functions/ooc/TransposeSelfMMTest.java index 803e6364b47..4978d33bbeb 100644 --- a/src/test/java/org/apache/sysds/test/functions/ooc/TransposeSelfMMTest.java +++ b/src/test/java/org/apache/sysds/test/functions/ooc/TransposeSelfMMTest.java @@ -95,7 +95,7 @@ private void runMatrixVectorMultiplicationTest(int cols, boolean sparse ) boolean exceptionExpected = false; runTest(true, exceptionExpected, null, -1); -// double[][] C1 = readMatrix(output(OUTPUT_NAME), Types.FileFormat.BINARY, rows, cols, 1000, 1000); + double[][] C1 = readMatrix(output(OUTPUT_NAME), Types.FileFormat.BINARY, rows, cols, 1000, 1000); // double result = 0.0; // for(int i = 0; i < rows; i++) { // verify the results with Java // double expected = 0.0; From e2fcaf471346718e5660d237918597f142c921dd Mon Sep 17 00:00:00 2001 From: Janardhan Pulivarthi Date: Thu, 14 Aug 2025 07:56:52 +0530 Subject: [PATCH 07/10] add test for values * also check for OOC instruction prefix --- .../functions/ooc/TransposeSelfMMTest.java | 28 +++++++++++++------ 1 file changed, 19 insertions(+), 9 deletions(-) diff --git a/src/test/java/org/apache/sysds/test/functions/ooc/TransposeSelfMMTest.java b/src/test/java/org/apache/sysds/test/functions/ooc/TransposeSelfMMTest.java index 4978d33bbeb..488ab43b22a 100644 --- a/src/test/java/org/apache/sysds/test/functions/ooc/TransposeSelfMMTest.java +++ b/src/test/java/org/apache/sysds/test/functions/ooc/TransposeSelfMMTest.java @@ -19,7 +19,9 @@ package org.apache.sysds.test.functions.ooc; +import org.apache.sysds.common.Opcodes; import org.apache.sysds.common.Types; +import org.apache.sysds.runtime.instructions.Instruction; import org.apache.sysds.runtime.io.MatrixWriter; import org.apache.sysds.runtime.io.MatrixWriterFactory; import org.apache.sysds.runtime.matrix.data.MatrixBlock; @@ -96,15 +98,23 @@ private void runMatrixVectorMultiplicationTest(int cols, boolean sparse ) runTest(true, exceptionExpected, null, -1); double[][] C1 = readMatrix(output(OUTPUT_NAME), Types.FileFormat.BINARY, rows, cols, 1000, 1000); -// double result = 0.0; -// for(int i = 0; i < rows; i++) { // verify the results with Java -// double expected = 0.0; -// for(int j = 0; j < cols; j++) { -// expected += A_mb.get(i, j) * x_mb.get(j,0); -// } -// result = C1[i][0]; -// Assert.assertEquals(expected, result, eps); -// } + double result = 0.0; + for(int i = 0; i < cols; i++) { // verify the results with Java + double expected = 0.0; + for(int j = 0; j < cols; j++) { + for (int k = 0; k < rows; k++) { + expected += A_mb.get(k, i) * A_mb.get(k, j); + } + result = C1[i][j]; + Assert.assertEquals(expected, result, eps); + } + } + + String prefix = Instruction.OOC_INST_PREFIX; + Assert.assertTrue("OOC wasn't used for RBLK", + heavyHittersContainsString(prefix + Opcodes.RBLK)); + Assert.assertTrue("OOC wasn't used for TSMM", + heavyHittersContainsString(prefix + Opcodes.TSMM)); } catch (IOException e) { throw new RuntimeException(e); From 61b3ce2b081ae8810cf20176a5fb98b90d43a5eb Mon Sep 17 00:00:00 2001 From: Janardhan Pulivarthi Date: Thu, 14 Aug 2025 08:20:39 +0530 Subject: [PATCH 08/10] results are incorrect --- .../ooc/TransposeSelfMMOOCInstruction.java | 29 ++++++++++++++++--- .../functions/ooc/TransposeSelfMMTest.java | 4 +-- 2 files changed, 27 insertions(+), 6 deletions(-) diff --git a/src/main/java/org/apache/sysds/runtime/instructions/ooc/TransposeSelfMMOOCInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/ooc/TransposeSelfMMOOCInstruction.java index 848568d5fd8..11537e1be91 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/ooc/TransposeSelfMMOOCInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/ooc/TransposeSelfMMOOCInstruction.java @@ -79,7 +79,7 @@ public void processInstruction( ExecutionContext ec ) { BinaryOperator plus = InstructionUtils.parseBinaryOperator(Opcodes.PLUS.toString()); ec.getMatrixObject(output).setStreamHandle(qOut); - MatrixBlock result = new MatrixBlock((int)cols, (int)cols, false); +// MatrixBlock result = new MatrixBlock((int)cols, (int)cols, false); ExecutorService pool = CommonThreadPool.get(); try { @@ -90,12 +90,33 @@ public void processInstruction( ExecutionContext ec ) { HashMap partialResults = new HashMap<>(); while ((tmp = qIn.dequeueTask()) != LocalTaskQueue.NO_MORE_TASKS) { MatrixBlock matrixBlock = (MatrixBlock) tmp.getValue(); -// long rowIndex = tmp.getIndexes().getRowIndex(); + long rowIndex = tmp.getIndexes().getRowIndex(); // long colIndex = tmp.getIndexes().getColumnIndex(); MatrixBlock partialResult = matrixBlock.transposeSelfMatrixMultOperations(new MatrixBlock(), _tstype); -// result.binaryOperationsInPlace(plus, partialResult); - qOut.enqueueTask(new IndexedMatrixValue(tmp.getIndexes(), partialResult)); + + + // for single column block + if ( min.getNumColumns() > min.getBlocksize() ) { + qOut.enqueueTask(new IndexedMatrixValue(tmp.getIndexes(), partialResult)); + } else { + MatrixBlock currAgg = partialResults.get(rowIndex); + if (currAgg == null) { + partialResults.put(rowIndex, partialResult); + } else { + currAgg.binaryOperationsInPlace(plus, partialResult); + } + + } + + // emit aggregated blocks + if( min.getNumColumns() > min.getBlocksize() ) { + for (Map.Entry entry : partialResults.entrySet()) { + MatrixIndexes outIndexes = new MatrixIndexes(entry.getKey(), 1L); + qOut.enqueueTask(new IndexedMatrixValue(outIndexes, entry.getValue())); + } + } + } } catch(Exception ex) { diff --git a/src/test/java/org/apache/sysds/test/functions/ooc/TransposeSelfMMTest.java b/src/test/java/org/apache/sysds/test/functions/ooc/TransposeSelfMMTest.java index 488ab43b22a..cf70586eab8 100644 --- a/src/test/java/org/apache/sysds/test/functions/ooc/TransposeSelfMMTest.java +++ b/src/test/java/org/apache/sysds/test/functions/ooc/TransposeSelfMMTest.java @@ -100,13 +100,13 @@ private void runMatrixVectorMultiplicationTest(int cols, boolean sparse ) double[][] C1 = readMatrix(output(OUTPUT_NAME), Types.FileFormat.BINARY, rows, cols, 1000, 1000); double result = 0.0; for(int i = 0; i < cols; i++) { // verify the results with Java - double expected = 0.0; for(int j = 0; j < cols; j++) { + double expected = 0.0; for (int k = 0; k < rows; k++) { expected += A_mb.get(k, i) * A_mb.get(k, j); } result = C1[i][j]; - Assert.assertEquals(expected, result, eps); + Assert.assertEquals( "value mismatch at cell ("+i+","+j+")",expected, result, eps); } } From 56fb0b2024ef9e2432b53103373c68327d91e673 Mon Sep 17 00:00:00 2001 From: Janardhan Pulivarthi Date: Thu, 14 Aug 2025 08:46:07 +0530 Subject: [PATCH 09/10] tsmm is a terminal operation --- .../ooc/TransposeSelfMMOOCInstruction.java | 71 ++++++------------- 1 file changed, 21 insertions(+), 50 deletions(-) diff --git a/src/main/java/org/apache/sysds/runtime/instructions/ooc/TransposeSelfMMOOCInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/ooc/TransposeSelfMMOOCInstruction.java index 11537e1be91..76b6a9c9123 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/ooc/TransposeSelfMMOOCInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/ooc/TransposeSelfMMOOCInstruction.java @@ -75,62 +75,33 @@ public void processInstruction( ExecutionContext ec ) { long cols = min.getNumColumns(); LocalTaskQueue qIn = min.getStreamHandle(); - LocalTaskQueue qOut = new LocalTaskQueue<>(); BinaryOperator plus = InstructionUtils.parseBinaryOperator(Opcodes.PLUS.toString()); - ec.getMatrixObject(output).setStreamHandle(qOut); -// MatrixBlock result = new MatrixBlock((int)cols, (int)cols, false); + // 1. create an empty accumulator for the result + MatrixBlock result = new MatrixBlock((int)cols, (int)cols, false); - ExecutorService pool = CommonThreadPool.get(); + IndexedMatrixValue tmp = null; try { - // Core logic: background thread - pool.submit(() -> { - IndexedMatrixValue tmp = null; - try { - HashMap partialResults = new HashMap<>(); - while ((tmp = qIn.dequeueTask()) != LocalTaskQueue.NO_MORE_TASKS) { - MatrixBlock matrixBlock = (MatrixBlock) tmp.getValue(); - long rowIndex = tmp.getIndexes().getRowIndex(); -// long colIndex = tmp.getIndexes().getColumnIndex(); - - MatrixBlock partialResult = matrixBlock.transposeSelfMatrixMultOperations(new MatrixBlock(), _tstype); - - - // for single column block - if ( min.getNumColumns() > min.getBlocksize() ) { - qOut.enqueueTask(new IndexedMatrixValue(tmp.getIndexes(), partialResult)); - } else { - MatrixBlock currAgg = partialResults.get(rowIndex); - if (currAgg == null) { - partialResults.put(rowIndex, partialResult); - } else { - currAgg.binaryOperationsInPlace(plus, partialResult); - } - - } - - // emit aggregated blocks - if( min.getNumColumns() > min.getBlocksize() ) { - for (Map.Entry entry : partialResults.entrySet()) { - MatrixIndexes outIndexes = new MatrixIndexes(entry.getKey(), 1L); - qOut.enqueueTask(new IndexedMatrixValue(outIndexes, entry.getValue())); - } - } - - } - } - catch(Exception ex) { - throw new DMLRuntimeException(ex); - } - finally { - qOut.closeInput(); - } - }); - } catch (Exception e) { - throw new DMLRuntimeException(e); + // 2. consume a stream of X blocks synchronously on main thread + while ((tmp = qIn.dequeueTask()) != LocalTaskQueue.NO_MORE_TASKS) { + MatrixBlock matrixBlock = (MatrixBlock) tmp.getValue(); + + // 3. compute the local transpose self: t(block) %*% block + MatrixBlock partialResult = matrixBlock.transposeSelfMatrixMultOperations(new MatrixBlock(), _tstype); + + // 4. aggregate the partial result into final accumulator block + result.binaryOperationsInPlace(plus, partialResult); + + } + // 5. once the stream is exhausted, set the final, aggregated block as the output + ec.setMatrixOutput(output.getName(), result); // single in-memory matrix block + } + catch(Exception ex) { + throw new DMLRuntimeException(ex); } finally { - pool.shutdown(); + ec.releaseMatrixInput(input1.getName()); } + } } From 817d511e6100727f9fdbb3590a7b334d521090cb Mon Sep 17 00:00:00 2001 From: Janardhan Pulivarthi Date: Fri, 15 Aug 2025 10:51:41 +0530 Subject: [PATCH 10/10] tsmm is mathematically impossible to calculate block/block --- .../runtime/instructions/ooc/TransposeSelfMMOOCInstruction.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/org/apache/sysds/runtime/instructions/ooc/TransposeSelfMMOOCInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/ooc/TransposeSelfMMOOCInstruction.java index 76b6a9c9123..e6a37649cb0 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/ooc/TransposeSelfMMOOCInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/ooc/TransposeSelfMMOOCInstruction.java @@ -100,7 +100,7 @@ public void processInstruction( ExecutionContext ec ) { throw new DMLRuntimeException(ex); } finally { - ec.releaseMatrixInput(input1.getName()); +// ec.releaseMatrixInput(input1.getName()); } }