Skip to content
This repository was archived by the owner on Jun 14, 2024. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 70 additions & 76 deletions src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala
Original file line number Diff line number Diff line change
Expand Up @@ -210,15 +210,6 @@ object Directory {
}
}

@tailrec
private def createEmptyDirectory(path: Path, subDirs: Seq[Directory] = Seq()): Directory = {
if (path.isRoot) {
Directory(path.toString, subDirs = subDirs)
} else {
createEmptyDirectory(path.getParent, Seq(Directory(path.getName, subDirs = subDirs)))
}
}

/**
* Create a Content object from a specified list of leaf files. Any files not listed here will
* NOT be part of the returned object.
Expand Down Expand Up @@ -291,6 +282,15 @@ object Directory {
pathToDirectory(getRoot(files.head.getPath))
}

@tailrec
private def createEmptyDirectory(path: Path, subDirs: Seq[Directory] = Seq()): Directory = {
if (path.isRoot) {
Directory(path.toString, subDirs = subDirs)
} else {
createEmptyDirectory(path.getParent, Seq(Directory(path.getName, subDirs = subDirs)))
}
}

// Return file system root path from any path. E.g. "file:/C:/a/b/c" will have root "file:/C:/".
// For linux systems, this root will be "file:/". Other hdfs compatible file systems will have
// corresponding roots.
Expand Down Expand Up @@ -444,47 +444,55 @@ case class IndexLogEntry(
properties: Map[String, String])
extends LogEntry(IndexLogEntry.VERSION) {

def schema: StructType =
DataType.fromJson(derivedDataset.properties.schemaString).asInstanceOf[StructType]

def created: Boolean = state.equals(Constants.States.ACTIVE)

def relations: Seq[Relation] = {
// Only one relation is currently supported.
assert(source.plan.properties.relations.size == 1)
source.plan.properties.relations
}

// FileInfo's 'name' contains the full path to the file.
@JsonIgnore
lazy val sourceFileInfoSet: Set[FileInfo] = {
relations.head.data.properties.content.fileInfos
}

@JsonIgnore
lazy val sourceFilesSizeInBytes: Long = {
sourceFileInfoSet.foldLeft(0L)(_ + _.size)
}

def sourceUpdate: Option[Update] = {
relations.head.data.properties.update
}

def hasSourceUpdate: Boolean = {
sourceUpdate.isDefined && (appendedFiles.nonEmpty || deletedFiles.nonEmpty)
}

// FileInfo's 'name' contains the full path to the file.
@JsonIgnore
lazy val appendedFiles: Set[FileInfo] = {
sourceUpdate.flatMap(_.appendedFiles).map(_.fileInfos).getOrElse(Set())
}

// FileInfo's 'name' contains the full path to the file.
@JsonIgnore
lazy val deletedFiles: Set[FileInfo] = {
sourceUpdate.flatMap(_.deletedFiles).map(_.fileInfos).getOrElse(Set())
}
@JsonIgnore
lazy val fileIdTracker: FileIdTracker = {
val tracker = new FileIdTracker
tracker.addFileInfo(sourceFileInfoSet ++ content.fileInfos)
tracker
}
/**
* A mutable map for holding auxiliary information of this index log entry while applying rules.
*/
@JsonIgnore
private val tags: mutable.Map[(LogicalPlan, IndexLogEntryTag[_]), Any] = mutable.Map.empty

def schema: StructType =
DataType.fromJson(derivedDataset.properties.schemaString).asInstanceOf[StructType]

def created: Boolean = state.equals(Constants.States.ACTIVE)

def hasSourceUpdate: Boolean = {
sourceUpdate.isDefined && (appendedFiles.nonEmpty || deletedFiles.nonEmpty)
}

def sourceUpdate: Option[Update] = {
relations.head.data.properties.update
}

def relations: Seq[Relation] = {
// Only one relation is currently supported.
assert(source.plan.properties.relations.size == 1)
source.plan.properties.relations
}

