diff --git a/.gitignore b/.gitignore index b63da45..b72e941 100644 --- a/.gitignore +++ b/.gitignore @@ -39,4 +39,10 @@ bin/ .vscode/ ### Mac OS ### -.DS_Store \ No newline at end of file +.DS_Store + +# resources 안의 certs 폴더 무시 +/src/main/resources/certs/ + +# resources 안의 application.properties 무시 +/src/main/resources/application.properties diff --git a/INF_TEST-ssla2n7kxevn6fh72-atsiotap-northeast-2amazonawscom8883/.lck b/INF_TEST-ssla2n7kxevn6fh72-atsiotap-northeast-2amazonawscom8883/.lck new file mode 100644 index 0000000..e69de29 diff --git a/build.gradle b/build.gradle index 337d801..62ea6c9 100644 --- a/build.gradle +++ b/build.gradle @@ -6,6 +6,12 @@ plugins { group = 'org.example' version = '1.0-SNAPSHOT' +configurations { + compileOnly { + extendsFrom annotationProcessor + } +} + repositories { mavenCentral() } @@ -13,8 +19,9 @@ repositories { dependencies { testImplementation platform('org.junit:junit-bom:5.10.0') testImplementation 'org.junit.jupiter:junit-jupiter' - compileOnly("org.projectlombok:lombok:1.18.38") - implementation 'org.slf4j:slf4j-api:1.7.32' + compileOnly 'org.projectlombok:lombok:1.18.30' + annotationProcessor 'org.projectlombok:lombok:1.18.30' + implementation 'ch.qos.logback:logback-classic:1.2.6' // flink diff --git a/src/main/java/com/monitory/data/FlinkApplication.java b/src/main/java/com/monitory/data/FlinkApplication.java index b3b7cba..52be518 100644 --- a/src/main/java/com/monitory/data/FlinkApplication.java +++ b/src/main/java/com/monitory/data/FlinkApplication.java @@ -1,13 +1,11 @@ package com.monitory.data; +import com.monitory.data.sources.MqttSource; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction; - -import java.util.Arrays; -import java.util.List; public class FlinkApplication { public static void main (String [] args) throws Exception { @@ -15,32 +13,16 @@ public static void main (String [] args) throws Exception { Configuration conf = new Configuration(); final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf); - // 2. 커스텀 소스 사용 (예시로 간단한 데이터 소스) - DataStream sourceStream = env.addSource(new SourceFunction() { - private boolean running = true; - - @Override - public void run(SourceContext ctx) throws Exception { - List data = Arrays.asList("Hello", "Flink", "Stream"); - for (String item : data) { - if (!running) { - return; - } - ctx.collect(item); // 데이터 한 항목씩 전달 - } - } + // 2. 데이터 소스 + DataStream sourceStream = env.fromSource(new MqttSource(), WatermarkStrategy.noWatermarks(), "MQTT-Source"); - @Override - public void cancel() { - running = false; // 소스 중지 - } - }); // 3. 데이터 처리: 단순하게 문자열을 대문자로 변환하는 예시 DataStream transformedStream = sourceStream .map(new MapFunction() { @Override public String map(String value) throws Exception { - Thread.sleep(2000000); +// Thread.sleep(2000000); + System.out.println("💡 received: " + value); return value.toUpperCase(); } }); diff --git a/src/main/java/com/monitory/data/config/MqttConfig.java b/src/main/java/com/monitory/data/config/MqttConfig.java new file mode 100644 index 0000000..869c737 --- /dev/null +++ b/src/main/java/com/monitory/data/config/MqttConfig.java @@ -0,0 +1,24 @@ +package com.monitory.data.config; + +import java.io.InputStream; +import java.util.Properties; + +public class MqttConfig { + private static final Properties properties = new Properties(); + + static { + try (InputStream input = MqttConfig.class.getClassLoader() + .getResourceAsStream("application.properties")) { + if (input == null) { + throw new RuntimeException("❌ application.properties 파일을 찾을 수 없습니다."); + } + properties.load(input); + } catch (Exception e) { + throw new RuntimeException("❌ properties 파일 로딩 실패", e); + } + } + + public static String get(String key) { + return properties.getProperty(key); + } +} diff --git a/src/main/java/com/monitory/data/sources/MqttReader.java b/src/main/java/com/monitory/data/sources/MqttReader.java new file mode 100644 index 0000000..d7854d8 --- /dev/null +++ b/src/main/java/com/monitory/data/sources/MqttReader.java @@ -0,0 +1,164 @@ +package com.monitory.data.sources; + +import com.monitory.data.config.MqttConfig; +import com.monitory.data.utils.SslUtil; +import lombok.extern.slf4j.Slf4j; +import org.apache.flink.api.connector.source.ReaderOutput; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.core.io.InputStatus; +import org.eclipse.paho.client.mqttv3.IMqttMessageListener; +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.eclipse.paho.client.mqttv3.MqttMessage; + +import javax.net.ssl.SSLSocketFactory; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentLinkedQueue; + +/** + * MQTT 메시지를 읽어오는 SourceReader + * - Flink에서 데이터를 가져오는 역할 + */ +@Slf4j +public class MqttReader implements SourceReader { + // Flink에서 SourceReader와 상호작용하는 데 필요한 context + private final SourceReaderContext sourceReaderContext; + // MQTT로 읽은 메시지를 임시로 저장할 큐 + private final Queue messageQueue = new ConcurrentLinkedQueue<>(); + // 메시지를 계속 읽어오게 할지 여부 + private volatile boolean running = true; + private MqttClient client; + + IMqttMessageListener mqttMessageListener = new IMqttMessageListener() { + @Override + public void messageArrived(String s, MqttMessage mqttMessage) throws Exception { + messageQueue.add(new String(mqttMessage.getPayload())); + } + }; + + public MqttReader(SourceReaderContext sourceReaderContext) { + this.sourceReaderContext = sourceReaderContext; + } + /** + * SourceReader 시작 시 호출됨 + * MQTT 클라이언트 연결 및 구독 등을 처리할 수 있음 + * (현재는 비어 있음) + */ + @Override + public void start() { + try { + String broker = MqttConfig.get("AWS_IOT_BROKER"); + String topic = MqttConfig.get("AWS_IOT_TOPIC"); + String clientId = MqttConfig.get("AWS_IOT_CLIENT_ID"); + + SslUtil sslUtil = new SslUtil(); + SSLSocketFactory temp = sslUtil.getSocketFactory( + MqttConfig.get("AWS_IOT_CA_PEM_PATH"), + MqttConfig.get("AWS_IOT_CERT_PATH"), + MqttConfig.get("AWS_IOT_PRIVATE_KEY_PATH") + ); + MqttConnectOptions options = new MqttConnectOptions(); + options.setSocketFactory(temp); + options.setCleanSession(true); + options.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1); + options.setAutomaticReconnect(true); + options.setConnectionTimeout(60); + + client = new MqttClient(broker, clientId); + log.info("⭐️ Connecting to MQTT broker: {}", broker); + client.setCallback(new org.eclipse.paho.client.mqttv3.MqttCallbackExtended() { + @Override + public void connectComplete(boolean reconnect, String serverURI) { + log.info("✅ MQTT 연결 완료: reconnect={}, serverURI={}", reconnect, serverURI); + try { + if (client.isConnected()) { + client.subscribe(topic, mqttMessageListener); + log.info("📡 MQTT subscribe 완료: topic = {}", topic); + } else { + log.warn("⚠️ MQTT 클라이언트가 연결되지 않았습니다."); + } + } catch (Exception e) { + log.error("❌ MQTT subscribe 실패", e); + } + } + + @Override + public void connectionLost(Throwable cause) { + log.warn("⚠️ MQTT 연결 끊김", cause); + } + + @Override + public void messageArrived(String topic, MqttMessage message) { + messageQueue.add(new String(message.getPayload())); + } + + @Override + public void deliveryComplete(org.eclipse.paho.client.mqttv3.IMqttDeliveryToken token) {} + }); + + client.connect(options); + if (!client.isConnected()) { + log.error("❌ MQTT 연결 실패: 연결이 되지 않았습니다."); + throw new RuntimeException("MQTT 연결 실패"); + } else { + log.info("✅ MQTT 연결 성공"); + } + + } catch (Exception e) { +// log.error("❌ MQTT Connection setup 중 예외 발생", e); + } + } + /** + * 큐에서 메시지를 하나씩 꺼내서 Flink에 전달 + * - 메시지가 있으면 데이터를 전달하고, 계속 받을 수 있음을 알림 + * - 메시지가 없으면 더 이상 데이터를 받을 수 없다고 알림 + */ + @Override + public InputStatus pollNext(ReaderOutput output) { + String msg = messageQueue.poll(); + if (msg != null) { + output.collect(msg); + return InputStatus.MORE_AVAILABLE; + } + return InputStatus.NOTHING_AVAILABLE; + } + /** + * 체크포인트 상태를 스냅샷으로 저장하는 메소드 + * - 현재 상태가 없으므로 빈 리스트 반환 + */ + @Override + public List snapshotState(long l) { + return List.of(); + } + /** + * 메시지가 수신될 때까지 대기 + * - 여기서는 구현되지 않음 + */ + @Override + public CompletableFuture isAvailable() { + return CompletableFuture.completedFuture(null); + } + + @Override + public void addSplits(List list) { + + } + + @Override + public void notifyNoMoreSplits() { + + } + + @Override + public void close() throws Exception { + running = false; + if (client != null && client.isConnected()) { + client.disconnect(); + client.close(); + log.info("🛑 MQTT Client disconnected and closed."); + } + } +} diff --git a/src/main/java/com/monitory/data/sources/MqttSource.java b/src/main/java/com/monitory/data/sources/MqttSource.java new file mode 100644 index 0000000..8951950 --- /dev/null +++ b/src/main/java/com/monitory/data/sources/MqttSource.java @@ -0,0 +1,73 @@ +package com.monitory.data.sources; + +import org.apache.flink.api.connector.source.*; +import org.apache.flink.core.io.SimpleVersionedSerializer; + +import java.nio.charset.StandardCharsets; +/** + * Flink에서 사용하는 커스텀 MQTT Source + * Source + * - T: 읽어올 데이터 타입 (여기선 String) + * - SplitT: 데이터를 나누는 단위 (MqttSplit) + * - CheckpointT: 체크포인트에 저장할 상태 (여기선 Void) + */ +public class MqttSource implements Source { + /** + * 이 소스는 실시간 데이터 스트림이기 때문에 "Unbounded" (끝이 없음) + */ + @Override + public Boundedness getBoundedness() { + return Boundedness.CONTINUOUS_UNBOUNDED; + } + + /** + * SplitEnumerator 생성: 각 parallel task에 split을 분배하는 역할 + */ + @Override + public SplitEnumerator createEnumerator(SplitEnumeratorContext splitEnumeratorContext) throws Exception { + return new MqttSplitEnumerator(splitEnumeratorContext); + } + + /** + * 장애 복구 시 사용: 이전 상태에서 SplitEnumerator 복구 + */ + @Override + public SplitEnumerator restoreEnumerator(SplitEnumeratorContext splitEnumeratorContext, Void checkpoint) throws Exception { + return createEnumerator(splitEnumeratorContext); + } + + /** + * Split을 직렬화/역직렬화하는 방법 정의 (필수) + */ + @Override + public SimpleVersionedSerializer getSplitSerializer() { + return new SimpleVersionedSerializer<>() { + @Override public int getVersion() { return 1; } + @Override public byte[] serialize(MqttSplit split) { + return split.splitId().getBytes(StandardCharsets.UTF_8); + } + + @Override public MqttSplit deserialize(int version, byte[] serialized) { + return new MqttSplit(new String(serialized, StandardCharsets.UTF_8)); + } + }; + } + /** + * Enumerator의 체크포인트 상태 직렬화 (우린 상태 없음) + */ + @Override + public SimpleVersionedSerializer getEnumeratorCheckpointSerializer() { + return new SimpleVersionedSerializer<>() { + @Override public int getVersion() { return 1; } + @Override public byte[] serialize(Void obj) { return new byte[0]; } + @Override public Void deserialize(int version, byte[] serialized) { return null; } + }; + } + /** + * SourceReader 생성: 실제 MQTT 데이터를 읽어오는 곳 + */ + @Override + public SourceReader createReader(SourceReaderContext sourceReaderContext) throws Exception { + return new MqttReader(sourceReaderContext); + } +} diff --git a/src/main/java/com/monitory/data/sources/MqttSplit.java b/src/main/java/com/monitory/data/sources/MqttSplit.java new file mode 100644 index 0000000..1f78807 --- /dev/null +++ b/src/main/java/com/monitory/data/sources/MqttSplit.java @@ -0,0 +1,15 @@ +package com.monitory.data.sources; + +import org.apache.flink.api.connector.source.SourceSplit; + +public class MqttSplit implements SourceSplit { + private final String splitId; + + public MqttSplit(String splitId) { + this.splitId = splitId; + } + @Override + public String splitId() { + return splitId; + } +} diff --git a/src/main/java/com/monitory/data/sources/MqttSplitEnumerator.java b/src/main/java/com/monitory/data/sources/MqttSplitEnumerator.java new file mode 100644 index 0000000..e9359d3 --- /dev/null +++ b/src/main/java/com/monitory/data/sources/MqttSplitEnumerator.java @@ -0,0 +1,42 @@ +package com.monitory.data.sources; + +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.jetbrains.annotations.Nullable; + +import java.io.IOException; +import java.util.List; + +/** + * MQTT용 SplitEnumerator + * - Flink에서 parallel reader에게 데이터를 분배할 "split"을 지정하는 역할 + */ +public class MqttSplitEnumerator implements SplitEnumerator { + private final SplitEnumeratorContext context; + + // Flink에서 이 enumerator와 상호작용하는 데 필요한 context + public MqttSplitEnumerator(SplitEnumeratorContext context) { + this.context = context; + } + + /** + * Flink가 enumerator를 시작할 때 호출됨 + * 여기서는 단일 split("mqtt-split")을 0번 서브태스크에게 할당하고, + * 더 이상 split이 없다고 알림 + */ + @Override + public void start() { +// context.assignSplit(new MqttSplit("mqtt-split"), 0); +// context.signalNoMoreSplits(0); + } + + @Override public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) {} + @Override public void addSplitsBack(List list, int i) {} + @Override public void addReader(int subtaskId) { + System.out.println("Subtask " + subtaskId + " is ready, assigning split..."); + MqttSplit split = new MqttSplit("mqtt-split-"+subtaskId); // 네가 설계한 split 객체 + context.assignSplit(split, subtaskId); + } + @Override public Void snapshotState(long checkpointId) { return null; } + @Override public void close() {} +} diff --git a/src/main/java/com/monitory/data/utils/SslUtil.java b/src/main/java/com/monitory/data/utils/SslUtil.java new file mode 100644 index 0000000..07bd677 --- /dev/null +++ b/src/main/java/com/monitory/data/utils/SslUtil.java @@ -0,0 +1,87 @@ +package com.monitory.data.utils; + +import org.bouncycastle.jce.provider.BouncyCastleProvider; +import org.bouncycastle.openssl.PEMKeyPair; +import org.bouncycastle.openssl.PEMParser; +import org.bouncycastle.openssl.jcajce.JcaPEMKeyConverter; + +import javax.net.ssl.*; +import java.io.*; +import java.security.*; +import java.security.cert.Certificate; +import java.security.cert.CertificateFactory; +import java.security.cert.X509Certificate; +import java.util.Arrays; +import java.util.stream.Stream; + +/** + * 🔐 AWS IoT MQTT 통신을 위한 SSLContext 생성 유틸리티 클래스 + * - 인증서 (device cert, private key, CA cert)를 이용하여 SSL 소켓을 생성함 + */ +public class SslUtil { + + /** + * 📦 MQTT 연결용 SSLSocketFactory 생성 메서드 + * + * @param caCrtFile AWS 루트 인증서 경로 (root.pem) + * @param crtFile 디바이스 인증서 경로 (.pem.crt) + * @param keyFile 디바이스 개인키 경로 (.pem.key) + * @return SSLSocketFactory 객체 + * @throws Exception 모든 예외 전달 (파일, 키, 인증서 파싱 오류 등) + */ + public static SSLSocketFactory getSocketFactory(String caCrtFile, String crtFile, String keyFile) throws Exception { + + // BouncyCastle Provider 등록 (PEM 파싱용) + Security.addProvider(new BouncyCastleProvider()); + + // CA 인증서와 디바이스 인증서 + CertificateFactory cf = CertificateFactory.getInstance("X.509"); + X509Certificate caCert = (X509Certificate) cf.generateCertificate(new FileInputStream(caCrtFile)); + X509Certificate cert = (X509Certificate) cf.generateCertificate(new FileInputStream(crtFile)); + + // 디바이스 개인키 PEM → Keypair 변환 + PEMParser pemParser = new PEMParser(new FileReader(keyFile)); + Object object = pemParser.readObject(); + JcaPEMKeyConverter converter = new JcaPEMKeyConverter().setProvider("BC"); + KeyPair key = converter.getKeyPair((PEMKeyPair) object); + pemParser.close(); + + // 키스토어 구성 (디바이스 인증서 + 개인키) +// KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType()); + KeyStore ks = KeyStore.getInstance("JKS"); + ks.load(null, null); + ks.setCertificateEntry("cert-alias", cert); + ks.setKeyEntry("key-alias", key.getPrivate(), "".toCharArray(), new Certificate[]{cert}); + + // 트러스트스토어 구성 (루트 CA 인증서) + KeyStore ts = KeyStore.getInstance(KeyStore.getDefaultType()); + ts.load(null, null); + ts.setCertificateEntry("ca-alias", caCert); + + + // [추가] JVM 기본 TrustManager (기본 TrustStore 포함) + TrustManagerFactory jvmTmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()); + jvmTmf.init((KeyStore) null); // ← 기본 truststore 사용 + + + // ✅ [기존] root.pem 기반 TrustManager + TrustManagerFactory customTmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()); + customTmf.init(ts); + + + // ✅ [추가] TrustManager 병합 (기본 + root.pem) + TrustManager[] mergedTrustManagers = Stream + .concat(Arrays.stream(jvmTmf.getTrustManagers()), Arrays.stream(customTmf.getTrustManagers())) + .toArray(TrustManager[]::new); + + + // 5. KeyManagerFactory 구성 + KeyManagerFactory kmf = KeyManagerFactory.getInstance("SunX509"); + kmf.init(ks, "".toCharArray()); + + SSLContext context = SSLContext.getInstance("TLSv1.2"); + context.init(kmf.getKeyManagers(), mergedTrustManagers, null); // ✅ [변경] 병합한 TrustManager 적용 + + return context.getSocketFactory(); + } +} \ No newline at end of file