Skip to content

For Mentor Review | Kafka Cron | Berkeli#61

Open
berkeli wants to merge 17 commits intomainfrom
kafka-cron-1
Open

For Mentor Review | Kafka Cron | Berkeli#61
berkeli wants to merge 17 commits intomainfrom
kafka-cron-1

Conversation

@berkeli
Copy link
Owner

@berkeli berkeli commented Dec 12, 2022

Learners, PR Template

Self checklist

  • I have titled my PR with For Mentor Review | {Project Name} | {My Name}
  • I have included a list of changes
  • I have tested my changes
  • I have explained my changes
  • I have asked someone to review my changes

Description

Implementation for kafka cron project

Change list

  • Docker compose with:
    • single zookeper and broker for kafka
    • 1 producer
    • 3 consumers (cluster-a, cluster-b, cluster-a-retries) each reading from their own topic
    • Implementation and tests with mock Producer using Sarama library
    • Prometheus, alerts, grafana setup in docker

Copy link

@illicitonion illicitonion left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks really good! I've only looked at the raw Go code, not the metrics yet - I'll take a look at the metrics stuff soon :)

Comment on lines 87 to 89
<-chDone
log.Println("Interrupt is detected, shutting down gracefully...")
wgWorkers.Wait()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the sequence of events which could cause you to need to wait for on waitgroup here? What would you wrong if you removed it completely?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pretty sure he's waiting for running workers to complete, but it could use a comment alright

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah so I didn't want to terminate the program in case a command was already running.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you talk through what is the series of events which could cause you to terminate the program while a job was running? Bearing in mind that once a single case in a select is picked, it will synchronously run until that case completes.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So my thought process:

  1. We have a consumer(instance of an application) that picks up a job that runs for let's say 10 minutes.
  2. In the 5th minute the consumer receives a signal to terminate (this could be by a user or CI/CD tool that's upgrading/replacing the consumer)
  3. I thought in such case it shouldn't just shut down, and it should wait for the last job to finish.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's a small program which I think models your situation - before you try running it, try to reason through what it will print and do if you run it then send it a SIGINT.

package main

import (
    "fmt"
    "os"
    "os/signal"
    "time"
)

func main() {
    ch := make(chan struct{}, 1)
    signals := make(chan os.Signal, 1)
    signal.Notify(signals, os.Interrupt)
    ch <- struct{}{}

    for {
        select {
        case <-signals:
            fmt.Println("Got signal - terminating 1")
            os.Exit(1)
        case <-ch:
            fmt.Println("Sleeping for 30 seconds then terminating 0")
            time.Sleep(30 * time.Second)
            os.Exit(0)
        }   
    }   
}

Once you've made your prediction, try running it - does it to what you expect?

Compare it with the structure of your actual program - what are the similarities or differences between your control flow and mine?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After doing some testing, it didn't work as expected and I had to add the following option to cmd:

cmd.SysProcAttr = &syscall.SysProcAttr{
		Setpgid: true,
	}

and run it in it's own goroutine.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see what you mean now, for some reason I tunnel visioned inside 1 case of the select and wasn't thinking about the first thing that happens when a signal is received, will have a think about the implementation

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's specifically your goal? To wait for running processes to finish? Or to interrupt them and make sure they die before we do?


func PublishMessages(prod sarama.SyncProducer, msg string, clusters []string) error {
for _, cluster := range clusters {
topic := fmt.Sprintf("%s-%s", *topicPrefix, cluster)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd probably extract a function, or even a type, to do this "%s-%s" joining behaviour, so that you don't need to update it in multiple places if it changes

func InitMonitoring(port int) (Metrics, error) {
http.Handle("/metrics", promhttp.Handler())
go func() {
http.ListenAndServe(fmt.Sprintf(":%d", port), nil)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What will happen if something is already listening on this port?

"github.com/berkeli/kafka-cron/types"
)

func AssertCommandsEqual(t *testing.T, want, got types.Command) error {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Assert" as a prefix to a function name strongly suggests that if things aren't equal, the test will be failed via a call to t.Fatal or t.Error - I'd suggest either making that be the case, or renaming the function.

PublishMessages(prod, wantMsg, clusters)
}

func Test_CommandPublisher(t *testing.T) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you have any tests around the actual repetition aspect of the schedule parsing? i.e. any tests which show that commands will get run more than once?

log.Println(err)
}
wgWorkers.Add(1)
// TODO: Add a worker pool with semaphore? What if jobs are dependent on each other?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there's no affordance in the system for the moment for job dependencies so i wouldn't worry about that

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some concept of a maximum running number of concurrent tasks is probably useful though

for {
select {
case err := <-consumer.Errors():
log.Println(err)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

anywhere you're catching errors consider whether you may want a metric

log.Println("Starting a job for: ", cmd.Description)
//metrics
timer := prometheus.NewTimer(JobDuration.WithLabelValues(*topic, cmd.Description))
defer timer.ObserveDuration()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

recording duration is certainly useful.
so is knowing the age of a job when you dequeue it - how long has it been sitting on the queue?

(you can put a timestamp in on the producer side, which will work fine as long as producer and consumer clocks are reasonably in sync. it's OK to use wall clock for a metric, basing system logic on it isn't great because clocks can differ on different hosts).

<-sigs
}

func CreateTopics(admin sarama.ClusterAdmin, cmds []types.Command) error {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So your approach here is to

  • read all the commands from config file
  • create conceptual clusters and corresponding topics based on the commands

I'd be more inclined to define the known clusters separately in a specific config file, and then treat reference to any unknown cluster as a failure. What would happen if a user misspelled a cluster name in a config? You'd create new topics automagically but you'd have no consumers running for them, presumably.


func ReadConfig(path string) ([]types.Command, error) {
var cmds struct {
Cron []types.Command `json:"cron" yaml:"cron"`

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consider some validation here.
what happens if a user specifies an unknown cluster?
what happens if they specify 100000 retries?

- /etc/prometheus/alert_rules.yml

# A scrape configuration containing exactly one endpoint to scrape:
# Here it's Prometheus itself.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stale comments here

# The job name is added as a label `job=<job_name>` to any timeseries scraped from this config.
- job_name: 'kafka'
metrics_path: '/metrics'
scrape_interval: 5s

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you might as well just use default global scrape interval and set that to 5s

metrics_path: '/metrics'
scrape_interval: 5s
static_configs:
- targets: ['cluster-a:2112', 'cluster-a-2:2112', 'cluster-b:2112', 'cluster-b-2:2112']

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you might like to be able to query your consumer metrics based on clusters and whether or not tasks are retries.
you could add that info to your source metrics, or you could try using prom's relabeling (good practice to try out some relabel configs IMO). https://grafana.com/blog/2022/03/21/how-relabeling-in-prometheus-works/

@@ -0,0 +1,42 @@
## prometheus.yml ##

# global settings

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For any metrics that you are displaying in your dashboards or where you're using them for alerting rules, you should use recording rules: https://prometheus.io/docs/prometheus/latest/configuration/recording_rules/
Otherwise our dashboards can get very slow in a real production context.

You don't have to do it for everything, but please setting up at least one recording rule and using it from a dashboard or alert.

@@ -0,0 +1,9 @@
## grafana_datasources.yml ##

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you should be able to export your dashboards as json for review https://grafana.com/docs/grafana/v9.0/dashboards/export-import/

feat!: add cluster list to config

feat!: add validation to commands provided

fix: general refactoring based on comments
feat: additional tests
* feat!: Update metrics for success/fail/retry
* feat: General refactor
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants