From ff8adf6ca4825cad301c4670bbbe7ea1e79c6bef Mon Sep 17 00:00:00 2001 From: manoellins Date: Wed, 8 May 2024 15:18:58 -0700 Subject: [PATCH 1/2] Module 1 labs - Enrichment and Filtering --- .../ridecleansing/RideCleansingExercise.java | 4 ++-- .../ridesandfares/RidesAndFaresExercise.java | 23 +++++++++++++++---- 2 files changed, 20 insertions(+), 7 deletions(-) diff --git a/ride-cleansing/src/main/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingExercise.java b/ride-cleansing/src/main/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingExercise.java index 1f07312f..0f9d07a3 100644 --- a/ride-cleansing/src/main/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingExercise.java +++ b/ride-cleansing/src/main/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingExercise.java @@ -26,7 +26,7 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.training.exercises.common.datatypes.TaxiRide; import org.apache.flink.training.exercises.common.sources.TaxiRideGenerator; -import org.apache.flink.training.exercises.common.utils.MissingSolutionException; +import org.apache.flink.training.exercises.common.utils.GeoUtils; /** * The Ride Cleansing exercise from the Flink training. @@ -80,7 +80,7 @@ public JobExecutionResult execute() throws Exception { public static class NYCFilter implements FilterFunction { @Override public boolean filter(TaxiRide taxiRide) throws Exception { - throw new MissingSolutionException(); + return GeoUtils.isInNYC(taxiRide.startLon,taxiRide.startLat) && GeoUtils.isInNYC(taxiRide.endLon,taxiRide.endLat); } } } diff --git a/rides-and-fares/src/main/java/org/apache/flink/training/exercises/ridesandfares/RidesAndFaresExercise.java b/rides-and-fares/src/main/java/org/apache/flink/training/exercises/ridesandfares/RidesAndFaresExercise.java index 0662dfc0..01883224 100644 --- a/rides-and-fares/src/main/java/org/apache/flink/training/exercises/ridesandfares/RidesAndFaresExercise.java +++ b/rides-and-fares/src/main/java/org/apache/flink/training/exercises/ridesandfares/RidesAndFaresExercise.java @@ -19,6 +19,8 @@ package org.apache.flink.training.exercises.ridesandfares; import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -31,7 +33,6 @@ import org.apache.flink.training.exercises.common.datatypes.TaxiRide; import org.apache.flink.training.exercises.common.sources.TaxiFareGenerator; import org.apache.flink.training.exercises.common.sources.TaxiRideGenerator; -import org.apache.flink.training.exercises.common.utils.MissingSolutionException; import org.apache.flink.util.Collector; /** @@ -98,20 +99,32 @@ public static void main(String[] args) throws Exception { public static class EnrichmentFunction extends RichCoFlatMapFunction { - + private ValueState fareState; + private ValueState rideState; @Override public void open(Configuration config) throws Exception { - throw new MissingSolutionException(); + fareState = getRuntimeContext().getState(new ValueStateDescriptor<>("fareState", TaxiFare.class)); + rideState = getRuntimeContext().getState(new ValueStateDescriptor<>("rideState", TaxiRide.class)); } @Override public void flatMap1(TaxiRide ride, Collector out) throws Exception { - throw new MissingSolutionException(); + if(fareState.value() == null){ + rideState.update(ride); + }else{ + out.collect(new RideAndFare(ride,fareState.value())); + fareState.clear(); + } } @Override public void flatMap2(TaxiFare fare, Collector out) throws Exception { - throw new MissingSolutionException(); + if(rideState.value() == null){ + fareState.update(fare); + }else{ + out.collect(new RideAndFare(rideState.value(),fare)); + rideState.clear(); + } } } } From f90a9d703b940ad87d81594221cded642fd86936 Mon Sep 17 00:00:00 2001 From: manoellins Date: Thu, 9 May 2024 16:40:32 -0700 Subject: [PATCH 2/2] Module 1 labs - Enrichment and Filtering Changing the state variables to transient. --- .../exercises/ridesandfares/RidesAndFaresExercise.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rides-and-fares/src/main/java/org/apache/flink/training/exercises/ridesandfares/RidesAndFaresExercise.java b/rides-and-fares/src/main/java/org/apache/flink/training/exercises/ridesandfares/RidesAndFaresExercise.java index 01883224..1d552fcd 100644 --- a/rides-and-fares/src/main/java/org/apache/flink/training/exercises/ridesandfares/RidesAndFaresExercise.java +++ b/rides-and-fares/src/main/java/org/apache/flink/training/exercises/ridesandfares/RidesAndFaresExercise.java @@ -99,8 +99,8 @@ public static void main(String[] args) throws Exception { public static class EnrichmentFunction extends RichCoFlatMapFunction { - private ValueState fareState; - private ValueState rideState; + private transient ValueState fareState; + private transient ValueState rideState; @Override public void open(Configuration config) throws Exception { fareState = getRuntimeContext().getState(new ValueStateDescriptor<>("fareState", TaxiFare.class));