Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 79 additions & 0 deletions attribution/example/kafka/kafka_produce_message_example.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package main

import (
"flag"
"fmt"
"github.com/TencentAd/attribution/attribution/pkg/storage/attribution/kafka"
click2 "github.com/TencentAd/attribution/attribution/proto/click"
"strconv"
"time"
)

func main() {
flag.Parse()

//conversionProducer := kafka.NewAmsKafkaAttributionStore().(*kafka.AmsKafkaAttributionStore)
clickProducer := kafka.NewAmsKafkaClickStore().(*kafka.AmsKafkaClickStore)

index := 1

start := time.Now().Unix()

for index <= 10 {
now := time.Now()
clickLog := &click2.ClickLog{
ClickTime: now.Unix(),
ClickId: strconv.Itoa(index),
AdId: int64(index),
Platform: click2.Platform(index%3 + 1),
}

if err := clickProducer.Store(clickLog); err != nil {
break
}
//// 转化的时间是点击的时间过10秒
//conversionLog := &conv.ConversionLog{
// EventTime: now.Add(10 * time.Second).Unix(),
// AppId: "appId " + strconv.Itoa(index),
// ConvId: "convId" + strconv.Itoa(index),
// Index: int32(index),
// MatchClick: &conv.MatchClick{
// ClickLog: clickLog,
// },
//}
//if err := conversionProducer.Store(conversionLog); err != nil {
// fmt.Println(err)
// fmt.Println("send message to kafka failed")
// break
//}

time.Sleep(2 * time.Second)

index++
}

//for index <= 15 {
// conversionLog := &conv.ConversionLog{
// EventTime: time.Now().Unix(),
// AppId: "appId " + strconv.Itoa(index),
// ConvId: "convId" + strconv.Itoa(index),
// Index: int32(index),
// MatchClick: &conv.MatchClick{
// ClickLog: nil,
// },
// }
// if err := conversionProducer.Store(conversionLog); err != nil {
// fmt.Println(err)
// fmt.Println("send message to kafka failed")
// break
// }
//
// time.Sleep(2 * time.Second)
//
// index++
//}

end := time.Now().Unix()

fmt.Println("cost time: ", end-start)
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package kafka
import (
"context"
"flag"
click2 "github.com/TencentAd/attribution/attribution/proto/click"
"github.com/TencentAd/attribution/attribution/proto/conv"
"github.com/golang/glog"
"github.com/golang/protobuf/proto"
Expand All @@ -16,33 +17,58 @@ var (
Namespace: "attribution",
Subsystem: "",
Name: "ams_conv_kafka_store_count",
Help: "amd conv kafka store count",
Help: "ams conv kafka store count",
},
[]string{"conv_id", "status"},
)

Address = flag.String("kafka_address", "localhost:9092", "kafka address, split with comma")
AttributionTopic = flag.String("attribution_kafka_topic", "attribution_test", "")
ClickTopic = flag.String("click_kafka_topic", "click_test", "")
ConversionTopic = flag.String("conversion_kafka_topic", "conversion_test", "")
AmsClickKafkaStoreCount = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "attribution",
Subsystem: "",
Name: "ams_click_kafka_store_count",
Help: "ams click kafka store count",
},
[]string{"click_id", "status"})

Address = flag.String("kafka_address", "9.134.188.241:9093", "kafka address, split with comma")
AttributionTopic = flag.String("attribution_kafka_topic", "conversion_test", "")
ClickTopic = flag.String("click_kafka_topic", "click_test", "")
ConversionTopic = flag.String("conversion_kafka_topic", "conversion_test", "")
)

func init() {
prometheus.MustRegister(AmsConvKafkaStoreCount)
}

