diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilder.java index 57c609ebb038..6fbc61ad6c99 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilder.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilder.java @@ -40,15 +40,32 @@ public interface AsyncBufferedMutatorBuilder { AsyncBufferedMutatorBuilder setRpcTimeout(long timeout, TimeUnit unit); /** - * Set a rpc request attribute. + * Sets a request attribute. Ignored if a factory is set via + * {@link #setRequestAttributesFactory(RequestAttributesFactory)}. + * @deprecated Since 3.0.0, will be removed in 4.0.0. Please use + * {@link #setRequestAttributesFactory(RequestAttributesFactory)} instead. */ + @Deprecated AsyncBufferedMutatorBuilder setRequestAttribute(String key, byte[] value); /** - * Set multiple rpc request attributes. + * Sets multiple request attributes. Ignored if a factory is set via + * @deprecated Since 3.0.0, will be removed in 4.0.0. Please use + * {@link #setRequestAttributesFactory(RequestAttributesFactory)} instead. */ + @Deprecated AsyncBufferedMutatorBuilder setRequestAttributes(Map requestAttributes); + /** + * Sets the factory for creating request attributes. Use {@link FixedRequestAttributesFactory} for + * attributes that do not change, or implement {@link RequestAttributesFactory} for dynamic + * attributes. + */ + default AsyncBufferedMutatorBuilder + setRequestAttributesFactory(RequestAttributesFactory requestAttributesFactory) { + throw new UnsupportedOperationException("Not implemented"); + } + /** * Set the base pause time for retrying. We use an exponential policy to generate sleep time when * retrying. diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilderImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilderImpl.java index 7fa860dc3d4e..21ae30467d32 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilderImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilderImpl.java @@ -96,6 +96,13 @@ public AsyncBufferedMutatorBuilder setRequestAttributes(Map requ return this; } + @Override + public AsyncBufferedMutatorBuilder + setRequestAttributesFactory(RequestAttributesFactory requestAttributesFactory) { + tableBuilder.setRequestAttributesFactory(requestAttributesFactory); + return this; + } + @Override public AsyncBufferedMutatorBuilder setWriteBufferSize(long writeBufferSize) { Preconditions.checkArgument(writeBufferSize > 0, "writeBufferSize %d must be > 0", diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java index 259183a41414..a2de4d7eac83 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java @@ -114,8 +114,10 @@ public interface AsyncTable { long getScanTimeout(TimeUnit unit); /** - * Get the map of request attributes - * @return a map of request attributes supplied by the client + * Returns the request attributes for this call. The attributes are generated by the configured + * {@link RequestAttributesFactory}, so each invocation may return different values if a dynamic + * factory is in use. + * @return the request attributes */ default Map getRequestAttributes() { return Collections.emptyMap(); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilder.java index 007f7ad48685..8b7300daec9a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilder.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilder.java @@ -138,10 +138,28 @@ default AsyncTableBuilder setMaxRetries(int maxRetries) { AsyncTableBuilder setStartLogErrorsCnt(int startLogErrorsCnt); /** - * Set a request attribute + * Sets a request attribute. Ignored if a factory is set via + * {@link #setRequestAttributesFactory(RequestAttributesFactory)}. + * @param key the attribute key + * @param value the attribute value + * @deprecated Since 3.0.0, will be removed in 4.0.0. Please use + * {@link #setRequestAttributesFactory(RequestAttributesFactory)} instead. */ + @Deprecated AsyncTableBuilder setRequestAttribute(String key, byte[] value); + /** + * Sets the factory for creating request attributes. Use {@link FixedRequestAttributesFactory} for + * attributes that do not change, or implement {@link RequestAttributesFactory} for dynamic + * attributes. + * @param requestAttributesFactory the factory to use + * @return this builder + */ + default AsyncTableBuilder + setRequestAttributesFactory(RequestAttributesFactory requestAttributesFactory) { + throw new UnsupportedOperationException("Not implemented"); + } + /** * Create the {@link AsyncTable} instance. */ diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilderBase.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilderBase.java index 428e7358195e..3e629a4dfd80 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilderBase.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilderBase.java @@ -19,9 +19,6 @@ import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; import java.util.concurrent.TimeUnit; import org.apache.hadoop.hbase.TableName; import org.apache.yetus.audience.InterfaceAudience; @@ -53,7 +50,9 @@ abstract class AsyncTableBuilderBase protected int startLogErrorsCnt; - protected Map requestAttributes = Collections.emptyMap(); + protected FixedRequestAttributesFactory.Builder fixedRequestAttributesFactoryBuilder = null; + + protected RequestAttributesFactory requestAttributesFactory = null; AsyncTableBuilderBase(TableName tableName, AsyncConnectionConfiguration connConf) { this.tableName = tableName; @@ -129,10 +128,27 @@ public AsyncTableBuilderBase setStartLogErrorsCnt(int startLogErrorsCnt) { @Override public AsyncTableBuilder setRequestAttribute(String key, byte[] value) { - if (requestAttributes.isEmpty()) { - requestAttributes = new HashMap<>(); + if (fixedRequestAttributesFactoryBuilder == null) { + fixedRequestAttributesFactoryBuilder = FixedRequestAttributesFactory.newBuilder(); } - requestAttributes.put(key, value); + fixedRequestAttributesFactoryBuilder.setAttribute(key, value); + return this; + } + + @Override + public AsyncTableBuilder + setRequestAttributesFactory(RequestAttributesFactory requestAttributesFactory) { + this.requestAttributesFactory = requestAttributesFactory; return this; } + + RequestAttributesFactory getRequestAttributesFactory() { + if (requestAttributesFactory != null) { + return requestAttributesFactory; + } else if (fixedRequestAttributesFactoryBuilder != null) { + return fixedRequestAttributesFactoryBuilder.build(); + } else { + return FixedRequestAttributesFactory.EMPTY; + } + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java index 550d13033cb2..261bb84fec14 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java @@ -17,15 +17,11 @@ */ package org.apache.hadoop.hbase.client; -import java.util.Collections; -import java.util.HashMap; import java.util.Map; import java.util.concurrent.ExecutorService; import org.apache.hadoop.hbase.TableName; import org.apache.yetus.audience.InterfaceAudience; -import org.apache.hbase.thirdparty.com.google.common.collect.Maps; - /** * Parameters for instantiating a {@link BufferedMutator}. */ @@ -44,7 +40,8 @@ public class BufferedMutatorParams implements Cloneable { private int rpcTimeout = UNSET; private int operationTimeout = UNSET; private int maxMutations = UNSET; - protected Map requestAttributes = Collections.emptyMap(); + protected FixedRequestAttributesFactory.Builder fixedRequestAttributesFactoryBuilder = null; + protected RequestAttributesFactory requestAttributesFactory = null; private BufferedMutator.ExceptionListener listener = new BufferedMutator.ExceptionListener() { @Override public void onException(RetriesExhaustedWithDetailsException exception, @@ -109,16 +106,55 @@ public int getMaxMutations() { return maxMutations; } + /** + * Sets a request attribute. Ignored if a factory is set via + * {@link #setRequestAttributesFactory(RequestAttributesFactory)}. + * @deprecated Since 3.0.0, will be removed in 4.0.0. Please use + * {@link #setRequestAttributesFactory(RequestAttributesFactory)} instead. + */ + @Deprecated public BufferedMutatorParams setRequestAttribute(String key, byte[] value) { - if (requestAttributes.isEmpty()) { - requestAttributes = new HashMap<>(); + if (fixedRequestAttributesFactoryBuilder == null) { + fixedRequestAttributesFactoryBuilder = FixedRequestAttributesFactory.newBuilder(); } - requestAttributes.put(key, value); + fixedRequestAttributesFactoryBuilder.setAttribute(key, value); return this; } + /** + * Returns *only* the request attributes added by {@link #setRequestAttribute(String, byte[])}. + * @deprecated Since 3.0.0, will be removed in 4.0.0. Please use + * {@link #getRequestAttributesFactory()} instead. + */ public Map getRequestAttributes() { - return requestAttributes; + if (fixedRequestAttributesFactoryBuilder == null) { + return null; + } + return fixedRequestAttributesFactoryBuilder.getAttributes(); + } + + /** + * Sets the factory for creating request attributes. Use {@link FixedRequestAttributesFactory} for + * attributes that do not change, or implement {@link RequestAttributesFactory} for dynamic + * attributes. + */ + public BufferedMutatorParams + setRequestAttributesFactory(RequestAttributesFactory requestAttributesFactory) { + this.requestAttributesFactory = requestAttributesFactory; + return this; + } + + /** + * Returns the request attributes factory. + */ + public RequestAttributesFactory getRequestAttributesFactory() { + if (requestAttributesFactory != null) { + return requestAttributesFactory; + } else if (fixedRequestAttributesFactoryBuilder != null) { + return fixedRequestAttributesFactoryBuilder.build(); + } else { + return FixedRequestAttributesFactory.EMPTY; + } } /** @@ -243,7 +279,12 @@ public BufferedMutatorParams clone() { clone.writeBufferPeriodicFlushTimerTickMs = this.writeBufferPeriodicFlushTimerTickMs; clone.maxKeyValueSize = this.maxKeyValueSize; clone.maxMutations = this.maxMutations; - clone.requestAttributes = Maps.newHashMap(this.requestAttributes); + clone.requestAttributesFactory = this.requestAttributesFactory; + if (fixedRequestAttributesFactoryBuilder != null) { + clone.fixedRequestAttributesFactoryBuilder = FixedRequestAttributesFactory.newBuilder(); + fixedRequestAttributesFactoryBuilder.getAttributes() + .forEach(clone.fixedRequestAttributesFactoryBuilder::setAttribute); + } clone.pool = this.pool; clone.listener = this.listener; clone.implementationClassName = this.implementationClassName; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionOverAsyncConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionOverAsyncConnection.java index 471cfa874458..c3858b0468cf 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionOverAsyncConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionOverAsyncConnection.java @@ -110,10 +110,7 @@ public BufferedMutator getBufferedMutator(BufferedMutatorParams params) throws I if (params.getMaxMutations() != BufferedMutatorParams.UNSET) { builder.setMaxMutations(params.getMaxMutations()); } - if (!params.getRequestAttributes().isEmpty()) { - - builder.setRequestAttributes(params.getRequestAttributes()); - } + builder.setRequestAttributesFactory(params.getRequestAttributesFactory()); return new BufferedMutatorOverAsyncBufferedMutator(builder.build(), params.getListener()); } @@ -200,8 +197,8 @@ public Table build() { conn.getTableBuilder(tableName).setRpcTimeout(rpcTimeout, TimeUnit.MILLISECONDS) .setReadRpcTimeout(readRpcTimeout, TimeUnit.MILLISECONDS) .setWriteRpcTimeout(writeRpcTimeout, TimeUnit.MILLISECONDS) - .setOperationTimeout(operationTimeout, TimeUnit.MILLISECONDS); - requestAttributes.forEach(tableBuilder::setRequestAttribute); + .setOperationTimeout(operationTimeout, TimeUnit.MILLISECONDS) + .setRequestAttributesFactory(getRequestAttributesFactory()); return new TableOverAsyncTable(conn, tableBuilder.build(), poolSupplier); } }; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FixedRequestAttributesFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FixedRequestAttributesFactory.java new file mode 100644 index 000000000000..59690eb7ab38 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FixedRequestAttributesFactory.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.Map; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * A {@link RequestAttributesFactory} that returns a fixed set of attributes for every call. Use + * this when attributes are fixed and do not change. + * @see AsyncTableBuilder#setRequestAttributesFactory(RequestAttributesFactory) + */ +@InterfaceAudience.Public +public final class FixedRequestAttributesFactory implements RequestAttributesFactory { + + /** + * A factory that always returns an empty map. + */ + public static final RequestAttributesFactory EMPTY = Collections::emptyMap; + + /** + * Builder for creating {@link FixedRequestAttributesFactory} instances. + */ + public static final class Builder { + private final Map requestAttributes = new LinkedHashMap<>(); + + /** + * Sets a request attribute. If value is null, the attribute is removed. + * @param key the attribute key + * @param value the attribute value, or null to remove + * @return this builder + */ + public Builder setAttribute(String key, byte[] value) { + if (value == null) { + requestAttributes.remove(key); + } else { + requestAttributes.put(key, value); + } + return this; + } + + /** + * Gets the accumulated request attributes. + */ + public Map getAttributes() { + return Collections.unmodifiableMap(requestAttributes); + } + + /** + * Builds a {@link FixedRequestAttributesFactory} with the configured attributes. + * @return the factory + */ + public FixedRequestAttributesFactory build() { + return new FixedRequestAttributesFactory(new LinkedHashMap<>(requestAttributes)); + } + } + + /** + * Returns a new builder. + * @return a new builder instance + */ + public static Builder newBuilder() { + return new Builder(); + } + + private final Map requestAttributes; + + private FixedRequestAttributesFactory(Map requestAttributes) { + this.requestAttributes = Collections.unmodifiableMap(requestAttributes); + } + + @Override + public Map create() { + return requestAttributes; + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java index 553b4afa55ea..5cddf5131d5a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java @@ -121,7 +121,7 @@ class RawAsyncTableImpl implements AsyncTable { private final int startLogErrorsCnt; - private final Map requestAttributes; + private final RequestAttributesFactory requestAttributesFactory; RawAsyncTableImpl(AsyncConnectionImpl conn, Timer retryTimer, AsyncTableBuilderBase builder) { this.conn = conn; @@ -149,7 +149,7 @@ class RawAsyncTableImpl implements AsyncTable { ? conn.connConf.getMetaScannerCaching() : conn.connConf.getScannerCaching(); this.defaultScannerMaxResultSize = conn.connConf.getScannerMaxResultSize(); - this.requestAttributes = builder.requestAttributes; + this.requestAttributesFactory = builder.getRequestAttributesFactory(); } @Override @@ -216,7 +216,7 @@ private SingleRequestCallerBuilder newCaller(byte[] row, int priority, lo .pause(pauseNs, TimeUnit.NANOSECONDS) .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS) .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt) - .setRequestAttributes(requestAttributes); + .setRequestAttributes(getRequestAttributes()); } private SingleRequestCallerBuilder @@ -616,7 +616,7 @@ private Scan setDefaultScanConfig(Scan scan) { public void scan(Scan scan, AdvancedScanResultConsumer consumer) { new AsyncClientScanner(setDefaultScanConfig(scan), consumer, tableName, conn, retryTimer, pauseNs, pauseNsForServerOverloaded, maxAttempts, scanTimeoutNs, readRpcTimeoutNs, - startLogErrorsCnt, requestAttributes).start(); + startLogErrorsCnt, getRequestAttributes()).start(); } private long resultSize2CacheSize(long maxResultSize) { @@ -713,7 +713,7 @@ private List> batch(List actions, long r .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS) .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS) .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt) - .setRequestAttributes(requestAttributes).call(); + .setRequestAttributes(getRequestAttributes()).call(); } @Override @@ -743,7 +743,10 @@ public long getScanTimeout(TimeUnit unit) { @Override public Map getRequestAttributes() { - return requestAttributes; + Map attributes = requestAttributesFactory.create(); + Preconditions.checkState(attributes != null, + "RequestAttributesFactory.create() must not return null"); + return attributes; } private CompletableFuture coprocessorService(Function stubMaker, diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RequestAttributesFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RequestAttributesFactory.java new file mode 100644 index 000000000000..90277bee9d5d --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RequestAttributesFactory.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.util.Map; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Factory for creating request attributes. Called each time a client call is started, allowing + * dynamic attributes per call. Useful for propagating {@link ThreadLocal} context as request + * attributes. + *

+ * For a fixed set of attributes that does not change, use {@link FixedRequestAttributesFactory}. + * @see AsyncTableBuilder#setRequestAttributesFactory(RequestAttributesFactory) + */ +@InterfaceAudience.Public +public interface RequestAttributesFactory { + + /** + * Creates request attributes for a client call (e.g., {@link AsyncTable#get}, + * {@link AsyncTable#put}, {@link AsyncTable#scan}). + *

+ * Guaranteed to be called on the same thread that initiates the client call. + * @return the request attributes, must not be null + */ + Map create(); +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableBuilder.java index eee985555b34..50cb550b5ef2 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableBuilder.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableBuilder.java @@ -56,10 +56,24 @@ public interface TableBuilder { TableBuilder setWriteRpcTimeout(int timeout); /** - * Set a request attribute + * Sets a request attribute. Ignored if a factory is set via + * {@link #setRequestAttributesFactory(RequestAttributesFactory)}. + * @deprecated Since 3.0.0, will be removed in 4.0.0. Please use + * {@link #setRequestAttributesFactory(RequestAttributesFactory)} instead. */ + @Deprecated TableBuilder setRequestAttribute(String key, byte[] value); + /** + * Sets the factory for creating request attributes. Use {@link FixedRequestAttributesFactory} for + * attributes that do not change, or implement {@link RequestAttributesFactory} for dynamic + * attributes. + */ + default TableBuilder + setRequestAttributesFactory(RequestAttributesFactory requestAttributesFactory) { + throw new UnsupportedOperationException("Not implemented"); + } + /** * Create the {@link Table} instance. */ diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableBuilderBase.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableBuilderBase.java index dc3111b0c79d..4e3100ac9b3d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableBuilderBase.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableBuilderBase.java @@ -17,9 +17,6 @@ */ package org.apache.hadoop.hbase.client; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; import org.apache.hadoop.hbase.TableName; import org.apache.yetus.audience.InterfaceAudience; @@ -39,7 +36,9 @@ abstract class TableBuilderBase implements TableBuilder { protected int writeRpcTimeout; - protected Map requestAttributes = Collections.emptyMap(); + protected FixedRequestAttributesFactory.Builder fixedRequestAttributesFactoryBuilder = null; + + protected RequestAttributesFactory requestAttributesFactory = null; TableBuilderBase(TableName tableName, ConnectionConfiguration connConf) { if (tableName == null) { @@ -81,10 +80,27 @@ public TableBuilderBase setWriteRpcTimeout(int timeout) { @Override public TableBuilderBase setRequestAttribute(String key, byte[] value) { - if (this.requestAttributes.isEmpty()) { - this.requestAttributes = new HashMap<>(); + if (fixedRequestAttributesFactoryBuilder == null) { + fixedRequestAttributesFactoryBuilder = FixedRequestAttributesFactory.newBuilder(); } - this.requestAttributes.put(key, value); + fixedRequestAttributesFactoryBuilder.setAttribute(key, value); + return this; + } + + @Override + public TableBuilderBase + setRequestAttributesFactory(RequestAttributesFactory requestAttributesFactory) { + this.requestAttributesFactory = requestAttributesFactory; return this; } + + RequestAttributesFactory getRequestAttributesFactory() { + if (requestAttributesFactory != null) { + return requestAttributesFactory; + } else if (fixedRequestAttributesFactoryBuilder != null) { + return fixedRequestAttributesFactoryBuilder.build(); + } else { + return FixedRequestAttributesFactory.EMPTY; + } + } } diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestBufferedMutatorParams.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestBufferedMutatorParams.java index b6c52a0cd0d6..9600bcd3cc6c 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestBufferedMutatorParams.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestBufferedMutatorParams.java @@ -182,7 +182,8 @@ private void cloneTest(BufferedMutatorParams some, BufferedMutatorParams clone) clone.getWriteBufferPeriodicFlushTimerTickMs()); assertEquals(some.getMaxKeyValueSize(), clone.getMaxKeyValueSize()); assertTrue(some.getMaxMutations() == clone.getMaxMutations()); - assertEquals(some.requestAttributes, clone.requestAttributes); + assertEquals(some.getRequestAttributesFactory().create(), + clone.getRequestAttributesFactory().create()); assertTrue(some.getListener() == clone.getListener()); assertTrue(some.getPool() == clone.getPool()); assertEquals(some.getImplementationClassName(), clone.getImplementationClassName()); diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestFixedRequestAttributesFactory.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestFixedRequestAttributesFactory.java new file mode 100644 index 000000000000..90c2b3d3b3fe --- /dev/null +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestFixedRequestAttributesFactory.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; + +import java.util.Map; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ ClientTests.class, SmallTests.class }) +public class TestFixedRequestAttributesFactory { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestFixedRequestAttributesFactory.class); + + @Test + public void testEmptyFactory() { + Map attrs = FixedRequestAttributesFactory.EMPTY.create(); + assertTrue(attrs.isEmpty()); + } + + @Test + public void testBuilderSetAttribute() { + byte[] value = Bytes.toBytes("value1"); + FixedRequestAttributesFactory factory = + FixedRequestAttributesFactory.newBuilder().setAttribute("key1", value).build(); + + Map attrs = factory.create(); + assertEquals(1, attrs.size()); + assertArrayEquals(value, attrs.get("key1")); + } + + @Test + public void testBuilderMultipleAttributes() { + byte[] value1 = Bytes.toBytes("value1"); + byte[] value2 = Bytes.toBytes("value2"); + FixedRequestAttributesFactory factory = FixedRequestAttributesFactory.newBuilder() + .setAttribute("key1", value1).setAttribute("key2", value2).build(); + + Map attrs = factory.create(); + assertEquals(2, attrs.size()); + assertArrayEquals(value1, attrs.get("key1")); + assertArrayEquals(value2, attrs.get("key2")); + } + + @Test + public void testBuilderOverrideAttribute() { + byte[] value1 = Bytes.toBytes("value1"); + byte[] value2 = Bytes.toBytes("value2"); + FixedRequestAttributesFactory factory = FixedRequestAttributesFactory.newBuilder() + .setAttribute("key1", value1).setAttribute("key1", value2).build(); + + Map attrs = factory.create(); + assertEquals(1, attrs.size()); + assertArrayEquals(value2, attrs.get("key1")); + } + + @Test + public void testBuilderNullValueRemovesAttribute() { + byte[] value1 = Bytes.toBytes("value1"); + FixedRequestAttributesFactory factory = FixedRequestAttributesFactory.newBuilder() + .setAttribute("key1", value1).setAttribute("key1", null).build(); + + Map attrs = factory.create(); + assertTrue(attrs.isEmpty()); + } + + @Test + public void testBuilderNullValueOnNonExistentKey() { + FixedRequestAttributesFactory factory = + FixedRequestAttributesFactory.newBuilder().setAttribute("key1", null).build(); + + Map attrs = factory.create(); + assertTrue(attrs.isEmpty()); + } + + @Test + public void testCreateReturnsSameInstance() { + FixedRequestAttributesFactory factory = FixedRequestAttributesFactory.newBuilder() + .setAttribute("key1", Bytes.toBytes("value1")).build(); + + Map attrs1 = factory.create(); + Map attrs2 = factory.create(); + assertSame(attrs1, attrs2); + } + + @Test(expected = UnsupportedOperationException.class) + public void testReturnedMapIsUnmodifiable() { + FixedRequestAttributesFactory factory = FixedRequestAttributesFactory.newBuilder() + .setAttribute("key1", Bytes.toBytes("value1")).build(); + + Map attrs = factory.create(); + attrs.put("key2", Bytes.toBytes("value2")); + } + + @Test + public void testBuilderIsolation() { + FixedRequestAttributesFactory.Builder builder = FixedRequestAttributesFactory.newBuilder(); + builder.setAttribute("key1", Bytes.toBytes("value1")); + FixedRequestAttributesFactory factory = builder.build(); + + builder.setAttribute("key2", Bytes.toBytes("value2")); + + Map attrs = factory.create(); + assertEquals(1, attrs.size()); + } + + @Test + public void testNewBuilderReturnsFreshInstance() { + FixedRequestAttributesFactory.Builder builder1 = FixedRequestAttributesFactory.newBuilder(); + FixedRequestAttributesFactory.Builder builder2 = FixedRequestAttributesFactory.newBuilder(); + assertNotSame(builder1, builder2); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRequestAttributes.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRequestAttributes.java index 9d6dc33a46a3..5c2452e314be 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRequestAttributes.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRequestAttributes.java @@ -17,6 +17,9 @@ */ package org.apache.hadoop.hbase.client; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + import java.io.IOException; import java.util.Arrays; import java.util.HashMap; @@ -24,8 +27,12 @@ import java.util.Map; import java.util.Optional; import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.AuthUtil; import org.apache.hadoop.hbase.Cell; @@ -72,8 +79,23 @@ public class TestRequestAttributes { private static final byte[] ROW_KEY6 = Bytes.toBytes("6"); private static final byte[] ROW_KEY7 = Bytes.toBytes("7"); private static final byte[] ROW_KEY8 = Bytes.toBytes("8"); + private static final byte[] ROW_KEY_FACTORY_GET = Bytes.toBytes("F1"); + private static final byte[] ROW_KEY_FACTORY_SCAN = Bytes.toBytes("F2"); + private static final byte[] ROW_KEY_FACTORY_PUT = Bytes.toBytes("F3"); + private static final byte[] ROW_KEY_FACTORY_PER_REQUEST = Bytes.toBytes("F5"); + private static final byte[] ROW_KEY_TABLE_FACTORY_GET = Bytes.toBytes("TF1"); + private static final byte[] ROW_KEY_TABLE_FACTORY_SCAN = Bytes.toBytes("TF2"); + private static final byte[] ROW_KEY_TABLE_FACTORY_PUT = Bytes.toBytes("TF3"); + private static final byte[] ROW_KEY_BM_FACTORY = Bytes.toBytes("BM1"); + private static final byte[] ROW_KEY_ASYNC_BM_FACTORY = Bytes.toBytes("ABM1"); + private static final String FACTORY_KEY = "factoryKey"; + private static final byte[] FACTORY_VALUE = Bytes.toBytes("factoryValue"); + private static final String IGNORED_KEY = "ignoredKey"; + private static final byte[] IGNORED_VALUE = Bytes.toBytes("ignoredValue"); private static final Map CONNECTION_ATTRIBUTES = new HashMap<>(); private static final Map REQUEST_ATTRIBUTES_SCAN = addRandomRequestAttributes(); + private static final Map REQUEST_ATTRIBUTES_FACTORY_SCAN = new HashMap<>(); + private static final Map REQUEST_ATTRIBUTES_TABLE_FACTORY_SCAN = new HashMap<>(); private static final Map> ROW_KEY_TO_REQUEST_ATTRIBUTES = new HashMap<>(); static { @@ -88,6 +110,38 @@ public class TestRequestAttributes { ROW_KEY_TO_REQUEST_ATTRIBUTES.put(ROW_KEY6, addRandomRequestAttributes()); ROW_KEY_TO_REQUEST_ATTRIBUTES.put(ROW_KEY7, addRandomRequestAttributes()); ROW_KEY_TO_REQUEST_ATTRIBUTES.put(ROW_KEY8, new HashMap()); + + Map factoryGetAttrs = new HashMap<>(); + factoryGetAttrs.put(FACTORY_KEY, FACTORY_VALUE); + ROW_KEY_TO_REQUEST_ATTRIBUTES.put(ROW_KEY_FACTORY_GET, factoryGetAttrs); + + REQUEST_ATTRIBUTES_FACTORY_SCAN.put(FACTORY_KEY, FACTORY_VALUE); + + Map factoryPutAttrs = new HashMap<>(); + factoryPutAttrs.put(FACTORY_KEY, FACTORY_VALUE); + ROW_KEY_TO_REQUEST_ATTRIBUTES.put(ROW_KEY_FACTORY_PUT, factoryPutAttrs); + + Map factoryPerRequestAttrs = new HashMap<>(); + factoryPerRequestAttrs.put(FACTORY_KEY, FACTORY_VALUE); + ROW_KEY_TO_REQUEST_ATTRIBUTES.put(ROW_KEY_FACTORY_PER_REQUEST, factoryPerRequestAttrs); + + Map tableFactoryGetAttrs = new HashMap<>(); + tableFactoryGetAttrs.put(FACTORY_KEY, FACTORY_VALUE); + ROW_KEY_TO_REQUEST_ATTRIBUTES.put(ROW_KEY_TABLE_FACTORY_GET, tableFactoryGetAttrs); + + REQUEST_ATTRIBUTES_TABLE_FACTORY_SCAN.put(FACTORY_KEY, FACTORY_VALUE); + + Map tableFactoryPutAttrs = new HashMap<>(); + tableFactoryPutAttrs.put(FACTORY_KEY, FACTORY_VALUE); + ROW_KEY_TO_REQUEST_ATTRIBUTES.put(ROW_KEY_TABLE_FACTORY_PUT, tableFactoryPutAttrs); + + Map bmFactoryAttrs = new HashMap<>(); + bmFactoryAttrs.put(FACTORY_KEY, FACTORY_VALUE); + ROW_KEY_TO_REQUEST_ATTRIBUTES.put(ROW_KEY_BM_FACTORY, bmFactoryAttrs); + + Map asyncBmFactoryAttrs = new HashMap<>(); + asyncBmFactoryAttrs.put(FACTORY_KEY, FACTORY_VALUE); + ROW_KEY_TO_REQUEST_ATTRIBUTES.put(ROW_KEY_ASYNC_BM_FACTORY, asyncBmFactoryAttrs); } private static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(100); private static final byte[] FAMILY = Bytes.toBytes("0"); @@ -237,6 +291,227 @@ public void testNoRequestAttributes() throws IOException { } } + @Test + public void testAsyncRequestAttributesFactoryGet() + throws IOException, ExecutionException, InterruptedException { + Configuration conf = TEST_UTIL.getConfiguration(); + try (AsyncConnection conn = ConnectionFactory.createAsyncConnection(conf).get()) { + AsyncTable table = conn.getTableBuilder(TABLE_NAME).setRequestAttributesFactory(() -> { + Map attrs = new HashMap<>(); + attrs.put(FACTORY_KEY, FACTORY_VALUE); + return attrs; + }).build(); + table.get(new Get(ROW_KEY_FACTORY_GET)).get(); + } + } + + @Test + public void testAsyncRequestAttributesFactoryScan() + throws IOException, ExecutionException, InterruptedException { + Configuration conf = TEST_UTIL.getConfiguration(); + try (AsyncConnection conn = ConnectionFactory.createAsyncConnection(conf).get()) { + AsyncTable table = conn.getTableBuilder(TABLE_NAME).setRequestAttributesFactory(() -> { + Map attrs = new HashMap<>(); + attrs.put(FACTORY_KEY, FACTORY_VALUE); + return attrs; + }).build(); + table + .scanAll( + new Scan().withStartRow(ROW_KEY_FACTORY_SCAN).withStopRow(ROW_KEY_FACTORY_SCAN, true)) + .get(); + } + } + + @Test + public void testAsyncRequestAttributesFactoryPut() + throws IOException, ExecutionException, InterruptedException { + Configuration conf = TEST_UTIL.getConfiguration(); + try (AsyncConnection conn = ConnectionFactory.createAsyncConnection(conf).get()) { + AsyncTable table = conn.getTableBuilder(TABLE_NAME).setRequestAttributesFactory(() -> { + Map attrs = new HashMap<>(); + attrs.put(FACTORY_KEY, FACTORY_VALUE); + return attrs; + }).build(); + Put put = new Put(ROW_KEY_FACTORY_PUT); + put.addColumn(FAMILY, Bytes.toBytes("c"), Bytes.toBytes("v")); + table.put(put).get(); + } + } + + @Test + public void testAsyncRequestAttributesFactoryCalledPerRequest() + throws IOException, ExecutionException, InterruptedException { + Configuration conf = TEST_UTIL.getConfiguration(); + AtomicInteger callCount = new AtomicInteger(0); + try (AsyncConnection conn = ConnectionFactory.createAsyncConnection(conf).get()) { + AsyncTable table = conn.getTableBuilder(TABLE_NAME).setRequestAttributesFactory(() -> { + callCount.incrementAndGet(); + Map attrs = new HashMap<>(); + attrs.put(FACTORY_KEY, FACTORY_VALUE); + return attrs; + }).build(); + table.get(new Get(ROW_KEY_FACTORY_PER_REQUEST)).get(); + table.get(new Get(ROW_KEY_FACTORY_PER_REQUEST)).get(); + table.get(new Get(ROW_KEY_FACTORY_PER_REQUEST)).get(); + } + assertTrue("Factory should be called at least 3 times", callCount.get() >= 3); + } + + @Test + public void testAsyncRequestAttributesFactoryCalledOnInitiatingThread() + throws IOException, ExecutionException, InterruptedException { + Configuration conf = TEST_UTIL.getConfiguration(); + Thread testThread = Thread.currentThread(); + AtomicReference factoryThread = new AtomicReference<>(); + try (AsyncConnection conn = ConnectionFactory.createAsyncConnection(conf).get()) { + AsyncTable table = conn.getTableBuilder(TABLE_NAME).setRequestAttributesFactory(() -> { + factoryThread.set(Thread.currentThread()); + Map attrs = new HashMap<>(); + attrs.put(FACTORY_KEY, FACTORY_VALUE); + return attrs; + }).build(); + table.get(new Get(ROW_KEY_FACTORY_GET)).get(); + } + assertEquals("Factory should be called on the initiating thread", testThread, + factoryThread.get()); + } + + @Test + public void testAsyncFixedRequestAttributesFactory() + throws IOException, ExecutionException, InterruptedException { + Configuration conf = TEST_UTIL.getConfiguration(); + try (AsyncConnection conn = ConnectionFactory.createAsyncConnection(conf).get()) { + AsyncTable table = conn.getTableBuilder(TABLE_NAME).setRequestAttributesFactory( + FixedRequestAttributesFactory.newBuilder().setAttribute(FACTORY_KEY, FACTORY_VALUE).build()) + .build(); + table.get(new Get(ROW_KEY_FACTORY_GET)).get(); + } + } + + @Test + public void testTableRequestAttributesFactoryGet() throws IOException { + Configuration conf = TEST_UTIL.getConfiguration(); + try (Connection conn = ConnectionFactory.createConnection(conf)) { + Table table = conn.getTableBuilder(TABLE_NAME, null).setRequestAttributesFactory(() -> { + Map attrs = new HashMap<>(); + attrs.put(FACTORY_KEY, FACTORY_VALUE); + return attrs; + }).build(); + table.get(new Get(ROW_KEY_TABLE_FACTORY_GET)); + } + } + + @Test + public void testTableRequestAttributesFactoryScan() throws IOException { + Configuration conf = TEST_UTIL.getConfiguration(); + try (Connection conn = ConnectionFactory.createConnection(conf)) { + Table table = conn.getTableBuilder(TABLE_NAME, null).setRequestAttributesFactory(() -> { + Map attrs = new HashMap<>(); + attrs.put(FACTORY_KEY, FACTORY_VALUE); + return attrs; + }).build(); + ResultScanner scanner = table.getScanner(new Scan().withStartRow(ROW_KEY_TABLE_FACTORY_SCAN) + .withStopRow(ROW_KEY_TABLE_FACTORY_SCAN, true)); + scanner.next(); + } + } + + @Test + public void testTableRequestAttributesFactoryPut() throws IOException { + Configuration conf = TEST_UTIL.getConfiguration(); + try (Connection conn = ConnectionFactory.createConnection(conf)) { + Table table = conn.getTableBuilder(TABLE_NAME, null).setRequestAttributesFactory(() -> { + Map attrs = new HashMap<>(); + attrs.put(FACTORY_KEY, FACTORY_VALUE); + return attrs; + }).build(); + Put put = new Put(ROW_KEY_TABLE_FACTORY_PUT); + put.addColumn(FAMILY, Bytes.toBytes("c"), Bytes.toBytes("v")); + table.put(put); + } + } + + @Test + public void testTableRequestAttributesFactoryOverridesStaticAttributes() throws IOException { + Configuration conf = TEST_UTIL.getConfiguration(); + try (Connection conn = ConnectionFactory.createConnection(conf)) { + Table table = conn.getTableBuilder(TABLE_NAME, null) + .setRequestAttribute(IGNORED_KEY, IGNORED_VALUE).setRequestAttributesFactory(() -> { + Map attrs = new HashMap<>(); + attrs.put(FACTORY_KEY, FACTORY_VALUE); + return attrs; + }).build(); + table.get(new Get(ROW_KEY_TABLE_FACTORY_GET)); + } + } + + @Test + public void testTableFixedRequestAttributesFactory() throws IOException { + Configuration conf = TEST_UTIL.getConfiguration(); + try (Connection conn = ConnectionFactory.createConnection(conf)) { + Table table = conn.getTableBuilder(TABLE_NAME, null).setRequestAttributesFactory( + FixedRequestAttributesFactory.newBuilder().setAttribute(FACTORY_KEY, FACTORY_VALUE).build()) + .build(); + table.get(new Get(ROW_KEY_TABLE_FACTORY_GET)); + } + } + + @Test + public void testBufferedMutatorRequestAttributesFactory() throws IOException { + Configuration conf = TEST_UTIL.getConfiguration(); + try (Connection conn = ConnectionFactory.createConnection(conf)) { + BufferedMutatorParams params = + new BufferedMutatorParams(TABLE_NAME).setRequestAttributesFactory(() -> { + Map attrs = new HashMap<>(); + attrs.put(FACTORY_KEY, FACTORY_VALUE); + return attrs; + }); + BufferedMutator bufferedMutator = conn.getBufferedMutator(params); + Put put = new Put(ROW_KEY_BM_FACTORY); + put.addColumn(FAMILY, Bytes.toBytes("c"), Bytes.toBytes("v")); + bufferedMutator.mutate(put); + bufferedMutator.flush(); + } + } + + @Test + public void testBufferedMutatorRequestAttributesFactoryOverridesStaticAttributes() + throws IOException { + Configuration conf = TEST_UTIL.getConfiguration(); + try (Connection conn = ConnectionFactory.createConnection(conf)) { + BufferedMutatorParams params = new BufferedMutatorParams(TABLE_NAME) + .setRequestAttribute(IGNORED_KEY, IGNORED_VALUE).setRequestAttributesFactory(() -> { + Map attrs = new HashMap<>(); + attrs.put(FACTORY_KEY, FACTORY_VALUE); + return attrs; + }); + BufferedMutator bufferedMutator = conn.getBufferedMutator(params); + Put put = new Put(ROW_KEY_BM_FACTORY); + put.addColumn(FAMILY, Bytes.toBytes("c"), Bytes.toBytes("v")); + bufferedMutator.mutate(put); + bufferedMutator.flush(); + } + } + + @Test + public void testAsyncBufferedMutatorRequestAttributesFactory() + throws IOException, ExecutionException, InterruptedException { + Configuration conf = TEST_UTIL.getConfiguration(); + try (AsyncConnection conn = ConnectionFactory.createAsyncConnection(conf).get()) { + AsyncBufferedMutator bufferedMutator = + conn.getBufferedMutatorBuilder(TABLE_NAME).setRequestAttributesFactory(() -> { + Map attrs = new HashMap<>(); + attrs.put(FACTORY_KEY, FACTORY_VALUE); + return attrs; + }).build(); + Put put = new Put(ROW_KEY_ASYNC_BM_FACTORY); + put.addColumn(FAMILY, Bytes.toBytes("c"), Bytes.toBytes("v")); + CompletableFuture future = bufferedMutator.mutate(put); + bufferedMutator.flush(); + future.get(); + } + } + private static Map addRandomRequestAttributes() { Map requestAttributes = new HashMap<>(); int j = Math.max(2, (int) (10 * Math.random())); @@ -324,7 +599,11 @@ public void preGetOp(ObserverContext c, @Override public boolean preScannerNext(ObserverContext c, InternalScanner s, List result, int limit, boolean hasNext) throws IOException { - if (!isValidRequestAttributes(REQUEST_ATTRIBUTES_SCAN)) { + if ( + !isValidRequestAttributes(REQUEST_ATTRIBUTES_SCAN) + && !isValidRequestAttributes(REQUEST_ATTRIBUTES_FACTORY_SCAN) + && !isValidRequestAttributes(REQUEST_ATTRIBUTES_TABLE_FACTORY_SCAN) + ) { throw new IOException("Incorrect request attributes"); } return hasNext;