Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,10 @@ bin/
.vscode/

### Mac OS ###
.DS_Store
.DS_Store

# resources 안의 certs 폴더 무시
/src/main/resources/certs/

# resources 안의 application.properties 무시
/src/main/resources/application.properties
Empty file.
11 changes: 9 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,22 @@ plugins {
group = 'org.example'
version = '1.0-SNAPSHOT'

configurations {
compileOnly {
extendsFrom annotationProcessor
}
}

repositories {
mavenCentral()
}

dependencies {
testImplementation platform('org.junit:junit-bom:5.10.0')
testImplementation 'org.junit.jupiter:junit-jupiter'
compileOnly("org.projectlombok:lombok:1.18.38")
implementation 'org.slf4j:slf4j-api:1.7.32'
compileOnly 'org.projectlombok:lombok:1.18.30'
annotationProcessor 'org.projectlombok:lombok:1.18.30'

implementation 'ch.qos.logback:logback-classic:1.2.6'

// flink
Expand Down
30 changes: 6 additions & 24 deletions src/main/java/com/monitory/data/FlinkApplication.java
Original file line number Diff line number Diff line change
@@ -1,46 +1,28 @@
package com.monitory.data;

import com.monitory.data.sources.MqttSource;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction;

import java.util.Arrays;
import java.util.List;

public class FlinkApplication {
public static void main (String [] args) throws Exception {
// 1. Flink 환경 설정
Configuration conf = new Configuration();
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);

// 2. 커스텀 소스 사용 (예시로 간단한 데이터 소스)
DataStream<String> sourceStream = env.addSource(new SourceFunction<String>() {
private boolean running = true;

@Override
public void run(SourceContext<String> ctx) throws Exception {
List<String> data = Arrays.asList("Hello", "Flink", "Stream");
for (String item : data) {
if (!running) {
return;
}
ctx.collect(item); // 데이터 한 항목씩 전달
}
}
// 2. 데이터 소스
DataStream<String> sourceStream = env.fromSource(new MqttSource(), WatermarkStrategy.noWatermarks(), "MQTT-Source");

@Override
public void cancel() {
running = false; // 소스 중지
}
});
// 3. 데이터 처리: 단순하게 문자열을 대문자로 변환하는 예시
DataStream<String> transformedStream = sourceStream
.map(new MapFunction<String, String>() {
@Override
public String map(String value) throws Exception {
Thread.sleep(2000000);
// Thread.sleep(2000000);
System.out.println("💡 received: " + value);
return value.toUpperCase();
}
});
Expand Down
24 changes: 24 additions & 0 deletions src/main/java/com/monitory/data/config/MqttConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package com.monitory.data.config;

import java.io.InputStream;
import java.util.Properties;

public class MqttConfig {
private static final Properties properties = new Properties();

static {
try (InputStream input = MqttConfig.class.getClassLoader()
.getResourceAsStream("application.properties")) {
if (input == null) {
throw new RuntimeException("❌ application.properties 파일을 찾을 수 없습니다.");
}
properties.load(input);
} catch (Exception e) {
throw new RuntimeException("❌ properties 파일 로딩 실패", e);
}
}

public static String get(String key) {
return properties.getProperty(key);
}
}
164 changes: 164 additions & 0 deletions src/main/java/com/monitory/data/sources/MqttReader.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
package com.monitory.data.sources;

import com.monitory.data.config.MqttConfig;
import com.monitory.data.utils.SslUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.core.io.InputStatus;
import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttMessage;

import javax.net.ssl.SSLSocketFactory;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;

/**
* MQTT 메시지를 읽어오는 SourceReader
* - Flink에서 데이터를 가져오는 역할
*/
@Slf4j
public class MqttReader implements SourceReader <String, MqttSplit> {
// Flink에서 SourceReader와 상호작용하는 데 필요한 context
private final SourceReaderContext sourceReaderContext;
// MQTT로 읽은 메시지를 임시로 저장할 큐
private final Queue<String> messageQueue = new ConcurrentLinkedQueue<>();
// 메시지를 계속 읽어오게 할지 여부
private volatile boolean running = true;
private MqttClient client;

IMqttMessageListener mqttMessageListener = new IMqttMessageListener() {
@Override
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
messageQueue.add(new String(mqttMessage.getPayload()));
}
};

public MqttReader(SourceReaderContext sourceReaderContext) {
this.sourceReaderContext = sourceReaderContext;
}
/**
* SourceReader 시작 시 호출됨
* MQTT 클라이언트 연결 및 구독 등을 처리할 수 있음
* (현재는 비어 있음)
*/
@Override
public void start() {
try {
String broker = MqttConfig.get("AWS_IOT_BROKER");
String topic = MqttConfig.get("AWS_IOT_TOPIC");
String clientId = MqttConfig.get("AWS_IOT_CLIENT_ID");

SslUtil sslUtil = new SslUtil();
SSLSocketFactory temp = sslUtil.getSocketFactory(
MqttConfig.get("AWS_IOT_CA_PEM_PATH"),
MqttConfig.get("AWS_IOT_CERT_PATH"),
MqttConfig.get("AWS_IOT_PRIVATE_KEY_PATH")
);
MqttConnectOptions options = new MqttConnectOptions();
options.setSocketFactory(temp);
options.setCleanSession(true);
options.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1);
options.setAutomaticReconnect(true);
options.setConnectionTimeout(60);

client = new MqttClient(broker, clientId);
log.info("⭐️ Connecting to MQTT broker: {}", broker);
client.setCallback(new org.eclipse.paho.client.mqttv3.MqttCallbackExtended() {
@Override
public void connectComplete(boolean reconnect, String serverURI) {
log.info("✅ MQTT 연결 완료: reconnect={}, serverURI={}", reconnect, serverURI);
try {
if (client.isConnected()) {
client.subscribe(topic, mqttMessageListener);
log.info("📡 MQTT subscribe 완료: topic = {}", topic);
} else {
log.warn("⚠️ MQTT 클라이언트가 연결되지 않았습니다.");
}
} catch (Exception e) {
log.error("❌ MQTT subscribe 실패", e);
}
}

@Override
public void connectionLost(Throwable cause) {
log.warn("⚠️ MQTT 연결 끊김", cause);
}

@Override
public void messageArrived(String topic, MqttMessage message) {
messageQueue.add(new String(message.getPayload()));
}

@Override
public void deliveryComplete(org.eclipse.paho.client.mqttv3.IMqttDeliveryToken token) {}
});

client.connect(options);
if (!client.isConnected()) {
log.error("❌ MQTT 연결 실패: 연결이 되지 않았습니다.");
throw new RuntimeException("MQTT 연결 실패");
} else {
log.info("✅ MQTT 연결 성공");
}

} catch (Exception e) {
// log.error("❌ MQTT Connection setup 중 예외 발생", e);
}
}
/**
* 큐에서 메시지를 하나씩 꺼내서 Flink에 전달
* - 메시지가 있으면 데이터를 전달하고, 계속 받을 수 있음을 알림
* - 메시지가 없으면 더 이상 데이터를 받을 수 없다고 알림
*/
@Override
public InputStatus pollNext(ReaderOutput<String> output) {
String msg = messageQueue.poll();
if (msg != null) {
output.collect(msg);
return InputStatus.MORE_AVAILABLE;
}
return InputStatus.NOTHING_AVAILABLE;
}
/**
* 체크포인트 상태를 스냅샷으로 저장하는 메소드
* - 현재 상태가 없으므로 빈 리스트 반환
*/
@Override
public List<MqttSplit> snapshotState(long l) {
return List.of();
}
/**
* 메시지가 수신될 때까지 대기
* - 여기서는 구현되지 않음
*/
@Override
public CompletableFuture<Void> isAvailable() {
return CompletableFuture.completedFuture(null);
}

@Override
public void addSplits(List<MqttSplit> list) {

}

@Override
public void notifyNoMoreSplits() {

}

@Override
public void close() throws Exception {
running = false;
if (client != null && client.isConnected()) {
client.disconnect();
client.close();
log.info("🛑 MQTT Client disconnected and closed.");
}
}
}
73 changes: 73 additions & 0 deletions src/main/java/com/monitory/data/sources/MqttSource.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package com.monitory.data.sources;

import org.apache.flink.api.connector.source.*;
import org.apache.flink.core.io.SimpleVersionedSerializer;

import java.nio.charset.StandardCharsets;
/**
* Flink에서 사용하는 커스텀 MQTT Source
* Source<T, SplitT, CheckpointT>
* - T: 읽어올 데이터 타입 (여기선 String)
* - SplitT: 데이터를 나누는 단위 (MqttSplit)
* - CheckpointT: 체크포인트에 저장할 상태 (여기선 Void)
*/
public class MqttSource implements Source<String, MqttSplit, Void> {
/**
* 이 소스는 실시간 데이터 스트림이기 때문에 "Unbounded" (끝이 없음)
*/
@Override
public Boundedness getBoundedness() {
return Boundedness.CONTINUOUS_UNBOUNDED;
}

/**
* SplitEnumerator 생성: 각 parallel task에 split을 분배하는 역할
*/
@Override
public SplitEnumerator<MqttSplit, Void> createEnumerator(SplitEnumeratorContext<MqttSplit> splitEnumeratorContext) throws Exception {
return new MqttSplitEnumerator(splitEnumeratorContext);
}

/**
* 장애 복구 시 사용: 이전 상태에서 SplitEnumerator 복구
*/
@Override
public SplitEnumerator<MqttSplit, Void> restoreEnumerator(SplitEnumeratorContext<MqttSplit> splitEnumeratorContext, Void checkpoint) throws Exception {
return createEnumerator(splitEnumeratorContext);
}

/**
* Split을 직렬화/역직렬화하는 방법 정의 (필수)
*/
@Override
public SimpleVersionedSerializer<MqttSplit> getSplitSerializer() {
return new SimpleVersionedSerializer<>() {
@Override public int getVersion() { return 1; }
@Override public byte[] serialize(MqttSplit split) {
return split.splitId().getBytes(StandardCharsets.UTF_8);
}

@Override public MqttSplit deserialize(int version, byte[] serialized) {
return new MqttSplit(new String(serialized, StandardCharsets.UTF_8));
}
};
}
/**
* Enumerator의 체크포인트 상태 직렬화 (우린 상태 없음)
*/
@Override
public SimpleVersionedSerializer<Void> getEnumeratorCheckpointSerializer() {
return new SimpleVersionedSerializer<>() {
@Override public int getVersion() { return 1; }
@Override public byte[] serialize(Void obj) { return new byte[0]; }
@Override public Void deserialize(int version, byte[] serialized) { return null; }
};
}
/**
* SourceReader 생성: 실제 MQTT 데이터를 읽어오는 곳
*/
@Override
public SourceReader<String, MqttSplit> createReader(SourceReaderContext sourceReaderContext) throws Exception {
return new MqttReader(sourceReaderContext);
}
}
15 changes: 15 additions & 0 deletions src/main/java/com/monitory/data/sources/MqttSplit.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.monitory.data.sources;

import org.apache.flink.api.connector.source.SourceSplit;

public class MqttSplit implements SourceSplit {
private final String splitId;

public MqttSplit(String splitId) {
this.splitId = splitId;
}
@Override
public String splitId() {
return splitId;
}
}
Loading