Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ jobs:
- name: Run Gosec Security Scanner
uses: securego/gosec@master
with:
args: -exclude=G115 ./...
args: -exclude=G115 -exclude-generated ./...

- name: Build
run: go build -v ./...
Expand Down
16 changes: 16 additions & 0 deletions connectors/common/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ type Teardownable interface {
Teardown()
}

type OnTaskCompletionBarrierHandlerServicable interface {
OnTaskCompletionBarrierHandler(uint) error
}

type ConnectorSettings struct {
NumParallelCopiers int
NumParallelWriters int
Expand Down Expand Up @@ -623,6 +627,7 @@ func (c *connector) StartReadToChannel(flowId iface.FlowID, options iface.Connec
DataBatch: r.GetData(),
MutationType: iface.MutationType_InsertBatch,
Loc: destNs,
TaskId: uint(task.Id),
}
dataChannel <- dataMessage
}
Expand All @@ -635,6 +640,7 @@ func (c *connector) StartReadToChannel(flowId iface.FlowID, options iface.Connec
DataBatch: transformed.Msg.GetData(),
MutationType: iface.MutationType_InsertBatch,
Loc: destNs,
TaskId: uint(task.Id),
}
dataChannel <- dataMessage
}
Expand All @@ -643,6 +649,7 @@ func (c *connector) StartReadToChannel(flowId iface.FlowID, options iface.Connec
DataBatch: data,
MutationType: iface.MutationType_InsertBatch,
Loc: destinationNamespace,
TaskId: uint(task.Id),
}
dataChannel <- dataMessage
}
Expand Down Expand Up @@ -978,6 +985,14 @@ func (c *connector) HandlerError(err error) error {
func (c *connector) HandleBarrierMessage(barrierMsg iface.DataMessage) error {
switch barrierMsg.BarrierType {
case iface.BarrierType_TaskComplete:
// Call the optional OnTaskCompletionBarrier hook if implemented
if onTaskCompletionBarrierHandlerServicable, ok := c.maybeOptimizedImpl.(OnTaskCompletionBarrierHandlerServicable); ok {
err := onTaskCompletionBarrierHandlerServicable.OnTaskCompletionBarrierHandler(barrierMsg.BarrierTaskId)
if err != nil {
return err
}
}

// notify the coordinator that the task is done from our side
if err := c.coord.NotifyTaskDone(c.flowID, c.id, (iface.ReadPlanTaskID)(barrierMsg.BarrierTaskId), nil); err != nil {
return err
Expand Down Expand Up @@ -1080,6 +1095,7 @@ func (c *connector) ProcessDataMessages(dataMsgs []iface.DataMessage) error {
Namespace: dataMsg.Loc,
Data: dataMsg.DataBatch,
Type: c.settings.DestinationDataType,
TaskId: uint32(dataMsg.TaskId),
}))
if err != nil {
return err
Expand Down
Loading