diff --git a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala index 4d9227dc9..094fb958c 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala @@ -31,6 +31,7 @@ import org.apache.spark.sql.types.{DataType, StructType} import com.microsoft.hyperspace.{BuildInfo, HyperspaceException} import com.microsoft.hyperspace.actions.Constants +import com.microsoft.hyperspace.index.Content.{createFileInfoWithPrefix, recFilesApply} import com.microsoft.hyperspace.util.PathUtils // IndexLogEntry-specific fingerprint to be temporarily used where fingerprint is not defined. @@ -45,31 +46,20 @@ case class Content(root: Directory, fingerprint: NoOpFingerprint = NoOpFingerpri @JsonIgnore lazy val files: Seq[Path] = { // Recursively find files from directory tree. - rec(new Path(root.name), root, (f, prefix) => new Path(prefix, f.name)) + recFilesApply(new Path(root.name), root, (f, prefix) => new Path(prefix, f.name)) } @JsonIgnore lazy val fileInfos: Set[FileInfo] = { - rec( - new Path(root.name), - root, - (f, prefix) => - FileInfo(new Path(prefix, f.name).toString, f.size, f.modifiedTime, f.id)).toSet - } - - private def rec[T]( - prefixPath: Path, - directory: Directory, - func: (FileInfo, Path) => T): Seq[T] = { - val files = directory.files.map(f => func(f, prefixPath)) - files ++ directory.subDirs.flatMap { dir => - rec(new Path(prefixPath, dir.name), dir, func) - } + recFilesApply(new Path(root.name), root, createFileInfoWithPrefix).toSet } } object Content { + val createFileInfoWithPrefix: (FileInfo, Path) => FileInfo = (f, prefix) => + FileInfo(new Path(prefix, f.name).toString, f.size, f.modifiedTime, f.id) + /** * Create a Content object from a directory path by recursively listing its leaf files. All * files from the directory tree will be part of the Directory. @@ -91,8 +81,9 @@ object Content { hadoopConfiguration: Configuration, pathFilter: PathFilter = PathUtils.DataPathFilter, throwIfNotExists: Boolean = false): Content = - Content(Directory.fromDirectory(path, fileIdTracker, pathFilter, hadoopConfiguration, - throwIfNotExists)) + Content( + Directory + .fromDirectory(path, fileIdTracker, pathFilter, hadoopConfiguration, throwIfNotExists)) /** * Create a Content object from a specified list of leaf files. Any files not listed here will @@ -102,15 +93,50 @@ object Content { * @param fileIdTracker FileIdTracker to keep mapping of file properties to assigned file ids. * @return Content object with Directory tree from leaf files. */ - def fromLeafFiles( - files: Seq[FileStatus], - fileIdTracker: FileIdTracker): Option[Content] = { + def fromLeafFiles(files: Seq[FileStatus], fileIdTracker: FileIdTracker): Option[Content] = { if (files.nonEmpty) { Some(Content(Directory.fromLeafFiles(files, fileIdTracker))) } else { None } } + + /** + * Apply `func` to each file in directory recursively. + * + * @param prefixPath Root prefix + * @param directory Root directory + * @param func Function which would apply to current prefix and file + * @tparam T + * @return Result list of applying function to all files + */ + def recFilesApply[T]( + prefixPath: Path, + directory: Directory, + func: (FileInfo, Path) => T): Seq[T] = { + @tailrec + def recAcc[A]( + dirMap: List[(Path, Seq[Directory])], + func: (FileInfo, Path) => A, + acc: Seq[A] = Seq.empty): Seq[A] = { + dirMap match { + case Nil => acc + case (curPrefixPath, curDirs) :: otherDirs => + val curAcc = for { + dir <- curDirs + file <- dir.files + } yield func(file, new Path(curPrefixPath, dir.name)) + + val newLevels = curDirs + .filter(_.subDirs.nonEmpty) + .map(dir => (new Path(curPrefixPath, dir.name), dir.subDirs)) + + recAcc(otherDirs ++ newLevels, func, curAcc ++ acc) + } + } + + recAcc(List(prefixPath -> Seq(directory)), func) + } } /** @@ -210,15 +236,6 @@ object Directory { } } - @tailrec - private def createEmptyDirectory(path: Path, subDirs: Seq[Directory] = Seq()): Directory = { - if (path.isRoot) { - Directory(path.toString, subDirs = subDirs) - } else { - createEmptyDirectory(path.getParent, Seq(Directory(path.getName, subDirs = subDirs))) - } - } - /** * Create a Content object from a specified list of leaf files. Any files not listed here will * NOT be part of the returned object. @@ -230,12 +247,10 @@ object Directory { * @param files List of leaf files. * @param fileIdTracker FileIdTracker to keep mapping of file properties to assigned file ids. * Note: If a new leaf file is discovered, the input fileIdTracker gets - * updated by adding it to the files it is tracking. + * updated by adding it to the files it is tracking. * @return Content object with Directory tree from leaf files. */ - def fromLeafFiles( - files: Seq[FileStatus], - fileIdTracker: FileIdTracker): Directory = { + def fromLeafFiles(files: Seq[FileStatus], fileIdTracker: FileIdTracker): Directory = { require( files.nonEmpty, s"Empty files list found while creating a ${Directory.getClass.getName}.") @@ -291,6 +306,15 @@ object Directory { pathToDirectory(getRoot(files.head.getPath)) } + @tailrec + private def createEmptyDirectory(path: Path, subDirs: Seq[Directory] = Seq()): Directory = { + if (path.isRoot) { + Directory(path.toString, subDirs = subDirs) + } else { + createEmptyDirectory(path.getParent, Seq(Directory(path.getName, subDirs = subDirs))) + } + } + // Return file system root path from any path. E.g. "file:/C:/a/b/c" will have root "file:/C:/". // For linux systems, this root will be "file:/". Other hdfs compatible file systems will have // corresponding roots. @@ -323,8 +347,8 @@ case class FileInfo(name: String, size: Long, modifiedTime: Long, id: Long) { override def equals(o: Any): Boolean = o match { case that: FileInfo => name.equals(that.name) && - size.equals(that.size) && - modifiedTime.equals(that.modifiedTime) + size.equals(that.size) && + modifiedTime.equals(that.modifiedTime) case _ => false } @@ -350,10 +374,11 @@ case class CoveringIndex(properties: CoveringIndex.Properties) { val kindAbbr = "CI" } object CoveringIndex { - case class Properties(columns: Properties.Columns, - schemaString: String, - numBuckets: Int, - properties: Map[String, String]) + case class Properties( + columns: Properties.Columns, + schemaString: String, + numBuckets: Int, + properties: Map[String, String]) object Properties { case class Columns(indexed: Seq[String], included: Seq[String]) @@ -377,9 +402,7 @@ object LogicalPlanFingerprint { * @param appendedFiles Appended files. * @param deletedFiles Deleted files. */ -case class Update( - appendedFiles: Option[Content] = None, - deletedFiles: Option[Content] = None) +case class Update(appendedFiles: Option[Content] = None, deletedFiles: Option[Content] = None) // IndexLogEntry-specific Hdfs that represents the source data. case class Hdfs(properties: Hdfs.Properties) { @@ -444,47 +467,50 @@ case class IndexLogEntry( properties: Map[String, String]) extends LogEntry(IndexLogEntry.VERSION) { - def schema: StructType = - DataType.fromJson(derivedDataset.properties.schemaString).asInstanceOf[StructType] - - def created: Boolean = state.equals(Constants.States.ACTIVE) - - def relations: Seq[Relation] = { - // Only one relation is currently supported. - assert(source.plan.properties.relations.size == 1) - source.plan.properties.relations - } - // FileInfo's 'name' contains the full path to the file. @JsonIgnore lazy val sourceFileInfoSet: Set[FileInfo] = { relations.head.data.properties.content.fileInfos } - @JsonIgnore lazy val sourceFilesSizeInBytes: Long = { sourceFileInfoSet.foldLeft(0L)(_ + _.size) } - - def sourceUpdate: Option[Update] = { - relations.head.data.properties.update - } - - def hasSourceUpdate: Boolean = { - sourceUpdate.isDefined && (appendedFiles.nonEmpty || deletedFiles.nonEmpty) - } - // FileInfo's 'name' contains the full path to the file. @JsonIgnore lazy val appendedFiles: Set[FileInfo] = { sourceUpdate.flatMap(_.appendedFiles).map(_.fileInfos).getOrElse(Set()) } - // FileInfo's 'name' contains the full path to the file. @JsonIgnore lazy val deletedFiles: Set[FileInfo] = { sourceUpdate.flatMap(_.deletedFiles).map(_.fileInfos).getOrElse(Set()) } + @JsonIgnore + lazy val fileIdTracker: FileIdTracker = { + val tracker = new FileIdTracker + tracker.addFileInfo(sourceFileInfoSet ++ content.fileInfos) + tracker + } + + /** + * A mutable map for holding auxiliary information of this index log entry while applying rules. + */ + @JsonIgnore + private val tags: mutable.Map[(LogicalPlan, IndexLogEntryTag[_]), Any] = mutable.Map.empty + + def schema: StructType = + DataType.fromJson(derivedDataset.properties.schemaString).asInstanceOf[StructType] + + def created: Boolean = state.equals(Constants.States.ACTIVE) + + def hasSourceUpdate: Boolean = { + sourceUpdate.isDefined && (appendedFiles.nonEmpty || deletedFiles.nonEmpty) + } + + def sourceUpdate: Option[Update] = { + relations.head.data.properties.update + } def copyWithUpdate( latestFingerprint: LogicalPlanFingerprint, @@ -493,21 +519,14 @@ case class IndexLogEntry( def toFileStatus(f: FileInfo) = { new FileStatus(f.size, false, 0, 1, f.modifiedTime, new Path(f.name)) } - copy( - source = source.copy( - plan = source.plan.copy( - properties = source.plan.properties.copy( - fingerprint = latestFingerprint, - relations = Seq( - relations.head.copy( - data = relations.head.data.copy( - properties = relations.head.data.properties.copy( - update = Some( - Update( - appendedFiles = - Content.fromLeafFiles(appended.map(toFileStatus), fileIdTracker), - deletedFiles = - Content.fromLeafFiles(deleted.map(toFileStatus), fileIdTracker))))))))))) + copy(source = source.copy(plan = source.plan.copy(properties = source.plan.properties.copy( + fingerprint = latestFingerprint, + relations = Seq( + relations.head.copy(data = relations.head.data.copy(properties = + relations.head.data.properties.copy(update = Some(Update( + appendedFiles = Content.fromLeafFiles(appended.map(toFileStatus), fileIdTracker), + deletedFiles = + Content.fromLeafFiles(deleted.map(toFileStatus), fileIdTracker))))))))))) } def bucketSpec: BucketSpec = @@ -543,43 +562,46 @@ case class IndexLogEntry( } def hasLineageColumn: Boolean = { - derivedDataset.properties.properties.getOrElse( - IndexConstants.LINEAGE_PROPERTY, IndexConstants.INDEX_LINEAGE_ENABLED_DEFAULT).toBoolean + derivedDataset.properties.properties + .getOrElse(IndexConstants.LINEAGE_PROPERTY, IndexConstants.INDEX_LINEAGE_ENABLED_DEFAULT) + .toBoolean } def hasParquetAsSourceFormat: Boolean = { relations.head.fileFormat.equals("parquet") || - derivedDataset.properties.properties.getOrElse( - IndexConstants.HAS_PARQUET_AS_SOURCE_FORMAT_PROPERTY, "false").toBoolean + derivedDataset.properties.properties + .getOrElse(IndexConstants.HAS_PARQUET_AS_SOURCE_FORMAT_PROPERTY, "false") + .toBoolean } - @JsonIgnore - lazy val fileIdTracker: FileIdTracker = { - val tracker = new FileIdTracker - tracker.addFileInfo(sourceFileInfoSet ++ content.fileInfos) - tracker + def relations: Seq[Relation] = { + // Only one relation is currently supported. + assert(source.plan.properties.relations.size == 1) + source.plan.properties.relations } override def hashCode(): Int = { config.hashCode + signature.hashCode + numBuckets.hashCode + content.hashCode } - /** - * A mutable map for holding auxiliary information of this index log entry while applying rules. - */ - @JsonIgnore - private val tags: mutable.Map[(LogicalPlan, IndexLogEntryTag[_]), Any] = mutable.Map.empty + def unsetTagValue[T](plan: LogicalPlan, tag: IndexLogEntryTag[T]): Unit = { + tags.remove((plan, tag)) + } - def setTagValue[T](plan: LogicalPlan, tag: IndexLogEntryTag[T], value: T): Unit = { - tags((plan, tag)) = value + def setTagValue[T](tag: IndexLogEntryTag[T], value: T): Unit = { + tags((null, tag)) = value } - def getTagValue[T](plan: LogicalPlan, tag: IndexLogEntryTag[T]): Option[T] = { - tags.get((plan, tag)).map(_.asInstanceOf[T]) + def getTagValue[T](tag: IndexLogEntryTag[T]): Option[T] = { + tags.get((null, tag)).map(_.asInstanceOf[T]) } - def unsetTagValue[T](plan: LogicalPlan, tag: IndexLogEntryTag[T]): Unit = { - tags.remove((plan, tag)) + def unsetTagValue[T](tag: IndexLogEntryTag[T]): Unit = { + tags.remove((null, tag)) + } + + def withCachedTag[T](tag: IndexLogEntryTag[T])(f: => T): T = { + withCachedTag(null, tag)(f) } def withCachedTag[T](plan: LogicalPlan, tag: IndexLogEntryTag[T])(f: => T): T = { @@ -592,20 +614,12 @@ case class IndexLogEntry( } } - def setTagValue[T](tag: IndexLogEntryTag[T], value: T): Unit = { - tags((null, tag)) = value - } - - def getTagValue[T](tag: IndexLogEntryTag[T]): Option[T] = { - tags.get((null, tag)).map(_.asInstanceOf[T]) - } - - def unsetTagValue[T](tag: IndexLogEntryTag[T]): Unit = { - tags.remove((null, tag)) + def setTagValue[T](plan: LogicalPlan, tag: IndexLogEntryTag[T], value: T): Unit = { + tags((plan, tag)) = value } - def withCachedTag[T](tag: IndexLogEntryTag[T])(f: => T): T = { - withCachedTag(null, tag)(f) + def getTagValue[T](plan: LogicalPlan, tag: IndexLogEntryTag[T]): Option[T] = { + tags.get((plan, tag)).map(_.asInstanceOf[T]) } } @@ -637,8 +651,7 @@ object IndexLogEntry { derivedDataset, content, source, - properties + ((IndexConstants.HYPERSPACE_VERSION_PROPERTY, BuildInfo.version)) - ) + properties + ((IndexConstants.HYPERSPACE_VERSION_PROPERTY, BuildInfo.version))) } } @@ -646,16 +659,15 @@ object IndexLogEntry { * Provides functionality to generate unique file ids for files. */ class FileIdTracker { - private var maxId: Long = -1L - // Combination of file properties, used as key, to identify a // unique file for which an id is generated. type key = ( - String, // Full path. + String, // Full path. Long, // Size. - Long // Modified time. - ) + Long // Modified time. + ) private val fileToIdMap: mutable.HashMap[key, Long] = mutable.HashMap() + private var maxId: Long = -1L def getMaxFileId: Long = maxId @@ -664,8 +676,6 @@ class FileIdTracker { def getFileId(path: String, size: Long, modifiedTime: Long): Option[Long] = fileToIdMap.get((path, size, modifiedTime)) - def setSizeHint(size: Int): Unit = fileToIdMap.sizeHint(size) - /** * Add a set of FileInfos to the fileToIdMap. The assumption is * that the each FileInfo already has a valid file id if an entry @@ -679,8 +689,7 @@ class FileIdTracker { setSizeHint(files.size) files.foreach { f => if (f.id == IndexConstants.UNKNOWN_FILE_ID) { - throw HyperspaceException( - s"Cannot add file info with unknown id. (file: ${f.name}).") + throw HyperspaceException(s"Cannot add file info with unknown id. (file: ${f.name}).") } val key = (f.name, f.size, f.modifiedTime) @@ -698,6 +707,8 @@ class FileIdTracker { } } + def setSizeHint(size: Int): Unit = fileToIdMap.sizeHint(size) + /** * Try to add file properties to fileToIdMap. If the file is already in * the map then return its current id. Otherwise, generate a new id, @@ -708,8 +719,7 @@ class FileIdTracker { */ def addFile(file: FileStatus): Long = { fileToIdMap.getOrElseUpdate( - (file.getPath.toString, file.getLen, file.getModificationTime), - { + (file.getPath.toString, file.getLen, file.getModificationTime), { maxId += 1 maxId }) diff --git a/src/test/scala/com/microsoft/hyperspace/index/IndexLogEntryTest.scala b/src/test/scala/com/microsoft/hyperspace/index/IndexLogEntryTest.scala index c4936d88f..0a64edb72 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/IndexLogEntryTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/IndexLogEntryTest.scala @@ -193,34 +193,20 @@ class IndexLogEntryTest extends SparkFunSuite with SQLHelper with BeforeAndAfter Seq( Relation( Seq("rootpath"), - Hdfs( - Hdfs.Properties( - Content( - Directory( - "test", - Seq(FileInfo("f1", 100L, 100L, 0), FileInfo("f2", 100L, 200L, 1)), - Seq() - ) - ), - Some( - Update( - None, - Some(Content(Directory("", Seq(FileInfo("f1", 10, 10, 2))))) - ) - ) - ) - ), + Hdfs(Hdfs.Properties( + Content(Directory( + "test", + Seq(FileInfo("f1", 100L, 100L, 0), FileInfo("f2", 100L, 200L, 1)), + Seq())), + Some(Update(None, Some(Content(Directory("", Seq(FileInfo("f1", 10, 10, 2))))))))), "schema", "type", - Map() - ) - ), + Map())), null, null, LogicalPlanFingerprint( LogicalPlanFingerprint - .Properties(Seq(Signature("provider", "signatureValue"))) - )) + .Properties(Seq(Signature("provider", "signatureValue"))))) val expected = IndexLogEntry.create( "indexName", @@ -242,16 +228,61 @@ class IndexLogEntryTest extends SparkFunSuite with SQLHelper with BeforeAndAfter assert(actual.sourceFilesSizeInBytes == 200L) } - test("Content.files api lists all files from Content object.") { - val content = Content(Directory("file:/", subDirs = - Seq( - Directory("a", - files = Seq(FileInfo("f1", 0, 0, UNKNOWN_FILE_ID), FileInfo("f2", 0, 0, UNKNOWN_FILE_ID)), + test("Content.recFilesApply returns a result list of applying function to all files.") { + val directory = Directory( + "file:/", + files = Seq(FileInfo("f0", 0, 0, UNKNOWN_FILE_ID)), + subDirs = Seq( + Directory( + "a", + files = + Seq(FileInfo("f1", 0, 0, UNKNOWN_FILE_ID), FileInfo("f2", 0, 0, UNKNOWN_FILE_ID)), subDirs = Seq( - Directory("b", + Directory( + "b", files = - Seq(FileInfo("f3", 0, 0, UNKNOWN_FILE_ID), FileInfo("f4", 0, 0, UNKNOWN_FILE_ID))))) - ))) + Seq(FileInfo("f3", 0, 0, UNKNOWN_FILE_ID), FileInfo("f4", 0, 0, UNKNOWN_FILE_ID)), + subDirs = Seq(Directory("c"))), + Directory("d"))))) + + def theFunction: (FileInfo, Path) => Path = (f, prefix) => new Path(prefix, f.name) + + val res = Content.recFilesApply(new Path("file:/"), directory, theFunction) + + val expected = + Seq("file:/f0", "file:/a/f1", "file:/a/f2", "file:/a/b/f3", "file:/a/b/f4") + .map(new Path(_)) + .toSet + + val actual = res.toSet + assert(actual.equals(expected)) + } + + test("Content.recFilesApply returns empty list for directories without files.") { + val directory = Directory("file:/") + + val res = Content.recFilesApply( + new Path("file:/"), + directory, + (f, prefix) => new Path(prefix, f.name)) + + assert(res.isEmpty) + } + + test("Content.files api lists all files from Content object.") { + val content = Content( + Directory( + "file:/", + subDirs = Seq(Directory( + "a", + files = + Seq(FileInfo("f1", 0, 0, UNKNOWN_FILE_ID), FileInfo("f2", 0, 0, UNKNOWN_FILE_ID)), + subDirs = Seq( + Directory( + "b", + files = Seq( + FileInfo("f3", 0, 0, UNKNOWN_FILE_ID), + FileInfo("f4", 0, 0, UNKNOWN_FILE_ID)))))))) val expected = Seq("file:/a/f1", "file:/a/f2", "file:/a/b/f3", "file:/a/b/f4").map(new Path(_)).toSet @@ -263,8 +294,9 @@ class IndexLogEntryTest extends SparkFunSuite with SQLHelper with BeforeAndAfter val nestedDirPath = toPath(nestedDir) val expected = { - val fileInfos = Seq(f3, f4).map(toFileStatus).map(f => - FileInfo(f, fileIdTracker.addFile(f), asFullPath = false)) + val fileInfos = Seq(f3, f4) + .map(toFileStatus) + .map(f => FileInfo(f, fileIdTracker.addFile(f), asFullPath = false)) val nestedDirDirectory = Directory("nested", fileInfos) val rootDirectory = createDirectory(nestedDirPath, nestedDirDirectory) Content(rootDirectory, NoOpFingerprint()) @@ -278,8 +310,9 @@ class IndexLogEntryTest extends SparkFunSuite with SQLHelper with BeforeAndAfter val nestedDirPath = toPath(nestedDir) val expected = { - val fileInfos = Seq(f3, f4).map(toFileStatus).map(f => - FileInfo(f, fileIdTracker.addFile(f), asFullPath = false)) + val fileInfos = Seq(f3, f4) + .map(toFileStatus) + .map(f => FileInfo(f, fileIdTracker.addFile(f), asFullPath = false)) val nestedDirDirectory = Directory("nested", fileInfos) val rootDirectory = createDirectory(nestedDirPath, nestedDirDirectory) Content(rootDirectory, NoOpFingerprint()) @@ -293,8 +326,9 @@ class IndexLogEntryTest extends SparkFunSuite with SQLHelper with BeforeAndAfter val nestedDirPath = toPath(nestedDir) val expected = { - val fileInfos = Seq(f3, f4).map(toFileStatus).map(f => - FileInfo(f, fileIdTracker.addFile(f), asFullPath = false)) + val fileInfos = Seq(f3, f4) + .map(toFileStatus) + .map(f => FileInfo(f, fileIdTracker.addFile(f), asFullPath = false)) val nestedDirDirectory = Directory("nested", fileInfos) createDirectory(nestedDirPath, nestedDirDirectory) } @@ -303,17 +337,21 @@ class IndexLogEntryTest extends SparkFunSuite with SQLHelper with BeforeAndAfter assert(directoryEquals(actual, expected)) } - test("Directory.fromDirectory api creates the correct Directory objects, " + - "recursively listing all leaf files.") { + test( + "Directory.fromDirectory api creates the correct Directory objects, " + + "recursively listing all leaf files.") { val testDirPath = toPath(testDir) val testDirLeafFiles = - Seq(f1, f2).map(toFileStatus).map(f => - FileInfo(f, fileIdTracker.addFile(f), asFullPath = false)) + Seq(f1, f2) + .map(toFileStatus) + .map(f => FileInfo(f, fileIdTracker.addFile(f), asFullPath = false)) val nestedDirLeafFiles = - Seq(f3, f4).map(toFileStatus).map(f => - FileInfo(f, fileIdTracker.addFile(f), asFullPath = false)) - val testDirDirectory = Directory(name = "testDir", + Seq(f3, f4) + .map(toFileStatus) + .map(f => FileInfo(f, fileIdTracker.addFile(f), asFullPath = false)) + val testDirDirectory = Directory( + name = "testDir", files = testDirLeafFiles, subDirs = Seq(Directory(name = "nested", files = nestedDirLeafFiles))) val expected = createDirectory(testDirPath, testDirDirectory) @@ -327,12 +365,15 @@ class IndexLogEntryTest extends SparkFunSuite with SQLHelper with BeforeAndAfter val testDirPath = toPath(testDir) val testDirLeafFiles = - Seq(f1, f2).map(toFileStatus).map(f => - FileInfo(f, fileIdTracker.addFile(f), asFullPath = false)) + Seq(f1, f2) + .map(toFileStatus) + .map(f => FileInfo(f, fileIdTracker.addFile(f), asFullPath = false)) val nestedDirLeafFiles = - Seq(f3, f4).map(toFileStatus).map(f => - FileInfo(f, fileIdTracker.addFile(f), asFullPath = false)) - val testDirDirectory = Directory(name = "testDir", + Seq(f3, f4) + .map(toFileStatus) + .map(f => FileInfo(f, fileIdTracker.addFile(f), asFullPath = false)) + val testDirDirectory = Directory( + name = "testDir", files = testDirLeafFiles, subDirs = Seq(Directory(name = "nested", files = nestedDirLeafFiles))) @@ -346,12 +387,15 @@ class IndexLogEntryTest extends SparkFunSuite with SQLHelper with BeforeAndAfter test("Directory.fromLeafFiles api does not include other files in the directory.") { val testDirPath = toPath(testDir) - val testDirLeafFiles = Seq(f1).map(toFileStatus).map(f => - FileInfo(f, fileIdTracker.addFile(f), asFullPath = false)) + val testDirLeafFiles = Seq(f1) + .map(toFileStatus) + .map(f => FileInfo(f, fileIdTracker.addFile(f), asFullPath = false)) val nestedDirLeafFiles = - Seq(f4).map(toFileStatus).map(f => - FileInfo(f, fileIdTracker.addFile(f), asFullPath = false)) - val testDirDirectory = Directory(name = "testDir", + Seq(f4) + .map(toFileStatus) + .map(f => FileInfo(f, fileIdTracker.addFile(f), asFullPath = false)) + val testDirDirectory = Directory( + name = "testDir", files = testDirLeafFiles, subDirs = Seq(Directory(name = "nested", files = nestedDirLeafFiles))) @@ -362,8 +406,9 @@ class IndexLogEntryTest extends SparkFunSuite with SQLHelper with BeforeAndAfter assert(directoryEquals(actual, expected)) } - test("Directory.fromLeafFiles: throwIfNotExist flag throws exception for non-existent" + - "directory, otherwise works as expected.") { + test( + "Directory.fromLeafFiles: throwIfNotExist flag throws exception for non-existent" + + "directory, otherwise works as expected.") { val testDirPath = toPath(testDir) val nonExistentDir = new Path(testDirPath, "nonexistent") @@ -404,8 +449,9 @@ class IndexLogEntryTest extends SparkFunSuite with SQLHelper with BeforeAndAfter override def accept(path: Path): Boolean = path.getName.startsWith("f1") } - val testDirLeafFiles = Seq(f1).map(toFileStatus).map(f => - FileInfo(f, fileIdTracker.addFile(f), asFullPath = false)) + val testDirLeafFiles = Seq(f1) + .map(toFileStatus) + .map(f => FileInfo(f, fileIdTracker.addFile(f), asFullPath = false)) val testDirDirectory = Directory(name = "testDir", files = testDirLeafFiles) val expected = createDirectory(testDirPath, testDirDirectory) @@ -415,8 +461,9 @@ class IndexLogEntryTest extends SparkFunSuite with SQLHelper with BeforeAndAfter assert(directoryEquals(actual, expected)) } - test("Directory.fromDirectory and fromLeafFileswhere files are at same level but different" + - "dirs.") { + test( + "Directory.fromDirectory and fromLeafFileswhere files are at same level but different" + + "dirs.") { // File Structure // testDir/temp/a/f1 // testDir/temp/b/f2 @@ -428,11 +475,17 @@ class IndexLogEntryTest extends SparkFunSuite with SQLHelper with BeforeAndAfter val f2 = Files.createFile(Paths.get(b + "/f2")) val aDirectory = - Directory("a", Seq(f1).map(toFileStatus).map(f => - FileInfo(f, fileIdTracker.addFile(f), asFullPath = false))) + Directory( + "a", + Seq(f1) + .map(toFileStatus) + .map(f => FileInfo(f, fileIdTracker.addFile(f), asFullPath = false))) val bDirectory = - Directory("b", Seq(f2).map(toFileStatus).map(f => - FileInfo(f, fileIdTracker.addFile(f), asFullPath = false))) + Directory( + "b", + Seq(f2) + .map(toFileStatus) + .map(f => FileInfo(f, fileIdTracker.addFile(f), asFullPath = false))) val tempDirectory = Directory("temp", subDirs = Seq(aDirectory, bDirectory)) val tempDirectoryPath = toPath(tempDir) @@ -460,12 +513,18 @@ class IndexLogEntryTest extends SparkFunSuite with SQLHelper with BeforeAndAfter val f2 = Files.createFile(Paths.get(c + "/f2")) val cDirectory = - Directory("c", Seq(f2).map(toFileStatus).map(f => - FileInfo(f, fileIdTracker.addFile(f), asFullPath = false))) + Directory( + "c", + Seq(f2) + .map(toFileStatus) + .map(f => FileInfo(f, fileIdTracker.addFile(f), asFullPath = false))) val bDirectory = Directory("b", subDirs = Seq(cDirectory)) val aDirectory = - Directory("a", Seq(f1).map(toFileStatus).map(f => - FileInfo(f, fileIdTracker.addFile(f), asFullPath = false))) + Directory( + "a", + Seq(f1) + .map(toFileStatus) + .map(f => FileInfo(f, fileIdTracker.addFile(f), asFullPath = false))) val tempDirectory = Directory("temp", subDirs = Seq(aDirectory, bDirectory)) val tempDirectoryPath = toPath(tempDir) @@ -480,8 +539,9 @@ class IndexLogEntryTest extends SparkFunSuite with SQLHelper with BeforeAndAfter FileUtils.deleteDirectory(tempDir.toFile) } - test("Directory.fromDirectory and fromLeafFiles where files belong to multiple" + - "subdirectories.") { + test( + "Directory.fromDirectory and fromLeafFiles where files belong to multiple" + + "subdirectories.") { // File Structure // testDir/temp/a/f1 // testDir/temp/a/b/f2 @@ -496,17 +556,23 @@ class IndexLogEntryTest extends SparkFunSuite with SQLHelper with BeforeAndAfter val f3 = Files.createFile(Paths.get(c + "/f3")) val bDirectory = - Directory("b", Seq(f2).map(toFileStatus).map(f => - FileInfo(f, fileIdTracker.addFile(f), asFullPath = false))) + Directory( + "b", + Seq(f2) + .map(toFileStatus) + .map(f => FileInfo(f, fileIdTracker.addFile(f), asFullPath = false))) val cDirectory = - Directory("c", Seq(f3).map(toFileStatus).map(f => - FileInfo(f, fileIdTracker.addFile(f), asFullPath = false))) + Directory( + "c", + Seq(f3) + .map(toFileStatus) + .map(f => FileInfo(f, fileIdTracker.addFile(f), asFullPath = false))) val aDirectory = Directory( "a", - Seq(f1).map(toFileStatus).map(f => - FileInfo(f, fileIdTracker.addFile(f), asFullPath = false)), - Seq(bDirectory, cDirectory) - ) + Seq(f1) + .map(toFileStatus) + .map(f => FileInfo(f, fileIdTracker.addFile(f), asFullPath = false)), + Seq(bDirectory, cDirectory)) val tempDirectory = Directory("temp", subDirs = Seq(aDirectory)) val tempDirectoryPath = toPath(tempDir) @@ -526,11 +592,7 @@ class IndexLogEntryTest extends SparkFunSuite with SQLHelper with BeforeAndAfter // a/f2 val directory1 = Directory( name = "a", - files = Seq( - FileInfo("f1", 100L, 100L, 1L), - FileInfo("f2", 100L, 100L, 2L) - ) - ) + files = Seq(FileInfo("f1", 100L, 100L, 1L), FileInfo("f2", 100L, 100L, 2L))) // directory2: // a/b/f3 @@ -540,13 +602,7 @@ class IndexLogEntryTest extends SparkFunSuite with SQLHelper with BeforeAndAfter subDirs = Seq( Directory( name = "b", - files = Seq( - FileInfo("f3", 100L, 100L, 3L), - FileInfo("f4", 100L, 100L, 4L) - ) - ) - ) - ) + files = Seq(FileInfo("f3", 100L, 100L, 3L), FileInfo("f4", 100L, 100L, 4L))))) // Expected result of merging directory1 and directory2: // a/f1 @@ -555,20 +611,11 @@ class IndexLogEntryTest extends SparkFunSuite with SQLHelper with BeforeAndAfter // a/b/f4 val expected = Directory( name = "a", - files = Seq( - FileInfo("f1", 100L, 100L, 1L), - FileInfo("f2", 100L, 100L, 2L) - ), + files = Seq(FileInfo("f1", 100L, 100L, 1L), FileInfo("f2", 100L, 100L, 2L)), subDirs = Seq( Directory( name = "b", - files = Seq( - FileInfo("f3", 100L, 100L, 3L), - FileInfo("f4", 100L, 100L, 4L) - ) - ) - ) - ) + files = Seq(FileInfo("f3", 100L, 100L, 3L), FileInfo("f4", 100L, 100L, 4L))))) val actual1 = directory1.merge(directory2) val actual2 = directory2.merge(directory1) @@ -584,14 +631,8 @@ class IndexLogEntryTest extends SparkFunSuite with SQLHelper with BeforeAndAfter // a/b/f3 val directory1 = Directory( name = "a", - files = Seq( - FileInfo("f1", 100L, 100L, 1L), - FileInfo("f2", 100L, 100L, 2L) - ), - subDirs = Seq( - Directory(name = "b", files = Seq(FileInfo("f3", 100L, 100L, 3L))) - ) - ) + files = Seq(FileInfo("f1", 100L, 100L, 1L), FileInfo("f2", 100L, 100L, 2L)), + subDirs = Seq(Directory(name = "b", files = Seq(FileInfo("f3", 100L, 100L, 3L))))) // directory2: // a/f4 @@ -604,17 +645,8 @@ class IndexLogEntryTest extends SparkFunSuite with SQLHelper with BeforeAndAfter subDirs = Seq( Directory( name = "b", - files = Seq( - FileInfo("f5", 100L, 100L, 5L), - FileInfo("f6", 100L, 100L, 6L) - ), - subDirs = Seq(Directory( - name = "c", - files = Seq(FileInfo("f7", 100L, 100L, 7L)) - )) - ) - ) - ) + files = Seq(FileInfo("f5", 100L, 100L, 5L), FileInfo("f6", 100L, 100L, 6L)), + subDirs = Seq(Directory(name = "c", files = Seq(FileInfo("f7", 100L, 100L, 7L))))))) // Expected result of merging directory1 and directory2: // directory1: @@ -630,23 +662,15 @@ class IndexLogEntryTest extends SparkFunSuite with SQLHelper with BeforeAndAfter files = Seq( FileInfo("f1", 100L, 100L, 1L), FileInfo("f2", 100L, 100L, 2L), - FileInfo("f4", 100L, 100L, 4L) - ), + FileInfo("f4", 100L, 100L, 4L)), subDirs = Seq( Directory( name = "b", files = Seq( FileInfo("f3", 100L, 100L, 3L), FileInfo("f5", 100L, 100L, 5L), - FileInfo("f6", 100L, 100L, 6L) - ), - subDirs = Seq( - Directory("c", - files = Seq(FileInfo("f7", 100L, 100L, 7L))) - ) - ) - ) - ) + FileInfo("f6", 100L, 100L, 6L)), + subDirs = Seq(Directory("c", files = Seq(FileInfo("f7", 100L, 100L, 7L))))))) val actual1 = directory1.merge(directory2) val actual2 = directory2.merge(directory1) @@ -661,19 +685,17 @@ class IndexLogEntryTest extends SparkFunSuite with SQLHelper with BeforeAndAfter // a/f2 val directory1 = Directory( name = "a", - files = Seq(FileInfo("f1", 100L, 100L, 1L), FileInfo("f2", 100L, 100L, 2L)) - ) + files = Seq(FileInfo("f1", 100L, 100L, 1L), FileInfo("f2", 100L, 100L, 2L))) // directory2: // b/f3 // b/f4 val directory2 = Directory( name = "b", - files = Seq(FileInfo("f3", 100L, 100L, 3L), FileInfo("f4", 100L, 100L, 4L)) - ) + files = Seq(FileInfo("f3", 100L, 100L, 3L), FileInfo("f4", 100L, 100L, 4L))) - val ex1 = intercept[HyperspaceException] (directory1.merge(directory2)) - val ex2 = intercept[HyperspaceException] (directory2.merge(directory1)) + val ex1 = intercept[HyperspaceException](directory1.merge(directory2)) + val ex2 = intercept[HyperspaceException](directory2.merge(directory1)) assert(ex1.msg.contains("Merging directories with names a and b failed.")) assert(ex2.msg.contains("Merging directories with names b and a failed.")) @@ -685,19 +707,18 @@ class IndexLogEntryTest extends SparkFunSuite with SQLHelper with BeforeAndAfter private def directoryEquals(dir1: Directory, dir2: Directory): Boolean = { dir1.name.equals(dir2.name) && - dir1.files.toSet.equals(dir2.files.toSet) && - dir1.subDirs.size.equals(dir2.subDirs.size) && - dir1.subDirs.sortBy(_.name).zip(dir2.subDirs.sortBy(_.name)).forall{ - case (d1, d2) => directoryEquals(d1, d2) - } + dir1.files.toSet.equals(dir2.files.toSet) && + dir1.subDirs.size.equals(dir2.subDirs.size) && + dir1.subDirs.sortBy(_.name).zip(dir2.subDirs.sortBy(_.name)).forall { case (d1, d2) => + directoryEquals(d1, d2) + } } // Using `directoryPath`, create a Directory tree starting from root and ending at // `leafDirectory`. private def createDirectory(directoryPath: Path, leafDirectory: Directory): Directory = { - TestUtils.splitPath(directoryPath.getParent).foldLeft(leafDirectory) { - (accum, name) => - Directory(name, Seq(), Seq(accum)) + TestUtils.splitPath(directoryPath.getParent).foldLeft(leafDirectory) { (accum, name) => + Directory(name, Seq(), Seq(accum)) } } }