Skip to content

Commit 60e2de2

Browse files
committed
Adding support for Kerberos Authentication
1 parent 0c84fc5 commit 60e2de2

6 files changed

Lines changed: 67 additions & 38 deletions

File tree

pom.xml

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
<name>Cascading and Scalding wrapper for HBase with advanced features</name>
1212
<groupId>parallelai</groupId>
1313
<artifactId>parallelai.spyglass</artifactId>
14-
<version>2.10_0.10_4.4</version>
14+
<version>2.10_0.10_CDH5_4.4</version>
1515
<packaging>jar</packaging>
1616

1717
<properties>
@@ -161,6 +161,11 @@
161161
<artifactId>hbase-client</artifactId>
162162
<version>${hbase.version}</version>
163163
</dependency>
164+
<dependency>
165+
<groupId>org.apache.hbase</groupId>
166+
<artifactId>hbase-protocol</artifactId>
167+
<version>${hbase.version}</version>
168+
</dependency>
164169

165170
<dependency>
166171
<groupId>org.slf4j</groupId>

src/main/java/parallelai/spyglass/hbase/HBaseInputFormatBase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ public void configure(JobConf job) {
7878
sourceMode = HBaseConstants.SourceMode.valueOf(job.get(String.format(
7979
HBaseConstants.SOURCE_MODE, getTableName(job))));
8080

81-
LOG.info(String.format("GOT SOURCE MODE (%s) as (%s) and finally", String
81+
LOG.info(String.format("GOT SOURCE MODE (%s) as (%s) and finally (%s)", String
8282
.format(HBaseConstants.SOURCE_MODE, getTableName(job)), job
8383
.get(String.format(HBaseConstants.SOURCE_MODE, getTableName(job))),
8484
sourceMode));

src/main/java/parallelai/spyglass/hbase/HBaseInputFormatGranular.java

Lines changed: 9 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,5 @@
11
package parallelai.spyglass.hbase;
22

3-
import java.io.IOException;
4-
import java.net.InetAddress;
5-
import java.util.ArrayList;
6-
import java.util.HashMap;
7-
import java.util.List;
8-
import java.util.Set;
9-
import java.util.TreeSet;
10-
11-
import javax.naming.NamingException;
12-
133
import org.apache.commons.logging.Log;
144
import org.apache.commons.logging.LogFactory;
155
import org.apache.hadoop.hbase.HConstants;
@@ -25,9 +15,17 @@
2515
import org.apache.hadoop.mapred.RecordReader;
2616
import org.apache.hadoop.mapred.Reporter;
2717
import org.apache.hadoop.net.DNS;
28-
2918
import parallelai.spyglass.hbase.HBaseConstants.SourceMode;
3019

20+
import javax.naming.NamingException;
21+
import java.io.IOException;
22+
import java.net.InetAddress;
23+
import java.util.ArrayList;
24+
import java.util.HashMap;
25+
import java.util.List;
26+
import java.util.Set;
27+
import java.util.TreeSet;
28+
3129
/**
3230
* See HBaseInputFormatRegional first (!)
3331
*
@@ -188,20 +186,6 @@ public HBaseTableSplitGranular[] getSplits(JobConf job, int numSplits) throws IO
188186
byte[] rStart = cRegion.getRegionInfo().getStartKey();
189187
byte[] rStop = cRegion.getRegionInfo().getEndKey();
190188

191-
// HServerAddress regionServerAddress = cRegion
192-
// .getServerAddress();
193-
// InetAddress regionAddress = regionServerAddress
194-
// .getInetSocketAddress().getAddress();
195-
// String regionLocation;
196-
// try {
197-
// regionLocation = reverseDNS(regionAddress);
198-
// regionLocation = cRegion.
199-
// } catch (NamingException e) {
200-
// LOG.error("Cannot resolve the host name for "
201-
// + regionAddress + " because of " + e);
202-
// regionLocation = cRegion.getHostname();
203-
// }
204-
205189
String regionName = cRegion.getRegionInfo().getRegionNameAsString();
206190

207191
byte[] sStart = (startRow == HConstants.EMPTY_START_ROW

src/main/java/parallelai/spyglass/hbase/HBaseTap.java

Lines changed: 36 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,37 +12,36 @@
1212

1313
package parallelai.spyglass.hbase;
1414

15-
import parallelai.spyglass.hbase.HBaseConstants.SplitType;
16-
17-
import parallelai.spyglass.hbase.HBaseConstants.SourceMode;
1815
import cascading.flow.FlowProcess;
1916
import cascading.tap.SinkMode;
2017
import cascading.tap.Tap;
2118
import cascading.tap.hadoop.io.HadoopTupleEntrySchemeIterator;
2219
import cascading.tuple.TupleEntryCollector;
2320
import cascading.tuple.TupleEntryIterator;
24-
2521
import org.apache.hadoop.conf.Configuration;
2622
import org.apache.hadoop.fs.Path;
2723
import org.apache.hadoop.hbase.HBaseConfiguration;
2824
import org.apache.hadoop.hbase.HColumnDescriptor;
2925
import org.apache.hadoop.hbase.HTableDescriptor;
30-
import org.apache.hadoop.hbase.MasterNotRunningException;
31-
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
3226
import org.apache.hadoop.hbase.client.HBaseAdmin;
27+
import org.apache.hadoop.hbase.security.User;
3328
import org.apache.hadoop.mapred.FileInputFormat;
3429
import org.apache.hadoop.mapred.JobConf;
3530
import org.apache.hadoop.mapred.OutputCollector;
3631
import org.apache.hadoop.mapred.RecordReader;
32+
import org.apache.hadoop.security.Credentials;
33+
import org.apache.hadoop.security.UserGroupInformation;
34+
import org.apache.hadoop.security.token.Token;
3735
import org.slf4j.Logger;
3836
import org.slf4j.LoggerFactory;
37+
import parallelai.spyglass.hbase.HBaseConstants.SourceMode;
38+
import parallelai.spyglass.hbase.HBaseConstants.SplitType;
3939

4040
import java.io.IOException;
4141
import java.io.Serializable;
4242
import java.util.ArrayList;
4343
import java.util.Arrays;
4444
import java.util.UUID;
45-
4645
/**
4746
* The HBaseTap class is a {@link Tap} subclass. It is used in conjunction with
4847
* the {@HBaseFullScheme} to allow for the reading and writing
@@ -125,6 +124,28 @@ public HBaseTap(String quorumNames, String tableName, HBaseScheme HBaseFullSchem
125124
this.tableName = tableName;
126125
}
127126

127+
128+
private void obtainToken(JobConf conf) {
129+
if (User.isHBaseSecurityEnabled(conf)) {
130+
String user = conf.getUser();
131+
LOG.info("obtaining HBase token for: {}", user);
132+
try {
133+
UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
134+
user = currentUser.getUserName();
135+
Credentials credentials = conf.getCredentials();
136+
for (Token t : currentUser.getTokens()) {
137+
LOG.debug("Token {} is available", t);
138+
//there must be HBASE_AUTH_TOKEN exists, if not bad thing will happen, it's must be generated when job submission.
139+
if ("HBASE_AUTH_TOKEN".equalsIgnoreCase(t.getKind().toString())) {
140+
credentials.addToken(t.getKind(), t);
141+
}
142+
}
143+
} catch (IOException e) {
144+
throw new RuntimeException("Unable to obtain HBase auth token for " + user, e);
145+
}
146+
}
147+
}
148+
128149
/**
129150
* Method getTableName returns the tableName of this HBaseTap object.
130151
*
@@ -153,6 +174,8 @@ public void sinkConfInit(FlowProcess<JobConf> process, JobConf conf) {
153174
conf.set("hbase.zookeeper.quorum", quorumNames);
154175
}
155176

177+
178+
156179
LOG.debug("sinking to table: {}", tableName);
157180

158181
if (isReplace() && conf.get("mapred.task.partition") == null) {
@@ -179,7 +202,9 @@ else if (isUpdate()) {
179202
for( SinkConfig sc : sinkConfigList) {
180203
sc.configure(conf);
181204
}
182-
205+
206+
obtainToken(conf);
207+
183208
super.sinkConfInit(process, conf);
184209
}
185210

@@ -277,7 +302,9 @@ public void sourceConfInit(FlowProcess<JobConf> process, JobConf conf) {
277302
for( SourceConfig sc : sourceConfigList) {
278303
sc.configure(conf);
279304
}
280-
305+
306+
obtainToken(conf);
307+
281308
super.sourceConfInit(process, conf);
282309
}
283310

src/main/resources/pom.xml

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
<description>Cascading and Scalding wrapper for HBase with advanced features</description>
99
<groupId>parallelai</groupId>
1010
<artifactId>parallelai.spyglass</artifactId>
11-
<version>2.10_0.10_4.4</version>
11+
<version>2.10_0.10_CDH5_4.4</version>
1212
<packaging>jar</packaging>
1313

1414
<organization>
@@ -99,6 +99,11 @@
9999
<artifactId>hbase-client</artifactId>
100100
<version>0.98.1-cdh5.1.0</version>
101101
</dependency>
102+
<dependency>
103+
<groupId>org.apache.hbase</groupId>
104+
<artifactId>hbase-protocol</artifactId>
105+
<version>0.98.1-cdh5.1.0</version>
106+
</dependency>
102107

103108
<dependency>
104109
<groupId>org.slf4j</groupId>

src/main/scala/parallelai/spyglass/base/JobRunner.scala

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,10 @@ package parallelai.spyglass.base
33
import org.apache.hadoop.conf.Configuration
44
import com.twitter.scalding.Tool
55
import org.apache.hadoop
6-
6+
import org.apache.hadoop.hbase.security.token.TokenUtil
7+
import org.apache.hadoop.security.UserGroupInformation
8+
import org.apache.hadoop.hbase.security.User
9+
710
object JobRunner {
811
def main(args : Array[String]) {
912
val conf: Configuration = new Configuration
@@ -17,6 +20,11 @@ object JobRunner {
1720
}
1821

1922
AppConfig.jobConfig = conf
23+
24+
if (User.isHBaseSecurityEnabled(conf)) {
25+
println("Obtaining token for HBase security.");
26+
TokenUtil.obtainAndCacheToken(conf, UserGroupInformation.getCurrentUser());
27+
}
2028

2129
hadoop.util.ToolRunner.run(conf, new Tool, args);
2230
}

0 commit comments

Comments
 (0)