diff --git a/ride-cleansing/src/main/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingExercise.java b/ride-cleansing/src/main/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingExercise.java index 1f07312f..0f9d07a3 100644 --- a/ride-cleansing/src/main/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingExercise.java +++ b/ride-cleansing/src/main/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingExercise.java @@ -26,7 +26,7 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.training.exercises.common.datatypes.TaxiRide; import org.apache.flink.training.exercises.common.sources.TaxiRideGenerator; -import org.apache.flink.training.exercises.common.utils.MissingSolutionException; +import org.apache.flink.training.exercises.common.utils.GeoUtils; /** * The Ride Cleansing exercise from the Flink training. @@ -80,7 +80,7 @@ public JobExecutionResult execute() throws Exception { public static class NYCFilter implements FilterFunction { @Override public boolean filter(TaxiRide taxiRide) throws Exception { - throw new MissingSolutionException(); + return GeoUtils.isInNYC(taxiRide.startLon,taxiRide.startLat) && GeoUtils.isInNYC(taxiRide.endLon,taxiRide.endLat); } } } diff --git a/rides-and-fares/src/main/java/org/apache/flink/training/exercises/ridesandfares/RidesAndFaresExercise.java b/rides-and-fares/src/main/java/org/apache/flink/training/exercises/ridesandfares/RidesAndFaresExercise.java index 0662dfc0..1d552fcd 100644 --- a/rides-and-fares/src/main/java/org/apache/flink/training/exercises/ridesandfares/RidesAndFaresExercise.java +++ b/rides-and-fares/src/main/java/org/apache/flink/training/exercises/ridesandfares/RidesAndFaresExercise.java @@ -19,6 +19,8 @@ package org.apache.flink.training.exercises.ridesandfares; import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -31,7 +33,6 @@ import org.apache.flink.training.exercises.common.datatypes.TaxiRide; import org.apache.flink.training.exercises.common.sources.TaxiFareGenerator; import org.apache.flink.training.exercises.common.sources.TaxiRideGenerator; -import org.apache.flink.training.exercises.common.utils.MissingSolutionException; import org.apache.flink.util.Collector; /** @@ -98,20 +99,32 @@ public static void main(String[] args) throws Exception { public static class EnrichmentFunction extends RichCoFlatMapFunction { - + private transient ValueState fareState; + private transient ValueState rideState; @Override public void open(Configuration config) throws Exception { - throw new MissingSolutionException(); + fareState = getRuntimeContext().getState(new ValueStateDescriptor<>("fareState", TaxiFare.class)); + rideState = getRuntimeContext().getState(new ValueStateDescriptor<>("rideState", TaxiRide.class)); } @Override public void flatMap1(TaxiRide ride, Collector out) throws Exception { - throw new MissingSolutionException(); + if(fareState.value() == null){ + rideState.update(ride); + }else{ + out.collect(new RideAndFare(ride,fareState.value())); + fareState.clear(); + } } @Override public void flatMap2(TaxiFare fare, Collector out) throws Exception { - throw new MissingSolutionException(); + if(rideState.value() == null){ + fareState.update(fare); + }else{ + out.collect(new RideAndFare(rideState.value(),fare)); + rideState.clear(); + } } } }