diff --git a/gora-hive/src/test/java/org/apache/gora/hive/store/TestHiveStore.java b/gora-hive/src/test/java/org/apache/gora/hive/store/TestHiveStore.java index 003d8ec7..69507c09 100644 --- a/gora-hive/src/test/java/org/apache/gora/hive/store/TestHiveStore.java +++ b/gora-hive/src/test/java/org/apache/gora/hive/store/TestHiveStore.java @@ -18,22 +18,32 @@ package org.apache.gora.hive.store; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; import java.util.Set; import org.apache.avro.util.Utf8; +import org.apache.gora.examples.WebPageDataCreator; import org.apache.gora.examples.generated.Employee; import org.apache.gora.examples.generated.Metadata; import org.apache.gora.examples.generated.WebPage; import org.apache.gora.hive.GoraHiveTestDriver; import org.apache.gora.persistency.impl.BeanFactoryImpl; +import org.apache.gora.query.Query; +import org.apache.gora.query.Result; import org.apache.gora.store.DataStoreTestBase; import org.apache.gora.store.DataStoreTestUtil; import org.apache.gora.util.GoraException; import org.apache.gora.util.StringUtils; +import org.apache.metamodel.query.parser.QueryParserException; import org.junit.Ignore; import org.junit.Test; @@ -56,6 +66,139 @@ public void assertSchemaExists(String schemaName) throws Exception { assertTrue(employeeStore.schemaExists()); } + private void awaitWebPageSchema(String key) throws Exception { + // wait until Hive exposes the schema and, if provided, the specific record + for (int attempt = 0; attempt < 100; attempt++) { + webPageStore.flush(); + if (!webPageStore.schemaExists()) { + Thread.sleep(100L); + continue; + } + if (key == null) return; + try { + // read the key to confirm parser metadata is in sync + webPageStore.get(key, new String[] {"url"}); + return; + } catch (QueryParserException e) { + // recreate the store when Hive's parser cache is stale + webPageStore.close(); + webPageStore = testDriver.createDataStore(String.class, WebPage.class); + } + } + fail("Hive web page schema or record was not visible"); + } + + private void populateWebPages() throws Exception { + // load deterministic WebPage fixtures and ensure their visibility + webPageStore.createSchema(); + awaitWebPageSchema(null); + WebPageDataCreator.createWebPageData(webPageStore); + for (String url : WebPageDataCreator.URLS) { + // block until each inserted record is queryable + awaitWebPageSchema(url); + } + } + + private List sortedWebPageUrls() { + // copy and sort the static URL set to enforce deterministic order + List sorted = new ArrayList<>(Arrays.asList(WebPageDataCreator.URLS)); + Collections.sort(sorted); + return sorted; + } + + private void assertKeyRange(boolean setStartKey, boolean setEndKey) throws Exception { + // verify range queries across all start/end key combinations + populateWebPages(); + List urls = sortedWebPageUrls(); + int n = urls.size(); + + for (int i = 0, iLimit = setStartKey ? n : 1; i < iLimit; i++) { + // derive the set of end indices allowed for the current start index + int jStart = setEndKey ? i : n - 1; + int jLimit = setEndKey ? n : jStart + 1; + for (int j = jStart; j < jLimit; j++) { + Query query = webPageStore.newQuery(); + if (setStartKey) query.setStartKey(urls.get(i)); + if (setEndKey) query.setEndKey(urls.get(j)); + + Result result = query.execute(); + int actual = 0; + while (result.next()) { + WebPage page = result.get(); + // enforce that each returned record matches the fixture expectations + DataStoreTestUtil.assertWebPage( + page, WebPageDataCreator.URL_INDEXES.get(page.getUrl().toString())); + actual++; + } + result.close(); + + // compare the observed result size with the analytical expectation + int expected = (setEndKey ? j + 1 : n) - (setStartKey ? i : 0); + assertEquals(expected, actual); + } + } + } + + private void assertSingleKeyQuery(String[] fields) throws Exception { + // run single-key lookups for each URL and validate the returned page + populateWebPages(); + for (int i = 0; i < WebPageDataCreator.URLS.length; i++) { + Query query = webPageStore.newQuery(); + query.setKey(WebPageDataCreator.URLS[i]); + query.setFields(fields); + Result result = query.execute(); + assertTrue(result.next()); + WebPage page = result.get(); + // the content must match the deterministic fixture by index + DataStoreTestUtil.assertWebPage(page, i); + assertFalse(result.next()); + result.close(); + } + } + + @Override + public void testQuery() throws Exception { + assertKeyRange(false, false); + } + + @Override + public void testQueryEndKey() throws Exception { + assertKeyRange(false, true); + } + + @Override + public void testQueryKeyRange() throws Exception { + assertKeyRange(true, true); + } + + @Override + public void testQueryStartKey() throws Exception { + assertKeyRange(true, false); + } + + @Override + public void testQueryWebPageQueryEmptyResults() throws Exception { + populateWebPages(); + Query query = webPageStore.newQuery(); + query.setStartKey("aa"); + query.setEndKey("ab"); + DataStoreTestUtil.assertEmptyResults(query); + + query = webPageStore.newQuery(); + query.setKey("aa"); + DataStoreTestUtil.assertEmptyResults(query); + } + + @Override + public void testQueryWebPageSingleKey() throws Exception { + assertSingleKeyQuery(((HiveStore) webPageStore).getFields()); + } + + @Override + public void testQueryWebPageSingleKeyDefaultFields() throws Exception { + assertSingleKeyQuery(null); + } + @Override public void assertPut(Employee employee) throws GoraException { employeeStore.put(employee.getSsn().toString(), employee);