From 9f7432e54dc5f8a4193729c1d42f0fd7009200c3 Mon Sep 17 00:00:00 2001 From: ravipesala Date: Wed, 28 Jan 2015 17:49:18 +0530 Subject: [PATCH] Added splitfile support to the Create table command --- .../spark/sql/hbase/HBaseRelation.scala | 22 ++++++++++++++++++- .../spark/sql/hbase/HBaseSQLParser.scala | 21 +++++++++++++----- .../spark/sql/hbase/util/HBaseKVHelper.scala | 15 +++++++++++++ 3 files changed, 51 insertions(+), 7 deletions(-) diff --git a/src/main/scala/org/apache/spark/sql/hbase/HBaseRelation.scala b/src/main/scala/org/apache/spark/sql/hbase/HBaseRelation.scala index 93c4bb9..d74ba01 100644 --- a/src/main/scala/org/apache/spark/sql/hbase/HBaseRelation.scala +++ b/src/main/scala/org/apache/spark/sql/hbase/HBaseRelation.scala @@ -54,6 +54,7 @@ class HBaseSource extends RelationProvider { val nonKeyCols = parameters("nonKeyCols").split(";") .filterNot(_ == "") .map { case c => val cols = c.split(","); (cols(0), cols(1), cols(2), cols(3))} + val splitFile = parameters("splitFileName") val keyMap: Map[String, String] = keyCols.toMap val allColumns = colsSeq.map { @@ -73,8 +74,27 @@ class HBaseSource extends RelationProvider { ) } } - catalog.createTable(tableName, rawNamespace, hbaseTable, allColumns, null) + val keyColumns = allColumns.filter(_.isInstanceOf[KeyColumn]) + val splits = createSplitKeys(sqlContext, keyColumns, splitFile) + catalog.createTable(tableName, rawNamespace, hbaseTable, allColumns, splits) } + + private[hbase] def createSplitKeys( + sqlContext: SQLContext, + keyColumns: Seq[AbstractColumn], + splitFile: String): Array[Array[Byte]] = { + val rdd = sqlContext.sparkContext.textFile(splitFile) + rdd.map {line => + val buffer = keyColumns.map { x => + BytesUtils.create(x.dataType) + } + val keyBytes = new Array[(Array[Byte], DataType)](keyColumns.size) + //TODO: delimiter needs to make as configurable + HBaseKVHelper.string2Key(line.split(","), buffer.toArray, keyColumns, keyBytes) + HBaseKVHelper.encodingRawKeyColumns(keyBytes).asInstanceOf[Array[Byte]] + }.collect + } + } /** diff --git a/src/main/scala/org/apache/spark/sql/hbase/HBaseSQLParser.scala b/src/main/scala/org/apache/spark/sql/hbase/HBaseSQLParser.scala index 95b5a49..127b1d2 100755 --- a/src/main/scala/org/apache/spark/sql/hbase/HBaseSQLParser.scala +++ b/src/main/scala/org/apache/spark/sql/hbase/HBaseSQLParser.scala @@ -61,6 +61,7 @@ class HBaseSQLParser extends SqlParser { protected val PARALL = Keyword("PARALL") protected val SHORT = Keyword("SHORT") protected val SHOW = Keyword("SHOW") + protected val SPLITSFILE = Keyword("SPLITSFILE") protected val TABLES = Keyword("TABLES") protected val VALUES = Keyword("VALUES") protected val TERMINATED = Keyword("TERMINATED") @@ -96,9 +97,11 @@ class HBaseSQLParser extends SqlParser { (PRIMARY ~> KEY ~> "(" ~> keys <~ ")" <~ ")") ~ (MAPPED ~> BY ~> "(" ~> opt(nameSpace)) ~ (ident <~ ",") ~ - (COLS ~> "=" ~> "[" ~> expressions <~ "]" <~ ")") <~ opt(";") ^^ { + (COLS ~> "=" ~> "[" ~> expressions <~ "]" <~ ")") ~ + opt(SPLITSFILE ~> "=" ~> literal) <~ opt(";") ^^ { - case tableName ~ tableColumns ~ keySeq ~ tableNameSpace ~ hbaseTableName ~ mappingInfo => + case tableName ~ tableColumns ~ keySeq ~ tableNameSpace ~ + hbaseTableName ~ mappingInfo ~ splitFileName => // Since the lexical can not recognize the symbol "=" as we expected, we compose it // to expression first and then translate it into Map[String, (String, String)]. // TODO: Now get the info by hacking, need to change it into normal way if possible @@ -157,16 +160,22 @@ class HBaseSQLParser extends SqlParser { .reduceLeft(_ + ";" + _) } - val opts: Map[String, String] = Seq( + val opts: Seq[(String, String)] = Seq( ("tableName", tableName), ("namespace", customizedNameSpace), ("hbaseTableName", hbaseTableName), ("colsSeq", colsSeqString), ("keyCols", keyColsString), ("nonKeyCols", nonkeyColsString) - ).toMap - - CreateTable(tableName, "org.apache.spark.sql.hbase.HBaseSource", opts) + ) + val addedOpts = { + if(splitFileName != None) { + opts ++ Seq(("splitFileName", splitFileName.get.value.toString)) + } else { + opts + } + } + CreateTable(tableName, "org.apache.spark.sql.hbase.HBaseSource", addedOpts.toMap) } private[hbase] case class CreateTable( diff --git a/src/main/scala/org/apache/spark/sql/hbase/util/HBaseKVHelper.scala b/src/main/scala/org/apache/spark/sql/hbase/util/HBaseKVHelper.scala index 2436a03..39610b6 100755 --- a/src/main/scala/org/apache/spark/sql/hbase/util/HBaseKVHelper.scala +++ b/src/main/scala/org/apache/spark/sql/hbase/util/HBaseKVHelper.scala @@ -127,6 +127,21 @@ object HBaseKVHelper { } } + /** + * Takes a record, translate it into HBase row key column matching with metadata + * @param values record that as a sequence of string + * @param keyBytes output parameter, array of (key column and its type); + */ + def string2Key(values: Seq[String], + lineBuffer: Array[BytesUtils], + keyColumns: Seq[AbstractColumn], + keyBytes: Array[(Array[Byte], DataType)]) = { + values.zipWithIndex.map(kc => { + keyBytes(kc._2) = (string2Bytes(values(kc._2), lineBuffer(kc._2)), + keyColumns(kc._2).dataType) + }) + } + /** * create a array of buffer that to be used for creating HBase Put object * @param schema the schema of the line buffer