Skip to content

Commit 7d77bcd

Browse files
committed
feat: update node metric to get BytesSend and BytesReceived
1 parent ae2731d commit 7d77bcd

File tree

6 files changed

+106
-11
lines changed

6 files changed

+106
-11
lines changed

isaac/network/node_metric.go

Lines changed: 50 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,15 @@ package isaacnetwork
22

33
import (
44
"math"
5+
"reflect"
56
"sync"
67
"sync/atomic"
78
"time"
89

910
"github.com/ProtoconNet/mitum2/network/quicstream"
1011
"github.com/ProtoconNet/mitum2/util"
1112
"github.com/ProtoconNet/mitum2/util/hint"
13+
"github.com/ProtoconNet/mitum2/util/localtime"
1214
)
1315

1416
var NodeMetricsHint = hint.MustNewHint("node-metrics-v0.0.1")
@@ -30,22 +32,64 @@ type NodeMetrics struct {
3032

3133
func (m NodeMetrics) MarshalJSON() ([]byte, error) {
3234
type alias struct {
33-
Hint hint.Type `json:"hint"`
34-
Timestamp time.Time `json:"timestamp"`
35-
Uptime string `json:"uptime"`
35+
hint.BaseHinter
36+
Timestamp localtime.Time `json:"timestamp"`
37+
Uptime util.ReadableDuration `json:"uptime"`
3638
Cumulative CumulativeMetrics `json:"cumulative"`
3739
Intervals map[string]IntervalMetrics `json:"intervals"`
3840
}
3941

4042
return util.MarshalJSON(alias{
41-
Hint: m.Hint().Type(),
42-
Timestamp: m.Timestamp,
43-
Uptime: m.Uptime.String(),
43+
BaseHinter: m.BaseHinter,
44+
Timestamp: localtime.New(m.Timestamp),
45+
Uptime: util.ReadableDuration(m.Uptime),
4446
Cumulative: m.Cumulative,
4547
Intervals: m.Intervals,
4648
})
4749
}
4850

51+
func (m *NodeMetrics) UnmarshalJSON(b []byte) error {
52+
e := util.StringError("unmarshal NodeMetrics")
53+
54+
type alias struct {
55+
hint.BaseHinter
56+
Timestamp time.Time `json:"timestamp"`
57+
Uptime *util.ReadableDuration `json:"uptime"`
58+
Cumulative CumulativeMetrics `json:"cumulative"`
59+
Intervals map[string]IntervalMetrics `json:"intervals"`
60+
}
61+
62+
var u alias
63+
64+
if err := util.UnmarshalJSON(b, &u); err != nil {
65+
return e.Wrap(err)
66+
}
67+
68+
m.BaseHinter = u.BaseHinter
69+
m.Timestamp = u.Timestamp
70+
m.Cumulative = u.Cumulative
71+
m.Intervals = u.Intervals
72+
73+
durArgs := [][2]interface{}{
74+
{u.Uptime, &m.Uptime},
75+
}
76+
77+
for i := range durArgs {
78+
v := durArgs[i][0].(*util.ReadableDuration) //nolint:forcetypeassert //...
79+
t := durArgs[i][1].(*time.Duration) //nolint:forcetypeassert //...
80+
81+
if reflect.ValueOf(v).IsZero() {
82+
continue
83+
}
84+
85+
if err := util.SetInterfaceValue(time.Duration(*v), t); err != nil {
86+
return err
87+
}
88+
}
89+
90+
return nil
91+
}
92+
4993
type CumulativeMetrics struct {
5094
QuicBytesSent uint64 `json:"quic_bytes_sent"`
5195
QuicBytesReceived uint64 `json:"quic_bytes_received"`

isaac/network/quicstream_handlers.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package isaacnetwork
33
import (
44
"bytes"
55
"context"
6-
"fmt"
76
"io"
87
"net"
98
"net/url"
@@ -542,7 +541,7 @@ func QuicstreamHandlerNodeInfo(
542541
broker *quicstreamheader.HandlerBroker, _ NodeInfoRequestHeader,
543542
) (context.Context, error) {
544543
e := util.StringError("handle node info")
545-
fmt.Println("QuicstreamHandlerNodeInfo QuicstreamHandlerNodeInfo")
544+
546545
b, err, _ := util.SingleflightDo[[]byte](&sg, HandlerNameNodeInfo.String(), func() ([]byte, error) {
547546
return getNodeInfo()
548547
})

launch/hinters.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ var Hinters = []encoder.DecodeDetail{
7373
{Hint: isaacnetwork.NodeChallengeRequestHeaderHint, Instance: isaacnetwork.NodeChallengeRequestHeader{}},
7474
{Hint: isaacnetwork.NodeInfoHint, Instance: isaacnetwork.NodeInfo{}},
7575
{Hint: isaacnetwork.NodeInfoRequestHeaderHint, Instance: isaacnetwork.NodeInfoRequestHeader{}},
76+
{Hint: isaacnetwork.NodeMetricsHint, Instance: isaacnetwork.NodeMetrics{}},
7677
{Hint: isaacnetwork.NodeMetricsRequestHeaderHint, Instance: isaacnetwork.NodeMetricsRequestHeader{}},
7778
{Hint: isaacnetwork.OperationRequestHeaderHint, Instance: isaacnetwork.OperationRequestHeader{}},
7879
{Hint: isaacnetwork.ProposalRequestHeaderHint, Instance: isaacnetwork.ProposalRequestHeader{}},

network/quicstream/connection.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,13 +57,15 @@ func (c *Connection) Stream(ctx context.Context, f StreamFunc) error {
5757
defer collector.RecordQuicStreamClosed()
5858
}
5959

60+
r, w := wrapMetricsIO(ctx, stream, stream)
61+
6062
defer func() {
6163
stream.CancelRead(0)
6264
_ = stream.Close()
6365
}()
6466

6567
return util.AwareContext(ctx, func(ctx context.Context) error {
66-
return f(ctx, stream, stream)
68+
return f(ctx, r, w)
6769
})
6870
}
6971

@@ -77,7 +79,9 @@ func (c *Connection) OpenStream(ctx context.Context) (io.Reader, io.WriteCloser,
7779
collector.RecordQuicStreamOpened()
7880
}
7981

80-
return stream, stream, func() error {
82+
r, w := wrapMetricsIO(ctx, stream, stream)
83+
84+
return r, w, func() error {
8185
if collector := GetMetricsCollector(ctx); collector != nil {
8286
collector.RecordQuicStreamClosed()
8387
}

network/quicstream/metrics.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package quicstream
22

33
import (
44
"context"
5+
"io"
56

67
"github.com/ProtoconNet/mitum2/util"
78
)
@@ -47,3 +48,47 @@ func GetMetricsCollector(ctx context.Context) MetricsCollector {
4748

4849
return nil
4950
}
51+
52+
func wrapMetricsIO(ctx context.Context, reader io.Reader, writer io.WriteCloser) (io.Reader, io.WriteCloser) {
53+
collector := GetMetricsCollector(ctx)
54+
55+
switch {
56+
case collector == nil:
57+
return reader, writer
58+
case reader == nil:
59+
return reader, &metricsWriteCloser{WriteCloser: writer, collector: collector}
60+
case writer == nil:
61+
return &metricsReader{reader: reader, collector: collector}, writer
62+
default:
63+
return &metricsReader{reader: reader, collector: collector},
64+
&metricsWriteCloser{WriteCloser: writer, collector: collector}
65+
}
66+
}
67+
68+
type metricsReader struct {
69+
reader io.Reader
70+
collector MetricsCollector
71+
}
72+
73+
func (r *metricsReader) Read(p []byte) (int, error) {
74+
n, err := r.reader.Read(p)
75+
if n > 0 && r.collector != nil {
76+
r.collector.RecordQuicBytesReceived(uint64(n))
77+
}
78+
79+
return n, err
80+
}
81+
82+
type metricsWriteCloser struct {
83+
io.WriteCloser
84+
collector MetricsCollector
85+
}
86+
87+
func (w *metricsWriteCloser) Write(p []byte) (int, error) {
88+
n, err := w.WriteCloser.Write(p)
89+
if n > 0 && w.collector != nil {
90+
w.collector.RecordQuicBytesSent(uint64(n))
91+
}
92+
93+
return n, err
94+
}

network/quicstream/server.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,8 @@ func (srv *Server) handleStream(ctx context.Context, remoteAddr net.Addr, stream
165165
sctx, cancel := srv.streamTimeoutContext(ctx)
166166
defer cancel()
167167

168+
r, w := wrapMetricsIO(ctx, stream, stream)
169+
168170
if collector := GetMetricsCollector(ctx); collector != nil {
169171
collector.RecordQuicStreamOpened()
170172
defer collector.RecordQuicStreamClosed()
@@ -173,7 +175,7 @@ func (srv *Server) handleStream(ctx context.Context, remoteAddr net.Addr, stream
173175
var errcode quic.StreamErrorCode
174176

175177
if err := util.AwareContext(sctx, func(context.Context) error {
176-
_, err := srv.handler(sctx, remoteAddr, stream, stream)
178+
_, err := srv.handler(sctx, remoteAddr, r, w)
177179

178180
return err
179181
}); err != nil {

0 commit comments

Comments
 (0)