Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
FROM ubuntu:18.04

RUN apt-get update && apt-get install -y git make g++

# Installing Confluent's Golang Client for Apache KafkaTM
RUN \
git clone https://github.com/edenhill/librdkafka.git && \
cd librdkafka && \
./configure --prefix /usr && \
make && \
make install

# Install go client
RUN apt-get install -y golang-go
RUN go get -u github.com/confluentinc/confluent-kafka-go/kafka

# Install golang_helloooo
RUN \
git clone https://github.com/ggdupont/golang_helloooo.git && \
cd golang_helloooo && \
go build && \
./golang_helloooo
83 changes: 82 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1 +1,82 @@
`go run testserver4g.go`
# Golan go

From basic HTTP server to gateway for kafka.

## Work

### DONE

- [x] launch the http server
- [x] explore request headers
- [x] launch kafka
- [x] create topic, produce/consume messages
- [x] make the server produces message for each incoming request

### IN PROGRESS

- [ ] package in docker + docker-compose (?)
- [ ] explore request content
- [ ] define a data model to handle the request data

### TODO

- [ ] test on kafka cluster
- [ ] optimize kafka connectio creation
- [ ] check error handling

## Launch kafka

### From source

https://kafka.apache.org/quickstart

### With docker-compose in cluster mode

https://github.com/simplesteph/kafka-stack-docker-compose

```shell
docker-compose -f zk-single-kafka-single.yml up
```

## Run the server

### Installing Confluent's Golang Client for Apache KafkaTM

From https://github.com/confluentinc/confluent-kafka-go

1) Install librdkafka:

``` shell
git clone https://github.com/edenhill/librdkafka.git
cd librdkafka
./configure --prefix /usr
make
sudo make install
```
2) Install go client

```shell
go get -u github.com/confluentinc/confluent-kafka-go/kafka
```

### Run baby run!

```shell
cd golang_helloooo
go build
./golang_helloooo
```

### Launch the consumer

```shell
cd golang_helloooo/cons
go build
./cons
```

## Query the HTTP server

```shell
curl --header "Content-Type: application/json" --request POST --data '{"message":"everything is awesome"}' 'http://localhost:8080/'
```
34 changes: 34 additions & 0 deletions cons/consumerTest.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package main

import (
"fmt"

"github.com/confluentinc/confluent-kafka-go/kafka"
)

func main() {

c, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": "localhost:9092",
"group.id": "myGroup",
"auto.offset.reset": "earliest",
})

if err != nil {
panic(err)
}

c.SubscribeTopics([]string{"incoming_req", "^aRegex.*[Tt]opic"}, nil)

for {
msg, err := c.ReadMessage(-1)
if err == nil {
fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value))
} else {
fmt.Printf("Consumer error: %v (%v)\n", err, msg)
break
}
}

c.Close()
}
3 changes: 3 additions & 0 deletions dummyRequest.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"message" : "everything is awesome!"
}
52 changes: 51 additions & 1 deletion testserver4g.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,16 @@ package main

import (
"encoding/json"
"fmt"
"log"
"net/http"
"net/url"

"github.com/confluentinc/confluent-kafka-go/kafka"
)

func main() {
// start HTTP server
log.Print("Starting test server for G")
http.HandleFunc("/", index)
log.Fatal(http.ListenAndServe(":8080", nil))
Expand All @@ -20,20 +25,65 @@ type response struct {
Ok bool `json:"ok"`
}

type click struct {
referer string
URL *url.URL
datetime int64
}

func index(w http.ResponseWriter, r *http.Request) {

w.Header().Set("Content-Type", "application/json")
decoder := json.NewDecoder(r.Body)
var request request
if err := decoder.Decode(&request); err != nil {
http.Error(w, `{"error_message": "internal error"}`, http.StatusBadRequest)
http.Error(w, `{"error_message": "internal error - json request expected"}`, http.StatusBadRequest)
return
}

// TODO process request and push to topic
log.Printf("host: %v, referer: %v", r.Host, r.Referer())
log.Printf("language: %v", request.Language)
log.Printf("headers:")
for i := range r.Header {
log.Printf("\t%v => %v", i, r.Header.Get(i))
}

log.Printf("body:")

// initiate kafka connection
p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost:9092"})
if err != nil {
panic(err)
}
defer p.Close()

// push to topic
topic := "incoming_req"

p.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
Value: []byte(`"referer": "` + r.Host + `"`),
}, nil)

// Delivery report handler for produced messages
go func() {
for e := range p.Events() {
switch ev := e.(type) {
case *kafka.Message:
if ev.TopicPartition.Error != nil {
fmt.Printf("Delivery failed: %v\n", ev.TopicPartition)
} else {
fmt.Printf("Delivered message to %v\n", ev.TopicPartition)
}
}
}
}()

// Wait for message deliveries
p.Flush(15 * 1000)

// write http response
payload, err := json.Marshal(response{
Ok: true,
})
Expand Down