type AmsKafkaAttributionStore struct {
type AmsStoreInterface interface {
Store(message interface{}) error
}

type AmsKafkaStore struct {
writer *kafka.Writer
}

type AmsKafkaClickStore struct {
store *AmsKafkaStore
}

type AmsKafkaAttributionStore struct {
store *AmsKafkaStore
}

type AmsKafkaConversionStore struct {
store *AmsKafkaStore
}

func NewAmsKafkaAttributionStore() interface{} {
writer := &kafka.Writer{
Addr: kafka.TCP(*Address),
Topic: *AttributionTopic,
Balancer: &kafka.Hash{},
}

return &AmsKafkaAttributionStore{writer: writer}
return &AmsKafkaAttributionStore{store: &AmsKafkaStore{writer: writer}}
}

func NewAmsKafkaClickStore() interface{} {
Expand All @@ -52,7 +78,7 @@ func NewAmsKafkaClickStore() interface{} {
Balancer: &kafka.Hash{},
}

return &AmsKafkaAttributionStore{writer: writer}
return &AmsKafkaClickStore{store: &AmsKafkaStore{writer: writer}}
}

func NewAmsKafkaConversionStore() interface{} {
Expand All @@ -62,7 +88,7 @@ func NewAmsKafkaConversionStore() interface{} {
Balancer: &kafka.Hash{},
}

return &AmsKafkaAttributionStore{writer: writer}
return &AmsKafkaConversionStore{store: &AmsKafkaStore{writer: writer}}
}

func (a *AmsKafkaAttributionStore) Store(message interface{}) error {
Expand All @@ -79,15 +105,37 @@ func (a *AmsKafkaAttributionStore) Store(message interface{}) error {
}
AmsConvKafkaStoreCount.WithLabelValues(conversionLog.ConvId, "success").Add(1)
return nil

}

func (a *AmsKafkaAttributionStore) doStore(conv *conv.ConversionLog) error {
value, err := proto.Marshal(conv)
if err != nil {
return err
}
err = a.writer.WriteMessages(context.Background(), kafka.Message{Value: value})
err = a.store.writer.WriteMessages(context.Background(), kafka.Message{Value: value})
if err != nil {
return err
}
return nil
}

func (a *AmsKafkaClickStore) Store(message interface{}) error {
clickLog := message.(*click2.ClickLog)
if err := a.doStore(clickLog); err != nil {
AmsClickKafkaStoreCount.WithLabelValues(clickLog.GetClickId(), "fail").Add(1)
glog.Errorf("fail to store click result, err: %s", err)
return err
}
AmsClickKafkaStoreCount.WithLabelValues(clickLog.ClickId, "success").Add(1)
return nil
}

func (a *AmsKafkaClickStore) doStore(click *click2.ClickLog) error {
value, err := proto.Marshal(click)
if err != nil {
return err
}
err = a.store.writer.WriteMessages(context.Background(), kafka.Message{Value: value})
if err != nil {
return err
}
Expand Down

This file was deleted.

64 changes: 64 additions & 0 deletions datacube/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# README

## 项目的启动

### 项目打包

进入到项目的根目录,使用maven进行打包

```shell
mvn clean install
```

### 修改配置文件

在`datacube/datacube-flatten/src/main/resources/conv-flattened-streaming.conf`文件中修改相关的配置

配置文件中的第一级设置了什么环境,如`test`和`prod`

```json
prod {
flattened_conversion {
log-type: "conversion"
saving-dir: "cosn://attribution-log-test-1257943044/conversion-test/"
checkpoint-dir: "cosn://attribution-log-test-1257943044/checkpoint-test/"
checkpoint-interval: "60000"
stream-config {
type: "kafka"
bootstrap-servers: "9.134.188.241:9093"
group-id: "test-consumer-group"
topic: "conversion_test"
pb-class-name: "com.tencent.attribution.proto.conv.Conv.ConversionLog"
}
stream-sink-config {
bootstrap-servers: "9.134.188.241:9092"
topic: "conversion_sink_test"
}
}
}
```

这里以``prod`环境为例,第二级为`flattened_conversion`,可以根据不同的用途进行命名设置

主要修改的位置如下:

- logtype:需要打平的对象类型, 可以是`click`和`conversion`
- saving-dir:打平之后的日志的存储文件路径
- checkpoint-dir: checkpoint保存的文件路径
- stream-config:数据源的相关配置
- bootstrap-servers:kafka的服务器地址
- topic:consumer的topic
- pb-class-name:需要消费的数据的所属proto类型
- stream-sink-config:日志sink的相关配置
- bootstrap-servers:kafka的服务器地址
- topic:producer的topic

### 任务的执行

使用flink的可视化界面来部署任务并执行

![image-20210408113817724](imgs/image-20210408113817724.png)

这里上传了可执行jar包之后需要填入执行类的全限定名,还有运行参数,根据刚刚修改的配置文件,填入运行环境和运行的场景,比如`prod flattened_conversion`

然后提交任务即可运行任务
Loading