diff --git a/attribution/example/kafka/kafka_produce_message_example.go b/attribution/example/kafka/kafka_produce_message_example.go
new file mode 100644
index 0000000..49fd787
--- /dev/null
+++ b/attribution/example/kafka/kafka_produce_message_example.go
@@ -0,0 +1,79 @@
+package main
+
+import (
+ "flag"
+ "fmt"
+ "github.com/TencentAd/attribution/attribution/pkg/storage/attribution/kafka"
+ click2 "github.com/TencentAd/attribution/attribution/proto/click"
+ "strconv"
+ "time"
+)
+
+func main() {
+ flag.Parse()
+
+ //conversionProducer := kafka.NewAmsKafkaAttributionStore().(*kafka.AmsKafkaAttributionStore)
+ clickProducer := kafka.NewAmsKafkaClickStore().(*kafka.AmsKafkaClickStore)
+
+ index := 1
+
+ start := time.Now().Unix()
+
+ for index <= 10 {
+ now := time.Now()
+ clickLog := &click2.ClickLog{
+ ClickTime: now.Unix(),
+ ClickId: strconv.Itoa(index),
+ AdId: int64(index),
+ Platform: click2.Platform(index%3 + 1),
+ }
+
+ if err := clickProducer.Store(clickLog); err != nil {
+ break
+ }
+ //// 转化的时间是点击的时间过10秒
+ //conversionLog := &conv.ConversionLog{
+ // EventTime: now.Add(10 * time.Second).Unix(),
+ // AppId: "appId " + strconv.Itoa(index),
+ // ConvId: "convId" + strconv.Itoa(index),
+ // Index: int32(index),
+ // MatchClick: &conv.MatchClick{
+ // ClickLog: clickLog,
+ // },
+ //}
+ //if err := conversionProducer.Store(conversionLog); err != nil {
+ // fmt.Println(err)
+ // fmt.Println("send message to kafka failed")
+ // break
+ //}
+
+ time.Sleep(2 * time.Second)
+
+ index++
+ }
+
+ //for index <= 15 {
+ // conversionLog := &conv.ConversionLog{
+ // EventTime: time.Now().Unix(),
+ // AppId: "appId " + strconv.Itoa(index),
+ // ConvId: "convId" + strconv.Itoa(index),
+ // Index: int32(index),
+ // MatchClick: &conv.MatchClick{
+ // ClickLog: nil,
+ // },
+ // }
+ // if err := conversionProducer.Store(conversionLog); err != nil {
+ // fmt.Println(err)
+ // fmt.Println("send message to kafka failed")
+ // break
+ // }
+ //
+ // time.Sleep(2 * time.Second)
+ //
+ // index++
+ //}
+
+ end := time.Now().Unix()
+
+ fmt.Println("cost time: ", end-start)
+}
diff --git a/attribution/pkg/storage/attribution/kafka/kafka_attribution_store.go b/attribution/pkg/storage/attribution/kafka/kafka_attribution_store.go
index e14116b..13026a9 100644
--- a/attribution/pkg/storage/attribution/kafka/kafka_attribution_store.go
+++ b/attribution/pkg/storage/attribution/kafka/kafka_attribution_store.go
@@ -3,6 +3,7 @@ package kafka
import (
"context"
"flag"
+ click2 "github.com/TencentAd/attribution/attribution/proto/click"
"github.com/TencentAd/attribution/attribution/proto/conv"
"github.com/golang/glog"
"github.com/golang/protobuf/proto"
@@ -16,25 +17,50 @@ var (
Namespace: "attribution",
Subsystem: "",
Name: "ams_conv_kafka_store_count",
- Help: "amd conv kafka store count",
+ Help: "ams conv kafka store count",
},
[]string{"conv_id", "status"},
)
- Address = flag.String("kafka_address", "localhost:9092", "kafka address, split with comma")
- AttributionTopic = flag.String("attribution_kafka_topic", "attribution_test", "")
- ClickTopic = flag.String("click_kafka_topic", "click_test", "")
- ConversionTopic = flag.String("conversion_kafka_topic", "conversion_test", "")
+ AmsClickKafkaStoreCount = prometheus.NewCounterVec(
+ prometheus.CounterOpts{
+ Namespace: "attribution",
+ Subsystem: "",
+ Name: "ams_click_kafka_store_count",
+ Help: "ams click kafka store count",
+ },
+ []string{"click_id", "status"})
+
+ Address = flag.String("kafka_address", "9.134.188.241:9093", "kafka address, split with comma")
+ AttributionTopic = flag.String("attribution_kafka_topic", "conversion_test", "")
+ ClickTopic = flag.String("click_kafka_topic", "click_test", "")
+ ConversionTopic = flag.String("conversion_kafka_topic", "conversion_test", "")
)
func init() {
prometheus.MustRegister(AmsConvKafkaStoreCount)
}
-type AmsKafkaAttributionStore struct {
+type AmsStoreInterface interface {
+ Store(message interface{}) error
+}
+
+type AmsKafkaStore struct {
writer *kafka.Writer
}
+type AmsKafkaClickStore struct {
+ store *AmsKafkaStore
+}
+
+type AmsKafkaAttributionStore struct {
+ store *AmsKafkaStore
+}
+
+type AmsKafkaConversionStore struct {
+ store *AmsKafkaStore
+}
+
func NewAmsKafkaAttributionStore() interface{} {
writer := &kafka.Writer{
Addr: kafka.TCP(*Address),
@@ -42,7 +68,7 @@ func NewAmsKafkaAttributionStore() interface{} {
Balancer: &kafka.Hash{},
}
- return &AmsKafkaAttributionStore{writer: writer}
+ return &AmsKafkaAttributionStore{store: &AmsKafkaStore{writer: writer}}
}
func NewAmsKafkaClickStore() interface{} {
@@ -52,7 +78,7 @@ func NewAmsKafkaClickStore() interface{} {
Balancer: &kafka.Hash{},
}
- return &AmsKafkaAttributionStore{writer: writer}
+ return &AmsKafkaClickStore{store: &AmsKafkaStore{writer: writer}}
}
func NewAmsKafkaConversionStore() interface{} {
@@ -62,7 +88,7 @@ func NewAmsKafkaConversionStore() interface{} {
Balancer: &kafka.Hash{},
}
- return &AmsKafkaAttributionStore{writer: writer}
+ return &AmsKafkaConversionStore{store: &AmsKafkaStore{writer: writer}}
}
func (a *AmsKafkaAttributionStore) Store(message interface{}) error {
@@ -79,7 +105,6 @@ func (a *AmsKafkaAttributionStore) Store(message interface{}) error {
}
AmsConvKafkaStoreCount.WithLabelValues(conversionLog.ConvId, "success").Add(1)
return nil
-
}
func (a *AmsKafkaAttributionStore) doStore(conv *conv.ConversionLog) error {
@@ -87,7 +112,30 @@ func (a *AmsKafkaAttributionStore) doStore(conv *conv.ConversionLog) error {
if err != nil {
return err
}
- err = a.writer.WriteMessages(context.Background(), kafka.Message{Value: value})
+ err = a.store.writer.WriteMessages(context.Background(), kafka.Message{Value: value})
+ if err != nil {
+ return err
+ }
+ return nil
+}
+
+func (a *AmsKafkaClickStore) Store(message interface{}) error {
+ clickLog := message.(*click2.ClickLog)
+ if err := a.doStore(clickLog); err != nil {
+ AmsClickKafkaStoreCount.WithLabelValues(clickLog.GetClickId(), "fail").Add(1)
+ glog.Errorf("fail to store click result, err: %s", err)
+ return err
+ }
+ AmsClickKafkaStoreCount.WithLabelValues(clickLog.ClickId, "success").Add(1)
+ return nil
+}
+
+func (a *AmsKafkaClickStore) doStore(click *click2.ClickLog) error {
+ value, err := proto.Marshal(click)
+ if err != nil {
+ return err
+ }
+ err = a.store.writer.WriteMessages(context.Background(), kafka.Message{Value: value})
if err != nil {
return err
}
diff --git a/attribution/pkg/storage/attribution/kafka/kafka_attribution_store_test.go b/attribution/pkg/storage/attribution/kafka/kafka_attribution_store_test.go
deleted file mode 100644
index 4c36f47..0000000
--- a/attribution/pkg/storage/attribution/kafka/kafka_attribution_store_test.go
+++ /dev/null
@@ -1,74 +0,0 @@
-package kafka
-
-import (
- "context"
- "encoding/json"
- "flag"
- "fmt"
- "github.com/TencentAd/attribution/attribution/proto/conv"
- "github.com/golang/protobuf/proto"
- "github.com/segmentio/kafka-go"
- "github.com/stretchr/testify/assert"
- "log"
- "testing"
- "time"
-)
-
-func testNewAmsKafkaAttributionStore(t *testing.T) {
- flag.Parse()
-
- store := NewAmsKafkaAttributionStore().(*AmsKafkaAttributionStore)
-
- c := &conv.ConversionLog{
- UserData: nil,
- EventTime: 0,
- AppId: "test appid!!!",
- ConvId: "test convid!!!",
- CampaignId: 0,
- Index: 0,
- MatchClick: &conv.MatchClick{
- ClickLog: nil,
- MatchIdType: 1,
- },
- OriginalContent: "hello world",
- }
-
- assert.NoError(t, store.Store(c))
-}
-
-func testConsumer(t *testing.T) {
- topic := "flink_result_test"
- partition := 0
-
- conn, err := kafka.DialLeader(context.Background(), "tcp", "localhost:9810", topic, partition)
- if err != nil {
- log.Fatal("failed to dial leader:", err)
- }
-
- _ = conn.SetReadDeadline(time.Now().Add(10 * time.Second))
- batch := conn.ReadBatch(10e3, 1e6) // fetch 10KB min, 1MB max
-
- b := make([]byte, 10e3) // 10KB max per message
- for {
- read, err := batch.Read(b)
- if err != nil {
- assert.Error(t, err)
- break
- }
- conversionLog := &conv.ConversionLog{}
- if err := proto.Unmarshal(b[0:read], conversionLog); err != nil {
- fmt.Println(string(b[0:read]))
- } else {
- value, _ := json.Marshal(conversionLog)
- fmt.Println(string(value))
- }
- }
-
- if err := batch.Close(); err != nil {
- log.Fatal("failed to close batch:", err)
- }
-
- if err := conn.Close(); err != nil {
- log.Fatal("failed to close connection:", err)
- }
-}
diff --git a/datacube/README.md b/datacube/README.md
new file mode 100644
index 0000000..ab6c49f
--- /dev/null
+++ b/datacube/README.md
@@ -0,0 +1,64 @@
+# README
+
+## 项目的启动
+
+### 项目打包
+
+进入到项目的根目录,使用maven进行打包
+
+```shell
+mvn clean install
+```
+
+### 修改配置文件
+
+在`datacube/datacube-flatten/src/main/resources/conv-flattened-streaming.conf`文件中修改相关的配置
+
+配置文件中的第一级设置了什么环境,如`test`和`prod`
+
+```json
+prod {
+ flattened_conversion {
+ log-type: "conversion"
+ saving-dir: "cosn://attribution-log-test-1257943044/conversion-test/"
+ checkpoint-dir: "cosn://attribution-log-test-1257943044/checkpoint-test/"
+ checkpoint-interval: "60000"
+ stream-config {
+ type: "kafka"
+ bootstrap-servers: "9.134.188.241:9093"
+ group-id: "test-consumer-group"
+ topic: "conversion_test"
+ pb-class-name: "com.tencent.attribution.proto.conv.Conv.ConversionLog"
+ }
+ stream-sink-config {
+ bootstrap-servers: "9.134.188.241:9092"
+ topic: "conversion_sink_test"
+ }
+ }
+}
+```
+
+这里以``prod`环境为例,第二级为`flattened_conversion`,可以根据不同的用途进行命名设置
+
+主要修改的位置如下:
+
+- logtype:需要打平的对象类型, 可以是`click`和`conversion`
+- saving-dir:打平之后的日志的存储文件路径
+- checkpoint-dir: checkpoint保存的文件路径
+- stream-config:数据源的相关配置
+ - bootstrap-servers:kafka的服务器地址
+ - topic:consumer的topic
+ - pb-class-name:需要消费的数据的所属proto类型
+- stream-sink-config:日志sink的相关配置
+ - bootstrap-servers:kafka的服务器地址
+ - topic:producer的topic
+
+### 任务的执行
+
+使用flink的可视化界面来部署任务并执行
+
+
+
+这里上传了可执行jar包之后需要填入执行类的全限定名,还有运行参数,根据刚刚修改的配置文件,填入运行环境和运行的场景,比如`prod flattened_conversion`
+
+然后提交任务即可运行任务
diff --git a/datacube/datacube-common/pom.xml b/datacube/datacube-common/pom.xml
index 775add8..fecf9ef 100644
--- a/datacube/datacube-common/pom.xml
+++ b/datacube/datacube-common/pom.xml
@@ -14,7 +14,7 @@
8
8
- 1.11.2
+ 1.12.0
3.6.0
@@ -41,7 +41,7 @@
org.apache.flink
- flink-connector-kafka_2.12
+ flink-connector-kafka_2.11
${flink.version}
@@ -74,7 +74,7 @@
org.apache.avro
avro
- 1.8.2
+ 1.9.2
com.fasterxml.jackson.core
@@ -101,13 +101,13 @@
org.apache.flink
- flink-parquet
- 1.7.1
+ flink-parquet_2.11
+ ${flink.version}
org.apache.parquet
parquet-avro
- 1.10.1
+ 1.11.1
avro
@@ -118,7 +118,7 @@
org.apache.flink
flink-statebackend-rocksdb_2.11
- 1.7.0
+ ${flink.version}
org.apache.hadoop
diff --git a/datacube/datacube-common/src/main/java/com/attribution/datacube/common/factories/record/FlattenedRecordClassFactory.java b/datacube/datacube-common/src/main/java/com/attribution/datacube/common/factories/record/FlattenedRecordClassFactory.java
new file mode 100644
index 0000000..8456bf7
--- /dev/null
+++ b/datacube/datacube-common/src/main/java/com/attribution/datacube/common/factories/record/FlattenedRecordClassFactory.java
@@ -0,0 +1,21 @@
+package com.attribution.datacube.common.factories.record;
+
+import com.attribution.datacube.common.flatten.record.FlattenedClickLog;
+import com.attribution.datacube.common.flatten.record.FlattenedConversionLog;
+import com.attribution.datacube.common.flatten.record.FlattenedRecord;
+
+public class FlattenedRecordClassFactory {
+ public static Class extends FlattenedRecord> getLogClass(String type) throws Exception {
+ switch (type) {
+ case "conversion": {
+ return FlattenedConversionLog.class;
+ }
+ case "click": {
+ return FlattenedClickLog.class;
+ }
+ default: {
+ throw new Exception("no such type");
+ }
+ }
+ }
+}
diff --git a/datacube/datacube-common/src/main/java/com/attribution/datacube/common/flatten/parser/FlattenParserFactory.java b/datacube/datacube-common/src/main/java/com/attribution/datacube/common/flatten/parser/FlattenParserFactory.java
index 29ab2dc..2c7b574 100644
--- a/datacube/datacube-common/src/main/java/com/attribution/datacube/common/flatten/parser/FlattenParserFactory.java
+++ b/datacube/datacube-common/src/main/java/com/attribution/datacube/common/flatten/parser/FlattenParserFactory.java
@@ -8,7 +8,7 @@ public static FlattenParser getFlattenedParser(String type) throws Exception {
case "click": {
return new FlattenedClickLogParser();
}
- case "conversion" : {
+ case "conversion": {
return new FlattenedConversionLogParser();
}
default: {
diff --git a/datacube/datacube-common/src/main/java/com/attribution/datacube/common/flatten/parser/FlattenedClickLogParser.java b/datacube/datacube-common/src/main/java/com/attribution/datacube/common/flatten/parser/FlattenedClickLogParser.java
index 6409790..61deb69 100644
--- a/datacube/datacube-common/src/main/java/com/attribution/datacube/common/flatten/parser/FlattenedClickLogParser.java
+++ b/datacube/datacube-common/src/main/java/com/attribution/datacube/common/flatten/parser/FlattenedClickLogParser.java
@@ -6,6 +6,9 @@
import com.tencent.attribution.proto.click.Click;
import com.tencent.attribution.proto.user.User;
+/**
+ * 将click数据打平的parser,parse方法将Message类的数据打平成FlattenedRecord类的数据
+ */
public class FlattenedClickLogParser extends FlattenParser{
@Override
diff --git a/datacube/datacube-common/src/main/java/com/attribution/datacube/common/flatten/parser/FlattenedConversionLogParser.java b/datacube/datacube-common/src/main/java/com/attribution/datacube/common/flatten/parser/FlattenedConversionLogParser.java
index e044786..c5e0445 100644
--- a/datacube/datacube-common/src/main/java/com/attribution/datacube/common/flatten/parser/FlattenedConversionLogParser.java
+++ b/datacube/datacube-common/src/main/java/com/attribution/datacube/common/flatten/parser/FlattenedConversionLogParser.java
@@ -7,14 +7,16 @@
import com.tencent.attribution.proto.conv.Conv;
import com.tencent.attribution.proto.user.User;
+/**
+ * 将转化数据打平的parser
+ */
public class FlattenedConversionLogParser extends FlattenParser{
@Override
public FlattenedRecord parse(Message message) {
- System.out.println("start to parse message");
Conv.ConversionLog conversionLog = (Conv.ConversionLog)message;
User.UserData userData = conversionLog.getUserData();
Click.ClickLog clickLog = conversionLog.getMatchClick().getClickLog();
- FlattenedConversionLog log = FlattenedConversionLog.builder()
+ return FlattenedConversionLog.builder()
.eventTime(conversionLog.getEventTime())
.appId(conversionLog.getAppId())
.convId(conversionLog.getConvId())
@@ -62,7 +64,5 @@ public FlattenedRecord parse(Message message) {
.billingEvent(clickLog.getBillingEventValue())
.platform(clickLog.getPlatformValue())
.build();
- System.out.println("parse message done");
- return log;
}
}
diff --git a/datacube/datacube-common/src/main/java/com/attribution/datacube/common/flatten/record/FlattenedClickLog.java b/datacube/datacube-common/src/main/java/com/attribution/datacube/common/flatten/record/FlattenedClickLog.java
index 7c6780f..5d08323 100644
--- a/datacube/datacube-common/src/main/java/com/attribution/datacube/common/flatten/record/FlattenedClickLog.java
+++ b/datacube/datacube-common/src/main/java/com/attribution/datacube/common/flatten/record/FlattenedClickLog.java
@@ -185,6 +185,6 @@ public class FlattenedClickLog extends FlattenedRecord{
@Override
public long getEventTime() {
- return 0;
+ return clickTime;
}
}
diff --git a/datacube/datacube-common/src/main/java/com/attribution/datacube/common/flatten/record/FlattenedConversionLog.java b/datacube/datacube-common/src/main/java/com/attribution/datacube/common/flatten/record/FlattenedConversionLog.java
index 04a1cfb..7c2e8ad 100644
--- a/datacube/datacube-common/src/main/java/com/attribution/datacube/common/flatten/record/FlattenedConversionLog.java
+++ b/datacube/datacube-common/src/main/java/com/attribution/datacube/common/flatten/record/FlattenedConversionLog.java
@@ -201,6 +201,6 @@ public class FlattenedConversionLog extends FlattenedRecord {
@Override
public long getEventTime() {
- return getProcessTime();
+ return eventTime;
}
}
diff --git a/datacube/datacube-common/src/test/java/com/attribution/datacube/common/parser/RTAPageviewRequestParserTest.java b/datacube/datacube-common/src/test/java/com/attribution/datacube/common/parser/RTAPageviewRequestParserTest.java
deleted file mode 100644
index 042f7da..0000000
--- a/datacube/datacube-common/src/test/java/com/attribution/datacube/common/parser/RTAPageviewRequestParserTest.java
+++ /dev/null
@@ -1,15 +0,0 @@
-//package com.attribution.datacube.common.parser;
-//
-//import com.attribution.datacube.proto.pageview.PageviewService;
-//import org.junit.Test;
-//
-//public class RTAPageviewRequestParserTest {
-//
-// @Test
-// public void test() {
-// PageviewService.Pageview pageview = PageviewService.Pageview.getDefaultInstance();
-// RTAPageviewRequestParser parser = new RTAPageviewRequestParser();
-// System.out.println(parser.parse(pageview));
-// }
-//
-//}
\ No newline at end of file
diff --git a/datacube/datacube-flatten/pom.xml b/datacube/datacube-flatten/pom.xml
index 2f4b4d8..e33fe73 100644
--- a/datacube/datacube-flatten/pom.xml
+++ b/datacube/datacube-flatten/pom.xml
@@ -23,6 +23,12 @@
datacube-common
1.0-SNAPSHOT
+
+
+ com.fasterxml.jackson.core
+ jackson-core
+ 2.12.1
+
diff --git a/datacube/datacube-flatten/src/main/java/com/attribution/datacube/flatten/BatchLogFlattenJob.java b/datacube/datacube-flatten/src/main/java/com/attribution/datacube/flatten/BatchLogFlattenJob.java
new file mode 100644
index 0000000..2eb5739
--- /dev/null
+++ b/datacube/datacube-flatten/src/main/java/com/attribution/datacube/flatten/BatchLogFlattenJob.java
@@ -0,0 +1,31 @@
+package com.attribution.datacube.flatten;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class BatchLogFlattenJob {
+ private static final Logger LOG = LoggerFactory.getLogger(BatchLogFlattenJob.class);
+
+ public static void main(String[] args) {
+ String env = args[0];
+ String jobName = args[1];
+
+ Config config = ConfigFactory.load("conv-flattened-streaming.conf")
+ .getConfig(env).getConfig(jobName);
+
+ String logType = config.getString("log-type");
+ String savingPath = config.getString("saving-dir");
+
+ Config batchConfig = config.getConfig("batch-config");
+
+ int checkpointInterval = config.getInt("checkpoint-interval");
+ String checkpointDir = config.getString("checkpoint-dir");
+
+ ExecutionEnvironment ee = ExecutionEnvironment.getExecutionEnvironment();
+
+ }
+}
diff --git a/datacube/datacube-flatten/src/main/java/com/attribution/datacube/flatten/LogFlattenJob.java b/datacube/datacube-flatten/src/main/java/com/attribution/datacube/flatten/LogFlattenJob.java
deleted file mode 100644
index a9d9dcc..0000000
--- a/datacube/datacube-flatten/src/main/java/com/attribution/datacube/flatten/LogFlattenJob.java
+++ /dev/null
@@ -1,76 +0,0 @@
-package com.attribution.datacube.flatten;
-
-import com.attribution.datacube.common.factories.datasource.StreamClientFactory;
-import com.attribution.datacube.common.flatten.parser.FlattenParserFactory;
-import com.attribution.datacube.common.flatten.record.FlattenedConversionLog;
-import com.attribution.datacube.common.flatten.record.FlattenedRecord;
-import com.attribution.datacube.flatten.flatMapper.LogFlatMapper;
-import com.google.protobuf.Message;
-import com.tencent.attribution.proto.conv.Conv;
-import com.twitter.chill.protobuf.ProtobufSerializer;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.formats.parquet.avro.ParquetAvroWriters;
-import org.apache.flink.streaming.api.CheckpointingMode;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import org.apache.flink.streaming.api.environment.CheckpointConfig;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class LogFlattenJob {
- private static final Logger LOG = LoggerFactory.getLogger(LogFlattenJob.class);
-
- public static void main(String[] args) throws Exception {
- // todo 这里的逻辑就是消费数据,然后使用一个flatMapper来进行打平
- String env = "test";
- String jobName = "flattened";
-
- // todo 这里的config需要增加配置
- Config config = ConfigFactory.load("conv-flattened-streaming.conf")
- .getConfig(env).getConfig(jobName);
-
- String logType = config.getString("log-type");
- String path = config.getString("saving-dir");
-
- Config streamConfig = config.getConfig("stream-config");
-
- // todo 这里后面需要加入 checkpoint
- int checkpointInterval = config.getInt("checkpoint-interval");
- String checkpointDir = config.getString("checkpoint-dir");
-
- StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
- see.getConfig().registerTypeWithKryoSerializer(Conv.ConversionLog.class, ProtobufSerializer.class);
-
- // 设置checkpoint
-// see.enableCheckpointing(checkpointInterval);
-// CheckpointConfig checkpointConfig = see.getCheckpointConfig();
-// checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
-// checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
-// checkpointConfig.setMinPauseBetweenCheckpoints(500);
-// checkpointConfig.setCheckpointTimeout(60000);
-// checkpointConfig.setMaxConcurrentCheckpoints(1);
-
- DataStreamSource messageDataStreamSource = see
- .addSource(StreamClientFactory.getStreamClient(streamConfig).getSourceFunction());
-
- LOG.info("set datasource done");
-
- // todo 中间的处理逻辑
- SingleOutputStreamOperator flattenedResultStream = messageDataStreamSource
- .flatMap(new LogFlatMapper(FlattenParserFactory.getFlattenedParser(logType)));
-
- LOG.info("flat map done");
-
- StreamingFileSink sink = StreamingFileSink
- .forBulkFormat(new Path(path), ParquetAvroWriters.forReflectRecord(FlattenedRecord.class))
- .build();
- flattenedResultStream.addSink(sink);
-
- see.execute("test kafka");
- }
-}
diff --git a/datacube/datacube-flatten/src/main/java/com/attribution/datacube/flatten/StreamingLogFlattenJob.java b/datacube/datacube-flatten/src/main/java/com/attribution/datacube/flatten/StreamingLogFlattenJob.java
new file mode 100644
index 0000000..a047f52
--- /dev/null
+++ b/datacube/datacube-flatten/src/main/java/com/attribution/datacube/flatten/StreamingLogFlattenJob.java
@@ -0,0 +1,107 @@
+package com.attribution.datacube.flatten;
+
+import com.attribution.datacube.common.factories.datasource.StreamClientFactory;
+import com.attribution.datacube.common.factories.record.FlattenedRecordClassFactory;
+import com.attribution.datacube.common.flatten.parser.FlattenParserFactory;
+import com.attribution.datacube.common.flatten.record.FlattenedRecord;
+import com.attribution.datacube.flatten.flatMapper.LogFlatMapper;
+import com.attribution.datacube.flatten.tool.FlattenedMessageSchema;
+import com.google.protobuf.Message;
+import com.tencent.attribution.proto.conv.Conv;
+import com.twitter.chill.protobuf.ProtobufSerializer;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.parquet.avro.ParquetAvroWriters;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
+import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaSerializationSchemaWrapper;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Properties;
+
+public class StreamingLogFlattenJob {
+ private static final Logger LOG = LoggerFactory.getLogger(StreamingLogFlattenJob.class);
+
+ public static void main(String[] args) throws Exception {
+ String env = args[0];
+ String jobName = args[1];
+
+ // todo 这里的config需要增加配置
+ Config config = ConfigFactory.load("conv-flattened-streaming.conf")
+ .getConfig(env).getConfig(jobName);
+
+ String logType = config.getString("log-type");
+ String savingPath = config.getString("saving-dir");
+
+ Config streamConfig = config.getConfig("stream-config");
+ Config streamSinkConfig = config.getConfig("stream-sink-config");
+
+ int checkpointInterval = config.getInt("checkpoint-interval");
+ String checkpointDir = config.getString("checkpoint-dir");
+
+ StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ // 设置checkpoint
+ see.enableCheckpointing(checkpointInterval);
+ CheckpointConfig checkpointConfig = see.getCheckpointConfig();
+ checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
+ checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+ checkpointConfig.setMinPauseBetweenCheckpoints(10000);
+ checkpointConfig.setCheckpointTimeout(60000);
+ see.setStateBackend(new FsStateBackend(checkpointDir, true));
+
+ see.getConfig().registerTypeWithKryoSerializer(Conv.ConversionLog.class, ProtobufSerializer.class);
+
+ DataStreamSource messageDataStreamSource = see
+ .addSource(StreamClientFactory.getStreamClient(streamConfig).getSourceFunction());
+
+ LOG.info("set datasource done");
+
+ // 中间的处理逻辑
+ SingleOutputStreamOperator flattenedResultStream = messageDataStreamSource
+ .flatMap(new LogFlatMapper(FlattenParserFactory.getFlattenedParser(logType)))
+ .name("flatten map");
+ LOG.info("flat map done");
+
+ // 将数据落到cos存储
+ StreamingFileSink sink = StreamingFileSink
+ .forBulkFormat(
+ new Path(savingPath),
+ ParquetAvroWriters.forReflectRecord(FlattenedRecordClassFactory.getLogClass(logType)))
+ // 这里是设置多长时间函缓存一个文件
+ .withBucketAssigner(new DateTimeBucketAssigner<>("yyyyMMdd"))
+ .build();
+
+ Properties properties = new Properties();
+ properties.setProperty("bootstrap.servers", streamSinkConfig.getString("bootstrap-servers"));
+ // 设置broker的事务最大超时时间为5分钟,小于broker的默认事务超时时间15分钟才能正常工作
+ properties.setProperty("transaction.timeout.ms", 1000 * 60 * 5 + "");
+
+ // 将数据落到kafka存储
+ FlinkKafkaProducer kafkaProducer = new FlinkKafkaProducer<>(
+ streamSinkConfig.getString("topic"), // 目标 topic
+ new KafkaSerializationSchemaWrapper<>(
+ streamSinkConfig.getString("topic"),
+ new FlinkFixedPartitioner<>(),
+ false,
+ new FlattenedMessageSchema()), // 序列化 schema
+ properties, // producer 配置
+ FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);
+
+ flattenedResultStream.addSink(sink).name("sink to file");
+
+ flattenedResultStream.addSink(kafkaProducer).name("sink to kafka");
+
+ see.execute("flatten log");
+ }
+}
diff --git a/datacube/datacube-flatten/src/main/java/com/attribution/datacube/flatten/flatMapper/LogFlatMapper.java b/datacube/datacube-flatten/src/main/java/com/attribution/datacube/flatten/flatMapper/LogFlatMapper.java
index 8421890..aae3d96 100644
--- a/datacube/datacube-flatten/src/main/java/com/attribution/datacube/flatten/flatMapper/LogFlatMapper.java
+++ b/datacube/datacube-flatten/src/main/java/com/attribution/datacube/flatten/flatMapper/LogFlatMapper.java
@@ -18,8 +18,7 @@ public LogFlatMapper(FlattenParser flattenParser) {
@Override
public void flatMap(Message message, Collector collector) {
- System.out.println("get message");
- collector.collect(flattenParser.parse(message));
- System.out.println("collect message done");
+ FlattenedRecord flattenedrecord = flattenParser.parse(message);
+ collector.collect(flattenedrecord);
}
}
diff --git a/datacube/datacube-flatten/src/main/java/com/attribution/datacube/flatten/policy/CustomRollingPolicy.java b/datacube/datacube-flatten/src/main/java/com/attribution/datacube/flatten/policy/CustomRollingPolicy.java
new file mode 100644
index 0000000..01a4c88
--- /dev/null
+++ b/datacube/datacube-flatten/src/main/java/com/attribution/datacube/flatten/policy/CustomRollingPolicy.java
@@ -0,0 +1,20 @@
+package com.attribution.datacube.flatten.policy;
+
+import com.attribution.datacube.common.flatten.record.FlattenedRecord;
+import org.apache.flink.streaming.api.functions.sink.filesystem.PartFileInfo;
+import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.CheckpointRollingPolicy;
+
+import java.io.IOException;
+
+public class CustomRollingPolicy extends CheckpointRollingPolicy {
+
+ @Override
+ public boolean shouldRollOnEvent(PartFileInfo partFileInfo, FlattenedRecord flattenedRecord) throws IOException {
+ return true;
+ }
+
+ @Override
+ public boolean shouldRollOnProcessingTime(PartFileInfo partFileInfo, long l) throws IOException {
+ return true;
+ }
+}
diff --git a/datacube/datacube-flatten/src/main/java/com/attribution/datacube/flatten/tool/FlattenedMessageSchema.java b/datacube/datacube-flatten/src/main/java/com/attribution/datacube/flatten/tool/FlattenedMessageSchema.java
index 5bb038b..7103a24 100644
--- a/datacube/datacube-flatten/src/main/java/com/attribution/datacube/flatten/tool/FlattenedMessageSchema.java
+++ b/datacube/datacube-flatten/src/main/java/com/attribution/datacube/flatten/tool/FlattenedMessageSchema.java
@@ -1,6 +1,8 @@
package com.attribution.datacube.flatten.tool;
import com.attribution.datacube.common.flatten.record.FlattenedRecord;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -28,17 +30,13 @@ public boolean isEndOfStream(FlattenedRecord o) {
@Override
public byte[] serialize(FlattenedRecord o) {
- byte[] bytes = null;
+ ObjectMapper objectMapper = new ObjectMapper();
try {
- ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
- ObjectOutputStream outputStream = new ObjectOutputStream(byteArrayOutputStream);
- outputStream.writeObject(o);
- outputStream.flush();
- bytes = byteArrayOutputStream.toByteArray();
- } catch (IOException e) {
+ return objectMapper.writeValueAsBytes(o);
+ } catch (JsonProcessingException e) {
e.printStackTrace();
}
- return bytes;
+ return null;
}
@Override
diff --git a/datacube/datacube-flatten/src/main/resources/conv-flattened-streaming.conf b/datacube/datacube-flatten/src/main/resources/conv-flattened-streaming.conf
index 0e94f78..24668fa 100644
--- a/datacube/datacube-flatten/src/main/resources/conv-flattened-streaming.conf
+++ b/datacube/datacube-flatten/src/main/resources/conv-flattened-streaming.conf
@@ -1,28 +1,114 @@
test {
- flattened {
- log-type = "conversion"
- saving-dir = "file:////Users/zelfaliu/tmp/attribution-log"
- checkpoint-dir = "cosn://attribution-log-test-1257943044/checkpoint-test"
- checkpoint-interval = "1000"
+ flattened_conversion {
+ log-type: "conversion"
+ saving-dir: "cosn://attribution-log-test-1257943044/conversion-test/"
+ checkpoint-dir: "cosn://attribution-log-test-1257943044/checkpoint-test/"
+ checkpoint-interval: "60000"
stream-config {
- type = "kafka"
- bootstrap-servers = "localhost:9092"
- group-id = "test-consumer-group"
- topic = "attribution_test"
- pb-class-name = "com.tencent.attribution.proto.conv.Conv.ConversionLog"
+ type: "kafka"
+ bootstrap-servers: "9.134.188.241:9093"
+ group-id: "test-consumer-group"
+ topic: "conversion_test"
+ pb-class-name: "com.tencent.attribution.proto.conv.Conv.ConversionLog"
+ }
+ stream-sink-config {
+ bootstrap-servers: "9.134.188.241:9092"
+ topic: "conversion_sink_test"
+ }
+ s3-config {
+ type: "tencent"
+ }
+ }
+
+ flattened_click {
+ log-type: "click"
+ saving-dir: "cosn://attribution-log-test-1257943044/click-test/"
+ checkpoint-dir: "cosn://attribution-log-test-1257943044/checkpoint-test/"
+ checkpoint-interval: "60000"
+ stream-config {
+ type: "kafka"
+ bootstrap-servers: "9.134.188.241:9093"
+ group-id: "test-consumer-group"
+ topic: "click_test"
+ pb-class-name: "com.tencent.attribution.proto.click.Click.ClickLog"
+ }
+ stream-sink-config {
+ bootstrap-servers: "9.134.188.241:9092"
+ topic: "click_sink_test"
+ }
+ s3-config {
+ type: "tencent"
+ }
+ }
+
+ flattened_hdfs {
+ log-type: "conversion"
+ saving-dir: "cosn://attribution-log-test-1257943044/attribution-test/"
+ checkpoint-dir: "hdfs://9.134.188.241:9000/checkpoint-test"
+ checkpoint-interval: "60000"
+ stream-config {
+ type: "kafka"
+ bootstrap-servers: "10.43.32.128:9092"
+ group-id: "test-consumer-group"
+ topic: "attribution_test"
+ pb-class-name: "com.tencent.attribution.proto.conv.Conv.ConversionLog"
}
s3-config {
- type = "tencent"
+ type: "tencent"
}
}
- string {
+ flattened_local {
+ log-type: "conversion"
+ saving-dir: "file:///Users/zelfaliu/tmp/attribution-test/"
+ checkpoint-dir: "file:///Users/zelfaliu/tmp/checkpoint-test/"
+ checkpoint-interval: "10000"
stream-config {
- type = "kafka"
- bootstrap-servers = "localhost:9810"
- group-id = "test-consumer-group"
- topic = "string_test"
- pb-class-name = "com.attribution.datacube.proto.conv.Conv.ConversionLog"
+ type: "kafka"
+ bootstrap-servers: "10.43.32.128:9092"
+ group-id: "test-consumer-group"
+ topic: "attribution_test"
+ pb-class-name: "com.tencent.attribution.proto.conv.Conv.ConversionLog"
+ }
+ s3-config {
+ type: "tencent"
+ }
+ }
+
+ flattened_mix {
+ log-type: "conversion"
+ saving-dir: "cosn://attribution-log-test-1257943044/attribution-test/"
+ checkpoint-dir: "file:///Users/zelfaliu/tmp/checkpoint-test/"
+ checkpoint-interval: "10000"
+ stream-config {
+ type: "kafka"
+ bootstrap-servers: "10.43.32.128:9092"
+ group-id: "test-consumer-group"
+ topic: "attribution_test"
+ pb-class-name: "com.tencent.attribution.proto.conv.Conv.ConversionLog"
+ }
+ s3-config {
+ type: "tencent"
+ }
+ }
+}
+
+prod {
+ flattened_conversion {
+ log-type: "conversion"
+ saving-dir: "cosn://attribution-log-test-1257943044/conversion-test/"
+ checkpoint-dir: "cosn://attribution-log-test-1257943044/checkpoint-test/"
+ checkpoint-interval: "60000"
+ stream-config {
+ type: "kafka"
+ bootstrap-servers: "9.134.188.241:9093"
+ group-id: "test-consumer-group"
+ topic: "conversion_test"
+ pb-class-name: "com.tencent.attribution.proto.conv.Conv.ConversionLog"
+ }
+ stream-sink-config {
+ bootstrap-servers: "9.134.188.241:9092"
+ topic: "conversion_sink_test"
}
}
}
\ No newline at end of file
diff --git a/datacube/datacube-join/pom.xml b/datacube/datacube-join/pom.xml
deleted file mode 100644
index 05f0359..0000000
--- a/datacube/datacube-join/pom.xml
+++ /dev/null
@@ -1,32 +0,0 @@
-
-
-
- attribution-datacube
- com.attribution.datacube
- 1.0-SNAPSHOT
-
- 4.0.0
-
- datacube-join
-
-
- 8
- 8
-
-
-
-
- com.attribution.datacube
- datacube-common
- 1.0-SNAPSHOT
-
-
- com.attribution.datacube
- datacube-proto
- 1.0-SNAPSHOT
-
-
-
-
\ No newline at end of file
diff --git a/datacube/datacube-join/src/main/java/com/attribution/datacube/join/JoinJob.java b/datacube/datacube-join/src/main/java/com/attribution/datacube/join/JoinJob.java
deleted file mode 100644
index 3d09444..0000000
--- a/datacube/datacube-join/src/main/java/com/attribution/datacube/join/JoinJob.java
+++ /dev/null
@@ -1,71 +0,0 @@
-//package com.attribution.datacube.join;
-//
-//
-//import com.google.protobuf.Message;
-//import com.twitter.chill.protobuf.ProtobufSerializer;
-//import com.typesafe.config.Config;
-//import com.typesafe.config.ConfigFactory;
-//import com.attribution.datacube.common.factories.datasource.StreamClient;
-//import com.attribution.datacube.common.factories.datasource.StreamClientFactory;
-//import com.attribution.datacube.join.process.JoinProcessFunction;
-//import com.attribution.datacube.proto.pageview.PageviewService;
-//import org.apache.flink.api.common.functions.MapFunction;
-//import org.apache.flink.api.java.functions.KeySelector;
-//import org.apache.flink.api.java.tuple.Tuple2;
-//import org.apache.flink.streaming.api.datastream.DataStream;
-//import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-//import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-//
-//public class JoinJob {
-// public static void main(String[] args) throws Exception {
-// String env = args[0];
-// String jobName = args[1];
-//
-// // todo 这里的config需要增加配置
-// Config config = ConfigFactory.load("rta-streaming.conf")
-// .getConfig(env).getConfig(jobName);
-// int checkpointInterval = config.getInt("checkpoint-interval");
-// String checkpointDir = config.getString("checkpoint-dir");
-//
-// StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
-// see.getConfig().registerTypeWithKryoSerializer(PageviewService.Pageview.class, ProtobufSerializer.class);
-//
-// StreamClient requestStreamClient = StreamClientFactory.getStreamClient(config.getConfig("request").getConfig("stream-config"));
-// StreamClient strategyStreamClient = StreamClientFactory.getStreamClient(config.getConfig("strategy").getConfig("stream-config"));
-//
-// DataStream> requestDataStream = see
-// .addSource(requestStreamClient.getSourceFunction())
-// .name("join_request")
-// .uid("join_request")
-// .map(new MapFunction>() {
-// @Override
-// public Tuple2 map(Message message) throws Exception {
-// return Tuple2.of("request", (PageviewService.Pageview) message);
-// }
-// });
-//
-// DataStream> strategyDataStream = see
-// .addSource(strategyStreamClient.getSourceFunction())
-// .name("join_strategy")
-// .uid("join_strategy")
-// .map(new MapFunction>() {
-// @Override
-// public Tuple2 map(Message message) throws Exception {
-// return Tuple2.of("strategy", (PageviewService.Pageview) message);
-// }
-// });
-//
-// // 这里是将两个数据流结合起来进行join操作
-// SingleOutputStreamOperator> enrichedRequestDataStream =
-// (requestDataStream.union(strategyDataStream))
-// .keyBy(new KeySelector, String>() {
-// @Override
-// public String getKey(Tuple2 value) throws Exception {
-// return value.f1.getTraceId();
-// }
-// })
-// .process(new JoinProcessFunction()).name("process_function").uid("process_function");
-//
-// see.execute("test kafka");
-// }
-//}
diff --git a/datacube/datacube-join/src/main/java/com/attribution/datacube/join/process/JoinProcessFunction.java b/datacube/datacube-join/src/main/java/com/attribution/datacube/join/process/JoinProcessFunction.java
deleted file mode 100644
index 84e19c1..0000000
--- a/datacube/datacube-join/src/main/java/com/attribution/datacube/join/process/JoinProcessFunction.java
+++ /dev/null
@@ -1,148 +0,0 @@
-//package com.attribution.datacube.join.process;
-//
-//import com.attribution.datacube.proto.pageview.PageviewService;
-//import org.apache.flink.api.common.state.ListState;
-//import org.apache.flink.api.common.state.ListStateDescriptor;
-//import org.apache.flink.api.common.state.ValueState;
-//import org.apache.flink.api.common.state.ValueStateDescriptor;
-//import org.apache.flink.api.java.tuple.Tuple;
-//import org.apache.flink.api.java.tuple.Tuple2;
-//import org.apache.flink.configuration.Configuration;
-//import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
-//import org.apache.flink.util.Collector;
-//
-//import java.util.Iterator;
-//
-//public class JoinProcessFunction
-// extends KeyedProcessFunction, Tuple2> {
-// private transient ValueState requestState;
-// private transient ListState strategyState;
-// private transient ListState impressionState;
-//
-// @Override
-// public void open(Configuration parameters) throws Exception {
-// ValueStateDescriptor requestStateDescriptor =
-// new ValueStateDescriptor<>("request state", PageviewService.Pageview.class);
-// ListStateDescriptor strategyStateDescriptor =
-// new ListStateDescriptor("strategy state", PageviewService.Pageview.class);
-// ListStateDescriptor impressionStateDescriptor =
-// new ListStateDescriptor("impression statae", PageviewService.Pageview.class);
-// requestState = getRuntimeContext().getState(requestStateDescriptor);
-// strategyState = getRuntimeContext().getListState(strategyStateDescriptor);
-// impressionState = getRuntimeContext().getListState(impressionStateDescriptor);
-//
-// // todo 这里可能后期添加prometheus监控
-// }
-//
-// @Override
-// public void processElement(
-// Tuple2 value,
-// Context ctx,
-// Collector> out)
-// throws Exception {
-// // todo 这里实现join的逻辑
-// String dataSource = value.f0;
-// PageviewService.Pageview message = value.f1;
-//
-// // TODO 添加用户画像
-//
-// // 这里添加策略的数据
-// if ("request".equals(dataSource)) {
-// PageviewService.Pageview request = PageviewService.Pageview.newBuilder()
-// .setProcessTime(message.getProcessTime())
-// .setTraceId(message.getTraceId())
-// .setDevice(message.getDevice())
-// .setFromPlatform(message.getFromPlatform())
-// .setInnerCostMs(message.getInnerCostMs())
-// .build();
-//
-// requestState.update(request);
-//
-// // 设置时间窗口为30分钟
-// // TODO 这里的30分钟应该添加到配置
-// ctx.timerService().registerEventTimeTimer(request.getProcessTime() * 1000L + 30 * 60 * 1000);
-//
-// Iterable strategies = strategyState.get();
-// if (strategies != null) {
-// Iterator iterator = strategies.iterator();
-// while (iterator.hasNext()) {
-// PageviewService.Pageview strategy = iterator.next();
-// out.collect(Tuple2.of("strategy", paddingData(request, strategy, "strategy")));
-// }
-// }
-// strategyState.clear();
-// }
-//
-// if ("strategy".equals(dataSource)) {
-// PageviewService.Pageview request = requestState.value();
-// if (request == null) {
-// strategyState.add(message);
-// ctx.timerService().registerEventTimeTimer(message.getProcessTime() * 1000L + 30 * 60 * 1000);
-// } else {
-// out.collect(Tuple2.of("strategy", paddingData(request, message, "strategy")));
-// }
-// }
-// // TODO 后续还有imp需要处理
-// if ("impression".equals(dataSource)) {
-// PageviewService.Pageview request = requestState.value();
-// if (request == null) {
-// impressionState.add(message);
-// ctx.timerService().registerEventTimeTimer(message.getProcessTime() * 1000L + 30 * 60 * 1000);
-// } else {
-// out.collect(Tuple2.of("impression", paddingData(request, message, "impression")));
-// }
-// }
-// }
-//
-// /**
-// * 定时器,负责清理状态
-// *
-// * @param timestamp
-// * @param ctx
-// * @param out
-// * @throws Exception
-// */
-// @Override
-// public void onTimer(long timestamp, OnTimerContext ctx, Collector> out) throws Exception {
-// requestState.clear();
-// Iterable strategies = strategyState.get();
-// for (PageviewService.Pageview strategy : strategies) {
-// out.collect(Tuple2.of("strategy_empty", strategy));
-// }
-// strategyState.clear();
-// }
-//
-// /**
-// * 这个方法是将数据(比如策略、曝光、点击等)追加到请求的数据后面
-// *
-// * @param request
-// * @param paddingItem
-// * @return
-// */
-// private PageviewService.Pageview paddingData(PageviewService.Pageview request, PageviewService.Pageview paddingItem, String type) throws Exception {
-// PageviewService.Pageview.Builder builder = PageviewService.Pageview.newBuilder();
-// builder.setProcessTime(request.getProcessTime())
-// .setTraceId(request.getTraceId())
-// .setDevice(request.getDevice())
-// .setFromPlatform(request.getFromPlatform())
-// .setInnerCostMs(request.getInnerCostMs());
-//
-// switch (type) {
-// case "strategy": {
-// PageviewService.Pageview result = builder
-// .addAllHitStrategy(paddingItem.getHitStrategyList())
-// .build();
-// return result;
-// }
-// case "impression": {
-// PageviewService.Pageview result = builder
-// .addAllImp(paddingItem.getImpList())
-// .build();
-// return result;
-// }
-// default: {
-// throw new Exception("no such type");
-// }
-// }
-// }
-//}
diff --git a/datacube/datacube-proto/pom.xml b/datacube/datacube-proto/pom.xml
index 1fb87f3..6a0df4b 100644
--- a/datacube/datacube-proto/pom.xml
+++ b/datacube/datacube-proto/pom.xml
@@ -14,7 +14,7 @@
8
8
- 1.11.2
+ 1.12.0
3.6.0
1.18.0
diff --git a/datacube/datacube-service/pom.xml b/datacube/datacube-service/pom.xml
deleted file mode 100644
index 11eaf8e..0000000
--- a/datacube/datacube-service/pom.xml
+++ /dev/null
@@ -1,27 +0,0 @@
-
-
-
- attribution-datacube
- com.attribution.datacube
- 1.0-SNAPSHOT
-
- 4.0.0
-
- datacube-service
-
-
- 8
- 8
-
-
-
-
- com.attribution.datacube
- datacube-common
- 1.0-SNAPSHOT
-
-
-
-
\ No newline at end of file
diff --git a/datacube/datacube-service/src/main/java/com/attribution/datacube/service/RTAStreamingProcess.java b/datacube/datacube-service/src/main/java/com/attribution/datacube/service/RTAStreamingProcess.java
deleted file mode 100644
index 48bd46d..0000000
--- a/datacube/datacube-service/src/main/java/com/attribution/datacube/service/RTAStreamingProcess.java
+++ /dev/null
@@ -1,34 +0,0 @@
-//package com.attribution.datacube.service;
-//
-//import com.google.protobuf.Message;
-//import com.twitter.chill.protobuf.ProtobufSerializer;
-//import com.typesafe.config.Config;
-//import com.typesafe.config.ConfigFactory;
-//import com.attribution.datacube.common.factories.datasource.StreamClientFactory;
-//import com.attribution.datacube.proto.pageview.PageviewService;
-//import org.apache.flink.streaming.api.datastream.DataStreamSource;
-//import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-//
-//public class RTAStreamingProcess {
-// public static void main(String[] args) throws Exception {
-// String env = args[0];
-// String jobName = args[1];
-//
-// // todo 这里的config需要增加配置
-// Config config = ConfigFactory.load("rta-streaming.conf")
-// .getConfig(env).getConfig(jobName);
-// int checkpointInterval = config.getInt("checkpoint-interval");
-// String checkpointDir = config.getString("checkpoint-dir");
-//
-// StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
-// see.getConfig().registerTypeWithKryoSerializer(PageviewService.Pageview.class, ProtobufSerializer.class);
-//
-// DataStreamSource messageDataStreamSource = see
-// .addSource(StreamClientFactory.getStreamClient(config).getSourceFunction());
-//
-// // todo 中间的处理逻辑
-// messageDataStreamSource.print();
-//
-// see.execute("test kafka");
-// }
-//}
diff --git a/datacube/datacube-service/src/main/resources/rta-streaming.conf b/datacube/datacube-service/src/main/resources/rta-streaming.conf
deleted file mode 100644
index 8ab77b8..0000000
--- a/datacube/datacube-service/src/main/resources/rta-streaming.conf
+++ /dev/null
@@ -1,11 +0,0 @@
-test {
- streaming-rta {
- type = "kafka"
- pb-class-name = "com.attribution.datacube.proto.pageview.PageviewService.Pageview"
- checkpoint-interval = "60000"
- checkpoint-dir = ""
- topic = ""
- group-id = ""
- bootstrap-servers = ""
- }
-}
\ No newline at end of file
diff --git a/datacube/imgs/image-20210408113817724.png b/datacube/imgs/image-20210408113817724.png
new file mode 100644
index 0000000..5a1c16f
Binary files /dev/null and b/datacube/imgs/image-20210408113817724.png differ
diff --git a/datacube/pom.xml b/datacube/pom.xml
index b0ae845..281fcb2 100644
--- a/datacube/pom.xml
+++ b/datacube/pom.xml
@@ -11,8 +11,6 @@
datacube-common
datacube-proto
- datacube-service
- datacube-join
datacube-flatten
datacube-attribution