Skip to content

Doing partial reduceByKey in Flow created in func init() #84

@jdelamar

Description

@jdelamar

Hello.

I apologize if I have missed something obvious, but I am using glow to map and reduce time series. I would like to do a reduce or reduceByKey on every time slices (for instance, reduceByKey on all events received in the last minute.

Right now, I am setting the code up to be distributed and, following the tutorial, have put my flows in the func init() section so that they are "statically -- (instantiated only once the same way)" on every nodes.

The data is coming from an unlimited stream (i.e, not from a bounded file). So I have something like this:

func init() {
	mapRecordsToMetadata.
		Channel(mapRecordsToMetadataInput).
		Map(mapTimeSeriesToTimeSliceFunc).
		Map(mapConvertColumnValuesFunc).
	        
                // ... some more maps and filters
                
                ReduceByKey(reduceByFlowKey).
		AddOutput(mapRecordsToMetadataOutput)
}

// letsDoIt uses mapRecordsToMetadata to map and reduces all events for a given key during a time slice
func letsDoIt(streamedEvents chan []string) chan GroupedEventsByKeyChan{

  out := make (chan GroupedEventsByKeyChan)
  go func() {
      for evt := range streamedEvents {
          mapRecordsToMetadataInput <- evt
     }
  }()

  go func() {
      for evt := range mapRecordsToMetadataOutput {
          out <- evt
      }
  }()

 return out
}

I have simplified a bit, but hopefully this is enough to get the idea. Now, reduceByKey is blocking until I close mapRecordsToMetadataInput input channel (makes sense). However, if I do this, I can't really use my flow mapRecordsToMetadata anymore (is there a way to replace the input channel and restart it?).

Conceptually, I would "close" my input flow (mapRecordsToMetadataInput every "time slices" where I want the aggregate to run (i.e every 30 seconds) so that my reduceByKey would run on that intervals of inputs.

My only option seems to make the "map" operations in the init() section (i.e mapRecordsToMetadataInput) and the reduceByKey() operation in a dynamic flow, recreating the dynamic flow every 30 seconds in my case.

Something like this:

func init() {
	mapRecordsToMetadata.
		Channel(mapRecordsToMetadataInput).
		Map(mapTimeSeriesToTimeSliceFunc).
		Map(mapConvertColumnValuesFunc).
	        
                // ... some more maps and filters
                // Removed the Reduce By Key 
		AddOutput(mapRecordsToMetadataOutput)
}

func letsDoIt(streamedEvents chan []string) chan GroupedEventsByKeyChan{

  out := make (chan GroupedEventsByKeyChan)
  go func() {
      for evt := range streamedEvents {
          mapRecordsToMetadataInput <- evt
     }
  }()

  go func() {
      nextInterval := time.Now().Add(30 * time.SECOND)
      for {
         
         reduceFlow := flow.New()
         reduceInChan := make(chan EventsByKeychan)
         reduceFlow.
            Channel(reduceInChan).
            ReduceByKey(reduceByFlowKey).
            AddOutput(out)

         for evt := range mapRecordsToMetadataOutput {
            reduceInChan  <- evt

            if (evt.Time.After(nextInterval) {
                //flush and reduce for that interval
                close(ReduceInChan)
                nextInterval := nextInterval.Add(30 * time.SECOND)
            }
         }
      }
  }()

 return out
}

Is this the "right" canonical way to proceed? Does that scale? Or are we missing a small feature that would allow to "flush" our static flows at fixed intervals or on demand so that we can operate on streaming use cases in a more streamline fashion?

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions