11package ledgerbackend
22
33import (
4- "bufio "
4+ "bytes "
55 "io"
66 "time"
77
@@ -54,29 +54,28 @@ type metaResult struct {
5454// while previous ledger are being processed.
5555// - Limits memory usage in case of large ledgers are closed by the network.
5656//
57- // Internally, it keeps two buffers: bufio.Reader with binary ledger data and
58- // buffered channel with unmarshaled xdr.LedgerCloseMeta objects ready for
59- // processing. The first buffer removes overhead time connected to reading from
60- // a file. The second buffer allows unmarshaling binary data into XDR objects
61- // (which can be a bottleneck) while clients are processing previous ledgers.
57+ // Internally, it reads framed XDR data directly into a reusable buffer and uses
58+ // a reusable Decoder for optimized decoding. The buffered channel stores unmarshaled
59+ // xdr.LedgerCloseMeta objects ready for processing, allowing unmarshaling to
60+ // proceed while clients process previous ledgers.
6261//
6362// Finally, when a large ledger (larger than binary buffer) is closed it waits
6463// until xdr.LedgerCloseMeta objects channel is empty. This prevents memory
6564// exhaustion when network closes a series a large ledgers.
6665type bufferedLedgerMetaReader struct {
67- r * bufio.Reader
68- c chan metaResult
69- decoder * xdr3.Decoder
66+ r io.Reader
67+ c chan metaResult
68+ decoder * xdr3.Decoder
69+ frameBuffer bytes.Buffer
7070}
7171
7272// newBufferedLedgerMetaReader creates a new meta reader that will shutdown
7373// when stellar-core terminates.
7474func newBufferedLedgerMetaReader (reader io.Reader ) * bufferedLedgerMetaReader {
75- r := bufio .NewReaderSize (reader , metaPipeBufferSize )
7675 return & bufferedLedgerMetaReader {
7776 c : make (chan metaResult , ledgerReadAheadBufferSize ),
78- r : r ,
79- decoder : xdr3 .NewDecoder (r ),
77+ r : reader ,
78+ decoder : xdr3 .NewDecoder (nil ),
8079 }
8180}
8281
@@ -86,21 +85,41 @@ func newBufferedLedgerMetaReader(reader io.Reader) *bufferedLedgerMetaReader {
8685// - The next ledger available in the buffer exceeds the meta pipe buffer size.
8786// In such case the method will block until LedgerCloseMeta buffer is empty.
8887func (b * bufferedLedgerMetaReader ) readLedgerMetaFromPipe () (* xdr.LedgerCloseMeta , error ) {
89- frameLength , err := xdr .ReadFrameLength (b .decoder )
88+ frameLength , err := xdr .ReadFrameLength (b .r )
9089 if err != nil {
91- return nil , errors .Wrap (err , "error reading frame length" )
90+ if err == io .EOF {
91+ return nil , err
92+ }
93+ return nil , errors .Wrap (err , "reading frame length" )
9294 }
9395
9496 for frameLength > metaPipeBufferSize && len (b .c ) > 0 {
9597 // Wait for LedgerCloseMeta buffer to be cleared to minimize memory usage.
9698 <- time .After (time .Second )
9799 }
98100
101+ // Read frame data directly into reusable buffer
102+ b .frameBuffer .Reset ()
103+ b .frameBuffer .Grow (int (frameLength ))
104+ n , err := b .frameBuffer .ReadFrom (io .LimitReader (b .r , int64 (frameLength )))
105+ if err != nil {
106+ return nil , errors .Wrap (err , "reading frame data" )
107+ }
108+ if n != int64 (frameLength ) {
109+ return nil , errors .Errorf ("read %d bytes, expected %d" , n , frameLength )
110+ }
111+
112+ // Decode using reusable Decoder for optimized performance
99113 var xlcm xdr.LedgerCloseMeta
100- _ , err = xlcm .DecodeFrom (b .decoder , xdr3 .DecodeDefaultMaxDepth )
114+ b .decoder .Reset (b .frameBuffer .Bytes ())
115+ bytesRead , err := b .decoder .Decode (& xlcm )
101116 if err != nil {
102117 return nil , errors .Wrap (err , "unmarshaling framed LedgerCloseMeta" )
103118 }
119+ if bytesRead != int (frameLength ) {
120+ return nil , errors .Errorf ("unmarshaled %d bytes, expected %d" , bytesRead , frameLength )
121+ }
122+
104123 return & xlcm , nil
105124}
106125
0 commit comments