Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion src/main/scala/org/apache/spark/sql/hbase/HBaseRelation.scala
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ class HBaseSource extends RelationProvider {
val nonKeyCols = parameters("nonKeyCols").split(";")
.filterNot(_ == "")
.map { case c => val cols = c.split(","); (cols(0), cols(1), cols(2), cols(3))}
val splitFile = parameters("splitFileName")

val keyMap: Map[String, String] = keyCols.toMap
val allColumns = colsSeq.map {
Expand All @@ -73,8 +74,27 @@ class HBaseSource extends RelationProvider {
)
}
}
catalog.createTable(tableName, rawNamespace, hbaseTable, allColumns, null)
val keyColumns = allColumns.filter(_.isInstanceOf[KeyColumn])
val splits = createSplitKeys(sqlContext, keyColumns, splitFile)
catalog.createTable(tableName, rawNamespace, hbaseTable, allColumns, splits)
}

private[hbase] def createSplitKeys(
sqlContext: SQLContext,
keyColumns: Seq[AbstractColumn],
splitFile: String): Array[Array[Byte]] = {
val rdd = sqlContext.sparkContext.textFile(splitFile)
rdd.map {line =>
val buffer = keyColumns.map { x =>
BytesUtils.create(x.dataType)
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

format issue

val keyBytes = new Array[(Array[Byte], DataType)](keyColumns.size)
//TODO: delimiter needs to make as configurable
HBaseKVHelper.string2Key(line.split(","), buffer.toArray, keyColumns, keyBytes)
HBaseKVHelper.encodingRawKeyColumns(keyBytes).asInstanceOf[Array[Byte]]
}.collect
}

}

/**
Expand Down
21 changes: 15 additions & 6 deletions src/main/scala/org/apache/spark/sql/hbase/HBaseSQLParser.scala
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ class HBaseSQLParser extends SqlParser {
protected val PARALL = Keyword("PARALL")
protected val SHORT = Keyword("SHORT")
protected val SHOW = Keyword("SHOW")
protected val SPLITSFILE = Keyword("SPLITSFILE")
protected val TABLES = Keyword("TABLES")
protected val VALUES = Keyword("VALUES")
protected val TERMINATED = Keyword("TERMINATED")
Expand Down Expand Up @@ -96,9 +97,11 @@ class HBaseSQLParser extends SqlParser {
(PRIMARY ~> KEY ~> "(" ~> keys <~ ")" <~ ")") ~
(MAPPED ~> BY ~> "(" ~> opt(nameSpace)) ~
(ident <~ ",") ~
(COLS ~> "=" ~> "[" ~> expressions <~ "]" <~ ")") <~ opt(";") ^^ {
(COLS ~> "=" ~> "[" ~> expressions <~ "]" <~ ")") ~
opt(SPLITSFILE ~> "=" ~> literal) <~ opt(";") ^^ {

case tableName ~ tableColumns ~ keySeq ~ tableNameSpace ~ hbaseTableName ~ mappingInfo =>
case tableName ~ tableColumns ~ keySeq ~ tableNameSpace ~
hbaseTableName ~ mappingInfo ~ splitFileName =>
// Since the lexical can not recognize the symbol "=" as we expected, we compose it
// to expression first and then translate it into Map[String, (String, String)].
// TODO: Now get the info by hacking, need to change it into normal way if possible
Expand Down Expand Up @@ -157,16 +160,22 @@ class HBaseSQLParser extends SqlParser {
.reduceLeft(_ + ";" + _)
}

val opts: Map[String, String] = Seq(
val opts: Seq[(String, String)] = Seq(
("tableName", tableName),
("namespace", customizedNameSpace),
("hbaseTableName", hbaseTableName),
("colsSeq", colsSeqString),
("keyCols", keyColsString),
("nonKeyCols", nonkeyColsString)
).toMap

CreateTable(tableName, "org.apache.spark.sql.hbase.HBaseSource", opts)
)
val addedOpts = {
if(splitFileName != None) {
opts ++ Seq(("splitFileName", splitFileName.get.value.toString))
} else {
opts
}
}
CreateTable(tableName, "org.apache.spark.sql.hbase.HBaseSource", addedOpts.toMap)
}

private[hbase] case class CreateTable(
Expand Down
15 changes: 15 additions & 0 deletions src/main/scala/org/apache/spark/sql/hbase/util/HBaseKVHelper.scala
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,21 @@ object HBaseKVHelper {
}
}

/**
* Takes a record, translate it into HBase row key column matching with metadata
* @param values record that as a sequence of string
* @param keyBytes output parameter, array of (key column and its type);
*/
def string2Key(values: Seq[String],
lineBuffer: Array[BytesUtils],
keyColumns: Seq[AbstractColumn],
keyBytes: Array[(Array[Byte], DataType)]) = {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only in createSplitKeys we use this method, right? if so i think we no need create this method in HBaseKVHelper

values.zipWithIndex.map(kc => {
keyBytes(kc._2) = (string2Bytes(values(kc._2), lineBuffer(kc._2)),
keyColumns(kc._2).dataType)
})
}

/**
* create a array of buffer that to be used for creating HBase Put object
* @param schema the schema of the line buffer
Expand Down