From a9e1d3726b44146bfeecd3bcec3f4ee2d1f2f4fe Mon Sep 17 00:00:00 2001 From: ChaoKong Date: Thu, 31 Dec 2015 16:30:49 -0500 Subject: [PATCH] add intermediate results for TCPThroughput, ping, and tracerouteTask --- .../com/mobilyzer/MeasurementScheduler.java | 15 ++- .../src/com/mobilyzer/MeasurementTask.java | 14 ++- .../com/mobilyzer/ServerMeasurementTask.java | 1 + Mobilyzer/src/com/mobilyzer/UpdateIntent.java | 4 + .../com/mobilyzer/UserMeasurementTask.java | 5 +- .../mobilyzer/measurements/ParallelTask.java | 3 + .../com/mobilyzer/measurements/PingTask.java | 98 +++++++++++++++++-- .../measurements/SequentialTask.java | 1 + .../measurements/TCPThroughputTask.java | 50 ++++++++++ .../measurements/TracerouteTask.java | 58 ++++++++++- 10 files changed, 238 insertions(+), 11 deletions(-) diff --git a/Mobilyzer/src/com/mobilyzer/MeasurementScheduler.java b/Mobilyzer/src/com/mobilyzer/MeasurementScheduler.java index 948c0ab..be87425 100644 --- a/Mobilyzer/src/com/mobilyzer/MeasurementScheduler.java +++ b/Mobilyzer/src/com/mobilyzer/MeasurementScheduler.java @@ -185,7 +185,7 @@ public void onCreate() { filter.addAction(UpdateIntent.MEASUREMENT_PROGRESS_UPDATE_ACTION); filter.addAction(UpdateIntent.GCM_MEASUREMENT_ACTION); filter.addAction(UpdateIntent.PLT_MEASUREMENT_ACTION); - + filter.addAction(UpdateIntent.MEASUREMENT_INTERMEDIATE_PROGRESS_UPDATE_ACTION); broadcastReceiver = new BroadcastReceiver() { @Override @@ -324,6 +324,19 @@ public void onReceive(Context context, Intent intent) { Config.TASK_RESUMED)) { tasksStatus.put(taskid, TaskStatus.RUNNING); } + } else if (intent.getAction().equals(UpdateIntent.MEASUREMENT_INTERMEDIATE_PROGRESS_UPDATE_ACTION)){ + String IM_taskKey = intent.getStringExtra(UpdateIntent.CLIENTKEY_PAYLOAD); + String IM_taskid = intent.getStringExtra(UpdateIntent.TASKID_PAYLOAD); + int IM_priority = + intent.getIntExtra(UpdateIntent.TASK_PRIORITY_PAYLOAD, + MeasurementTask.INVALID_PRIORITY); + Parcelable[] IM_Results = intent.getParcelableArrayExtra(UpdateIntent.INTERMEDIATE_RESULT_PAYLOAD); + if (IM_Results != null && IM_Results.length!=0) { + sendResultToClient(IM_Results, IM_priority, IM_taskKey, IM_taskid); + Logger.i("Sending Intermediate results to client..."); + System.out.println("receive the intermediate results"); + } + } else if (intent.getAction().equals(UpdateIntent.CHECKIN_ACTION) || intent.getAction().equals(UpdateIntent.CHECKIN_RETRY_ACTION)) { Logger.d("Checkin intent received"); diff --git a/Mobilyzer/src/com/mobilyzer/MeasurementTask.java b/Mobilyzer/src/com/mobilyzer/MeasurementTask.java index eb90244..d85c666 100644 --- a/Mobilyzer/src/com/mobilyzer/MeasurementTask.java +++ b/Mobilyzer/src/com/mobilyzer/MeasurementTask.java @@ -6,7 +6,7 @@ import java.util.HashMap; import java.util.Set; import java.util.concurrent.Callable; - +import android.content.Context; import com.mobilyzer.exceptions.MeasurementError; import com.mobilyzer.measurements.DnsLookupTask; import com.mobilyzer.measurements.HttpTask; @@ -29,6 +29,9 @@ public abstract class MeasurementTask Parcelable { protected MeasurementDesc measurementDesc; protected String taskId; + + //private MeasurementScheduler scheduler; + private Context context = null; public static final int USER_PRIORITY = Integer.MIN_VALUE; @@ -114,6 +117,15 @@ public String getKey() { public void setKey(String key) { this.measurementDesc.key = key; } + + // pass scheduler to every specific task + public void setContext(Context context) { + this.context = context; + } + + public Context getContext() { + return this.context; + } public MeasurementDesc getDescription() { diff --git a/Mobilyzer/src/com/mobilyzer/ServerMeasurementTask.java b/Mobilyzer/src/com/mobilyzer/ServerMeasurementTask.java index ce5a231..fbf16a0 100644 --- a/Mobilyzer/src/com/mobilyzer/ServerMeasurementTask.java +++ b/Mobilyzer/src/com/mobilyzer/ServerMeasurementTask.java @@ -38,6 +38,7 @@ public class ServerMeasurementTask implements Callable { public ServerMeasurementTask(MeasurementTask task, MeasurementScheduler scheduler, ResourceCapManager manager) { realTask = task; + realTask.setContext(null); this.scheduler = scheduler; this.contextCollector = new ContextCollector(); this.rManager = manager; diff --git a/Mobilyzer/src/com/mobilyzer/UpdateIntent.java b/Mobilyzer/src/com/mobilyzer/UpdateIntent.java index e16b5fa..a9d662a 100644 --- a/Mobilyzer/src/com/mobilyzer/UpdateIntent.java +++ b/Mobilyzer/src/com/mobilyzer/UpdateIntent.java @@ -44,6 +44,7 @@ public class UpdateIntent extends Intent { public static final String VIDEO_TASK_PAYLOAD_REBUFFER_TIME = "VIDEO_TASK_PAYLOAD_REBUFFER_TIME"; public static final String VIDEO_TASK_PAYLOAD_BBA_SWITCH_TIME = "VIDEO_TASK_PAYLOAD_BBA_SWITCH_TIME"; public static final String VIDEO_TASK_PAYLOAD_BYTE_USED = "VIDEO_TASK_PAYLOAD_BYTE_USED"; + public static final String INTERMEDIATE_RESULT_PAYLOAD = "INTERMEDIATE_RESULT_PAYLOAD"; // Different types of actions that this intent can represent: @@ -81,6 +82,9 @@ public class UpdateIntent extends Intent { PACKAGE_PREFIX + ".DATA_USAGE_ACTION"; public static final String AUTH_ACCOUNT_ACTION = PACKAGE_PREFIX + ".AUTH_ACCOUNT_ACTION"; + + public static final String MEASUREMENT_INTERMEDIATE_PROGRESS_UPDATE_ACTION = + APP_PREFIX + ".MEASUREMENT_INTERMEDIATE_PROGRESS_UPDATE_ACTION"; /** * Creates an intent of the specified action with an optional message diff --git a/Mobilyzer/src/com/mobilyzer/UserMeasurementTask.java b/Mobilyzer/src/com/mobilyzer/UserMeasurementTask.java index 59ec096..46d34fd 100644 --- a/Mobilyzer/src/com/mobilyzer/UserMeasurementTask.java +++ b/Mobilyzer/src/com/mobilyzer/UserMeasurementTask.java @@ -17,7 +17,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.concurrent.Callable; - +import android.content.Context; import android.content.Intent; import com.mobilyzer.MeasurementResult.TaskProgress; @@ -33,6 +33,7 @@ public class UserMeasurementTask implements Callable { public UserMeasurementTask(MeasurementTask task, MeasurementScheduler scheduler) { realTask = task; + realTask.setContext(scheduler.getApplicationContext()); this.scheduler = scheduler; this.contextCollector= new ContextCollector(); } @@ -96,9 +97,11 @@ public MeasurementResult[] call() throws MeasurementError { ArrayList> contextResults = contextCollector.stopCollector(); for (MeasurementResult r: results){ + String testIntermediateFirst = r.toString(); r.addContextResults(contextResults); r.getDeviceProperty().dnResolvability=contextCollector.dnsConnectivity; r.getDeviceProperty().ipConnectivity=contextCollector.ipConnectivity; + String testIntermediateSecond = r.toString(); } } catch (MeasurementError e) { Logger.e("User measurement " + realTask.getDescriptor() + " has failed"); diff --git a/Mobilyzer/src/com/mobilyzer/measurements/ParallelTask.java b/Mobilyzer/src/com/mobilyzer/measurements/ParallelTask.java index 9cf9c36..e9a6c5d 100644 --- a/Mobilyzer/src/com/mobilyzer/measurements/ParallelTask.java +++ b/Mobilyzer/src/com/mobilyzer/measurements/ParallelTask.java @@ -165,6 +165,9 @@ public String getDescriptor() { @Override public MeasurementResult[] call() throws MeasurementError { + for (MeasurementTask task: this.tasks){ + task.setContext(this.getContext()); + } long timeout=duration; executor=Executors.newFixedThreadPool(this.tasks.size()); diff --git a/Mobilyzer/src/com/mobilyzer/measurements/PingTask.java b/Mobilyzer/src/com/mobilyzer/measurements/PingTask.java index 6d26ee3..7674aac 100755 --- a/Mobilyzer/src/com/mobilyzer/measurements/PingTask.java +++ b/Mobilyzer/src/com/mobilyzer/measurements/PingTask.java @@ -18,7 +18,8 @@ import android.os.Parcel; import android.os.Parcelable; import android.util.Log; - +import android.content.Context; +import android.content.Intent; import com.mobilyzer.Config; import com.mobilyzer.MeasurementDesc; @@ -30,6 +31,8 @@ import com.mobilyzer.util.MeasurementJsonConvertor; import com.mobilyzer.util.PhoneUtils; import com.mobilyzer.util.Util; +import com.mobilyzer.MeasurementScheduler; +import com.mobilyzer.UpdateIntent; import java.io.BufferedReader; import java.io.IOException; @@ -70,6 +73,30 @@ public class PingTask extends MeasurementTask{ private String targetIp = null; //Track data consumption for this task to avoid exceeding user's limit private long dataConsumed; + private Context context = null; + + private void broadcastIntermediateResults(MeasurementResult[] results, Context context) { + this.context = context; + Intent intent = new Intent(); + intent.setAction(UpdateIntent.MEASUREMENT_INTERMEDIATE_PROGRESS_UPDATE_ACTION); + //TODO fixed one value priority for all users task? + intent.putExtra(UpdateIntent.TASK_PRIORITY_PAYLOAD, + MeasurementTask.USER_PRIORITY); + intent.putExtra(UpdateIntent.TASKID_PAYLOAD, this.getTaskId()); + intent.putExtra(UpdateIntent.CLIENTKEY_PAYLOAD, this.getKey()); + + if (results != null){ + + //intent.putExtra(UpdateIntent.TASK_STATUS_PAYLOAD, Config.TASK_FINISHED); + intent.putExtra(UpdateIntent.INTERMEDIATE_RESULT_PAYLOAD, results); + + this.context.sendBroadcast(intent); + }else{ + intent.putExtra(UpdateIntent.INTERMEDIATE_RESULT_PAYLOAD, "No intermediate results are broadcasted"); + + } + + } /** * Encode ping specific parameters, along with common parameters inherited from MeasurmentDesc @@ -356,6 +383,12 @@ private MeasurementResult executePingCmdTask(int ipByteLen) PingDesc pingTask = (PingDesc) this.measurementDesc; String errorMsg = ""; MeasurementResult measurementResult = null; + + MeasurementResult intermediateMeasurementResults = null; + MeasurementResult[] intermediateResult = null; + intermediateResult = new MeasurementResult[1]; + + // TODO(Wenjie): Add a exhaustive list of ping locations for different // Android phones pingTask.pingExe = Util.pingExecutableBasedOnIPType(ipByteLen); @@ -409,6 +442,18 @@ private MeasurementResult executePingCmdTask(int ipByteLen) int packetsReceived = packetLossInfo[1]; packetLoss = 1 - ((double) packetsReceived / (double) packetsSent); } + + this.context = this.getContext(); + if ( this.context != null ){ + if (rrts.size() >= 2 && (rrts.size() < Config.PING_COUNT_PER_MEASUREMENT) && (extractedValues != null) ){ + intermediateMeasurementResults = constructResult(rrts,packetLoss, + rrts.size(),PING_METHOD_CMD); + intermediateResult[0] = intermediateMeasurementResults; + broadcastIntermediateResults(intermediateResult,this.context); + + + } + } Logger.i(line); } @@ -452,6 +497,11 @@ private MeasurementResult executeJavaPingTask() throws MeasurementError { ArrayList rrts = new ArrayList(); String errorMsg = ""; MeasurementResult result = null; + + double packetLoss = Double.MIN_VALUE; + MeasurementResult intermediateMeasurementResults = null; + MeasurementResult[] intermediateResult = null; + intermediateResult = new MeasurementResult[1]; try { int timeOut = (int) (3000 * (double) pingTask.pingTimeoutSec / @@ -466,12 +516,27 @@ private MeasurementResult executeJavaPingTask() throws MeasurementError { if (status) { totalPingDelay += rrtVal; rrts.add((double) rrtVal); + + packetLoss = 1 - + ((double) rrts.size() / (i+1)); + dataConsumed += pingTask.packetSizeByte * (i+1) * 2; + this.context = this.getContext(); + if ( this.context != null){ + intermediateMeasurementResults = constructResult(rrts,packetLoss, + (i+1),PING_METHOD_JAVA); + intermediateResult[0] = intermediateMeasurementResults; + broadcastIntermediateResults(intermediateResult,this.context); + + } } } Logger.i("java ping succeeds"); - double packetLoss = 1 - - ((double) rrts.size() / (double) Config.PING_COUNT_PER_MEASUREMENT); - +// double packetLoss = 1 - +// ((double) rrts.size() / (double) Config.PING_COUNT_PER_MEASUREMENT); + if (packetLoss == Double.MIN_VALUE) { + packetLoss = 1 - + ((double) rrts.size() / (double) Config.PING_COUNT_PER_MEASUREMENT); + } dataConsumed += pingTask.packetSizeByte * Config.PING_COUNT_PER_MEASUREMENT * 2; result = constructResult(rrts, packetLoss, @@ -505,7 +570,10 @@ private MeasurementResult executeHttpPingTask() throws MeasurementError { PingDesc pingTask = (PingDesc) this.measurementDesc; String errorMsg = ""; MeasurementResult result = null; - + double packetLoss = Double.MIN_VALUE; + MeasurementResult intermediateMeasurementResults = null; + MeasurementResult[] intermediateResult = null; + intermediateResult = new MeasurementResult[1]; try { long totalPingDelay = 0; @@ -525,12 +593,28 @@ private MeasurementResult executeHttpPingTask() throws MeasurementError { pingEndTime = System.currentTimeMillis(); httpClient.disconnect(); rrts.add((double) (pingEndTime - pingStartTime)); + packetLoss = 1 - + ((double) rrts.size() / (i+1)); + dataConsumed += pingTask.packetSizeByte * (i+1) * 2; + this.context = this.getContext(); + if ( this.context != null){ + intermediateMeasurementResults = constructResult(rrts,packetLoss, + (i+1),PING_METHOD_HTTP); + intermediateResult[0] = intermediateMeasurementResults; + broadcastIntermediateResults(intermediateResult,this.context); + + } } Logger.i("HTTP get ping succeeds"); Logger.i("RTT is " + rrts.toString()); - double packetLoss = 1 - - ((double) rrts.size() / (double) Config.PING_COUNT_PER_MEASUREMENT); +// double packetLoss = 1 +// - ((double) rrts.size() / (double) Config.PING_COUNT_PER_MEASUREMENT); + + if (packetLoss == Double.MIN_VALUE) { + packetLoss = 1 - + ((double) rrts.size() / (double) Config.PING_COUNT_PER_MEASUREMENT); + } dataConsumed += pingTask.packetSizeByte * Config.PING_COUNT_PER_MEASUREMENT * 2; diff --git a/Mobilyzer/src/com/mobilyzer/measurements/SequentialTask.java b/Mobilyzer/src/com/mobilyzer/measurements/SequentialTask.java index 7dd1a38..61a4cb8 100644 --- a/Mobilyzer/src/com/mobilyzer/measurements/SequentialTask.java +++ b/Mobilyzer/src/com/mobilyzer/measurements/SequentialTask.java @@ -169,6 +169,7 @@ public MeasurementResult[] call() throws MeasurementError { try { // futures=executor.invokeAll(this.tasks,timeout,TimeUnit.MILLISECONDS); for(MeasurementTask mt: tasks){ + mt.setContext(this.getContext()); if(stopFlag){ throw new MeasurementError("Cancelled"); } diff --git a/Mobilyzer/src/com/mobilyzer/measurements/TCPThroughputTask.java b/Mobilyzer/src/com/mobilyzer/measurements/TCPThroughputTask.java index 66a3550..e3ff31c 100644 --- a/Mobilyzer/src/com/mobilyzer/measurements/TCPThroughputTask.java +++ b/Mobilyzer/src/com/mobilyzer/measurements/TCPThroughputTask.java @@ -24,8 +24,12 @@ import com.mobilyzer.util.MLabNS; import com.mobilyzer.util.MeasurementJsonConvertor; import com.mobilyzer.util.PhoneUtils; +import com.mobilyzer.MeasurementScheduler; +import com.mobilyzer.UpdateIntent; + import android.content.Context; +import android.content.Intent; import android.os.Parcel; import android.os.Parcelable; @@ -88,6 +92,35 @@ public class TCPThroughputTask extends MeasurementTask { private long duration; private TaskProgress taskProgress; private volatile boolean stopFlag; + + private Context IM_context = null; + private TaskProgress Intermediate_TaskProgress = TaskProgress.COMPLETED; + + // add broadcast to send the intermediate results + + private void broadcastIntermediateMeasurement(MeasurementResult[] results, Context context) { + this.IM_context = context; + Intent intent = new Intent(); + intent.setAction(UpdateIntent.MEASUREMENT_INTERMEDIATE_PROGRESS_UPDATE_ACTION); + + intent.putExtra(UpdateIntent.TASK_PRIORITY_PAYLOAD, + MeasurementTask.USER_PRIORITY); + intent.putExtra(UpdateIntent.TASKID_PAYLOAD, this.getTaskId()); + intent.putExtra(UpdateIntent.CLIENTKEY_PAYLOAD, this.getKey()); + + if (results != null){ + + //intent.putExtra(UpdateIntent.TASK_STATUS_PAYLOAD, Config.TASK_FINISHED); + intent.putExtra(UpdateIntent.INTERMEDIATE_RESULT_PAYLOAD, results); + + this.IM_context.sendBroadcast(intent); + }else{ + intent.putExtra(UpdateIntent.INTERMEDIATE_RESULT_PAYLOAD, "No intermediate results are broadcasted"); + + } + + } + // class constructor public TCPThroughputTask(MeasurementDesc desc) { @@ -679,6 +712,7 @@ private void downlink() throws MeasurementError, IOException { * @param time period increment */ private void updateSize(int delta) { + MeasurementResult IntermediateResult = null; double gtime = System.currentTimeMillis() - this.taskStartTime; //ignore slow start if (gtime<((TCPThroughputDesc)measurementDesc).slow_start_period_sec*this.KSEC) @@ -696,6 +730,22 @@ private void updateSize(int delta) { this.samplingResults = this.insertWithOrder(this.samplingResults, throughput); this.accumulativeSize = 0; this.startSampleTime = System.currentTimeMillis(); + + this.IM_context = this.getContext(); + if (this.IM_context != null){ + PhoneUtils Intermediate_phoneUtils = PhoneUtils.getPhoneUtils(); + IntermediateResult = new MeasurementResult(Intermediate_phoneUtils.getDeviceInfo().deviceId, + Intermediate_phoneUtils.getDeviceProperty(this.getKey()),TCPThroughputTask.TYPE, + System.currentTimeMillis()*1000,Intermediate_TaskProgress,this.measurementDesc); + IntermediateResult.addResult("tcp_speed_results", this.samplingResults); + IntermediateResult.addResult("data_limit_exceeded", this.DATA_LIMIT_EXCEEDED); + IntermediateResult.addResult("duration", time); + IntermediateResult.addResult("server_version", this.serverVersion); + MeasurementResult[] IM_mrArray = new MeasurementResult[1]; + IM_mrArray[0] = IntermediateResult; + broadcastIntermediateMeasurement(IM_mrArray,this.IM_context); + + } } } diff --git a/Mobilyzer/src/com/mobilyzer/measurements/TracerouteTask.java b/Mobilyzer/src/com/mobilyzer/measurements/TracerouteTask.java index 14fba2f..af95e28 100755 --- a/Mobilyzer/src/com/mobilyzer/measurements/TracerouteTask.java +++ b/Mobilyzer/src/com/mobilyzer/measurements/TracerouteTask.java @@ -13,7 +13,8 @@ */ package com.mobilyzer.measurements; - +import android.content.Context; +import android.content.Intent; import android.os.Parcel; import android.os.Parcelable; @@ -50,6 +51,8 @@ import com.mobilyzer.util.MeasurementJsonConvertor; import com.mobilyzer.util.PhoneUtils; import com.mobilyzer.util.Util; +import com.mobilyzer.MeasurementScheduler; +import com.mobilyzer.UpdateIntent; /** @@ -86,6 +89,33 @@ public class TracerouteTask extends MeasurementTask implements PreemptibleMeasur // Track data consumption for this task to avoid exceeding user's limit private long dataConsumed; + private Context context = null; + private TaskProgress Intermediate_TaskProgress = TaskProgress.COMPLETED; + + // add broadcast to send the intermediate results + + private void broadcastIntermediateMeasurement(MeasurementResult[] results, Context context) { + this.context = context; + Intent intent = new Intent(); + intent.setAction(UpdateIntent.MEASUREMENT_INTERMEDIATE_PROGRESS_UPDATE_ACTION); + //TODO fixed one value priority for all users task? + intent.putExtra(UpdateIntent.TASK_PRIORITY_PAYLOAD, + MeasurementTask.USER_PRIORITY); + intent.putExtra(UpdateIntent.TASKID_PAYLOAD, this.getTaskId()); + intent.putExtra(UpdateIntent.CLIENTKEY_PAYLOAD, this.getKey()); + + if (results != null){ + + //intent.putExtra(UpdateIntent.TASK_STATUS_PAYLOAD, Config.TASK_FINISHED); + intent.putExtra(UpdateIntent.INTERMEDIATE_RESULT_PAYLOAD, results); + + this.context.sendBroadcast(intent); + }else{ + intent.putExtra(UpdateIntent.INTERMEDIATE_RESULT_PAYLOAD, "No intermediate results are broadcasted"); + + } + + } /** * The description of the Traceroute measurement @@ -323,6 +353,7 @@ public MeasurementResult[] call() throws MeasurementError { throw new MeasurementError("target " + target + " cannot be resolved"); } MeasurementResult result = null; + MeasurementResult intermediateResult = null; ExecutorService hopExecutorService = Executors.newFixedThreadPool(task.parallelProbeNum); @@ -397,6 +428,31 @@ public MeasurementResult[] call() throws MeasurementError { } } + this.context = this.getContext(); + if (this.context != null){ + + PhoneUtils Intermediate_phoneUtils = PhoneUtils.getPhoneUtils(); + intermediateResult = new MeasurementResult(Intermediate_phoneUtils.getDeviceInfo().deviceId, + Intermediate_phoneUtils.getDeviceProperty(this.getKey()),TracerouteTask.TYPE, + System.currentTimeMillis()*1000,Intermediate_TaskProgress,this.measurementDesc); + + intermediateResult.addResult("num_hops", hop.ttl); + + for (int i = 0; i < hopHosts.size(); i++) { + HopInfo intermediateHopInfo = hopHosts.get(i); + int intermediateHostIdx = 1; + for (String intermediateHost : intermediateHopInfo.hosts) { + intermediateResult.addResult("hop_" + intermediateHopInfo.ttl + "_addr_" + intermediateHostIdx++, intermediateHost); + } + intermediateResult.addResult("hop_" + intermediateHopInfo.ttl + "_rtt_ms", String.format("%.3f", intermediateHopInfo.rtt)); + + } + + + MeasurementResult[] intermediateMrArray = new MeasurementResult[1]; + intermediateMrArray[0] = intermediateResult; + broadcastIntermediateMeasurement(intermediateMrArray,this.context); + } } catch (InterruptedException e) { e.printStackTrace();