diff --git a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala index 4d9227dc9..65a96eb47 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala @@ -210,15 +210,6 @@ object Directory { } } - @tailrec - private def createEmptyDirectory(path: Path, subDirs: Seq[Directory] = Seq()): Directory = { - if (path.isRoot) { - Directory(path.toString, subDirs = subDirs) - } else { - createEmptyDirectory(path.getParent, Seq(Directory(path.getName, subDirs = subDirs))) - } - } - /** * Create a Content object from a specified list of leaf files. Any files not listed here will * NOT be part of the returned object. @@ -291,6 +282,15 @@ object Directory { pathToDirectory(getRoot(files.head.getPath)) } + @tailrec + private def createEmptyDirectory(path: Path, subDirs: Seq[Directory] = Seq()): Directory = { + if (path.isRoot) { + Directory(path.toString, subDirs = subDirs) + } else { + createEmptyDirectory(path.getParent, Seq(Directory(path.getName, subDirs = subDirs))) + } + } + // Return file system root path from any path. E.g. "file:/C:/a/b/c" will have root "file:/C:/". // For linux systems, this root will be "file:/". Other hdfs compatible file systems will have // corresponding roots. @@ -444,47 +444,55 @@ case class IndexLogEntry( properties: Map[String, String]) extends LogEntry(IndexLogEntry.VERSION) { - def schema: StructType = - DataType.fromJson(derivedDataset.properties.schemaString).asInstanceOf[StructType] - - def created: Boolean = state.equals(Constants.States.ACTIVE) - - def relations: Seq[Relation] = { - // Only one relation is currently supported. - assert(source.plan.properties.relations.size == 1) - source.plan.properties.relations - } - // FileInfo's 'name' contains the full path to the file. @JsonIgnore lazy val sourceFileInfoSet: Set[FileInfo] = { relations.head.data.properties.content.fileInfos } - @JsonIgnore lazy val sourceFilesSizeInBytes: Long = { sourceFileInfoSet.foldLeft(0L)(_ + _.size) } - - def sourceUpdate: Option[Update] = { - relations.head.data.properties.update - } - - def hasSourceUpdate: Boolean = { - sourceUpdate.isDefined && (appendedFiles.nonEmpty || deletedFiles.nonEmpty) - } - // FileInfo's 'name' contains the full path to the file. @JsonIgnore lazy val appendedFiles: Set[FileInfo] = { sourceUpdate.flatMap(_.appendedFiles).map(_.fileInfos).getOrElse(Set()) } - // FileInfo's 'name' contains the full path to the file. @JsonIgnore lazy val deletedFiles: Set[FileInfo] = { sourceUpdate.flatMap(_.deletedFiles).map(_.fileInfos).getOrElse(Set()) } + @JsonIgnore + lazy val fileIdTracker: FileIdTracker = { + val tracker = new FileIdTracker + tracker.addFileInfo(sourceFileInfoSet ++ content.fileInfos) + tracker + } + /** + * A mutable map for holding auxiliary information of this index log entry while applying rules. + */ + @JsonIgnore + private val tags: mutable.Map[(LogicalPlan, IndexLogEntryTag[_]), Any] = mutable.Map.empty + + def schema: StructType = + DataType.fromJson(derivedDataset.properties.schemaString).asInstanceOf[StructType] + + def created: Boolean = state.equals(Constants.States.ACTIVE) + + def hasSourceUpdate: Boolean = { + sourceUpdate.isDefined && (appendedFiles.nonEmpty || deletedFiles.nonEmpty) + } + + def sourceUpdate: Option[Update] = { + relations.head.data.properties.update + } + + def relations: Seq[Relation] = { + // Only one relation is currently supported. + assert(source.plan.properties.relations.size == 1) + source.plan.properties.relations + } def copyWithUpdate( latestFingerprint: LogicalPlanFingerprint, @@ -528,20 +536,6 @@ case class IndexLogEntry( case _ => false } - def numBuckets: Int = derivedDataset.properties.numBuckets - - def config: IndexConfig = IndexConfig(name, indexedColumns, includedColumns) - - def indexedColumns: Seq[String] = derivedDataset.properties.columns.indexed - - def includedColumns: Seq[String] = derivedDataset.properties.columns.included - - def signature: Signature = { - val sourcePlanSignatures = source.plan.properties.fingerprint.properties.signatures - assert(sourcePlanSignatures.length == 1) - sourcePlanSignatures.head - } - def hasLineageColumn: Boolean = { derivedDataset.properties.properties.getOrElse( IndexConstants.LINEAGE_PROPERTY, IndexConstants.INDEX_LINEAGE_ENABLED_DEFAULT).toBoolean @@ -553,45 +547,28 @@ case class IndexLogEntry( IndexConstants.HAS_PARQUET_AS_SOURCE_FORMAT_PROPERTY, "false").toBoolean } - @JsonIgnore - lazy val fileIdTracker: FileIdTracker = { - val tracker = new FileIdTracker - tracker.addFileInfo(sourceFileInfoSet ++ content.fileInfos) - tracker - } - override def hashCode(): Int = { config.hashCode + signature.hashCode + numBuckets.hashCode + content.hashCode } - /** - * A mutable map for holding auxiliary information of this index log entry while applying rules. - */ - @JsonIgnore - private val tags: mutable.Map[(LogicalPlan, IndexLogEntryTag[_]), Any] = mutable.Map.empty + def numBuckets: Int = derivedDataset.properties.numBuckets - def setTagValue[T](plan: LogicalPlan, tag: IndexLogEntryTag[T], value: T): Unit = { - tags((plan, tag)) = value - } + def config: IndexConfig = IndexConfig(name, indexedColumns, includedColumns) - def getTagValue[T](plan: LogicalPlan, tag: IndexLogEntryTag[T]): Option[T] = { - tags.get((plan, tag)).map(_.asInstanceOf[T]) + def indexedColumns: Seq[String] = derivedDataset.properties.columns.indexed + + def includedColumns: Seq[String] = derivedDataset.properties.columns.included + + def signature: Signature = { + val sourcePlanSignatures = source.plan.properties.fingerprint.properties.signatures + assert(sourcePlanSignatures.length == 1) + sourcePlanSignatures.head } def unsetTagValue[T](plan: LogicalPlan, tag: IndexLogEntryTag[T]): Unit = { tags.remove((plan, tag)) } - def withCachedTag[T](plan: LogicalPlan, tag: IndexLogEntryTag[T])(f: => T): T = { - getTagValue(plan, tag) match { - case Some(v) => v - case None => - val ret = f - setTagValue(plan, tag, ret) - ret - } - } - def setTagValue[T](tag: IndexLogEntryTag[T], value: T): Unit = { tags((null, tag)) = value } @@ -607,6 +584,24 @@ case class IndexLogEntry( def withCachedTag[T](tag: IndexLogEntryTag[T])(f: => T): T = { withCachedTag(null, tag)(f) } + + def withCachedTag[T](plan: LogicalPlan, tag: IndexLogEntryTag[T])(f: => T): T = { + getTagValue(plan, tag) match { + case Some(v) => v + case None => + val ret = f + setTagValue(plan, tag, ret) + ret + } + } + + def setTagValue[T](plan: LogicalPlan, tag: IndexLogEntryTag[T], value: T): Unit = { + tags((plan, tag)) = value + } + + def getTagValue[T](plan: LogicalPlan, tag: IndexLogEntryTag[T]): Option[T] = { + tags.get((plan, tag)).map(_.asInstanceOf[T]) + } } // A tag of a `IndexLogEntry`, which defines name and type. @@ -646,8 +641,6 @@ object IndexLogEntry { * Provides functionality to generate unique file ids for files. */ class FileIdTracker { - private var maxId: Long = -1L - // Combination of file properties, used as key, to identify a // unique file for which an id is generated. type key = ( @@ -656,6 +649,7 @@ class FileIdTracker { Long // Modified time. ) private val fileToIdMap: mutable.HashMap[key, Long] = mutable.HashMap() + private var maxId: Long = -1L def getMaxFileId: Long = maxId @@ -664,8 +658,6 @@ class FileIdTracker { def getFileId(path: String, size: Long, modifiedTime: Long): Option[Long] = fileToIdMap.get((path, size, modifiedTime)) - def setSizeHint(size: Int): Unit = fileToIdMap.sizeHint(size) - /** * Add a set of FileInfos to the fileToIdMap. The assumption is * that the each FileInfo already has a valid file id if an entry @@ -698,6 +690,8 @@ class FileIdTracker { } } + def setSizeHint(size: Int): Unit = fileToIdMap.sizeHint(size) + /** * Try to add file properties to fileToIdMap. If the file is already in * the map then return its current id. Otherwise, generate a new id, diff --git a/src/test/scala/com/microsoft/hyperspace/index/IndexLogEntryTest.scala b/src/test/scala/com/microsoft/hyperspace/index/IndexLogEntryTest.scala index c4936d88f..d62e42c46 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/IndexLogEntryTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/IndexLogEntryTest.scala @@ -68,10 +68,10 @@ class IndexLogEntryTest extends SparkFunSuite with SQLHelper with BeforeAndAfter fileIdTracker = new FileIdTracker } - private def toPath(path: file.Path): Path = PathUtils.makeAbsolute(path.toString) - private def toFileStatus(path: file.Path): FileStatus = fs.getFileStatus(toPath(path)) + private def toPath(path: file.Path): Path = PathUtils.makeAbsolute(path.toString) + test("IndexLogEntry spec example") { val schemaString = """{\"type\":\"struct\",