Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func main() {
MaxConcurrentReconciles: concurrentReconciles,
}

mgr, err = controller.NewManager(restConfig, managerOptions, controllerOptions)
mgr, err = controller.NewManager(ctx, restConfig, managerOptions, controllerOptions)
if err != nil {
mainLog.Error(err, "unable to start manager")
os.Exit(1)
Expand Down
15 changes: 10 additions & 5 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,13 @@ var (

const TaskRunLabel = "tekton.dev/taskRun"

func NewManager(cfg *rest.Config, managerOptions ctrl.Options, controllerOptions controller.Options) (ctrl.Manager, error) {
func NewManager(ctx context.Context, cfg *rest.Config, managerOptions ctrl.Options, controllerOptions controller.Options) (ctrl.Manager, error) {
// do not check tekton in kcp
// we have seen in e2e testing that this path can get invoked prior to the TaskRun CRD getting generated,
// and controller-runtime does not retry on missing CRDs.
// so we are going to wait on the CRDs existing before moving forward.
apiextensionsClient := apiextensionsclient.NewForConfigOrDie(cfg)
if err := wait.PollUntilContextTimeout(context.Background(), time.Second*5, time.Minute*5, true, func(ctx context.Context) (done bool, err error) {
if err := wait.PollUntilContextTimeout(ctx, time.Second*5, time.Minute*5, true, func(ctx context.Context) (done bool, err error) {
_, err = apiextensionsClient.ApiextensionsV1().CustomResourceDefinitions().Get(ctx, "taskruns.tekton.dev", metav1.GetOptions{})
if err != nil {
controllerLog.Info("get of taskrun CRD failed with: " + err.Error())
Expand Down Expand Up @@ -104,15 +104,20 @@ func NewManager(cfg *rest.Config, managerOptions ctrl.Options, controllerOptions

ticker := time.NewTicker(time.Hour * 24)
go func() {
for range ticker.C {
taskrun.UpdateHostPools(operatorNamespace, mgr.GetClient(), &controllerLog)
for {
select {
case <-ticker.C:
taskrun.UpdateHostPools(ctx, operatorNamespace, mgr.GetClient(), &controllerLog)
case <-ctx.Done():
return
}
}
}()
timer := time.NewTimer(time.Minute)
go func() {
<-timer.C
//update the nodes on startup
taskrun.UpdateHostPools(operatorNamespace, mgr.GetClient(), &controllerLog)
taskrun.UpdateHostPools(ctx, operatorNamespace, mgr.GetClient(), &controllerLog)
}()

if err := mpcmetrics.AddTaskRunMetricsExporter(mgr); err != nil {
Expand Down
6 changes: 3 additions & 3 deletions pkg/reconciler/taskrun/hostupdate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ var _ = Describe("HostUpdateTaskRunTest", func() {

// when: host pools are updated
log := logr.FromContextOrDiscard(ctx)
UpdateHostPools(testNamespace, k8sClient, &log)
UpdateHostPools(ctx, testNamespace, k8sClient, &log)

// then: no host pool update tasks are created
list := v1.TaskRunList{}
Expand Down Expand Up @@ -150,7 +150,7 @@ var _ = Describe("HostUpdateTaskRunTest", func() {

// when: host pools are updated
log := logr.FromContextOrDiscard(ctx)
UpdateHostPools(testNamespace, k8sClient, &log)
UpdateHostPools(ctx, testNamespace, k8sClient, &log)

// when: spawned threads run to completion
waitGroup.Wait()
Expand Down Expand Up @@ -192,7 +192,7 @@ var _ = Describe("HostUpdateTaskRunTest", func() {

// when: host pools are updated
log := logr.FromContextOrDiscard(ctx)
UpdateHostPools(testNamespace, k8sClient, &log)
UpdateHostPools(ctx, testNamespace, k8sClient, &log)

// test everything in TaskRun creation that is not part of the table testing
Eventually(func(g Gomega) {
Expand Down
6 changes: 3 additions & 3 deletions pkg/reconciler/taskrun/updates.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ import (
)

// UpdateHostPools Run the host update task periodically
func UpdateHostPools(operatorNamespace string, client client.Client, log *logr.Logger) {
func UpdateHostPools(ctx context.Context, operatorNamespace string, client client.Client, log *logr.Logger) {
log.Info("running pooled host update")
cm := v12.ConfigMap{}
err := client.Get(context.Background(), types.NamespacedName{Namespace: operatorNamespace, Name: HostConfig}, &cm)
err := client.Get(ctx, types.NamespacedName{Namespace: operatorNamespace, Name: HostConfig}, &cm)
if err != nil {
log.Error(err, "Failed to read config to update hosts", "audit", "true")
return
Expand Down Expand Up @@ -105,7 +105,7 @@ func UpdateHostPools(operatorNamespace string, client client.Client, log *logr.L
Value: *v1.NewStructuredValues(hostsConcurrency[host.Name]),
},
}
err = client.Create(context.Background(), &provision)
err = client.Create(ctx, &provision)
}()
}
}
Loading