def copyWithUpdate(
latestFingerprint: LogicalPlanFingerprint,
Expand Down Expand Up @@ -528,20 +536,6 @@ case class IndexLogEntry(
case _ => false
}

def numBuckets: Int = derivedDataset.properties.numBuckets

def config: IndexConfig = IndexConfig(name, indexedColumns, includedColumns)

def indexedColumns: Seq[String] = derivedDataset.properties.columns.indexed

def includedColumns: Seq[String] = derivedDataset.properties.columns.included

def signature: Signature = {
val sourcePlanSignatures = source.plan.properties.fingerprint.properties.signatures
assert(sourcePlanSignatures.length == 1)
sourcePlanSignatures.head
}

def hasLineageColumn: Boolean = {
derivedDataset.properties.properties.getOrElse(
IndexConstants.LINEAGE_PROPERTY, IndexConstants.INDEX_LINEAGE_ENABLED_DEFAULT).toBoolean
Expand All @@ -553,45 +547,28 @@ case class IndexLogEntry(
IndexConstants.HAS_PARQUET_AS_SOURCE_FORMAT_PROPERTY, "false").toBoolean
}

@JsonIgnore
lazy val fileIdTracker: FileIdTracker = {
val tracker = new FileIdTracker
tracker.addFileInfo(sourceFileInfoSet ++ content.fileInfos)
tracker
}

override def hashCode(): Int = {
config.hashCode + signature.hashCode + numBuckets.hashCode + content.hashCode
}

/**
* A mutable map for holding auxiliary information of this index log entry while applying rules.
*/
@JsonIgnore
private val tags: mutable.Map[(LogicalPlan, IndexLogEntryTag[_]), Any] = mutable.Map.empty
def numBuckets: Int = derivedDataset.properties.numBuckets

def setTagValue[T](plan: LogicalPlan, tag: IndexLogEntryTag[T], value: T): Unit = {
tags((plan, tag)) = value
}
def config: IndexConfig = IndexConfig(name, indexedColumns, includedColumns)

def getTagValue[T](plan: LogicalPlan, tag: IndexLogEntryTag[T]): Option[T] = {
tags.get((plan, tag)).map(_.asInstanceOf[T])
def indexedColumns: Seq[String] = derivedDataset.properties.columns.indexed

def includedColumns: Seq[String] = derivedDataset.properties.columns.included

def signature: Signature = {
val sourcePlanSignatures = source.plan.properties.fingerprint.properties.signatures
assert(sourcePlanSignatures.length == 1)
sourcePlanSignatures.head
}

def unsetTagValue[T](plan: LogicalPlan, tag: IndexLogEntryTag[T]): Unit = {
tags.remove((plan, tag))
}

def withCachedTag[T](plan: LogicalPlan, tag: IndexLogEntryTag[T])(f: => T): T = {
getTagValue(plan, tag) match {
case Some(v) => v
case None =>
val ret = f
setTagValue(plan, tag, ret)
ret
}
}

def setTagValue[T](tag: IndexLogEntryTag[T], value: T): Unit = {
tags((null, tag)) = value
}
Expand All @@ -607,6 +584,24 @@ case class IndexLogEntry(
def withCachedTag[T](tag: IndexLogEntryTag[T])(f: => T): T = {
withCachedTag(null, tag)(f)
}

def withCachedTag[T](plan: LogicalPlan, tag: IndexLogEntryTag[T])(f: => T): T = {
getTagValue(plan, tag) match {
case Some(v) => v
case None =>
val ret = f
setTagValue(plan, tag, ret)
ret
}
}

def setTagValue[T](plan: LogicalPlan, tag: IndexLogEntryTag[T], value: T): Unit = {
tags((plan, tag)) = value
}

def getTagValue[T](plan: LogicalPlan, tag: IndexLogEntryTag[T]): Option[T] = {
tags.get((plan, tag)).map(_.asInstanceOf[T])
}
}

// A tag of a `IndexLogEntry`, which defines name and type.
Expand Down Expand Up @@ -646,8 +641,6 @@ object IndexLogEntry {
* Provides functionality to generate unique file ids for files.
*/
class FileIdTracker {
private var maxId: Long = -1L

// Combination of file properties, used as key, to identify a
// unique file for which an id is generated.
type key = (
Expand All @@ -656,6 +649,7 @@ class FileIdTracker {
Long // Modified time.
)
private val fileToIdMap: mutable.HashMap[key, Long] = mutable.HashMap()
private var maxId: Long = -1L

def getMaxFileId: Long = maxId

Expand All @@ -664,8 +658,6 @@ class FileIdTracker {
def getFileId(path: String, size: Long, modifiedTime: Long): Option[Long] =
fileToIdMap.get((path, size, modifiedTime))

def setSizeHint(size: Int): Unit = fileToIdMap.sizeHint(size)

/**
* Add a set of FileInfos to the fileToIdMap. The assumption is
* that the each FileInfo already has a valid file id if an entry
Expand Down Expand Up @@ -698,6 +690,8 @@ class FileIdTracker {
}
}

def setSizeHint(size: Int): Unit = fileToIdMap.sizeHint(size)

/**
* Try to add file properties to fileToIdMap. If the file is already in
* the map then return its current id. Otherwise, generate a new id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,10 @@ class IndexLogEntryTest extends SparkFunSuite with SQLHelper with BeforeAndAfter
fileIdTracker = new FileIdTracker
}

private def toPath(path: file.Path): Path = PathUtils.makeAbsolute(path.toString)

private def toFileStatus(path: file.Path): FileStatus = fs.getFileStatus(toPath(path))

private def toPath(path: file.Path): Path = PathUtils.makeAbsolute(path.toString)

test("IndexLogEntry spec example") {
val schemaString =
"""{\"type\":\"struct\",
Expand Down