Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 143 additions & 0 deletions gora-hive/src/test/java/org/apache/gora/hive/store/TestHiveStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,32 @@

package org.apache.gora.hive.store;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import org.apache.avro.util.Utf8;
import org.apache.gora.examples.WebPageDataCreator;
import org.apache.gora.examples.generated.Employee;
import org.apache.gora.examples.generated.Metadata;
import org.apache.gora.examples.generated.WebPage;
import org.apache.gora.hive.GoraHiveTestDriver;
import org.apache.gora.persistency.impl.BeanFactoryImpl;
import org.apache.gora.query.Query;
import org.apache.gora.query.Result;
import org.apache.gora.store.DataStoreTestBase;
import org.apache.gora.store.DataStoreTestUtil;
import org.apache.gora.util.GoraException;
import org.apache.gora.util.StringUtils;
import org.apache.metamodel.query.parser.QueryParserException;
import org.junit.Ignore;
import org.junit.Test;

Expand All @@ -56,6 +66,139 @@ public void assertSchemaExists(String schemaName) throws Exception {
assertTrue(employeeStore.schemaExists());
}

private void awaitWebPageSchema(String key) throws Exception {
// wait until Hive exposes the schema and, if provided, the specific record
for (int attempt = 0; attempt < 100; attempt++) {
webPageStore.flush();
if (!webPageStore.schemaExists()) {
Thread.sleep(100L);
continue;
}
if (key == null) return;
try {
// read the key to confirm parser metadata is in sync
webPageStore.get(key, new String[] {"url"});
return;
} catch (QueryParserException e) {
// recreate the store when Hive's parser cache is stale
webPageStore.close();
webPageStore = testDriver.createDataStore(String.class, WebPage.class);
}
}
fail("Hive web page schema or record was not visible");
}

private void populateWebPages() throws Exception {
// load deterministic WebPage fixtures and ensure their visibility
webPageStore.createSchema();
awaitWebPageSchema(null);
WebPageDataCreator.createWebPageData(webPageStore);
for (String url : WebPageDataCreator.URLS) {
// block until each inserted record is queryable
awaitWebPageSchema(url);
}
}

private List<String> sortedWebPageUrls() {
// copy and sort the static URL set to enforce deterministic order
List<String> sorted = new ArrayList<>(Arrays.asList(WebPageDataCreator.URLS));
Collections.sort(sorted);
return sorted;
}

private void assertKeyRange(boolean setStartKey, boolean setEndKey) throws Exception {
// verify range queries across all start/end key combinations
populateWebPages();
List<String> urls = sortedWebPageUrls();
int n = urls.size();

for (int i = 0, iLimit = setStartKey ? n : 1; i < iLimit; i++) {
// derive the set of end indices allowed for the current start index
int jStart = setEndKey ? i : n - 1;
int jLimit = setEndKey ? n : jStart + 1;
for (int j = jStart; j < jLimit; j++) {
Query<String, WebPage> query = webPageStore.newQuery();
if (setStartKey) query.setStartKey(urls.get(i));
if (setEndKey) query.setEndKey(urls.get(j));

Result<String, WebPage> result = query.execute();
int actual = 0;
while (result.next()) {
WebPage page = result.get();
// enforce that each returned record matches the fixture expectations
DataStoreTestUtil.assertWebPage(
page, WebPageDataCreator.URL_INDEXES.get(page.getUrl().toString()));
actual++;
}
result.close();

// compare the observed result size with the analytical expectation
int expected = (setEndKey ? j + 1 : n) - (setStartKey ? i : 0);
assertEquals(expected, actual);
}
}
}

private void assertSingleKeyQuery(String[] fields) throws Exception {
// run single-key lookups for each URL and validate the returned page
populateWebPages();
for (int i = 0; i < WebPageDataCreator.URLS.length; i++) {
Query<String, WebPage> query = webPageStore.newQuery();
query.setKey(WebPageDataCreator.URLS[i]);
query.setFields(fields);
Result<String, WebPage> result = query.execute();
assertTrue(result.next());
WebPage page = result.get();
// the content must match the deterministic fixture by index
DataStoreTestUtil.assertWebPage(page, i);
assertFalse(result.next());
result.close();
}
}

@Override
public void testQuery() throws Exception {
assertKeyRange(false, false);
}

@Override
public void testQueryEndKey() throws Exception {
assertKeyRange(false, true);
}

@Override
public void testQueryKeyRange() throws Exception {
assertKeyRange(true, true);
}

@Override
public void testQueryStartKey() throws Exception {
assertKeyRange(true, false);
}

@Override
public void testQueryWebPageQueryEmptyResults() throws Exception {
populateWebPages();
Query<String, WebPage> query = webPageStore.newQuery();
query.setStartKey("aa");
query.setEndKey("ab");
DataStoreTestUtil.assertEmptyResults(query);

query = webPageStore.newQuery();
query.setKey("aa");
DataStoreTestUtil.assertEmptyResults(query);
}

@Override
public void testQueryWebPageSingleKey() throws Exception {
assertSingleKeyQuery(((HiveStore<String, WebPage>) webPageStore).getFields());
}

@Override
public void testQueryWebPageSingleKeyDefaultFields() throws Exception {
assertSingleKeyQuery(null);
}

@Override
public void assertPut(Employee employee) throws GoraException {
employeeStore.put(employee.getSsn().toString(), employee);
Expand Down
Loading