From adf64df9f3b4c5b55acfdc4bbd08328aac3303e4 Mon Sep 17 00:00:00 2001 From: ddrag Date: Tue, 30 Mar 2021 12:27:26 +0200 Subject: [PATCH 1/9] What is the context for this pull request? Tracking Issue: #400 Parent Issue: N/A Dependencies: N/A What changes were proposed in this pull request? Refactor Content.rec(..) function to support tail-optimisation Does this PR introduce any user-facing change? No How was this patch tested? Existing unit tests should be enough, because it is just refactoring without changing functionality. --- .../hyperspace/index/IndexLogEntry.scala | 31 +++++++++++++++---- 1 file changed, 25 insertions(+), 6 deletions(-) diff --git a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala index bc6cbc39f..73dcaf675 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala @@ -58,13 +58,32 @@ case class Content(root: Directory, fingerprint: NoOpFingerprint = NoOpFingerpri } private def rec[T]( - prefixPath: Path, - directory: Directory, - func: (FileInfo, Path) => T): Seq[T] = { - val files = directory.files.map(f => func(f, prefixPath)) - files ++ directory.subDirs.flatMap { dir => - rec(new Path(prefixPath, dir.name), dir, func) + prefixPath: Path, + directory: Directory, + func: (FileInfo, Path) => T): Seq[T] = { + @tailrec + def recAcc[T]( + dirMap: List[(Path, Seq[Directory])], + func: (FileInfo, Path) => T, + acc: Seq[T] = Seq.empty): Seq[T] = { + dirMap match { + case Nil => acc + case (curPrefixPath, curDirs) :: xs => + + val curAcc = for { + dir <- curDirs + file <- dir.files + } yield func(file, new Path(curPrefixPath, dir.name)) + + val newLevels = curDirs + .filter(_.subDirs.nonEmpty) + .map(dir => (new Path(curPrefixPath, dir.name), dir.subDirs)) + + recAcc(xs ++ newLevels, func, curAcc ++ acc) + } } + + recAcc(List(prefixPath -> Seq(directory)), func) } } From c36ed37d244f5996436dce4ffdb9b865582a6e84 Mon Sep 17 00:00:00 2001 From: ddrag Date: Tue, 30 Mar 2021 16:52:54 +0200 Subject: [PATCH 2/9] Refactoring: - Move rec function outside Content case class. - Rename it to 'recFilesApply' - fixed formatting Cover recFilesApply with tests --- .../hyperspace/index/IndexLogEntry.scala | 71 +++++++++++-------- .../hyperspace/index/IndexLogEntryTest.scala | 41 +++++++++++ 2 files changed, 81 insertions(+), 31 deletions(-) diff --git a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala index 73dcaf675..b683aecd9 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala @@ -31,6 +31,7 @@ import org.apache.spark.sql.types.{DataType, StructType} import com.microsoft.hyperspace.HyperspaceException import com.microsoft.hyperspace.actions.Constants +import com.microsoft.hyperspace.index.Content.recFilesApply import com.microsoft.hyperspace.util.{PathUtils, SchemaUtils} // IndexLogEntry-specific fingerprint to be temporarily used where fingerprint is not defined. @@ -45,46 +46,17 @@ case class Content(root: Directory, fingerprint: NoOpFingerprint = NoOpFingerpri @JsonIgnore lazy val files: Seq[Path] = { // Recursively find files from directory tree. - rec(new Path(root.name), root, (f, prefix) => new Path(prefix, f.name)) + recFilesApply(new Path(root.name), root, (f, prefix) => new Path(prefix, f.name)) } @JsonIgnore lazy val fileInfos: Set[FileInfo] = { - rec( + recFilesApply( new Path(root.name), root, (f, prefix) => FileInfo(new Path(prefix, f.name).toString, f.size, f.modifiedTime, f.id)).toSet } - - private def rec[T]( - prefixPath: Path, - directory: Directory, - func: (FileInfo, Path) => T): Seq[T] = { - @tailrec - def recAcc[T]( - dirMap: List[(Path, Seq[Directory])], - func: (FileInfo, Path) => T, - acc: Seq[T] = Seq.empty): Seq[T] = { - dirMap match { - case Nil => acc - case (curPrefixPath, curDirs) :: xs => - - val curAcc = for { - dir <- curDirs - file <- dir.files - } yield func(file, new Path(curPrefixPath, dir.name)) - - val newLevels = curDirs - .filter(_.subDirs.nonEmpty) - .map(dir => (new Path(curPrefixPath, dir.name), dir.subDirs)) - - recAcc(xs ++ newLevels, func, curAcc ++ acc) - } - } - - recAcc(List(prefixPath -> Seq(directory)), func) - } } object Content { @@ -130,6 +102,43 @@ object Content { None } } + + /** + * Apply `func` to each file in directory recursively. + * @param prefixPath Root prefix + * @param directory Root directory + * @param func function which would apply to current prefix and file + * @tparam T + * @return + */ + def recFilesApply[T]( + prefixPath: Path, + directory: Directory, + func: (FileInfo, Path) => T): Seq[T] = { + @tailrec + def recAcc[A]( + dirMap: List[(Path, Seq[Directory])], + func: (FileInfo, Path) => A, + acc: Seq[A] = Seq.empty): Seq[A] = { + dirMap match { + case Nil => acc + case (curPrefixPath, curDirs) :: xs => + + val curAcc = for { + dir <- curDirs + file <- dir.files + } yield func(file, new Path(curPrefixPath, dir.name)) + + val newLevels = curDirs + .filter(_.subDirs.nonEmpty) + .map(dir => (new Path(curPrefixPath, dir.name), dir.subDirs)) + + recAcc(xs ++ newLevels, func, curAcc ++ acc) + } + } + + recAcc(List(prefixPath -> Seq(directory)), func) + } } /** diff --git a/src/test/scala/com/microsoft/hyperspace/index/IndexLogEntryTest.scala b/src/test/scala/com/microsoft/hyperspace/index/IndexLogEntryTest.scala index ebe4a5eec..a95aa5418 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/IndexLogEntryTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/IndexLogEntryTest.scala @@ -240,6 +240,47 @@ class IndexLogEntryTest extends SparkFunSuite with SQLHelper with BeforeAndAfter assert(actual.sourceFilesSizeInBytes == 200L) } + test("Content.recFilesApply apply function to all files ") { + val directory = Directory("file:/", + files = Seq(FileInfo("f0", 0, 0, UNKNOWN_FILE_ID)), + subDirs = Seq( + Directory("a", + files = Seq(FileInfo("f1", 0, 0, UNKNOWN_FILE_ID), FileInfo("f2", 0, 0, UNKNOWN_FILE_ID)), + subDirs = Seq( + Directory("b", + files = + Seq(FileInfo("f3", 0, 0, UNKNOWN_FILE_ID), FileInfo("f4", 0, 0, UNKNOWN_FILE_ID)), + subDirs = Seq(Directory("c")) + ), + Directory("d")) + ))) + + val res = Content.recFilesApply( + new Path("file:/"), + directory, + (f, prefix) => new Path(prefix, f.name) + ) + + val expected = + Seq("file:/f0", "file:/a/f1", "file:/a/f2", "file:/a/b/f3", "file:/a/b/f4") + .map(new Path(_)).toSet + + val actual = res.toSet + assert(actual.equals(expected)) + } + + test("Content.recFilesApply return empty list for directories without files ") { + val directory = Directory("file:/") + + val res = Content.recFilesApply( + new Path("file:/"), + directory, + (f, prefix) => new Path(prefix, f.name) + ) + + assert(res.isEmpty) + } + test("Content.files api lists all files from Content object.") { val content = Content(Directory("file:/", subDirs = Seq( From bce31e2adab7f8ca4a1063683973d41c78c6b5ff Mon Sep 17 00:00:00 2001 From: Dmytro Dragan <5161047+dmytroDragan@users.noreply.github.com> Date: Fri, 2 Apr 2021 09:36:04 +0200 Subject: [PATCH 3/9] Update src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala Co-authored-by: EJ Song <51077614+sezruby@users.noreply.github.com> --- .../scala/com/microsoft/hyperspace/index/IndexLogEntry.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala index b683aecd9..43433c2f2 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala @@ -105,6 +105,7 @@ object Content { /** * Apply `func` to each file in directory recursively. + * @param prefixPath Root prefix * @param directory Root directory * @param func function which would apply to current prefix and file From a342ae1dd5cf43030fcc29a114abd9e164445c8e Mon Sep 17 00:00:00 2001 From: Dmytro Dragan <5161047+dmytroDragan@users.noreply.github.com> Date: Fri, 2 Apr 2021 09:36:43 +0200 Subject: [PATCH 4/9] Update src/test/scala/com/microsoft/hyperspace/index/IndexLogEntryTest.scala Co-authored-by: EJ Song <51077614+sezruby@users.noreply.github.com> --- .../com/microsoft/hyperspace/index/IndexLogEntryTest.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/scala/com/microsoft/hyperspace/index/IndexLogEntryTest.scala b/src/test/scala/com/microsoft/hyperspace/index/IndexLogEntryTest.scala index a95aa5418..d568aa606 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/IndexLogEntryTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/IndexLogEntryTest.scala @@ -269,7 +269,7 @@ class IndexLogEntryTest extends SparkFunSuite with SQLHelper with BeforeAndAfter assert(actual.equals(expected)) } - test("Content.recFilesApply return empty list for directories without files ") { + test("Content.recFilesApply returns empty list for directories without files.") { val directory = Directory("file:/") val res = Content.recFilesApply( From add47fd57117ed050e61434651e17c4ccbc089ad Mon Sep 17 00:00:00 2001 From: ddrag Date: Fri, 2 Apr 2021 10:25:25 +0200 Subject: [PATCH 5/9] improve readability of recAcc function --- .../hyperspace/index/IndexLogEntry.scala | 102 +++--- .../hyperspace/index/IndexLogEntryTest.scala | 302 ++++++++---------- 2 files changed, 187 insertions(+), 217 deletions(-) diff --git a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala index 43433c2f2..1873281e8 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala @@ -54,7 +54,9 @@ case class Content(root: Directory, fingerprint: NoOpFingerprint = NoOpFingerpri recFilesApply( new Path(root.name), root, - (f, prefix) => + ( + f, + prefix) => FileInfo(new Path(prefix, f.name).toString, f.size, f.modifiedTime, f.id)).toSet } } @@ -82,8 +84,9 @@ object Content { hadoopConfiguration: Configuration, pathFilter: PathFilter = PathUtils.DataPathFilter, throwIfNotExists: Boolean = false): Content = - Content(Directory.fromDirectory(path, fileIdTracker, pathFilter, hadoopConfiguration, - throwIfNotExists)) + Content( + Directory + .fromDirectory(path, fileIdTracker, pathFilter, hadoopConfiguration, throwIfNotExists)) /** * Create a Content object from a specified list of leaf files. Any files not listed here will @@ -93,9 +96,7 @@ object Content { * @param fileIdTracker FileIdTracker to keep mapping of file properties to assigned file ids. * @return Content object with Directory tree from leaf files. */ - def fromLeafFiles( - files: Seq[FileStatus], - fileIdTracker: FileIdTracker): Option[Content] = { + def fromLeafFiles(files: Seq[FileStatus], fileIdTracker: FileIdTracker): Option[Content] = { if (files.nonEmpty) { Some(Content(Directory.fromLeafFiles(files, fileIdTracker))) } else { @@ -105,26 +106,25 @@ object Content { /** * Apply `func` to each file in directory recursively. - + * * @param prefixPath Root prefix * @param directory Root directory * @param func function which would apply to current prefix and file * @tparam T - * @return + * @return result list of applying function to all files */ def recFilesApply[T]( - prefixPath: Path, - directory: Directory, - func: (FileInfo, Path) => T): Seq[T] = { + prefixPath: Path, + directory: Directory, + func: (FileInfo, Path) => T): Seq[T] = { @tailrec def recAcc[A]( - dirMap: List[(Path, Seq[Directory])], - func: (FileInfo, Path) => A, - acc: Seq[A] = Seq.empty): Seq[A] = { + dirMap: List[(Path, Seq[Directory])], + func: (FileInfo, Path) => A, + acc: Seq[A] = Seq.empty): Seq[A] = { dirMap match { case Nil => acc - case (curPrefixPath, curDirs) :: xs => - + case (curPrefixPath, curDirs) :: otherDirs => val curAcc = for { dir <- curDirs file <- dir.files @@ -134,7 +134,7 @@ object Content { .filter(_.subDirs.nonEmpty) .map(dir => (new Path(curPrefixPath, dir.name), dir.subDirs)) - recAcc(xs ++ newLevels, func, curAcc ++ acc) + recAcc(otherDirs ++ newLevels, func, curAcc ++ acc) } } @@ -259,12 +259,10 @@ object Directory { * @param files List of leaf files. * @param fileIdTracker FileIdTracker to keep mapping of file properties to assigned file ids. * Note: If a new leaf file is discovered, the input fileIdTracker gets - * updated by adding it to the files it is tracking. + * updated by adding it to the files it is tracking. * @return Content object with Directory tree from leaf files. */ - def fromLeafFiles( - files: Seq[FileStatus], - fileIdTracker: FileIdTracker): Directory = { + def fromLeafFiles(files: Seq[FileStatus], fileIdTracker: FileIdTracker): Directory = { require( files.nonEmpty, s"Empty files list found while creating a ${Directory.getClass.getName}.") @@ -352,8 +350,8 @@ case class FileInfo(name: String, size: Long, modifiedTime: Long, id: Long) { override def equals(o: Any): Boolean = o match { case that: FileInfo => name.equals(that.name) && - size.equals(that.size) && - modifiedTime.equals(that.modifiedTime) + size.equals(that.size) && + modifiedTime.equals(that.modifiedTime) case _ => false } @@ -379,10 +377,11 @@ case class CoveringIndex(properties: CoveringIndex.Properties) { val kindAbbr = "CI" } object CoveringIndex { - case class Properties(columns: Properties.Columns, - schemaString: String, - numBuckets: Int, - properties: Map[String, String]) + case class Properties( + columns: Properties.Columns, + schemaString: String, + numBuckets: Int, + properties: Map[String, String]) object Properties { case class Columns(indexed: Seq[String], included: Seq[String]) @@ -406,9 +405,7 @@ object LogicalPlanFingerprint { * @param appendedFiles Appended files. * @param deletedFiles Deleted files. */ -case class Update( - appendedFiles: Option[Content] = None, - deletedFiles: Option[Content] = None) +case class Update(appendedFiles: Option[Content] = None, deletedFiles: Option[Content] = None) // IndexLogEntry-specific Hdfs that represents the source data. case class Hdfs(properties: Hdfs.Properties) { @@ -517,21 +514,14 @@ case class IndexLogEntry( def toFileStatus(f: FileInfo) = { new FileStatus(f.size, false, 0, 1, f.modifiedTime, new Path(f.name)) } - copy( - source = source.copy( - plan = source.plan.copy( - properties = source.plan.properties.copy( - fingerprint = latestFingerprint, - relations = Seq( - relations.head.copy( - data = relations.head.data.copy( - properties = relations.head.data.properties.copy( - update = Some( - Update( - appendedFiles = - Content.fromLeafFiles(appended.map(toFileStatus), fileIdTracker), - deletedFiles = - Content.fromLeafFiles(deleted.map(toFileStatus), fileIdTracker))))))))))) + copy(source = source.copy(plan = source.plan.copy(properties = source.plan.properties.copy( + fingerprint = latestFingerprint, + relations = Seq( + relations.head.copy(data = relations.head.data.copy(properties = + relations.head.data.properties.copy(update = Some(Update( + appendedFiles = Content.fromLeafFiles(appended.map(toFileStatus), fileIdTracker), + deletedFiles = + Content.fromLeafFiles(deleted.map(toFileStatus), fileIdTracker))))))))))) } def bucketSpec: BucketSpec = @@ -566,14 +556,16 @@ case class IndexLogEntry( } def hasLineageColumn: Boolean = { - derivedDataset.properties.properties.getOrElse( - IndexConstants.LINEAGE_PROPERTY, IndexConstants.INDEX_LINEAGE_ENABLED_DEFAULT).toBoolean + derivedDataset.properties.properties + .getOrElse(IndexConstants.LINEAGE_PROPERTY, IndexConstants.INDEX_LINEAGE_ENABLED_DEFAULT) + .toBoolean } def hasParquetAsSourceFormat: Boolean = { relations.head.fileFormat.equals("parquet") || - derivedDataset.properties.properties.getOrElse( - IndexConstants.HAS_PARQUET_AS_SOURCE_FORMAT_PROPERTY, "false").toBoolean + derivedDataset.properties.properties + .getOrElse(IndexConstants.HAS_PARQUET_AS_SOURCE_FORMAT_PROPERTY, "false") + .toBoolean } @JsonIgnore @@ -650,10 +642,10 @@ class FileIdTracker { // Combination of file properties, used as key, to identify a // unique file for which an id is generated. type key = ( - String, // Full path. + String, // Full path. Long, // Size. - Long // Modified time. - ) + Long // Modified time. + ) private val fileToIdMap: mutable.HashMap[key, Long] = mutable.HashMap() def getMaxFileId: Long = maxId @@ -678,8 +670,7 @@ class FileIdTracker { setSizeHint(files.size) files.foreach { f => if (f.id == IndexConstants.UNKNOWN_FILE_ID) { - throw HyperspaceException( - s"Cannot add file info with unknown id. (file: ${f.name}).") + throw HyperspaceException(s"Cannot add file info with unknown id. (file: ${f.name}).") } val key = (f.name, f.size, f.modifiedTime) @@ -707,8 +698,7 @@ class FileIdTracker { */ def addFile(file: FileStatus): Long = { fileToIdMap.getOrElseUpdate( - (file.getPath.toString, file.getLen, file.getModificationTime), - { + (file.getPath.toString, file.getLen, file.getModificationTime), { maxId += 1 maxId }) diff --git a/src/test/scala/com/microsoft/hyperspace/index/IndexLogEntryTest.scala b/src/test/scala/com/microsoft/hyperspace/index/IndexLogEntryTest.scala index d568aa606..c9a73d1fc 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/IndexLogEntryTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/IndexLogEntryTest.scala @@ -191,34 +191,20 @@ class IndexLogEntryTest extends SparkFunSuite with SQLHelper with BeforeAndAfter Seq( Relation( Seq("rootpath"), - Hdfs( - Hdfs.Properties( - Content( - Directory( - "test", - Seq(FileInfo("f1", 100L, 100L, 0), FileInfo("f2", 100L, 200L, 1)), - Seq() - ) - ), - Some( - Update( - None, - Some(Content(Directory("", Seq(FileInfo("f1", 10, 10, 2))))) - ) - ) - ) - ), + Hdfs(Hdfs.Properties( + Content(Directory( + "test", + Seq(FileInfo("f1", 100L, 100L, 0), FileInfo("f2", 100L, 200L, 1)), + Seq())), + Some(Update(None, Some(Content(Directory("", Seq(FileInfo("f1", 10, 10, 2))))))))), "schema", "type", - Map() - ) - ), + Map())), null, null, LogicalPlanFingerprint( LogicalPlanFingerprint - .Properties(Seq(Signature("provider", "signatureValue"))) - )) + .Properties(Seq(Signature("provider", "signatureValue"))))) val expected = IndexLogEntry( "indexName", @@ -240,30 +226,31 @@ class IndexLogEntryTest extends SparkFunSuite with SQLHelper with BeforeAndAfter assert(actual.sourceFilesSizeInBytes == 200L) } - test("Content.recFilesApply apply function to all files ") { - val directory = Directory("file:/", + test("Content.recFilesApply returns a result list of applying function to all files.") { + val directory = Directory( + "file:/", files = Seq(FileInfo("f0", 0, 0, UNKNOWN_FILE_ID)), subDirs = Seq( - Directory("a", - files = Seq(FileInfo("f1", 0, 0, UNKNOWN_FILE_ID), FileInfo("f2", 0, 0, UNKNOWN_FILE_ID)), + Directory( + "a", + files = + Seq(FileInfo("f1", 0, 0, UNKNOWN_FILE_ID), FileInfo("f2", 0, 0, UNKNOWN_FILE_ID)), subDirs = Seq( - Directory("b", + Directory( + "b", files = Seq(FileInfo("f3", 0, 0, UNKNOWN_FILE_ID), FileInfo("f4", 0, 0, UNKNOWN_FILE_ID)), - subDirs = Seq(Directory("c")) - ), - Directory("d")) - ))) + subDirs = Seq(Directory("c"))), + Directory("d"))))) - val res = Content.recFilesApply( - new Path("file:/"), - directory, - (f, prefix) => new Path(prefix, f.name) - ) + def theFunction: (FileInfo, Path) => Path = (f, prefix) => new Path(prefix, f.name) + + val res = Content.recFilesApply(new Path("file:/"), directory, theFunction) val expected = Seq("file:/f0", "file:/a/f1", "file:/a/f2", "file:/a/b/f3", "file:/a/b/f4") - .map(new Path(_)).toSet + .map(new Path(_)) + .toSet val actual = res.toSet assert(actual.equals(expected)) @@ -275,22 +262,25 @@ class IndexLogEntryTest extends SparkFunSuite with SQLHelper with BeforeAndAfter val res = Content.recFilesApply( new Path("file:/"), directory, - (f, prefix) => new Path(prefix, f.name) - ) + (f, prefix) => new Path(prefix, f.name)) assert(res.isEmpty) } test("Content.files api lists all files from Content object.") { - val content = Content(Directory("file:/", subDirs = - Seq( - Directory("a", - files = Seq(FileInfo("f1", 0, 0, UNKNOWN_FILE_ID), FileInfo("f2", 0, 0, UNKNOWN_FILE_ID)), + val content = Content( + Directory( + "file:/", + subDirs = Seq(Directory( + "a", + files = + Seq(FileInfo("f1", 0, 0, UNKNOWN_FILE_ID), FileInfo("f2", 0, 0, UNKNOWN_FILE_ID)), subDirs = Seq( - Directory("b", - files = - Seq(FileInfo("f3", 0, 0, UNKNOWN_FILE_ID), FileInfo("f4", 0, 0, UNKNOWN_FILE_ID))))) - ))) + Directory( + "b", + files = Seq( + FileInfo("f3", 0, 0, UNKNOWN_FILE_ID), + FileInfo("f4", 0, 0, UNKNOWN_FILE_ID)))))))) val expected = Seq("file:/a/f1", "file:/a/f2", "file:/a/b/f3", "file:/a/b/f4").map(new Path(_)).toSet @@ -302,8 +292,9 @@ class IndexLogEntryTest extends SparkFunSuite with SQLHelper with BeforeAndAfter val nestedDirPath = toPath(nestedDir) val expected = { - val fileInfos = Seq(f3, f4).map(toFileStatus).map(f => - FileInfo(f, fileIdTracker.addFile(f), asFullPath = false)) + val fileInfos = Seq(f3, f4) + .map(toFileStatus) + .map(f => FileInfo(f, fileIdTracker.addFile(f), asFullPath = false)) val nestedDirDirectory = Directory("nested", fileInfos) val rootDirectory = createDirectory(nestedDirPath, nestedDirDirectory) Content(rootDirectory, NoOpFingerprint()) @@ -317,8 +308,9 @@ class IndexLogEntryTest extends SparkFunSuite with SQLHelper with BeforeAndAfter val nestedDirPath = toPath(nestedDir) val expected = { - val fileInfos = Seq(f3, f4).map(toFileStatus).map(f => - FileInfo(f, fileIdTracker.addFile(f), asFullPath = false)) + val fileInfos = Seq(f3, f4) + .map(toFileStatus) + .map(f => FileInfo(f, fileIdTracker.addFile(f), asFullPath = false)) val nestedDirDirectory = Directory("nested", fileInfos) val rootDirectory = createDirectory(nestedDirPath, nestedDirDirectory) Content(rootDirectory, NoOpFingerprint()) @@ -332,8 +324,9 @@ class IndexLogEntryTest extends SparkFunSuite with SQLHelper with BeforeAndAfter val nestedDirPath = toPath(nestedDir) val expected = { - val fileInfos = Seq(f3, f4).map(toFileStatus).map(f => - FileInfo(f, fileIdTracker.addFile(f), asFullPath = false)) + val fileInfos = Seq(f3, f4) + .map(toFileStatus) + .map(f => FileInfo(f, fileIdTracker.addFile(f), asFullPath = false)) val nestedDirDirectory = Directory("nested", fileInfos) createDirectory(nestedDirPath, nestedDirDirectory) } @@ -342,17 +335,21 @@ class IndexLogEntryTest extends SparkFunSuite with SQLHelper with BeforeAndAfter assert(directoryEquals(actual, expected)) } - test("Directory.fromDirectory api creates the correct Directory objects, " + - "recursively listing all leaf files.") { + test( + "Directory.fromDirectory api creates the correct Directory objects, " + + "recursively listing all leaf files.") { val testDirPath = toPath(testDir) val testDirLeafFiles = - Seq(f1, f2).map(toFileStatus).map(f => - FileInfo(f, fileIdTracker.addFile(f), asFullPath = false)) + Seq(f1, f2) + .map(toFileStatus) + .map(f => FileInfo(f, fileIdTracker.addFile(f), asFullPath = false)) val nestedDirLeafFiles = - Seq(f3, f4).map(toFileStatus).map(f => - FileInfo(f, fileIdTracker.addFile(f), asFullPath = false)) - val testDirDirectory = Directory(name = "testDir", + Seq(f3, f4) + .map(toFileStatus) + .map(f => FileInfo(f, fileIdTracker.addFile(f), asFullPath = false)) + val testDirDirectory = Directory( + name = "testDir", files = testDirLeafFiles, subDirs = Seq(Directory(name = "nested", files = nestedDirLeafFiles))) val expected = createDirectory(testDirPath, testDirDirectory) @@ -366,12 +363,15 @@ class IndexLogEntryTest extends SparkFunSuite with SQLHelper with BeforeAndAfter val testDirPath = toPath(testDir) val testDirLeafFiles = - Seq(f1, f2).map(toFileStatus).map(f => - FileInfo(f, fileIdTracker.addFile(f), asFullPath = false)) + Seq(f1, f2) + .map(toFileStatus) + .map(f => FileInfo(f, fileIdTracker.addFile(f), asFullPath = false)) val nestedDirLeafFiles = - Seq(f3, f4).map(toFileStatus).map(f => - FileInfo(f, fileIdTracker.addFile(f), asFullPath = false)) - val testDirDirectory = Directory(name = "testDir", + Seq(f3, f4) + .map(toFileStatus) + .map(f => FileInfo(f, fileIdTracker.addFile(f), asFullPath = false)) + val testDirDirectory = Directory( + name = "testDir", files = testDirLeafFiles, subDirs = Seq(Directory(name = "nested", files = nestedDirLeafFiles))) @@ -385,12 +385,15 @@ class IndexLogEntryTest extends SparkFunSuite with SQLHelper with BeforeAndAfter test("Directory.fromLeafFiles api does not include other files in the directory.") { val testDirPath = toPath(testDir) - val testDirLeafFiles = Seq(f1).map(toFileStatus).map(f => - FileInfo(f, fileIdTracker.addFile(f), asFullPath = false)) + val testDirLeafFiles = Seq(f1) + .map(toFileStatus) + .map(f => FileInfo(f, fileIdTracker.addFile(f), asFullPath = false)) val nestedDirLeafFiles = - Seq(f4).map(toFileStatus).map(f => - FileInfo(f, fileIdTracker.addFile(f), asFullPath = false)) - val testDirDirectory = Directory(name = "testDir", + Seq(f4) + .map(toFileStatus) + .map(f => FileInfo(f, fileIdTracker.addFile(f), asFullPath = false)) + val testDirDirectory = Directory( + name = "testDir", files = testDirLeafFiles, subDirs = Seq(Directory(name = "nested", files = nestedDirLeafFiles))) @@ -401,8 +404,9 @@ class IndexLogEntryTest extends SparkFunSuite with SQLHelper with BeforeAndAfter assert(directoryEquals(actual, expected)) } - test("Directory.fromLeafFiles: throwIfNotExist flag throws exception for non-existent" + - "directory, otherwise works as expected.") { + test( + "Directory.fromLeafFiles: throwIfNotExist flag throws exception for non-existent" + + "directory, otherwise works as expected.") { val testDirPath = toPath(testDir) val nonExistentDir = new Path(testDirPath, "nonexistent") @@ -443,8 +447,9 @@ class IndexLogEntryTest extends SparkFunSuite with SQLHelper with BeforeAndAfter override def accept(path: Path): Boolean = path.getName.startsWith("f1") } - val testDirLeafFiles = Seq(f1).map(toFileStatus).map(f => - FileInfo(f, fileIdTracker.addFile(f), asFullPath = false)) + val testDirLeafFiles = Seq(f1) + .map(toFileStatus) + .map(f => FileInfo(f, fileIdTracker.addFile(f), asFullPath = false)) val testDirDirectory = Directory(name = "testDir", files = testDirLeafFiles) val expected = createDirectory(testDirPath, testDirDirectory) @@ -454,8 +459,9 @@ class IndexLogEntryTest extends SparkFunSuite with SQLHelper with BeforeAndAfter assert(directoryEquals(actual, expected)) } - test("Directory.fromDirectory and fromLeafFileswhere files are at same level but different" + - "dirs.") { + test( + "Directory.fromDirectory and fromLeafFileswhere files are at same level but different" + + "dirs.") { // File Structure // testDir/temp/a/f1 // testDir/temp/b/f2 @@ -467,11 +473,17 @@ class IndexLogEntryTest extends SparkFunSuite with SQLHelper with BeforeAndAfter val f2 = Files.createFile(Paths.get(b + "/f2")) val aDirectory = - Directory("a", Seq(f1).map(toFileStatus).map(f => - FileInfo(f, fileIdTracker.addFile(f), asFullPath = false))) + Directory( + "a", + Seq(f1) + .map(toFileStatus) + .map(f => FileInfo(f, fileIdTracker.addFile(f), asFullPath = false))) val bDirectory = - Directory("b", Seq(f2).map(toFileStatus).map(f => - FileInfo(f, fileIdTracker.addFile(f), asFullPath = false))) + Directory( + "b", + Seq(f2) + .map(toFileStatus) + .map(f => FileInfo(f, fileIdTracker.addFile(f), asFullPath = false))) val tempDirectory = Directory("temp", subDirs = Seq(aDirectory, bDirectory)) val tempDirectoryPath = toPath(tempDir) @@ -499,12 +511,18 @@ class IndexLogEntryTest extends SparkFunSuite with SQLHelper with BeforeAndAfter val f2 = Files.createFile(Paths.get(c + "/f2")) val cDirectory = - Directory("c", Seq(f2).map(toFileStatus).map(f => - FileInfo(f, fileIdTracker.addFile(f), asFullPath = false))) + Directory( + "c", + Seq(f2) + .map(toFileStatus) + .map(f => FileInfo(f, fileIdTracker.addFile(f), asFullPath = false))) val bDirectory = Directory("b", subDirs = Seq(cDirectory)) val aDirectory = - Directory("a", Seq(f1).map(toFileStatus).map(f => - FileInfo(f, fileIdTracker.addFile(f), asFullPath = false))) + Directory( + "a", + Seq(f1) + .map(toFileStatus) + .map(f => FileInfo(f, fileIdTracker.addFile(f), asFullPath = false))) val tempDirectory = Directory("temp", subDirs = Seq(aDirectory, bDirectory)) val tempDirectoryPath = toPath(tempDir) @@ -519,8 +537,9 @@ class IndexLogEntryTest extends SparkFunSuite with SQLHelper with BeforeAndAfter FileUtils.deleteDirectory(tempDir.toFile) } - test("Directory.fromDirectory and fromLeafFiles where files belong to multiple" + - "subdirectories.") { + test( + "Directory.fromDirectory and fromLeafFiles where files belong to multiple" + + "subdirectories.") { // File Structure // testDir/temp/a/f1 // testDir/temp/a/b/f2 @@ -535,17 +554,23 @@ class IndexLogEntryTest extends SparkFunSuite with SQLHelper with BeforeAndAfter val f3 = Files.createFile(Paths.get(c + "/f3")) val bDirectory = - Directory("b", Seq(f2).map(toFileStatus).map(f => - FileInfo(f, fileIdTracker.addFile(f), asFullPath = false))) + Directory( + "b", + Seq(f2) + .map(toFileStatus) + .map(f => FileInfo(f, fileIdTracker.addFile(f), asFullPath = false))) val cDirectory = - Directory("c", Seq(f3).map(toFileStatus).map(f => - FileInfo(f, fileIdTracker.addFile(f), asFullPath = false))) + Directory( + "c", + Seq(f3) + .map(toFileStatus) + .map(f => FileInfo(f, fileIdTracker.addFile(f), asFullPath = false))) val aDirectory = Directory( "a", - Seq(f1).map(toFileStatus).map(f => - FileInfo(f, fileIdTracker.addFile(f), asFullPath = false)), - Seq(bDirectory, cDirectory) - ) + Seq(f1) + .map(toFileStatus) + .map(f => FileInfo(f, fileIdTracker.addFile(f), asFullPath = false)), + Seq(bDirectory, cDirectory)) val tempDirectory = Directory("temp", subDirs = Seq(aDirectory)) val tempDirectoryPath = toPath(tempDir) @@ -565,11 +590,7 @@ class IndexLogEntryTest extends SparkFunSuite with SQLHelper with BeforeAndAfter // a/f2 val directory1 = Directory( name = "a", - files = Seq( - FileInfo("f1", 100L, 100L, 1L), - FileInfo("f2", 100L, 100L, 2L) - ) - ) + files = Seq(FileInfo("f1", 100L, 100L, 1L), FileInfo("f2", 100L, 100L, 2L))) // directory2: // a/b/f3 @@ -579,13 +600,7 @@ class IndexLogEntryTest extends SparkFunSuite with SQLHelper with BeforeAndAfter subDirs = Seq( Directory( name = "b", - files = Seq( - FileInfo("f3", 100L, 100L, 3L), - FileInfo("f4", 100L, 100L, 4L) - ) - ) - ) - ) + files = Seq(FileInfo("f3", 100L, 100L, 3L), FileInfo("f4", 100L, 100L, 4L))))) // Expected result of merging directory1 and directory2: // a/f1 @@ -594,20 +609,11 @@ class IndexLogEntryTest extends SparkFunSuite with SQLHelper with BeforeAndAfter // a/b/f4 val expected = Directory( name = "a", - files = Seq( - FileInfo("f1", 100L, 100L, 1L), - FileInfo("f2", 100L, 100L, 2L) - ), + files = Seq(FileInfo("f1", 100L, 100L, 1L), FileInfo("f2", 100L, 100L, 2L)), subDirs = Seq( Directory( name = "b", - files = Seq( - FileInfo("f3", 100L, 100L, 3L), - FileInfo("f4", 100L, 100L, 4L) - ) - ) - ) - ) + files = Seq(FileInfo("f3", 100L, 100L, 3L), FileInfo("f4", 100L, 100L, 4L))))) val actual1 = directory1.merge(directory2) val actual2 = directory2.merge(directory1) @@ -623,14 +629,8 @@ class IndexLogEntryTest extends SparkFunSuite with SQLHelper with BeforeAndAfter // a/b/f3 val directory1 = Directory( name = "a", - files = Seq( - FileInfo("f1", 100L, 100L, 1L), - FileInfo("f2", 100L, 100L, 2L) - ), - subDirs = Seq( - Directory(name = "b", files = Seq(FileInfo("f3", 100L, 100L, 3L))) - ) - ) + files = Seq(FileInfo("f1", 100L, 100L, 1L), FileInfo("f2", 100L, 100L, 2L)), + subDirs = Seq(Directory(name = "b", files = Seq(FileInfo("f3", 100L, 100L, 3L))))) // directory2: // a/f4 @@ -643,17 +643,8 @@ class IndexLogEntryTest extends SparkFunSuite with SQLHelper with BeforeAndAfter subDirs = Seq( Directory( name = "b", - files = Seq( - FileInfo("f5", 100L, 100L, 5L), - FileInfo("f6", 100L, 100L, 6L) - ), - subDirs = Seq(Directory( - name = "c", - files = Seq(FileInfo("f7", 100L, 100L, 7L)) - )) - ) - ) - ) + files = Seq(FileInfo("f5", 100L, 100L, 5L), FileInfo("f6", 100L, 100L, 6L)), + subDirs = Seq(Directory(name = "c", files = Seq(FileInfo("f7", 100L, 100L, 7L))))))) // Expected result of merging directory1 and directory2: // directory1: @@ -669,23 +660,15 @@ class IndexLogEntryTest extends SparkFunSuite with SQLHelper with BeforeAndAfter files = Seq( FileInfo("f1", 100L, 100L, 1L), FileInfo("f2", 100L, 100L, 2L), - FileInfo("f4", 100L, 100L, 4L) - ), + FileInfo("f4", 100L, 100L, 4L)), subDirs = Seq( Directory( name = "b", files = Seq( FileInfo("f3", 100L, 100L, 3L), FileInfo("f5", 100L, 100L, 5L), - FileInfo("f6", 100L, 100L, 6L) - ), - subDirs = Seq( - Directory("c", - files = Seq(FileInfo("f7", 100L, 100L, 7L))) - ) - ) - ) - ) + FileInfo("f6", 100L, 100L, 6L)), + subDirs = Seq(Directory("c", files = Seq(FileInfo("f7", 100L, 100L, 7L))))))) val actual1 = directory1.merge(directory2) val actual2 = directory2.merge(directory1) @@ -700,19 +683,17 @@ class IndexLogEntryTest extends SparkFunSuite with SQLHelper with BeforeAndAfter // a/f2 val directory1 = Directory( name = "a", - files = Seq(FileInfo("f1", 100L, 100L, 1L), FileInfo("f2", 100L, 100L, 2L)) - ) + files = Seq(FileInfo("f1", 100L, 100L, 1L), FileInfo("f2", 100L, 100L, 2L))) // directory2: // b/f3 // b/f4 val directory2 = Directory( name = "b", - files = Seq(FileInfo("f3", 100L, 100L, 3L), FileInfo("f4", 100L, 100L, 4L)) - ) + files = Seq(FileInfo("f3", 100L, 100L, 3L), FileInfo("f4", 100L, 100L, 4L))) - val ex1 = intercept[HyperspaceException] (directory1.merge(directory2)) - val ex2 = intercept[HyperspaceException] (directory2.merge(directory1)) + val ex1 = intercept[HyperspaceException](directory1.merge(directory2)) + val ex2 = intercept[HyperspaceException](directory2.merge(directory1)) assert(ex1.msg.contains("Merging directories with names a and b failed.")) assert(ex2.msg.contains("Merging directories with names b and a failed.")) @@ -724,19 +705,18 @@ class IndexLogEntryTest extends SparkFunSuite with SQLHelper with BeforeAndAfter private def directoryEquals(dir1: Directory, dir2: Directory): Boolean = { dir1.name.equals(dir2.name) && - dir1.files.toSet.equals(dir2.files.toSet) && - dir1.subDirs.size.equals(dir2.subDirs.size) && - dir1.subDirs.sortBy(_.name).zip(dir2.subDirs.sortBy(_.name)).forall{ - case (d1, d2) => directoryEquals(d1, d2) - } + dir1.files.toSet.equals(dir2.files.toSet) && + dir1.subDirs.size.equals(dir2.subDirs.size) && + dir1.subDirs.sortBy(_.name).zip(dir2.subDirs.sortBy(_.name)).forall { case (d1, d2) => + directoryEquals(d1, d2) + } } // Using `directoryPath`, create a Directory tree starting from root and ending at // `leafDirectory`. private def createDirectory(directoryPath: Path, leafDirectory: Directory): Directory = { - TestUtils.splitPath(directoryPath.getParent).foldLeft(leafDirectory) { - (accum, name) => - Directory(name, Seq(), Seq(accum)) + TestUtils.splitPath(directoryPath.getParent).foldLeft(leafDirectory) { (accum, name) => + Directory(name, Seq(), Seq(accum)) } } } From 1226937198d253c9a59f196c86106af40adf04dd Mon Sep 17 00:00:00 2001 From: ddrag Date: Fri, 2 Apr 2021 12:58:01 +0200 Subject: [PATCH 6/9] Align with description format --- .../hyperspace/index/IndexLogEntry.scala | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala index 1873281e8..38195202c 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala @@ -23,6 +23,7 @@ import scala.collection.mutable.{HashMap, ListBuffer} import scala.collection.mutable import com.fasterxml.jackson.annotation.JsonIgnore + import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter} import org.apache.spark.sql.catalyst.catalog.BucketSpec @@ -31,7 +32,7 @@ import org.apache.spark.sql.types.{DataType, StructType} import com.microsoft.hyperspace.HyperspaceException import com.microsoft.hyperspace.actions.Constants -import com.microsoft.hyperspace.index.Content.recFilesApply +import com.microsoft.hyperspace.index.Content.{createFileInfoWithPrefix, recFilesApply} import com.microsoft.hyperspace.util.{PathUtils, SchemaUtils} // IndexLogEntry-specific fingerprint to be temporarily used where fingerprint is not defined. @@ -51,18 +52,15 @@ case class Content(root: Directory, fingerprint: NoOpFingerprint = NoOpFingerpri @JsonIgnore lazy val fileInfos: Set[FileInfo] = { - recFilesApply( - new Path(root.name), - root, - ( - f, - prefix) => - FileInfo(new Path(prefix, f.name).toString, f.size, f.modifiedTime, f.id)).toSet + recFilesApply(new Path(root.name), root, createFileInfoWithPrefix).toSet } } object Content { + val createFileInfoWithPrefix: (FileInfo, Path) => FileInfo = (f, prefix) => + FileInfo(new Path(prefix, f.name).toString, f.size, f.modifiedTime, f.id) + /** * Create a Content object from a directory path by recursively listing its leaf files. All * files from the directory tree will be part of the Directory. @@ -109,9 +107,9 @@ object Content { * * @param prefixPath Root prefix * @param directory Root directory - * @param func function which would apply to current prefix and file + * @param func Function which would apply to current prefix and file * @tparam T - * @return result list of applying function to all files + * @return Result list of applying function to all files */ def recFilesApply[T]( prefixPath: Path, From 3a073e71b4b98e749b7ef1285613b8267af602c7 Mon Sep 17 00:00:00 2001 From: ddrag Date: Fri, 2 Apr 2021 13:45:14 +0200 Subject: [PATCH 7/9] Removed unneeded space --- .../scala/com/microsoft/hyperspace/index/IndexLogEntry.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala index 38195202c..9bf2ce7bf 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala @@ -23,7 +23,6 @@ import scala.collection.mutable.{HashMap, ListBuffer} import scala.collection.mutable import com.fasterxml.jackson.annotation.JsonIgnore - import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter} import org.apache.spark.sql.catalyst.catalog.BucketSpec From 23fe8f17eaf6f3b8dddd8f6a44f7f9c278c3f9c8 Mon Sep 17 00:00:00 2001 From: ddrag Date: Fri, 9 Apr 2021 14:27:27 +0200 Subject: [PATCH 8/9] reformat with latest master version --- .../hyperspace/index/IndexLogEntry.scala | 135 +++++++++--------- 1 file changed, 64 insertions(+), 71 deletions(-) diff --git a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala index ff8696d20..62a7f5997 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala @@ -236,15 +236,6 @@ object Directory { } } - @tailrec - private def createEmptyDirectory(path: Path, subDirs: Seq[Directory] = Seq()): Directory = { - if (path.isRoot) { - Directory(path.toString, subDirs = subDirs) - } else { - createEmptyDirectory(path.getParent, Seq(Directory(path.getName, subDirs = subDirs))) - } - } - /** * Create a Content object from a specified list of leaf files. Any files not listed here will * NOT be part of the returned object. @@ -315,6 +306,15 @@ object Directory { pathToDirectory(getRoot(files.head.getPath)) } + @tailrec + private def createEmptyDirectory(path: Path, subDirs: Seq[Directory] = Seq()): Directory = { + if (path.isRoot) { + Directory(path.toString, subDirs = subDirs) + } else { + createEmptyDirectory(path.getParent, Seq(Directory(path.getName, subDirs = subDirs))) + } + } + // Return file system root path from any path. E.g. "file:/C:/a/b/c" will have root "file:/C:/". // For linux systems, this root will be "file:/". Other hdfs compatible file systems will have // corresponding roots. @@ -467,47 +467,49 @@ case class IndexLogEntry( properties: Map[String, String]) extends LogEntry(IndexLogEntry.VERSION) { - def schema: StructType = - DataType.fromJson(derivedDataset.properties.schemaString).asInstanceOf[StructType] - - def created: Boolean = state.equals(Constants.States.ACTIVE) - - def relations: Seq[Relation] = { - // Only one relation is currently supported. - assert(source.plan.properties.relations.size == 1) - source.plan.properties.relations - } - // FileInfo's 'name' contains the full path to the file. @JsonIgnore lazy val sourceFileInfoSet: Set[FileInfo] = { relations.head.data.properties.content.fileInfos } - @JsonIgnore lazy val sourceFilesSizeInBytes: Long = { sourceFileInfoSet.foldLeft(0L)(_ + _.size) } - - def sourceUpdate: Option[Update] = { - relations.head.data.properties.update - } - - def hasSourceUpdate: Boolean = { - sourceUpdate.isDefined && (appendedFiles.nonEmpty || deletedFiles.nonEmpty) - } - // FileInfo's 'name' contains the full path to the file. @JsonIgnore lazy val appendedFiles: Set[FileInfo] = { sourceUpdate.flatMap(_.appendedFiles).map(_.fileInfos).getOrElse(Set()) } - // FileInfo's 'name' contains the full path to the file. @JsonIgnore lazy val deletedFiles: Set[FileInfo] = { sourceUpdate.flatMap(_.deletedFiles).map(_.fileInfos).getOrElse(Set()) } + @JsonIgnore + lazy val fileIdTracker: FileIdTracker = { + val tracker = new FileIdTracker + tracker.addFileInfo(sourceFileInfoSet ++ content.fileInfos) + tracker + } + /** + * A mutable map for holding auxiliary information of this index log entry while applying rules. + */ + @JsonIgnore + private val tags: mutable.Map[(LogicalPlan, IndexLogEntryTag[_]), Any] = mutable.Map.empty + + def schema: StructType = + DataType.fromJson(derivedDataset.properties.schemaString).asInstanceOf[StructType] + + def created: Boolean = state.equals(Constants.States.ACTIVE) + + def hasSourceUpdate: Boolean = { + sourceUpdate.isDefined && (appendedFiles.nonEmpty || deletedFiles.nonEmpty) + } + + def sourceUpdate: Option[Update] = { + relations.head.data.properties.update + } def copyWithUpdate( latestFingerprint: LogicalPlanFingerprint, @@ -526,12 +528,22 @@ case class IndexLogEntry( Content.fromLeafFiles(deleted.map(toFileStatus), fileIdTracker))))))))))) } + def relations: Seq[Relation] = { + // Only one relation is currently supported. + assert(source.plan.properties.relations.size == 1) + source.plan.properties.relations + } + def bucketSpec: BucketSpec = BucketSpec( numBuckets = numBuckets, bucketColumnNames = indexedColumns, sortColumnNames = indexedColumns) + def numBuckets: Int = derivedDataset.properties.numBuckets + + def indexedColumns: Seq[String] = derivedDataset.properties.columns.indexed + override def equals(o: Any): Boolean = o match { case that: IndexLogEntry => config.equals(that.config) && @@ -544,12 +556,8 @@ case class IndexLogEntry( case _ => false } - def numBuckets: Int = derivedDataset.properties.numBuckets - def config: IndexConfig = IndexConfig(name, indexedColumns, includedColumns) - def indexedColumns: Seq[String] = derivedDataset.properties.columns.indexed - def includedColumns: Seq[String] = derivedDataset.properties.columns.included def signature: Signature = { @@ -571,33 +579,28 @@ case class IndexLogEntry( .toBoolean } - @JsonIgnore - lazy val fileIdTracker: FileIdTracker = { - val tracker = new FileIdTracker - tracker.addFileInfo(sourceFileInfoSet ++ content.fileInfos) - tracker - } - override def hashCode(): Int = { config.hashCode + signature.hashCode + numBuckets.hashCode + content.hashCode } - /** - * A mutable map for holding auxiliary information of this index log entry while applying rules. - */ - @JsonIgnore - private val tags: mutable.Map[(LogicalPlan, IndexLogEntryTag[_]), Any] = mutable.Map.empty + def unsetTagValue[T](plan: LogicalPlan, tag: IndexLogEntryTag[T]): Unit = { + tags.remove((plan, tag)) + } - def setTagValue[T](plan: LogicalPlan, tag: IndexLogEntryTag[T], value: T): Unit = { - tags((plan, tag)) = value + def setTagValue[T](tag: IndexLogEntryTag[T], value: T): Unit = { + tags((null, tag)) = value } - def getTagValue[T](plan: LogicalPlan, tag: IndexLogEntryTag[T]): Option[T] = { - tags.get((plan, tag)).map(_.asInstanceOf[T]) + def getTagValue[T](tag: IndexLogEntryTag[T]): Option[T] = { + tags.get((null, tag)).map(_.asInstanceOf[T]) } - def unsetTagValue[T](plan: LogicalPlan, tag: IndexLogEntryTag[T]): Unit = { - tags.remove((plan, tag)) + def unsetTagValue[T](tag: IndexLogEntryTag[T]): Unit = { + tags.remove((null, tag)) + } + + def withCachedTag[T](tag: IndexLogEntryTag[T])(f: => T): T = { + withCachedTag(null, tag)(f) } def withCachedTag[T](plan: LogicalPlan, tag: IndexLogEntryTag[T])(f: => T): T = { @@ -610,20 +613,12 @@ case class IndexLogEntry( } } - def setTagValue[T](tag: IndexLogEntryTag[T], value: T): Unit = { - tags((null, tag)) = value - } - - def getTagValue[T](tag: IndexLogEntryTag[T]): Option[T] = { - tags.get((null, tag)).map(_.asInstanceOf[T]) - } - - def unsetTagValue[T](tag: IndexLogEntryTag[T]): Unit = { - tags.remove((null, tag)) + def setTagValue[T](plan: LogicalPlan, tag: IndexLogEntryTag[T], value: T): Unit = { + tags((plan, tag)) = value } - def withCachedTag[T](tag: IndexLogEntryTag[T])(f: => T): T = { - withCachedTag(null, tag)(f) + def getTagValue[T](plan: LogicalPlan, tag: IndexLogEntryTag[T]): Option[T] = { + tags.get((plan, tag)).map(_.asInstanceOf[T]) } } @@ -655,8 +650,7 @@ object IndexLogEntry { derivedDataset, content, source, - properties + ((IndexConstants.HYPERSPACE_VERSION_PROPERTY, BuildInfo.version)) - ) + properties + ((IndexConstants.HYPERSPACE_VERSION_PROPERTY, BuildInfo.version))) } } @@ -664,8 +658,6 @@ object IndexLogEntry { * Provides functionality to generate unique file ids for files. */ class FileIdTracker { - private var maxId: Long = -1L - // Combination of file properties, used as key, to identify a // unique file for which an id is generated. type key = ( @@ -674,6 +666,7 @@ class FileIdTracker { Long // Modified time. ) private val fileToIdMap: mutable.HashMap[key, Long] = mutable.HashMap() + private var maxId: Long = -1L def getMaxFileId: Long = maxId @@ -682,8 +675,6 @@ class FileIdTracker { def getFileId(path: String, size: Long, modifiedTime: Long): Option[Long] = fileToIdMap.get((path, size, modifiedTime)) - def setSizeHint(size: Int): Unit = fileToIdMap.sizeHint(size) - /** * Add a set of FileInfos to the fileToIdMap. The assumption is * that the each FileInfo already has a valid file id if an entry @@ -715,6 +706,8 @@ class FileIdTracker { } } + def setSizeHint(size: Int): Unit = fileToIdMap.sizeHint(size) + /** * Try to add file properties to fileToIdMap. If the file is already in * the map then return its current id. Otherwise, generate a new id, From 1948e0ca3c9a7c88ed725d255612843415f9e0c6 Mon Sep 17 00:00:00 2001 From: ddrag Date: Fri, 9 Apr 2021 14:48:35 +0200 Subject: [PATCH 9/9] reformat with latest master version after checking scalafmt options --- .../hyperspace/index/IndexLogEntry.scala | 21 ++++++++++--------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala index 62a7f5997..094fb958c 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala @@ -492,6 +492,7 @@ case class IndexLogEntry( tracker.addFileInfo(sourceFileInfoSet ++ content.fileInfos) tracker } + /** * A mutable map for holding auxiliary information of this index log entry while applying rules. */ @@ -528,22 +529,12 @@ case class IndexLogEntry( Content.fromLeafFiles(deleted.map(toFileStatus), fileIdTracker))))))))))) } - def relations: Seq[Relation] = { - // Only one relation is currently supported. - assert(source.plan.properties.relations.size == 1) - source.plan.properties.relations - } - def bucketSpec: BucketSpec = BucketSpec( numBuckets = numBuckets, bucketColumnNames = indexedColumns, sortColumnNames = indexedColumns) - def numBuckets: Int = derivedDataset.properties.numBuckets - - def indexedColumns: Seq[String] = derivedDataset.properties.columns.indexed - override def equals(o: Any): Boolean = o match { case that: IndexLogEntry => config.equals(that.config) && @@ -556,8 +547,12 @@ case class IndexLogEntry( case _ => false } + def numBuckets: Int = derivedDataset.properties.numBuckets + def config: IndexConfig = IndexConfig(name, indexedColumns, includedColumns) + def indexedColumns: Seq[String] = derivedDataset.properties.columns.indexed + def includedColumns: Seq[String] = derivedDataset.properties.columns.included def signature: Signature = { @@ -579,6 +574,12 @@ case class IndexLogEntry( .toBoolean } + def relations: Seq[Relation] = { + // Only one relation is currently supported. + assert(source.plan.properties.relations.size == 1) + source.plan.properties.relations + } + override def hashCode(): Int = { config.hashCode + signature.hashCode + numBuckets.hashCode + content.hashCode }