From 0af501c7bfc2938445c27207b86a0b9edc98a6ea Mon Sep 17 00:00:00 2001
From: Olga Kirillova
Date: Thu, 9 Nov 2017 22:42:51 -0800
Subject: [PATCH 1/5] Uniform Random Sampling
---
.../pig/sampling/UniformRandomSample.java | 241 ++++++++++++++++++
.../pig/sampling/UniformRandomSampleTest.java | 214 ++++++++++++++++
2 files changed, 455 insertions(+)
create mode 100644 datafu-pig/src/main/java/datafu/pig/sampling/UniformRandomSample.java
create mode 100644 datafu-pig/src/test/java/datafu/test/pig/sampling/UniformRandomSampleTest.java
diff --git a/datafu-pig/src/main/java/datafu/pig/sampling/UniformRandomSample.java b/datafu-pig/src/main/java/datafu/pig/sampling/UniformRandomSample.java
new file mode 100644
index 00000000..a31f378c
--- /dev/null
+++ b/datafu-pig/src/main/java/datafu/pig/sampling/UniformRandomSample.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.pig.sampling;
+
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.Iterator;
+
+import org.apache.commons.math.random.RandomDataImpl;
+import org.apache.pig.AlgebraicEvalFunc;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+import java.util.SortedSet;
+import java.util.TreeSet;
+/**
+ * Scalable uniform random sampling.
+ *
+ *
+ * This UDF implements a uniform random sampling algorithm
+ *
+ *
+ *
+ * It takes a bag of n items and either a fraction (p) or an exact number (k) of
+ * the items to be selected as the input, and returns a bag of k or ceil(p*n)
+ * items uniformly sampled.
+ *
+ *
+ *
+ * DEFINE URS datafu.pig.sampling.URandomSample();
+ *
+ * item = LOAD 'input' AS (x:double);
+ * sampled = FOREACH (GROUP items ALL) GENERATE FLATTEN(URS(items, p||k));
+ *
+ *
+ */
+
+public class UniformRandomSample extends AlgebraicEvalFunc {
+ /**
+ * Prefix for the output bag name.
+ */
+ public static final String OUTPUT_BAG_NAME_PREFIX = "URS";
+
+ private static final TupleFactory _TUPLE_FACTORY = TupleFactory.getInstance();
+ private static final BagFactory _BAG_FACTORY = BagFactory.getInstance();
+
+ protected static double p = 0d; // a fraction of the set came as input parameter
+ public UniformRandomSample(double p) {
+ // not sure how to implement the following:
+ // on p = 0, no further calcs are needed, return empty set
+ // on p = 1, return the input
+ this.p = p;
+ }
+
+ protected static long k; // an exact expected number came as input parameter
+ public UniformRandomSample(long k, long total_size) {
+ // on k = input.size, return the input, no further calcs
+ this.k = k;
+ this.p = (double) k / total_size;
+ }
+
+ @Override
+ public String getInitial() {
+ return Initial.class.getName();
+ }
+
+ @Override
+ public String getIntermed() {
+ return Intermed.class.getName();
+ }
+
+ @Override
+ public String getFinal() {
+ return Final.class.getName();
+ }
+
+ @Override
+ public Schema outputSchema(Schema input) {
+ try {
+ Schema.FieldSchema inputFieldSchema = input.getField(0);
+
+ if (inputFieldSchema.type != DataType.BAG) {
+ throw new RuntimeException("Expected a BAG as input");
+ }
+
+ return new Schema(new Schema.FieldSchema(super.getSchemaName(OUTPUT_BAG_NAME_PREFIX, input),
+ inputFieldSchema.schema, DataType.BAG));
+ } catch (FrontendException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * this Initial supposed to be executed in the mapper and each it's thread is
+ * independent on others, for cases when k % number_of_threads =/= 0 we will
+ * get more items in the final than has been requested
+ *
+ */
+
+ static public class Initial extends EvalFunc {
+
+ private static RandomDataImpl _RNG = new RandomDataImpl();
+ private static int nextInt(int n) {
+ return _RNG.nextInt(0, n);
+ }
+
+ @Override
+ public Tuple exec(Tuple input) throws IOException {
+ int numArgs = input.size();
+ int k_local;
+
+ if (numArgs != 2) {
+ throw new IllegalArgumentException("The input tuple should have two fields: "
+ + "a bag of items and the sampling.");
+ }
+
+ DataBag items = (DataBag) input.get(0);
+ // the set should not exceed int, if initial set is bigger than max_int,
+ // split into sub-sets
+ if (items.size() > 2147483647){
+ throw new IndexOutOfBoundsException("bag size is above int maximum");
+ }
+ int numItems = (int) items.size();
+ int in = (int) input.get(1);
+ if (in > 1){
+ // use global p, it doesn't change anyway
+ k_local = (int) Math.ceil(p * numItems);
+ } else {
+ k_local = (int) Math.ceil(in * numItems);
+ }
+
+ DataBag selected = _BAG_FACTORY.newDefaultBag();
+
+ int x;
+ SortedSet nums = new TreeSet();
+ while (nums.size() < k_local){
+ x = nextInt(numItems);
+ nums.add(x);
+ }
+ int i=0;
+ int j;
+ Iterator it = nums.iterator();
+ Iterator it2 = items.iterator();
+
+ while (nums.size() > 0){
+ for ( j=i; j
+ * number of processed items in this tuple (int),
+ * a bag of selected items (bag).
+ */
+ Tuple output = _TUPLE_FACTORY.newTuple();
+
+ //output.append(k_local);
+ output.append(numItems);
+ output.append(selected);
+
+ return output;
+ }
+ }
+
+ // this intermediate is just for consistency
+ static public class Intermed extends EvalFunc {
+ @Override
+ public Tuple exec(Tuple input) throws IOException {
+ return input;
+ }
+ }
+
+ // this final should be executed as reducer
+ // merges all selected bags into the output
+ // remove excess in case more than requested items
+ // were selected
+ static public class Final extends EvalFunc {
+
+ @Override
+ public DataBag exec(Tuple input) throws IOException {
+ DataBag bag = (DataBag) input.get(0);
+
+ long n_total = 0L; // number of overall items
+
+ DataBag selected = _BAG_FACTORY.newDefaultBag();
+
+ Iterator it = bag.iterator();
+ Tuple tuple = it.next();
+ while(it.hasNext()) {
+ n_total += (long) tuple.get(0);
+ selected.addAll((DataBag) tuple.get(1));
+ tuple = it.next();
+ }
+ n_total += (long) tuple.get(0);
+ DataBag lastBag = (DataBag) tuple.get(1);
+
+ // k || p come from the parent
+ long s; // final requested sample size
+ if (p > 0){
+ s = (long) Math.ceil(p * n_total);
+ } else {
+ s = k;
+ }
+
+ it = lastBag.iterator();
+ while(it.hasNext() && selected.size() < s ) {
+ selected.add(it.next());
+ }
+
+ return selected;
+ }
+ }
+
+}
diff --git a/datafu-pig/src/test/java/datafu/test/pig/sampling/UniformRandomSampleTest.java b/datafu-pig/src/test/java/datafu/test/pig/sampling/UniformRandomSampleTest.java
new file mode 100644
index 00000000..e2d08828
--- /dev/null
+++ b/datafu-pig/src/test/java/datafu/test/pig/sampling/UniformRandomSampleTest.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.test.pig.sampling;
+
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.pig.pigunit.PigTest;
+import org.testng.annotations.Test;
+
+import datafu.pig.sampling.SimpleRandomSample;
+import datafu.test.pig.PigTests;
+
+/**
+ * Tests for {@link UniformRandomSample}.
+ *
+ */
+public class UniformRandomSampleTest extends PigTests {
+ /**
+ *
+ *
+ * DEFINE URS datafu.pig.sampling.UniformRandomSample();
+ *
+ * data = LOAD 'input' AS (A_id:chararray, B_id:chararray, C:int);
+ *
+ * sampled = FOREACH (GROUP data ALL) GENERATE URS(data, $p) as sample_data;
+ *
+ * sampled = FOREACH sampled GENERATE COUNT(sample_data) AS sample_count;
+ *
+ * STORE sampled INTO 'output';
+ */
+ @Multiline
+ private String uniformRandomSampleFractionTest;
+
+ /**
+ *
+ *
+ * DEFINE URS datafu.pig.sampling.UniformRandomSample();
+ *
+ * data = LOAD 'input' AS (A_id:chararray, B_id:chararray, C:int);
+ *
+ * sampled = FOREACH (GROUP data ALL) GENERATE URS(data, $k, $n) as sample_data;
+ *
+ * sampled = FOREACH sampled GENERATE COUNT(sample_data) AS sample_count;
+ *
+ * STORE sampled INTO 'output';
+ */
+ @Multiline
+ private String uniformRandomSampleIntTest;
+
+ /**
+ * DEFINE URS datafu.pig.sampling.UniformRandomSample();
+ *
+ * data = LOAD 'input' AS (A_id:chararray, B_id:chararray, C:int);
+ *
+ * sampled = FOREACH (GROUP data ALL) GENERATE URS(data, $p) as sample_1, URS(data, $k, $n)
+ * AS sample_2;
+ *
+ * sampled = FOREACH sampled GENERATE COUNT(sample_1) AS sample_count_1, COUNT(sample_2)
+ * AS sample_count_2;
+ *
+ * STORE sampled INTO 'output';
+ */
+ @Multiline
+ private String uniformRandomSampleWithTwoCallsTest;
+
+ @Test
+ public void testUniformRandomSample() throws Exception {
+ writeLinesToFile("input",
+ "A1\tB1\t1",
+ "A1\tB1\t4",
+ "A1\tB3\t4",
+ "A1\tB4\t4",
+ "A2\tB1\t4",
+ "A2\tB2\t4",
+ "A3\tB1\t3",
+ "A3\tB1\t1",
+ "A3\tB3\t77",
+ "A4\tB1\t3",
+ "A4\tB2\t3",
+ "A4\tB3\t59",
+ "A4\tB4\t29",
+ "A5\tB1\t4",
+ "A6\tB2\t3",
+ "A6\tB2\t55",
+ "A6\tB3\t1",
+ "A7\tB1\t39",
+ "A7\tB2\t27",
+ "A7\tB3\t85",
+ "A8\tB1\t4",
+ "A8\tB2\t45",
+ "A9\tB3\t92",
+ "A9\tB3\t0",
+ "A9\tB6\t42",
+ "A9\tB5\t1",
+ "A10\tB1\t7",
+ "A10\tB2\t23",
+ "A10\tB2\t1",
+ "A10\tB2\t31",
+ "A10\tB6\t41",
+ "A10\tB7\t52");
+
+ int n = 32;
+ double p = 0.3;
+ int s = (int) Math.ceil(p * n);
+
+ PigTest test = createPigTestFromString(uniformRandomSampleFractionTest, "p=" + p);
+ test.runScript();
+ assertOutput(test, "sampled", "(" + s + ")");
+
+ int k = 10;
+ PigTest test2 =
+ createPigTestFromString(uniformRandomSampleIntTest, "k=" + k, "n=" + n);
+ test2.runScript();
+ assertOutput(test2, "sampled", "(" + k + ")");
+
+ double p = 0.05;
+ double k = 15;
+ int s = (int) Math.ceil(p * n);
+
+ PigTest testWithTwoCalls =
+ createPigTestFromString(uniformRandomSampleWithTwoCallsTest, "p=" + p, "k=" + k, "n=" + n);
+ testWithTwoCalls.runScript();
+ assertOutput(testWithTwoCalls, "sampled", "(" + s + "," + k + ")");
+
+ }
+
+ /**
+ * DEFINE URS datafu.pig.sampling.UniformRandomSample();
+ *
+ * data = LOAD 'input' AS (A_id:chararray, B_id:chararray, C:int);
+ *
+ * sampled = FOREACH (GROUP data BY A_id) GENERATE group, URS(data,
+ * $SAMPLING_FRACTION) as sample_data;
+ *
+ * sampled = FOREACH sampled GENERATE group, COUNT(sample_data) AS sample_count;
+ *
+ * sampled = ORDER sampled BY group;
+ *
+ * STORE sampled INTO 'output';
+ */
+ @Multiline
+ private String stratifiedSampleTest;
+
+ @Test
+ public void testStratifiedSample() throws Exception {
+ writeLinesToFile("input",
+ "A1\tB1\t1",
+ "A1\tB1\t4",
+ "A1\tB3\t4",
+ "A1\tB4\t4",
+ "A2\tB1\t4",
+ "A2\tB2\t4",
+ "A3\tB1\t3",
+ "A3\tB1\t1",
+ "A3\tB3\t77",
+ "A4\tB1\t3",
+ "A4\tB2\t3",
+ "A4\tB3\t59",
+ "A4\tB4\t29",
+ "A5\tB1\t4",
+ "A6\tB2\t3",
+ "A6\tB2\t55",
+ "A6\tB3\t1",
+ "A7\tB1\t39",
+ "A7\tB2\t27",
+ "A7\tB3\t85",
+ "A8\tB1\t4",
+ "A8\tB2\t45",
+ "A9\tB3\t92",
+ "A9\tB3\t0",
+ "A9\tB6\t42",
+ "A9\tB5\t1",
+ "A10\tB1\t7",
+ "A10\tB2\t23",
+ "A10\tB2\t1",
+ "A10\tB2\t31",
+ "A10\tB6\t41",
+ "A10\tB7\t52");
+
+ double p = 0.5;
+
+ PigTest test =
+ createPigTestFromString(stratifiedSampleTest, "SAMPLING_FRACTION=" + p);
+ test.runScript();
+ assertOutput(test,
+ "sampled",
+ "(A1,2)",
+ "(A10,3)",
+ "(A2,1)",
+ "(A3,2)",
+ "(A4,2)",
+ "(A5,1)",
+ "(A6,2)",
+ "(A7,2)",
+ "(A8,1)",
+ "(A9,2)");
+ }
+}
From 5d170699d31eb500e6788a9cac676868ca08deef Mon Sep 17 00:00:00 2001
From: Olga Kirillova
Date: Wed, 29 Nov 2017 19:30:07 -0800
Subject: [PATCH 2/5] small fixes
---
.../pig/sampling/UniformRandomSample.java | 19 +++++++++++++++++++
.../pig/sampling/UniformRandomSampleTest.java | 6 +++---
2 files changed, 22 insertions(+), 3 deletions(-)
diff --git a/datafu-pig/src/main/java/datafu/pig/sampling/UniformRandomSample.java b/datafu-pig/src/main/java/datafu/pig/sampling/UniformRandomSample.java
index a31f378c..1592de1c 100644
--- a/datafu-pig/src/main/java/datafu/pig/sampling/UniformRandomSample.java
+++ b/datafu-pig/src/main/java/datafu/pig/sampling/UniformRandomSample.java
@@ -67,7 +67,19 @@ public class UniformRandomSample extends AlgebraicEvalFunc {
private static final TupleFactory _TUPLE_FACTORY = TupleFactory.getInstance();
private static final BagFactory _BAG_FACTORY = BagFactory.getInstance();
+ // required for tests
+ // when absent, tests fail with InstantiationException
+ // by logic there should be either p or k and n parameters
+ public UniformRandomSample() {}
+
protected static double p = 0d; // a fraction of the set came as input parameter
+ public UniformRandomSample(String ps) {
+ this.p = Double.parseDouble(ps);
+ if (this.p > 1) {
+ throw new RuntimeException("p should not exceed 1.0");
+ }
+ }
+
public UniformRandomSample(double p) {
// not sure how to implement the following:
// on p = 0, no further calcs are needed, return empty set
@@ -122,6 +134,8 @@ public Schema outputSchema(Schema input) {
static public class Initial extends EvalFunc {
+ public Initial(){}
+
private static RandomDataImpl _RNG = new RandomDataImpl();
private static int nextInt(int n) {
return _RNG.nextInt(0, n);
@@ -191,6 +205,9 @@ public Tuple exec(Tuple input) throws IOException {
// this intermediate is just for consistency
static public class Intermed extends EvalFunc {
+
+ public Intermed() {}
+
@Override
public Tuple exec(Tuple input) throws IOException {
return input;
@@ -203,6 +220,8 @@ public Tuple exec(Tuple input) throws IOException {
// were selected
static public class Final extends EvalFunc {
+ public Final(){}
+
@Override
public DataBag exec(Tuple input) throws IOException {
DataBag bag = (DataBag) input.get(0);
diff --git a/datafu-pig/src/test/java/datafu/test/pig/sampling/UniformRandomSampleTest.java b/datafu-pig/src/test/java/datafu/test/pig/sampling/UniformRandomSampleTest.java
index e2d08828..a23e7c41 100644
--- a/datafu-pig/src/test/java/datafu/test/pig/sampling/UniformRandomSampleTest.java
+++ b/datafu-pig/src/test/java/datafu/test/pig/sampling/UniformRandomSampleTest.java
@@ -129,9 +129,9 @@ public void testUniformRandomSample() throws Exception {
test2.runScript();
assertOutput(test2, "sampled", "(" + k + ")");
- double p = 0.05;
- double k = 15;
- int s = (int) Math.ceil(p * n);
+ p = 0.05;
+ k = 15;
+ s = (int) Math.ceil(p * n);
PigTest testWithTwoCalls =
createPigTestFromString(uniformRandomSampleWithTwoCallsTest, "p=" + p, "k=" + k, "n=" + n);
From 1f921d562971cacac3bd900cf6ef5c33ca82ef0a Mon Sep 17 00:00:00 2001
From: Olga Kirillova
Date: Sat, 16 Dec 2017 16:19:20 -0800
Subject: [PATCH 3/5] DATAFU-XX: Uniform Random Sampling able to handle both
fractianal and int/long sample requests
---
.../pig/sampling/UniformRandomSample.java | 201 +++++++++++-------
.../pig/sampling/UniformRandomSampleTest.java | 82 +++----
2 files changed, 169 insertions(+), 114 deletions(-)
diff --git a/datafu-pig/src/main/java/datafu/pig/sampling/UniformRandomSample.java b/datafu-pig/src/main/java/datafu/pig/sampling/UniformRandomSample.java
index 1592de1c..8ddde3ab 100644
--- a/datafu-pig/src/main/java/datafu/pig/sampling/UniformRandomSample.java
+++ b/datafu-pig/src/main/java/datafu/pig/sampling/UniformRandomSample.java
@@ -50,10 +50,10 @@
*
*
*
- * DEFINE URS datafu.pig.sampling.URandomSample();
+ * DEFINE URS datafu.pig.sampling.UniformRandomSample('p | k & n');
*
- * item = LOAD 'input' AS (x:double);
- * sampled = FOREACH (GROUP items ALL) GENERATE FLATTEN(URS(items, p||k));
+ * item = LOAD 'input' AS (x:);
+ * sampled = FOREACH (GROUP items ALL) GENERATE FLATTEN(URS(items));
*
*
*/
@@ -67,31 +67,14 @@ public class UniformRandomSample extends AlgebraicEvalFunc {
private static final TupleFactory _TUPLE_FACTORY = TupleFactory.getInstance();
private static final BagFactory _BAG_FACTORY = BagFactory.getInstance();
- // required for tests
- // when absent, tests fail with InstantiationException
- // by logic there should be either p or k and n parameters
- public UniformRandomSample() {}
-
- protected static double p = 0d; // a fraction of the set came as input parameter
public UniformRandomSample(String ps) {
- this.p = Double.parseDouble(ps);
- if (this.p > 1) {
- throw new RuntimeException("p should not exceed 1.0");
- }
- }
-
- public UniformRandomSample(double p) {
- // not sure how to implement the following:
- // on p = 0, no further calcs are needed, return empty set
- // on p = 1, return the input
- this.p = p;
- }
-
- protected static long k; // an exact expected number came as input parameter
- public UniformRandomSample(long k, long total_size) {
- // on k = input.size, return the input, no further calcs
- this.k = k;
- this.p = (double) k / total_size;
+ super(ps);
+ // all parameters calculations should be done in nested classes constructors
+ // to allow multiple instantiations through calls like:
+ // DEFINE URS1 datafu.pig.sampling.UniformRandomSample('$p');
+ // DEFINE URS2 datafu.pig.sampling.UniformRandomSample('$k, $n');
+ // data = LOAD 'input' AS (A_id:chararray, B_id:chararray, C:int);
+ // sampled = FOREACH (GROUP data ALL) GENERATE URS1(data) as sample_1, URS2(data) AS sample_2;
}
@Override
@@ -117,7 +100,6 @@ public Schema outputSchema(Schema input) {
if (inputFieldSchema.type != DataType.BAG) {
throw new RuntimeException("Expected a BAG as input");
}
-
return new Schema(new Schema.FieldSchema(super.getSchemaName(OUTPUT_BAG_NAME_PREFIX, input),
inputFieldSchema.schema, DataType.BAG));
} catch (FrontendException e) {
@@ -125,10 +107,11 @@ public Schema outputSchema(Schema input) {
}
}
- /**
- * this Initial supposed to be executed in the mapper and each it's thread is
- * independent on others, for cases when k % number_of_threads =/= 0 we will
- * get more items in the final than has been requested
+ /*
+ * this Initial vectorizes input data, making 1 cal per element,
+ * for sampling 1 element processing doesn't make much sense,
+ * therefore, lets skip initial, just passing data through it
+ * and make actual calc in Intermed
*
*/
@@ -136,6 +119,47 @@ static public class Initial extends EvalFunc {
public Initial(){}
+ public Initial(String pi){}
+
+ @Override
+ public Tuple exec(Tuple input) throws IOException {
+ return input;
+ }
+ }
+
+ /*
+ * this intermediate is where a subset selection is done
+ */
+ static public class Intermed extends EvalFunc {
+
+ public Intermed() {}
+
+ private double p;
+ private long k = 0L;
+
+ public Intermed(String pi){
+ if (pi.contains(",")){
+ try {
+ String[] ss = pi.split(",");
+ k = Long.parseLong(ss[0].trim(), 10);
+ long ts = Long.parseLong(ss[1].trim(), 10);
+ p = (double) k / ts;
+ } catch (NumberFormatException e) {
+ throw new RuntimeException("k and n should be numbers, got NumberFormatException:"+pi);
+ }
+ } else {
+ try {
+ p = Double.parseDouble(pi);
+ } catch (NumberFormatException e) {
+ throw new RuntimeException("p should be a number, got NumberFormatException:"+pi);
+ }
+ }
+ if (p > 1.0d) {
+ throw new RuntimeException("p should not exceed 1");
+ }
+
+ }
+
private static RandomDataImpl _RNG = new RandomDataImpl();
private static int nextInt(int n) {
return _RNG.nextInt(0, n);
@@ -143,77 +167,81 @@ private static int nextInt(int n) {
@Override
public Tuple exec(Tuple input) throws IOException {
- int numArgs = input.size();
- int k_local;
+ DataBag items = (DataBag) input.get(0);
- if (numArgs != 2) {
- throw new IllegalArgumentException("The input tuple should have two fields: "
- + "a bag of items and the sampling.");
- }
+ DataBag selected = _BAG_FACTORY.newDefaultBag();
+ Tuple tu;
+ Tuple output = _TUPLE_FACTORY.newTuple();
+
+ if (items.size() == 0 || p == 0) {
+ return _TUPLE_FACTORY.newTuple();
+ } else if (items.size() == 1) {
+ tu = items.iterator().next();
+ selected.add(tu);
+ output.append(1);
+ output.append(selected);
+
+ return output;
+ } else if ( p == 1.0d){
+ selected.addAll(items);
+ output.append(items.size());
+ output.append(selected);
+
+ return output;
+ }
- DataBag items = (DataBag) input.get(0);
// the set should not exceed int, if initial set is bigger than max_int,
// split into sub-sets
- if (items.size() > 2147483647){
+ if (items.size() > Integer.MAX_VALUE){
throw new IndexOutOfBoundsException("bag size is above int maximum");
}
int numItems = (int) items.size();
- int in = (int) input.get(1);
- if (in > 1){
- // use global p, it doesn't change anyway
- k_local = (int) Math.ceil(p * numItems);
- } else {
- k_local = (int) Math.ceil(in * numItems);
- }
- DataBag selected = _BAG_FACTORY.newDefaultBag();
+ int k_local = (int) Math.ceil(p * numItems);
+ if (k_local == 0) return _TUPLE_FACTORY.newTuple();
int x;
SortedSet nums = new TreeSet();
+ numItems--;
while (nums.size() < k_local){
x = nextInt(numItems);
nums.add(x);
}
+
int i=0;
int j;
Iterator it = nums.iterator();
Iterator it2 = items.iterator();
-
- while (nums.size() > 0){
- for ( j=i; j
* number of processed items in this tuple (int),
* a bag of selected items (bag).
*/
- Tuple output = _TUPLE_FACTORY.newTuple();
- //output.append(k_local);
- output.append(numItems);
+ output.append(numItems+1);
output.append(selected);
return output;
}
}
- // this intermediate is just for consistency
- static public class Intermed extends EvalFunc {
-
- public Intermed() {}
-
- @Override
- public Tuple exec(Tuple input) throws IOException {
- return input;
- }
- }
-
// this final should be executed as reducer
// merges all selected bags into the output
// remove excess in case more than requested items
@@ -222,27 +250,54 @@ static public class Final extends EvalFunc {
public Final(){}
+ private double p;
+ private long k = 0L;
+
+ public Final(String pi){
+
+ if (pi.contains(",")){
+ try {
+ String[] ss = pi.split(",");
+ k = Long.parseLong(ss[0].trim(), 10);
+ long ts = Long.parseLong(ss[1].trim(), 10);
+ p = (double) k / ts;
+ } catch (NumberFormatException e) {
+ throw new RuntimeException("k and n should be numbers, got NumberFormatException:"+pi);
+ }
+ } else {
+ try {
+ p = Double.parseDouble(pi);
+ } catch (NumberFormatException e) {
+ throw new RuntimeException("p should be a number, got NumberFormatException:"+pi);
+ }
+ }
+ if (p > 1.0d) {
+ throw new RuntimeException("p should not exceed 1");
+ }
+ }
+
@Override
public DataBag exec(Tuple input) throws IOException {
DataBag bag = (DataBag) input.get(0);
- long n_total = 0L; // number of overall items
+ if (bag.size() == 0) { return _BAG_FACTORY.newDefaultBag(); }
+
+ long n_total = 0L;
DataBag selected = _BAG_FACTORY.newDefaultBag();
Iterator it = bag.iterator();
Tuple tuple = it.next();
while(it.hasNext()) {
- n_total += (long) tuple.get(0);
+ n_total += ((Number) tuple.get(0)).longValue();
selected.addAll((DataBag) tuple.get(1));
tuple = it.next();
}
- n_total += (long) tuple.get(0);
+ n_total += ((Number) tuple.get(0)).longValue();
DataBag lastBag = (DataBag) tuple.get(1);
- // k || p come from the parent
long s; // final requested sample size
- if (p > 0){
+ if (k == 0){
s = (long) Math.ceil(p * n_total);
} else {
s = k;
diff --git a/datafu-pig/src/test/java/datafu/test/pig/sampling/UniformRandomSampleTest.java b/datafu-pig/src/test/java/datafu/test/pig/sampling/UniformRandomSampleTest.java
index a23e7c41..51e82b05 100644
--- a/datafu-pig/src/test/java/datafu/test/pig/sampling/UniformRandomSampleTest.java
+++ b/datafu-pig/src/test/java/datafu/test/pig/sampling/UniformRandomSampleTest.java
@@ -23,7 +23,7 @@
import org.apache.pig.pigunit.PigTest;
import org.testng.annotations.Test;
-import datafu.pig.sampling.SimpleRandomSample;
+import datafu.pig.sampling.UniformRandomSample;
import datafu.test.pig.PigTests;
/**
@@ -34,11 +34,11 @@ public class UniformRandomSampleTest extends PigTests {
/**
*
*
- * DEFINE URS datafu.pig.sampling.UniformRandomSample();
+ * DEFINE URS datafu.pig.sampling.UniformRandomSample('$p');
*
* data = LOAD 'input' AS (A_id:chararray, B_id:chararray, C:int);
*
- * sampled = FOREACH (GROUP data ALL) GENERATE URS(data, $p) as sample_data;
+ * sampled = FOREACH (GROUP data ALL) GENERATE URS(data) as sample_data;
*
* sampled = FOREACH sampled GENERATE COUNT(sample_data) AS sample_count;
*
@@ -50,11 +50,11 @@ public class UniformRandomSampleTest extends PigTests {
/**
*
*
- * DEFINE URS datafu.pig.sampling.UniformRandomSample();
+ * DEFINE URS datafu.pig.sampling.UniformRandomSample('$k, $n');
*
* data = LOAD 'input' AS (A_id:chararray, B_id:chararray, C:int);
*
- * sampled = FOREACH (GROUP data ALL) GENERATE URS(data, $k, $n) as sample_data;
+ * sampled = FOREACH (GROUP data ALL) GENERATE URS(data) as sample_data;
*
* sampled = FOREACH sampled GENERATE COUNT(sample_data) AS sample_count;
*
@@ -64,11 +64,13 @@ public class UniformRandomSampleTest extends PigTests {
private String uniformRandomSampleIntTest;
/**
- * DEFINE URS datafu.pig.sampling.UniformRandomSample();
+ * DEFINE URS1 datafu.pig.sampling.UniformRandomSample('$p');
+ *
+ * DEFINE URS2 datafu.pig.sampling.UniformRandomSample('$k, $n');
*
* data = LOAD 'input' AS (A_id:chararray, B_id:chararray, C:int);
*
- * sampled = FOREACH (GROUP data ALL) GENERATE URS(data, $p) as sample_1, URS(data, $k, $n)
+ * sampled = FOREACH (GROUP data ALL) GENERATE URS1(data) as sample_1, URS2(data)
* AS sample_2;
*
* sampled = FOREACH sampled GENERATE COUNT(sample_1) AS sample_count_1, COUNT(sample_2)
@@ -83,42 +85,41 @@ public class UniformRandomSampleTest extends PigTests {
public void testUniformRandomSample() throws Exception {
writeLinesToFile("input",
"A1\tB1\t1",
- "A1\tB1\t4",
- "A1\tB3\t4",
- "A1\tB4\t4",
"A2\tB1\t4",
- "A2\tB2\t4",
- "A3\tB1\t3",
- "A3\tB1\t1",
- "A3\tB3\t77",
- "A4\tB1\t3",
- "A4\tB2\t3",
- "A4\tB3\t59",
- "A4\tB4\t29",
+ "A3\tB3\t4",
+ "A4\tB4\t4",
"A5\tB1\t4",
- "A6\tB2\t3",
- "A6\tB2\t55",
- "A6\tB3\t1",
- "A7\tB1\t39",
- "A7\tB2\t27",
- "A7\tB3\t85",
- "A8\tB1\t4",
- "A8\tB2\t45",
- "A9\tB3\t92",
- "A9\tB3\t0",
- "A9\tB6\t42",
- "A9\tB5\t1",
- "A10\tB1\t7",
- "A10\tB2\t23",
- "A10\tB2\t1",
- "A10\tB2\t31",
- "A10\tB6\t41",
- "A10\tB7\t52");
+ "A6\tB2\t4",
+ "A7\tB1\t3",
+ "A8\tB1\t1",
+ "A9\tB3\t77",
+ "A10\tB1\t3",
+ "A11\tB2\t3",
+ "A12\tB3\t59",
+ "A13\tB4\t29",
+ "A14\tB1\t4",
+ "A15\tB2\t3",
+ "A16\tB2\t55",
+ "A17\tB3\t1",
+ "A18\tB1\t39",
+ "A19\tB2\t27",
+ "A20\tB3\t85",
+ "A21\tB1\t4",
+ "A22\tB2\t45",
+ "A23\tB3\t92",
+ "A24\tB3\t0",
+ "A25\tB6\t42",
+ "A26\tB5\t1",
+ "A27\tB1\t7",
+ "A28\tB2\t23",
+ "A29\tB2\t1",
+ "A30\tB2\t31",
+ "A31\tB6\t41",
+ "A32\tB7\t52");
int n = 32;
double p = 0.3;
int s = (int) Math.ceil(p * n);
-
PigTest test = createPigTestFromString(uniformRandomSampleFractionTest, "p=" + p);
test.runScript();
assertOutput(test, "sampled", "(" + s + ")");
@@ -137,16 +138,14 @@ public void testUniformRandomSample() throws Exception {
createPigTestFromString(uniformRandomSampleWithTwoCallsTest, "p=" + p, "k=" + k, "n=" + n);
testWithTwoCalls.runScript();
assertOutput(testWithTwoCalls, "sampled", "(" + s + "," + k + ")");
-
}
/**
- * DEFINE URS datafu.pig.sampling.UniformRandomSample();
+ * DEFINE URS datafu.pig.sampling.UniformRandomSample('$SAMPLING_FRACTION');
*
* data = LOAD 'input' AS (A_id:chararray, B_id:chararray, C:int);
*
- * sampled = FOREACH (GROUP data BY A_id) GENERATE group, URS(data,
- * $SAMPLING_FRACTION) as sample_data;
+ * sampled = FOREACH (GROUP data BY A_id) GENERATE group, URS(data) as sample_data;
*
* sampled = FOREACH sampled GENERATE group, COUNT(sample_data) AS sample_count;
*
@@ -211,4 +210,5 @@ public void testStratifiedSample() throws Exception {
"(A8,1)",
"(A9,2)");
}
+
}
From 8476c0125d40dfeb72ab2b5fab5210c5ef387ba3 Mon Sep 17 00:00:00 2001
From: Olga Kirillova
Date: Thu, 18 Jan 2018 10:22:44 -0800
Subject: [PATCH 4/5] redesigned with Initial use
---
.../pig/sampling/UniformRandomSample.java | 276 ++++++++++++------
1 file changed, 181 insertions(+), 95 deletions(-)
diff --git a/datafu-pig/src/main/java/datafu/pig/sampling/UniformRandomSample.java b/datafu-pig/src/main/java/datafu/pig/sampling/UniformRandomSample.java
index 8ddde3ab..eb5be5ee 100644
--- a/datafu-pig/src/main/java/datafu/pig/sampling/UniformRandomSample.java
+++ b/datafu-pig/src/main/java/datafu/pig/sampling/UniformRandomSample.java
@@ -22,6 +22,8 @@
import java.io.IOException;
import java.util.Comparator;
import java.util.Iterator;
+import java.util.SortedSet;
+import java.util.TreeSet;
import org.apache.commons.math.random.RandomDataImpl;
import org.apache.pig.AlgebraicEvalFunc;
@@ -34,8 +36,6 @@
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.schema.Schema;
-import java.util.SortedSet;
-import java.util.TreeSet;
/**
* Scalable uniform random sampling.
*
@@ -107,61 +107,52 @@ public Schema outputSchema(Schema input) {
}
}
- /*
- * this Initial vectorizes input data, making 1 cal per element,
- * for sampling 1 element processing doesn't make much sense,
- * therefore, lets skip initial, just passing data through it
- * and make actual calc in Intermed
- *
- */
-
- static public class Initial extends EvalFunc {
-
- public Initial(){}
-
- public Initial(String pi){}
-
- @Override
- public Tuple exec(Tuple input) throws IOException {
- return input;
- }
- }
-
- /*
- * this intermediate is where a subset selection is done
- */
- static public class Intermed extends EvalFunc {
-
- public Intermed() {}
-
- private double p;
- private long k = 0L;
-
- public Intermed(String pi){
- if (pi.contains(",")){
+ protected static Tuple getPNK(String pi){
+ Tuple pnk = _TUPLE_FACTORY.newTuple();
+ long n, k;
+ double p;
+ String[] ss = pi.split(",");
+ if (pi.startsWith("0.") || pi.startsWith(".")){
+ try {
+ p = Double.parseDouble(ss[0].trim());
+ pnk.append(p);
+ } catch (NumberFormatException e) {
+ throw new RuntimeException("p should be a number, got NumberFormatException:"+pi);
+ }
+ } else {
+ if (ss.length == 2){
try {
- String[] ss = pi.split(",");
k = Long.parseLong(ss[0].trim(), 10);
- long ts = Long.parseLong(ss[1].trim(), 10);
- p = (double) k / ts;
+ n = Long.parseLong(ss[1].trim(), 10);
+ p = (double) k / n;
+ pnk.append(p);
+ pnk.append(k);
} catch (NumberFormatException e) {
throw new RuntimeException("k and n should be numbers, got NumberFormatException:"+pi);
}
} else {
- try {
- p = Double.parseDouble(pi);
- } catch (NumberFormatException e) {
- throw new RuntimeException("p should be a number, got NumberFormatException:"+pi);
- }
- }
- if (p > 1.0d) {
- throw new RuntimeException("p should not exceed 1");
+ throw new RuntimeException("2 parameters are required k and n, got:"+pi);
}
+ }
+ return pnk;
+ }
+
+ /**
+ * 1st mapped data processing step, can't be skipped
+ *
+ */
+ static public class Initial extends EvalFunc {
+ public Initial() {}
+
+ private Tuple pnk;
+
+ public Initial(String pi){
+ pnk = getPNK(pi);
}
private static RandomDataImpl _RNG = new RandomDataImpl();
- private static int nextInt(int n) {
+ synchronized private static int nextInt(int n) {
return _RNG.nextInt(0, n);
}
@@ -170,25 +161,22 @@ public Tuple exec(Tuple input) throws IOException {
DataBag items = (DataBag) input.get(0);
DataBag selected = _BAG_FACTORY.newDefaultBag();
+ Tuple extra = _TUPLE_FACTORY.newTuple();
Tuple tu;
Tuple output = _TUPLE_FACTORY.newTuple();
+ Double p = (Double) pnk.get(0);
- if (items.size() == 0 || p == 0) {
+ if (items.size() == 0 || p == 0d) {
return _TUPLE_FACTORY.newTuple();
} else if (items.size() == 1) {
tu = items.iterator().next();
- selected.add(tu);
+ extra = tu;
output.append(1);
output.append(selected);
+ output.append(extra);
return output;
- } else if ( p == 1.0d){
- selected.addAll(items);
- output.append(items.size());
- output.append(selected);
-
- return output;
- }
+ }
// the set should not exceed int, if initial set is bigger than max_int,
// split into sub-sets
@@ -197,13 +185,32 @@ public Tuple exec(Tuple input) throws IOException {
}
int numItems = (int) items.size();
- int k_local = (int) Math.ceil(p * numItems);
- if (k_local == 0) return _TUPLE_FACTORY.newTuple();
+ int k_down = (int) Math.floor(p * numItems);
+ int k_up = (int) Math.ceil(p * numItems);
+ if (k_up == 0) return _TUPLE_FACTORY.newTuple();
int x;
- SortedSet nums = new TreeSet();
numItems--;
- while (nums.size() < k_local){
+ SortedSet nums = new TreeSet();
+ int kk = k_up;
+
+ // if we need to return more than a half of input elements
+ // insteaed of addition it make sense to make exclusion
+ // I mean
+ // p <= 0.5
+ // add selected randomly elements to output set
+ // p > 0.5
+ // add all elements except randomly selected
+ // for exclusion if we need an extra element
+ // take the 1st, since at the end eventually no
+ // elements to take from have left
+
+ if (p <= 0.5){
+ k_down = numItems - k_down + 1;
+ k_up = numItems - k_up + 1;
+ kk = k_down;
+ }
+ while (nums.size() < kk){
x = nextInt(numItems);
nums.add(x);
}
@@ -214,34 +221,132 @@ public Tuple exec(Tuple input) throws IOException {
Iterator it2 = items.iterator();
tu = it2.next();
int ii = it.next();
+
+ if (p > 0.5){
+ while(i == ii){
+ i++;
+ ii = it.next();
+ tu = it2.next();
+ }
+ // add the 1st valid element
+ if (k_down == k_up){
+ selected.add(tu);
+ } else {
+ extra=tu;
+ }
+ tu = it2.next();
+ i++;
+ }
+
while (it.hasNext()){
- for ( j=i; j
* number of processed items in this tuple (int),
- * a bag of selected items (bag).
+ * a bag of selected items (bag),
+ * an extra tuple from the original set for cases
+ * when the exact number/fraction can't be obtained
+ * (tuple).
*/
output.append(numItems+1);
output.append(selected);
+ output.append(extra);
return output;
}
}
+ /**
+ * this intermediate is where a subset selection is done
+ */
+ static public class Intermed extends EvalFunc {
+
+ public Intermed(){}
+
+ private Tuple pnk;
+
+ public Intermed(String pi){
+ pnk = getPNK(pi);
+ }
+
+ @Override
+ public Tuple exec(Tuple input) throws IOException {
+ DataBag bag = (DataBag) input.get(0);
+ DataBag selected = _BAG_FACTORY.newDefaultBag();
+ DataBag extra = _BAG_FACTORY.newDefaultBag();
+ long numItems = 0L;
+ long gotItems = 0L;
+ long required;
+
+ for (Tuple tuple : bag){
+ numItems += ((Number) tuple.get(0)).longValue();
+ gotItems += ((Number)((DataBag) tuple.get(1)).size()).longValue();
+ selected.addAll((DataBag) tuple.get(1));
+ extra.add((Tuple) tuple.get(2));
+ }
+
+ Double p = (Double) pnk.get(0);
+ long required_up = (long) Math.ceil(p * numItems);
+
+ if (extra.size() > 0) {
+ Iterator it = extra.iterator();
+ Tuple tu = it.next();
+ if (extra.size() == 1 && gotItems < required_up) {
+ selected.add(tu);
+ } else {
+ while (gotItems < required_up && it.hasNext()){
+ selected.add(tu);
+ gotItems++;
+ tu = it.next();
+ }
+ }
+ }
+
+ Tuple output = _TUPLE_FACTORY.newTuple();
+ output.append(numItems);
+ output.append(selected);
+ return output;
+ }
+ }
+
+
// this final should be executed as reducer
// merges all selected bags into the output
// remove excess in case more than requested items
@@ -250,30 +355,10 @@ static public class Final extends EvalFunc {
public Final(){}
- private double p;
- private long k = 0L;
+ private Tuple pnk;
public Final(String pi){
-
- if (pi.contains(",")){
- try {
- String[] ss = pi.split(",");
- k = Long.parseLong(ss[0].trim(), 10);
- long ts = Long.parseLong(ss[1].trim(), 10);
- p = (double) k / ts;
- } catch (NumberFormatException e) {
- throw new RuntimeException("k and n should be numbers, got NumberFormatException:"+pi);
- }
- } else {
- try {
- p = Double.parseDouble(pi);
- } catch (NumberFormatException e) {
- throw new RuntimeException("p should be a number, got NumberFormatException:"+pi);
- }
- }
- if (p > 1.0d) {
- throw new RuntimeException("p should not exceed 1");
- }
+ pnk = getPNK(pi);
}
@Override
@@ -297,10 +382,11 @@ public DataBag exec(Tuple input) throws IOException {
DataBag lastBag = (DataBag) tuple.get(1);
long s; // final requested sample size
- if (k == 0){
- s = (long) Math.ceil(p * n_total);
+ if (pnk.size() > 1){
+ s = ((Number) pnk.get(1)).longValue();
} else {
- s = k;
+ Double p = (Double) pnk.get(0);
+ s = (long) Math.ceil(p * n_total);
}
it = lastBag.iterator();
From 9daa836aeb7f3e82c7b9529a76b2c629cd25dc68 Mon Sep 17 00:00:00 2001
From: Olga Kirillova
Date: Sun, 21 Jan 2018 18:51:19 -0800
Subject: [PATCH 5/5] Move to lower bond selection in Intermed
---
.../pig/sampling/UniformRandomSample.java | 94 ++++++++++---------
1 file changed, 51 insertions(+), 43 deletions(-)
diff --git a/datafu-pig/src/main/java/datafu/pig/sampling/UniformRandomSample.java b/datafu-pig/src/main/java/datafu/pig/sampling/UniformRandomSample.java
index eb5be5ee..d4a0dbd3 100644
--- a/datafu-pig/src/main/java/datafu/pig/sampling/UniformRandomSample.java
+++ b/datafu-pig/src/main/java/datafu/pig/sampling/UniformRandomSample.java
@@ -1,20 +1,20 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
+ * Copyright 2018 IICOLL
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * Permission is hereby granted, free of charge, to any person obtaining a copy of this software
+ * and associated documentation files (the "Software"), to deal in the Software without
+ * restriction, including without limitation the rights to use, copy, modify, merge, publish,
+ * distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
*
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
+ * The above copyright notice and this permission notice shall be included in all copies or
+ * substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING
+ * BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
+ * DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
package datafu.pig.sampling;
@@ -133,20 +133,20 @@ protected static Tuple getPNK(String pi){
} else {
throw new RuntimeException("2 parameters are required k and n, got:"+pi);
}
- }
+ }
return pnk;
}
/**
- * 1st mapped data processing step, can't be skipped
+ * 1st mapped data processing step, can't be skipped
*
*/
static public class Initial extends EvalFunc {
public Initial() {}
- private Tuple pnk;
-
+ private Tuple pnk;
+
public Initial(String pi){
pnk = getPNK(pi);
}
@@ -176,7 +176,7 @@ public Tuple exec(Tuple input) throws IOException {
output.append(extra);
return output;
- }
+ }
// the set should not exceed int, if initial set is bigger than max_int,
// split into sub-sets
@@ -196,14 +196,14 @@ public Tuple exec(Tuple input) throws IOException {
// if we need to return more than a half of input elements
// insteaed of addition it make sense to make exclusion
- // I mean
+ // I mean
// p <= 0.5
// add selected randomly elements to output set
// p > 0.5
// add all elements except randomly selected
// for exclusion if we need an extra element
- // take the 1st, since at the end eventually no
- // elements to take from have left
+ // take the 1st, since at the end eventually no
+ // elements to take from have left
if (p <= 0.5){
k_down = numItems - k_down + 1;
@@ -280,8 +280,8 @@ public Tuple exec(Tuple input) throws IOException {
* The output tuple contains the following fields:
* number of processed items in this tuple (int),
* a bag of selected items (bag),
- * an extra tuple from the original set for cases
- * when the exact number/fraction can't be obtained
+ * an extra tuple from the original set for cases
+ * when the exact number/fraction can't be obtained
* (tuple).
*/
@@ -294,23 +294,24 @@ public Tuple exec(Tuple input) throws IOException {
}
/**
- * this intermediate is where a subset selection is done
+ * this intermediate is for initial selection join into bigger chunks
*/
static public class Intermed extends EvalFunc {
public Intermed(){}
- private Tuple pnk;
+ private Tuple pnk;
public Intermed(String pi){
- pnk = getPNK(pi);
+ pnk = getPNK(pi);
}
@Override
public Tuple exec(Tuple input) throws IOException {
- DataBag bag = (DataBag) input.get(0);
+ DataBag bag = (DataBag) input.get(0);
DataBag selected = _BAG_FACTORY.newDefaultBag();
- DataBag extra = _BAG_FACTORY.newDefaultBag();
+ DataBag in_extra = _BAG_FACTORY.newDefaultBag();
+ Tuple out_extra = _TUPLE_FACTORY.newTuple();
long numItems = 0L;
long gotItems = 0L;
long required;
@@ -319,46 +320,50 @@ public Tuple exec(Tuple input) throws IOException {
numItems += ((Number) tuple.get(0)).longValue();
gotItems += ((Number)((DataBag) tuple.get(1)).size()).longValue();
selected.addAll((DataBag) tuple.get(1));
- extra.add((Tuple) tuple.get(2));
+ in_extra.add((Tuple) tuple.get(2));
}
Double p = (Double) pnk.get(0);
long required_up = (long) Math.ceil(p * numItems);
-
- if (extra.size() > 0) {
- Iterator it = extra.iterator();
+ long required_down = (long) Math.floor(p * numItems);
+
+ if (in_extra.size() > 0) {
+ Iterator it = in_extra.iterator();
Tuple tu = it.next();
- if (extra.size() == 1 && gotItems < required_up) {
+ if (in_extra.size() == 1 && gotItems < required_down) {
selected.add(tu);
} else {
- while (gotItems < required_up && it.hasNext()){
+ while (gotItems < required_down && it.hasNext()){
selected.add(tu);
gotItems++;
tu = it.next();
}
+ if (tu != null && required_down < required_up) { out_extra = tu; }
}
}
Tuple output = _TUPLE_FACTORY.newTuple();
output.append(numItems);
output.append(selected);
+ output.append(out_extra);
return output;
}
}
- // this final should be executed as reducer
- // merges all selected bags into the output
- // remove excess in case more than requested items
- // were selected
+ /**
+ * this final should be executed as reducer
+ * merges all selected bags into the output
+ * adding extra in case more elements needed
+ */
static public class Final extends EvalFunc {
public Final(){}
- private Tuple pnk;
+ private Tuple pnk;
public Final(String pi){
- pnk = getPNK(pi);
+ pnk = getPNK(pi);
}
@Override
@@ -370,16 +375,19 @@ public DataBag exec(Tuple input) throws IOException {
long n_total = 0L;
DataBag selected = _BAG_FACTORY.newDefaultBag();
+ DataBag extra = _BAG_FACTORY.newDefaultBag();
Iterator it = bag.iterator();
Tuple tuple = it.next();
while(it.hasNext()) {
n_total += ((Number) tuple.get(0)).longValue();
selected.addAll((DataBag) tuple.get(1));
+ extra.add((Tuple) tuple.get(2));
tuple = it.next();
}
n_total += ((Number) tuple.get(0)).longValue();
- DataBag lastBag = (DataBag) tuple.get(1);
+ selected.addAll((DataBag) tuple.get(1));
+ extra.add((Tuple) tuple.get(2));
long s; // final requested sample size
if (pnk.size() > 1){
@@ -389,7 +397,7 @@ public DataBag exec(Tuple input) throws IOException {
s = (long) Math.ceil(p * n_total);
}
- it = lastBag.iterator();
+ it = extra.iterator();
while(it.hasNext() && selected.size() < s ) {
selected.add(it.next());
}