Skip to content

Commit 26152af

Browse files
committed
refactor: 주가 수신 총 인원 메트릭 추가
1 parent 2c0a8cf commit 26152af

5 files changed

Lines changed: 51 additions & 46 deletions

File tree

build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ dependencies {
5050
implementation 'org.springframework.boot:spring-boot-starter-security'
5151
implementation 'io.jsonwebtoken:jjwt-api:0.11.5'
5252
implementation 'org.springframework.boot:spring-boot-starter-actuator'
53+
implementation 'io.micrometer:micrometer-registry-cloudwatch2'
5354
runtimeOnly 'io.jsonwebtoken:jjwt-impl:0.11.5'
5455
runtimeOnly 'io.jsonwebtoken:jjwt-jackson:0.11.5'
5556

src/main/java/com/jypLord/domain/trade/service/TradeService.java

Lines changed: 24 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
package com.jypLord.domain.trade.service;
22

33
import com.jypLord.api.BrokerageFirm;
4-
import com.jypLord.api.dto.response.AssetPrice;
54
import com.jypLord.api.dto.request.buy.request.BuyRequest;
65
import com.jypLord.api.dto.request.sell.SellRequest;
6+
import com.jypLord.api.dto.response.AssetPrice;
77
import com.jypLord.api.handler.BrokerClient;
88
import com.jypLord.domain.trade.TradeStatus;
99
import com.jypLord.domain.trade.dto.request.RegisterTradeInfoRequest;
@@ -47,7 +47,7 @@ public Mono<Void> registerTradeInfo(Long userId, RegisterTradeInfoRequest dto) {
4747
return tradeRepository.findByUserIdAndStockCodeAndStatus(userId, dto.stockCode(), TradeStatus.ACTIVE)
4848
.flatMap(trade -> {
4949
if (trade.getUserSetPrice() == dto.price()) {
50-
return Mono.error(new AlreadyBoundException("이미 추가한 종목:" + dto.stockCode()));
50+
return Mono.error(new AlreadyBoundException("이미 등록된 종목:" + dto.stockCode()));
5151
}
5252
return Mono.error(new AlreadyBoundException());
5353
})
@@ -63,17 +63,12 @@ public Flux<AssetPrice> receiveAssetInfo(Long userId, BrokerageFirm firm) {
6363
Mono<User> userCached = userRepository.findById(userId).cache();
6464

6565
return tradeRepository.findValidTradeByUserId(userId, firm)
66-
.switchIfEmpty(Mono.error(new NoValidTradeException("저장된 종목이 없음")))
66+
.switchIfEmpty(Mono.error(new NoValidTradeException("유효한 종목 데이터가 없음")))
6767
.take(10)
68-
.doOnNext(trade ->
69-
userSubscribeStockMap
70-
.computeIfAbsent(userId, key -> ConcurrentHashMap.newKeySet())
71-
.add(trade.getStockCode())
72-
)
7368
.filterWhen(trade -> redisPricePublisher.acquireLockIfAbsent(trade.getStockCode(), userId))
7469
.flatMap(trade ->
7570
userCached.flatMapMany(user ->
76-
brokerClient.receivePrice(
71+
brokerClient.receivePrice(
7772
DTOMapper.toPriceRequest(userId, firm, user.getMarketAccessToken(), trade.getStockCode())
7873
)
7974
.flatMap(asset ->
@@ -94,13 +89,9 @@ public Flux<Void> manageAsset(Long userId, BrokerageFirm firm) {
9489
Mono<User> userCached = userRepository.findById(userId).cache();
9590

9691
return tradeRepository.findValidTradeByUserId(userId, firm)
97-
.switchIfEmpty(Mono.error(new NoValidTradeException("저장된 종목이 없음")))
92+
.switchIfEmpty(Mono.error(new NoValidTradeException("저장된 종목데이터가 없음")))
9893
.take(10)
99-
.doOnNext(trade ->
100-
userSubscribeStockMap
101-
.computeIfAbsent(userId, key -> ConcurrentHashMap.newKeySet())
102-
.add(trade.getStockCode())
103-
)
94+
.doOnNext(trade -> registerMonitoring(userId, trade.getStockCode()))
10495
.flatMap(trade ->
10596
userCached.flatMapMany(user ->
10697
losscutMonitoring(
@@ -133,9 +124,14 @@ public Mono<Void> losscutMonitoring(
133124
brokerClient.sell(new SellRequest(firm, stockCode, userSetPrice, quantity, marketAccessToken))
134125
.and(redisStockEventPublisher.publishLosscutEvent(userId, tradeId, stockCode, firm, userSetPrice, quantity))
135126
)
127+
.doFinally(signalType -> unregisterMonitoring(userId, stockCode))
136128
.then();
137129
}
138130

131+
public int currentMonitoringUserCount() {
132+
return userSubscribeStockMap.size();
133+
}
134+
139135
public Mono<Boolean> updateTradeStatusForIdempotency(Long tradeId, TradeStatus expectedStatus, TradeStatus newStatus) {
140136
return tradeRepository.updateTradeStatus(tradeId, expectedStatus, newStatus);
141137
}
@@ -168,4 +164,17 @@ public Mono<Void> reBuyAfterLossCut(
168164
)
169165
.then();
170166
}
167+
168+
private void registerMonitoring(Long userId, String stockCode) {
169+
userSubscribeStockMap
170+
.computeIfAbsent(userId, key -> ConcurrentHashMap.newKeySet())
171+
.add(stockCode);
172+
}
173+
174+
private void unregisterMonitoring(Long userId, String stockCode) {
175+
userSubscribeStockMap.computeIfPresent(userId, (key, stockCodes) -> {
176+
stockCodes.remove(stockCode);
177+
return stockCodes.isEmpty() ? null : stockCodes;
178+
});
179+
}
171180
}
Lines changed: 4 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,39 +1,25 @@
11
package com.jypLord.metrics;
22

3-
4-
import com.jypLord.redis.sub.RedisStockPriceSubscriber;
3+
import com.jypLord.domain.trade.service.TradeService;
54
import io.micrometer.core.instrument.MeterRegistry;
6-
7-
import java.util.Map;
8-
import java.util.concurrent.ConcurrentHashMap;
9-
import org.springframework.stereotype.Component;
10-
115
import java.util.concurrent.atomic.AtomicInteger;
12-
6+
import org.springframework.stereotype.Component;
137

148
@Component
159
public class MarketDataMetrics {
1610

17-
// 현재 브로드캐스팅 중인 종목 수
18-
private final AtomicInteger activePriceBroadcasts = new AtomicInteger(0);
19-
20-
// 손절 후 재매수 대기 이벤트 수
2111
private final AtomicInteger pendingRebuyEvents = new AtomicInteger(0);
2212

23-
public MarketDataMetrics(MeterRegistry registry, RedisStockPriceSubscriber subscriber) {
24-
registry.gauge("active_price_broadcasts", subscriber.activeBroadcastCount());
25-
13+
public MarketDataMetrics(MeterRegistry registry, TradeService tradeService) {
14+
registry.gauge("active_monitoring_users", tradeService, TradeService::currentMonitoringUserCount);
2615
registry.gauge("pending_rebuy_events", pendingRebuyEvents);
2716
}
2817

29-
30-
// ---- Streams 재매수 이벤트 ----
3118
public void rebuyEventEnqueued() {
3219
pendingRebuyEvents.incrementAndGet();
3320
}
3421

3522
public void rebuyEventDone() {
3623
pendingRebuyEvents.decrementAndGet();
3724
}
38-
3925
}

src/main/java/com/jypLord/redis/sub/RedisStockPriceSubscriber.java

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,6 @@ public class RedisStockPriceSubscriber {
2121

2222
private final ConcurrentHashMap<String, State> states = new ConcurrentHashMap<>();
2323

24-
25-
2624
private static final class State {
2725
final Sinks.Many<StockPrice> sink =
2826
Sinks.many().multicast().directBestEffort();
@@ -40,7 +38,6 @@ public Flux<StockPrice> subscribe(String stockCode) {
4038
startRedisSubscription(stockCode, state);
4139
}
4240

43-
4441
return state.sink.asFlux()
4542
.doFinally(sig -> release(stockCode));
4643
});
@@ -49,36 +46,41 @@ public Flux<StockPrice> subscribe(String stockCode) {
4946
private void release(String stockCode) {
5047

5148
State state = states.get(stockCode);
52-
if (state == null) return;
49+
if (state == null) {
50+
return;
51+
}
5352

5453
int n = state.refCount.decrementAndGet();
55-
if (n > 0) return;
54+
if (n > 0) {
55+
return;
56+
}
5657

57-
// 0 이하로 내려가는 걸 방지
5858
if (n < 0) {
5959
state.refCount.compareAndSet(n, 0);
6060
return;
6161
}
6262

6363
synchronized (state) {
64-
if (state.refCount.get() != 0) return;
64+
if (state.refCount.get() != 0) {
65+
return;
66+
}
6567

6668
if (state.redisSub != null) {
67-
state.redisSub.dispose(); // Redis 구독 해제
69+
state.redisSub.dispose();
6870
state.redisSub = null;
6971
}
7072

71-
// sink 종료 + 상태 제거
7273
state.sink.tryEmitComplete();
7374
states.remove(stockCode, state);
7475
}
7576
}
7677

7778
private void startRedisSubscription(String stockCode, State state) {
78-
// 동시에 여러 구독자가 붙어도 redisSub가 중복으로 열리지 않게 보호
7979
synchronized (state) {
8080

81-
if (state.redisSub != null && !state.redisSub.isDisposed()) return;
81+
if (state.redisSub != null && !state.redisSub.isDisposed()) {
82+
return;
83+
}
8284

8385
String channel = "stock:price:" + stockCode;
8486

@@ -98,6 +100,7 @@ private void startRedisSubscription(String stockCode, State state) {
98100
});
99101
}
100102
}
103+
101104
public int activeBroadcastCount() {
102105
return states.size();
103106
}
@@ -110,6 +113,4 @@ private StockPrice toEvent(String stockCode, Message<String, String> msg) {
110113

111114
return new StockPrice(stockCode, price);
112115
}
113-
114116
}
115-

src/main/resources/application.yml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,3 +58,11 @@ jwt:
5858
secret:
5959
key: ${JWT_SECRET_KEY}
6060

61+
management:
62+
metrics:
63+
export:
64+
cloudwatch:
65+
enabled: ${CLOUDWATCH_METRICS_ENABLED:true}
66+
namespace: ${CLOUDWATCH_NAMESPACE:autoInvest}
67+
step: ${CLOUDWATCH_STEP:1m}
68+

0 commit comments

Comments
 (0)