diff --git a/backend/cmd/main.go b/backend/cmd/main.go index 418ebd320..c20609be4 100644 --- a/backend/cmd/main.go +++ b/backend/cmd/main.go @@ -199,6 +199,11 @@ func main() { orderTopic := order_topic.NewSendTopic() loggerTopic := logger_topic.NewEnableTopic() loggerTopic.SetDataLogger(subloggers[data_logger.Name].(*data_logger.Logger)) + loggerHandler.SetOnStart(func() { + if err := loggerTopic.NotifyStarted(); err != nil { + trace.Error().Err(err).Msg("failed to notify logger started") + } + }) messageTopic := message_topic.NewUpdateTopic() stateOrderTopic := order_topic.NewState(idToBoard, trace.Logger) diff --git a/backend/pkg/broker/topics/logger/enable.go b/backend/pkg/broker/topics/logger/enable.go index 77f464f26..adc74cdb4 100644 --- a/backend/pkg/broker/topics/logger/enable.go +++ b/backend/pkg/broker/topics/logger/enable.go @@ -104,6 +104,11 @@ func (enable *Enable) handleVariables(_ websocket.ClientId, message *websocket.M return nil } +func (enable *Enable) NotifyStarted() error { + enable.isRunning.Store(true) + return enable.broadcastState() +} + func (enable *Enable) broadcastState() error { payload, err := json.Marshal(enable.isRunning.Load()) if err != nil { diff --git a/backend/pkg/logger/logger.go b/backend/pkg/logger/logger.go index fbefe6828..dec89e2ad 100644 --- a/backend/pkg/logger/logger.go +++ b/backend/pkg/logger/logger.go @@ -24,6 +24,8 @@ type Logger struct { subloggers map[abstraction.LoggerName]abstraction.Logger trace zerolog.Logger + + onStart func() } /************** @@ -83,9 +85,17 @@ func (logger *Logger) Start() error { } logger.trace.Info().Msg("started") + + if logger.onStart != nil { + logger.onStart() + } return nil } +func (logger *Logger) SetOnStart(cb func()) { + logger.onStart = cb +} + // PushRecord works as a proxy for the PushRecord method of the subloggers func (logger *Logger) PushRecord(record abstraction.LoggerRecord) error { diff --git a/backend/pkg/transport/transport.go b/backend/pkg/transport/transport.go index acda6f7ef..bc11b11c8 100644 --- a/backend/pkg/transport/transport.go +++ b/backend/pkg/transport/transport.go @@ -119,44 +119,17 @@ func (transport *Transport) HandleServer(config tcp.ServerConfig, local string) // handleTCPConn is used to handle the specific TCP connections to the boards. It detects errors caused // on concurrent reads and writes, so other routines should not worry about closing or handling errors func (transport *Transport) handleTCPConn(conn net.Conn) error { - if tcpConn, ok := conn.(*net.TCPConn); ok { - transport.logger.Trace().Str("remoteAddress", conn.RemoteAddr().String()).Msg("setting connection linger") - err := tcpConn.SetLinger(0) - if err != nil { - transport.errChan <- err - transport.logger.Error().Stack().Err(err).Str("remoteAddress", conn.RemoteAddr().String()).Msg("set linger") - } - - transport.logger.Trace().Str("remoteAddress", conn.RemoteAddr().String()).Msg("setting connection no delay") - err = tcpConn.SetNoDelay(true) - if err != nil { - transport.errChan <- err - transport.logger.Error().Stack().Err(err).Str("remoteAddress", conn.RemoteAddr().String()).Msg("set no delay") - } - } + transport.configureTCPConn(conn) - target, ok := transport.ipToTarget[conn.RemoteAddr().(*net.TCPAddr).IP.String()] - if !ok { - conn.Close() - transport.logger.Warn().Str("remoteAddress", conn.RemoteAddr().(*net.TCPAddr).IP.String()).Msg("ip target not found") - err := ErrUnknownTarget{Remote: conn.RemoteAddr()} - transport.errChan <- err + target, err := transport.targetFromTCPConn(conn) + if err != nil { return err } connectionLogger := transport.logger.With().Str("remoteAddress", conn.RemoteAddr().String()).Str("target", string(target)).Logger() connectionLogger.Info().Msg("new connection") - if err := func() error { - transport.connectionsMx.Lock() - defer transport.connectionsMx.Unlock() - if _, ok := transport.connections[target]; ok { - conn.Close() - connectionLogger.Debug().Msg("already connected") - return ErrTargetAlreadyConnected{Target: target} - } - return nil - }(); err != nil { + if err := transport.rejectIfConnectedTCPConn(target, conn, connectionLogger); err != nil { transport.errChan <- err return err } @@ -167,56 +140,125 @@ func (transport *Transport) handleTCPConn(conn net.Conn) error { connectionLogger.Info().Msg("close") }() - func() { - transport.connectionsMx.Lock() - defer transport.connectionsMx.Unlock() - connectionLogger.Debug().Msg("added connection") - transport.connections[target] = conn - }() - defer func() { - transport.connectionsMx.Lock() - defer transport.connectionsMx.Unlock() - connectionLogger.Debug().Msg("removed connection") - delete(transport.connections, target) - }() + cleanupConn := transport.registerTCPConn(target, conn, connectionLogger) + defer cleanupConn() transport.api.ConnectionUpdate(target, true) defer transport.api.ConnectionUpdate(target, false) + transport.readLoopTCPConn(conn, connectionLogger) + + err = <-errChan + if err != nil { + connectionLogger.Error().Stack().Err(err).Msg("") + transport.errChan <- err + } + return err +} + +// configureTCPConn sets TCP-level options like linger and no-delay. +func (transport *Transport) configureTCPConn(conn net.Conn) { + tcpConn, ok := conn.(*net.TCPConn) + if !ok { + return + } + + remote := conn.RemoteAddr().String() + + transport.logger.Trace().Str("remoteAddress", remote).Msg("setting connection linger") + err := tcpConn.SetLinger(0) + if err != nil { + transport.errChan <- err + transport.logger.Error().Stack().Err(err).Str("remoteAddress", remote).Msg("set linger") + } + + transport.logger.Trace().Str("remoteAddress", remote).Msg("setting connection no delay") + err = tcpConn.SetNoDelay(true) + if err != nil { + transport.errChan <- err + transport.logger.Error().Stack().Err(err).Str("remoteAddress", remote).Msg("set no delay") + } +} + +// targetFromTCPConn maps the remote IP address of the connection to a TransportTarget +// using the ipToTarget map. +func (transport *Transport) targetFromTCPConn(conn net.Conn) (abstraction.TransportTarget, error) { + remoteAddr := conn.RemoteAddr().(*net.TCPAddr) + ip := remoteAddr.IP.String() + + target, ok := transport.ipToTarget[ip] + if !ok { + conn.Close() + transport.logger.Warn().Str("remoteAddress", ip).Msg("ip target not found") + err := ErrUnknownTarget{Remote: conn.RemoteAddr()} + transport.errChan <- err + var zero abstraction.TransportTarget + return zero, err + + } + return target, nil +} + +// rejectIfConnectedTCPConn closes and rejects conn if target already has an active connection. +func (transport *Transport) rejectIfConnectedTCPConn(target abstraction.TransportTarget, conn net.Conn, logger zerolog.Logger,) error { + transport.connectionsMx.Lock() + defer transport.connectionsMx.Unlock() + + if _, ok := transport.connections[target]; ok { + conn.Close() + logger.Debug().Msg("already connected") + err := ErrTargetAlreadyConnected{Target: target} + transport.errChan <- err + return err + } + return nil +} + +// registerTCPConn stores conn for target and returns a cleanup that removes it. +func (transport *Transport) registerTCPConn(target abstraction.TransportTarget, conn net.Conn, logger zerolog.Logger) func() { + transport.connectionsMx.Lock() + logger.Debug().Msg("added connection") + transport.connections[target] = conn + transport.connectionsMx.Unlock() + + return func() { + transport.connectionsMx.Lock() + logger.Debug().Msg("removed connection") + delete(transport.connections, target) + transport.connectionsMx.Unlock() + } +} + +// readLoopTCPConn reads packets from conn and forwards notifications until an error occurs. +func (transport *Transport) readLoopTCPConn(conn net.Conn, logger zerolog.Logger) { + from := conn.RemoteAddr().String() + to := conn.LocalAddr().String() + go func() { for { packet, err := transport.decoder.DecodeNext(conn) if err != nil { - connectionLogger.Error().Stack().Err(err).Msg("decode") + logger.Error().Stack().Err(err).Msg("decode") transport.errChan <- err transport.SendFault() return } if transport.propagateFault && packet.Id() == 0 { - connectionLogger.Info().Msg("replicating packet with id 0 to all boards") + logger.Info().Msg("replicating packet with id 0 to all boards") err := transport.handlePacketEvent(NewPacketMessage(packet)) if err != nil { - connectionLogger.Error().Err(err).Msg("failed to replicate packet") + logger.Error().Err(err).Msg("failed to replicate packet") } } - from := conn.RemoteAddr().String() - to := conn.LocalAddr().String() - - connectionLogger.Trace().Type("type", packet).Msg("packet") + logger.Trace().Type("type", packet).Msg("packet") transport.api.Notification(NewPacketNotification(packet, from, to, time.Now())) } }() - - err := <-errChan - if err != nil { - connectionLogger.Error().Stack().Err(err).Msg("") - transport.errChan <- err - } - return err } + // SendMessage triggers an event to send something to the vehicle. Some messages // might additional means to pass information around (e.g. file read and write) func (transport *Transport) SendMessage(message abstraction.TransportMessage) error { @@ -233,8 +275,10 @@ func (transport *Transport) SendMessage(message abstraction.TransportMessage) er err = ErrUnrecognizedEvent{message.Event()} } // handlePacketEvent already sends the error through the channel, so this avoids duplicates - if _, ok := err.(ErrConnClosed); !ok { - transport.errChan <- err + if err != nil { + if _, ok := err.(ErrConnClosed); !ok { + transport.errChan <- err + } } return err } @@ -326,14 +370,18 @@ func (transport *Transport) handlePacketEvent(message PacketMessage) error { // handleFileWrite writes a file through tftp to the blcu func (transport *Transport) handleFileWrite(message FileWriteMessage) error { _, err := transport.tftp.WriteFile(message.Filename(), tftp.BinaryMode, message) - transport.errChan <- err + if err != nil { + transport.errChan <- err + } return err } // handleFileRead reads a file through tftp from the blcu func (transport *Transport) handleFileRead(message FileReadMessage) error { _, err := transport.tftp.ReadFile(message.Filename(), tftp.BinaryMode, message) - transport.errChan <- err + if err != nil { + transport.errChan <- err + } return err } diff --git a/backend/pkg/transport/transport_test.go b/backend/pkg/transport/transport_test.go index 58325f82a..0a1c9fd6f 100644 --- a/backend/pkg/transport/transport_test.go +++ b/backend/pkg/transport/transport_test.go @@ -89,12 +89,17 @@ type MockBoardServer struct { func NewMockBoardServer(address string) *MockBoardServer { logger := zerolog.Nop() + + enc := presentation.NewEncoder(binary.BigEndian, logger) + dec := presentation.NewDecoder(binary.BigEndian, logger) + wireTestPacketCodec(enc, dec, abstraction.PacketId(100)) + return &MockBoardServer{ address: address, connections: make([]net.Conn, 0), packetsRecv: make([]abstraction.Packet, 0), - encoder: presentation.NewEncoder(binary.BigEndian, logger), - decoder: presentation.NewDecoder(binary.BigEndian, logger), + encoder: enc, + decoder: dec, } } @@ -222,11 +227,18 @@ func (s *MockBoardServer) GetConnectionCount() int { // Test utilities func createTestTransport(t *testing.T) (*Transport, *TestTransportAPI) { - logger := zerolog.New(zerolog.NewTestWriter(t)).With().Timestamp().Logger() - + // if NewTestWriter(t) is used: background goroutines may log after the test ends and cause a panic + //logger := zerolog.New(zerolog.NewTestWriter(t)).With().Timestamp().Logger() + logger := zerolog.New(zerolog.Nop()).With().Timestamp().Logger() + + enc := presentation.NewEncoder(binary.BigEndian, logger) + dec := presentation.NewDecoder(binary.BigEndian, logger) + wireTestPacketCodec(enc, dec, abstraction.PacketId(100)) + + transport := NewTransport(logger). - WithEncoder(presentation.NewEncoder(binary.BigEndian, logger)). - WithDecoder(presentation.NewDecoder(binary.BigEndian, logger)) + WithEncoder(enc). + WithDecoder(dec) api := NewTestTransportAPI() transport.SetAPI(api) @@ -255,6 +267,20 @@ func waitForCondition(condition func() bool, timeout time.Duration, message stri return fmt.Errorf("timeout waiting for condition: %s", message) } +// test wiring: register a trivial codec for a data packet id. +func wireTestPacketCodec(enc *presentation.Encoder, dec *presentation.Decoder, id abstraction.PacketId) { + dataEnc := data.NewEncoder(binary.BigEndian) + dataDec := data.NewDecoder(binary.BigEndian) + + // Empty descriptor = no payload values, just the id header. + var desc data.Descriptor + dataEnc.SetDescriptor(id, desc) + dataDec.SetDescriptor(id, desc) + + enc.SetPacketEncoder(id, dataEnc) + dec.SetPacketDecoder(id, dataDec) +} + // Unit Tests func TestTransport_Creation(t *testing.T) { logger := zerolog.Nop() @@ -307,6 +333,42 @@ func TestTransport_SetTargetIp(t *testing.T) { } } +func TestTransport_InvalidInputs(t *testing.T) { + transport, _ := createTestTransport(t) + + // Test invalid ID input + err := transport.SetIdTarget(0, "") + if err == nil { + t.Errorf("Expected error for invalid ID input, got nil") + } + + // Test invalid IP input + err = transport.SetTargetIp("", "") + if err == nil { + t.Errorf("Expected error for invalid IP input, got nil") + } +} + +func TestTransport_RemoveTargets(t *testing.T) { + transport, _ := createTestTransport(t) + + // Add entries + transport.SetIdTarget(100, "TEST_BOARD") + transport.SetTargetIp("192.168.1.100", "TEST_BOARD") + + // Remove entries + delete(transport.idToTarget, 100) + delete(transport.ipToTarget, "192.168.1.100") + + // Verify removal + if _, exists := transport.idToTarget[100]; exists { + t.Errorf("Expected ID 100 to be removed, but it still exists") + } + if _, exists := transport.ipToTarget["192.168.1.100"]; exists { + t.Errorf("Expected IP 192.168.1.100 to be removed, but it still exists") + } +} + // Integration Tests func TestTransport_ClientServerConnection(t *testing.T) { transport, api := createTestTransport(t) @@ -437,7 +499,8 @@ func TestTransport_PacketSending(t *testing.T) { // Wait for connection err = waitForCondition(func() bool { - return mockBoard.GetConnectionCount() > 0 + updates := api.GetConnectionUpdates() + return len(updates) > 0 && updates[len(updates)-1].Target == target && updates[len(updates)-1].IsConnected }, 2*time.Second, "Should establish connection") if err != nil { t.Fatal(err)