diff --git a/pkg/netc/name.go b/pkg/netc/name.go index 7e75fb9..d5d5125 100644 --- a/pkg/netc/name.go +++ b/pkg/netc/name.go @@ -3,9 +3,11 @@ package netc import ( "crypto/rand" "encoding/base32" + "encoding/binary" "fmt" "io" "strings" + "time" ) var DNSSECEncoding = base32.NewEncoding("0123456789abcdefghijklmnopqrstuv").WithPadding(base32.NoPadding) @@ -25,3 +27,13 @@ func GenName() string { } return DNSSECEncoding.EncodeToString(data) } + +func GenTimestampName() string { + data := make([]byte, 32) + if _, err := io.ReadFull(rand.Reader, data); err != nil { + panic(err) + } + nanos := time.Now().UnixNano() + binary.BigEndian.PutUint64(data, uint64(nanos)) + return DNSSECEncoding.EncodeToString(data) +} diff --git a/server/control/clients.go b/server/control/clients.go index 30a81ea..6784d73 100644 --- a/server/control/clients.go +++ b/server/control/clients.go @@ -69,9 +69,11 @@ func newClientServer( return nil, fmt.Errorf("client snapshot: %w", err) } - reactivate := map[ClientConnKey][]ClientPeerKey{} + reactivate := map[ClientID]reactivateValue{} for _, msg := range connsMsgs { - reactivate[msg.Key] = []ClientPeerKey{} + v := reactivate[msg.Key.ID] + v.conns = append(v.conns, msg.Key) + reactivate[msg.Key.ID] = v } peersMsgs, peersOffset, err := peers.Snapshot() @@ -79,16 +81,17 @@ func newClientServer( return nil, fmt.Errorf("client peers snapshot: %w", err) } - peersCache := map[cacheKey][]*pbclient.RemotePeer{} + peersCache := map[peerKey][]peerValue{} for _, msg := range peersMsgs { - if reactivePeers, ok := reactivate[ClientConnKey{msg.Key.ID}]; ok { - key := cacheKey{msg.Key.Endpoint, msg.Key.Role} - peersCache[key] = append(peersCache[key], &pbclient.RemotePeer{ + if reactivePeers, ok := reactivate[msg.Key.ID]; ok { + key := peerKey{msg.Key.Endpoint, msg.Key.Role} + peersCache[key] = append(peersCache[key], peerValue{msg.Key.ConnID, &pbclient.RemotePeer{ Id: msg.Key.ID.string, Metadata: msg.Value.Metadata, Peer: msg.Value.Peer, - }) - reactivate[ClientConnKey{msg.Key.ID}] = append(reactivePeers, msg.Key) + }}) + reactivePeers.peers = append(reactivePeers.peers, msg.Key) + reactivate[msg.Key.ID] = reactivePeers } else { logger.Warn("peer without corresponding client, deleting", "endpoint", msg.Key.Endpoint, "role", msg.Key.Role, "id", msg.Key.ID) if err := peers.Del(msg.Key); err != nil { @@ -154,44 +157,67 @@ type clientServer struct { conns logc.KV[ClientConnKey, ClientConnValue] peers logc.KV[ClientPeerKey, ClientPeerValue] - peersCache map[cacheKey][]*pbclient.RemotePeer + peersCache map[peerKey][]peerValue peersOffset int64 peersMu sync.RWMutex - reactivate map[ClientConnKey][]ClientPeerKey + reactivate map[ClientID]reactivateValue reactivateMu sync.RWMutex } -func (s *clientServer) connected(id ClientID, auth ClientAuthentication, remote net.Addr, metadata string) error { +type peerKey struct { + endpoint model.Endpoint + role model.Role +} + +type peerValue struct { + connID ConnID + peer *pbclient.RemotePeer +} + +type reactivateValue struct { + conns []ClientConnKey + peers []ClientPeerKey +} + +func (s *clientServer) connected(id ClientID, connID ConnID, auth ClientAuthentication, remote net.Addr, metadata string) error { s.reactivateMu.Lock() - delete(s.reactivate, ClientConnKey{id}) + delete(s.reactivate, id) s.reactivateMu.Unlock() - return s.conns.Put(ClientConnKey{id}, ClientConnValue{auth, remote.String(), metadata}) + return s.conns.Put(ClientConnKey{id, connID}, ClientConnValue{auth, remote.String(), metadata}) } -func (s *clientServer) disconnected(id ClientID) error { - return s.conns.Del(ClientConnKey{id}) +func (s *clientServer) disconnected(id ClientID, connID ConnID) error { + return s.conns.Del(ClientConnKey{id, connID}) } -func (s *clientServer) announce(endpoint model.Endpoint, role model.Role, id ClientID, metadata string, peer *pbclient.Peer) error { - return s.peers.Put(ClientPeerKey{endpoint, role, id}, ClientPeerValue{peer, metadata}) +func (s *clientServer) announce(endpoint model.Endpoint, role model.Role, id ClientID, connID ConnID, metadata string, peer *pbclient.Peer) error { + return s.peers.Put(ClientPeerKey{endpoint, role, id, connID}, ClientPeerValue{peer, metadata}) } -func (s *clientServer) revoke(endpoint model.Endpoint, role model.Role, id ClientID) error { - return s.peers.Del(ClientPeerKey{endpoint, role, id}) +func (s *clientServer) revoke(endpoint model.Endpoint, role model.Role, id ClientID, connID ConnID) error { + return s.peers.Del(ClientPeerKey{endpoint, role, id, connID}) } -func (s *clientServer) cachedPeers(endpoint model.Endpoint, role model.Role) ([]*pbclient.RemotePeer, int64) { +func (s *clientServer) cachedPeers(endpoint model.Endpoint, role model.Role) ([]peerValue, int64) { s.peersMu.RLock() defer s.peersMu.RUnlock() - return slices.Clone(s.peersCache[cacheKey{endpoint, role}]), s.peersOffset + return slices.Clone(s.peersCache[peerKey{endpoint, role}]), s.peersOffset } func (s *clientServer) listen(ctx context.Context, endpoint model.Endpoint, role model.Role, notify func(peers []*pbclient.RemotePeer) error) error { peers, offset := s.cachedPeers(endpoint, role) - if err := notify(peers); err != nil { + doNotify := func() error { + uniquePeers := map[string]*pbclient.RemotePeer{} + for _, peer := range peers { + uniquePeers[peer.peer.Id] = peer.peer + } + + return notify(slices.Collect(maps.Values(uniquePeers))) + } + if err := doNotify(); err != nil { return err } @@ -208,16 +234,18 @@ func (s *clientServer) listen(ctx context.Context, endpoint model.Endpoint, role } if msg.Delete { - peers = slices.DeleteFunc(peers, func(peer *pbclient.RemotePeer) bool { - return peer.Id == msg.Key.ID.string + peers = slices.DeleteFunc(peers, func(peer peerValue) bool { + return peer.peer.Id == msg.Key.ID.string && peer.connID == msg.Key.ConnID }) } else { - npeer := &pbclient.RemotePeer{ + npeer := peerValue{msg.Key.ConnID, &pbclient.RemotePeer{ Id: msg.Key.ID.string, Metadata: msg.Value.Metadata, Peer: msg.Value.Peer, - } - idx := slices.IndexFunc(peers, func(peer *pbclient.RemotePeer) bool { return peer.Id == msg.Key.ID.string }) + }} + idx := slices.IndexFunc(peers, func(peer peerValue) bool { + return peer.peer.Id == msg.Key.ID.string && peer.connID == msg.Key.ConnID + }) if idx >= 0 { peers[idx] = npeer } else { @@ -230,7 +258,7 @@ func (s *clientServer) listen(ctx context.Context, endpoint model.Endpoint, role offset = nextOffset if changed { - if err := notify(peers); err != nil { + if err := doNotify(); err != nil { return err } } @@ -320,11 +348,11 @@ func (s *clientServer) runPeerCache(ctx context.Context) error { s.peersMu.Lock() defer s.peersMu.Unlock() - key := cacheKey{msg.Key.Endpoint, msg.Key.Role} + key := peerKey{msg.Key.Endpoint, msg.Key.Role} peers := s.peersCache[key] if msg.Delete { - peers = slices.DeleteFunc(peers, func(peer *pbclient.RemotePeer) bool { - return peer.Id == msg.Key.ID.string + peers = slices.DeleteFunc(peers, func(peer peerValue) bool { + return peer.peer.Id == msg.Key.ID.string && peer.connID == msg.Key.ConnID }) if len(peers) == 0 { delete(s.peersCache, key) @@ -332,12 +360,14 @@ func (s *clientServer) runPeerCache(ctx context.Context) error { s.peersCache[key] = peers } } else { - npeer := &pbclient.RemotePeer{ + npeer := peerValue{msg.Key.ConnID, &pbclient.RemotePeer{ Id: msg.Key.ID.string, Metadata: msg.Value.Metadata, Peer: msg.Value.Peer, - } - idx := slices.IndexFunc(peers, func(peer *pbclient.RemotePeer) bool { return peer.Id == msg.Key.ID.string }) + }} + idx := slices.IndexFunc(peers, func(peer peerValue) bool { + return peer.peer.Id == msg.Key.ID.string && peer.connID == msg.Key.ConnID + }) if idx >= 0 { peers[idx] = npeer } else { @@ -384,13 +414,15 @@ func (s *clientServer) runCleaner(ctx context.Context) error { s.reactivateMu.Lock() defer s.reactivateMu.Unlock() - for key, peers := range s.reactivate { - s.logger.Warn("force disconnecting client", "id", key.ID) - if err := s.disconnected(key.ID); err != nil { - return err + for key, value := range s.reactivate { + s.logger.Warn("force disconnecting client", "id", key) + for _, conn := range value.conns { + if err := s.disconnected(conn.ID, conn.ConnID); err != nil { + return err + } } - for _, peer := range peers { - if err := s.revoke(peer.Endpoint, peer.Role, peer.ID); err != nil { + for _, peer := range value.peers { + if err := s.revoke(peer.Endpoint, peer.Role, peer.ID, peer.ConnID); err != nil { return err } } @@ -423,6 +455,7 @@ type clientConn struct { server *clientServer conn *quic.Conn logger *slog.Logger + connID ConnID clientConnAuth } @@ -460,16 +493,17 @@ func (c *clientConn) runErr(ctx context.Context) error { } else { c.clientConnAuth = *auth c.logger = c.logger.With("client-id", c.id) + c.connID = NewConnID() } c.logger.Info("client connected", "addr", c.conn.RemoteAddr(), "metadata", c.metadata) defer c.logger.Info("client disconnected", "addr", c.conn.RemoteAddr(), "metadata", c.metadata) - if err := c.server.connected(c.id, c.auth, c.conn.RemoteAddr(), c.metadata); err != nil { + if err := c.server.connected(c.id, c.connID, c.auth, c.conn.RemoteAddr(), c.metadata); err != nil { return err } defer func() { - if err := c.server.disconnected(c.id); err != nil { + if err := c.server.disconnected(c.id, c.connID); err != nil { c.logger.Warn("failed to disconnect client", "id", c.id, "err", err) } }() @@ -622,11 +656,11 @@ func (s *clientStream) announce(ctx context.Context, req *pbclient.Request_Annou return err } - if err := s.conn.server.announce(endpoint, role, s.conn.id, s.conn.metadata, req.Peer); err != nil { + if err := s.conn.server.announce(endpoint, role, s.conn.id, s.conn.connID, s.conn.metadata, req.Peer); err != nil { return err } defer func() { - if err := s.conn.server.revoke(endpoint, role, s.conn.id); err != nil { + if err := s.conn.server.revoke(endpoint, role, s.conn.id, s.conn.connID); err != nil { s.conn.logger.Warn("failed to revoke client", "id", s.conn.id, "err", err) } }() @@ -655,7 +689,7 @@ func (s *clientStream) announce(ctx context.Context, req *pbclient.Request_Annou return err } - if err := s.conn.server.announce(endpoint, role, s.conn.id, s.conn.metadata, req.Announce.Peer); err != nil { + if err := s.conn.server.announce(endpoint, role, s.conn.id, s.conn.connID, s.conn.metadata, req.Announce.Peer); err != nil { return err } } diff --git a/server/control/conn_id.go b/server/control/conn_id.go new file mode 100644 index 0000000..d941178 --- /dev/null +++ b/server/control/conn_id.go @@ -0,0 +1,38 @@ +package control + +import ( + "encoding/binary" + "time" + + "github.com/connet-dev/connet/pkg/netc" +) + +type ConnID struct{ string } + +var ConnIDNil = ConnID{""} + +func NewConnID() ConnID { + return ConnID{netc.GenTimestampName()} +} + +func (k ConnID) String() string { + return k.string +} + +func (k ConnID) Time() time.Time { + data, err := netc.DNSSECEncoding.DecodeString(k.string) + if err != nil { + return time.Time{} + } + nanos := binary.BigEndian.Uint64(data) + return time.Unix(0, int64(nanos)) +} + +func (k ConnID) MarshalText() ([]byte, error) { + return []byte(k.string), nil +} + +func (k *ConnID) UnmarshalText(b []byte) error { + k.string = string(b) + return nil +} diff --git a/server/control/server.go b/server/control/server.go index ea32f64..438fd7a 100644 --- a/server/control/server.go +++ b/server/control/server.go @@ -106,6 +106,7 @@ func (s *Server) getClients() (map[string]StatusClient, error) { for _, msg := range clientMsgs { clients[msg.Key.ID.string] = StatusClient{ ID: msg.Key.ID, + ConnID: msg.Key.ConnID, Address: msg.Value.Addr, Metadata: msg.Value.Metadata, } @@ -127,9 +128,9 @@ func (s *Server) getEndpoints() (map[string]StatusEndpoint, error) { switch msg.Key.Role { case model.Destination: - ep.Destinations = append(ep.Destinations, msg.Key.ID) + ep.Destinations = append(ep.Destinations, StatusEndpointRemote{msg.Key.ID, msg.Key.ConnID}) case model.Source: - ep.Sources = append(ep.Sources, msg.Key.ID) + ep.Sources = append(ep.Sources, StatusEndpointRemote{msg.Key.ID, msg.Key.ConnID}) default: return nil, fmt.Errorf("unknown role: %s", msg.Key.Role) } @@ -177,14 +178,20 @@ type StatusIngress struct { type StatusClient struct { ID ClientID `json:"id"` + ConnID ConnID `json:"conn_id"` Address string `json:"address"` Metadata string `json:"metadata"` } type StatusEndpoint struct { - Endpoint model.Endpoint `json:"endpoint"` - Destinations []ClientID `json:"destinations"` - Sources []ClientID `json:"sources"` + Endpoint model.Endpoint `json:"endpoint"` + Destinations []StatusEndpointRemote `json:"destinations"` + Sources []StatusEndpointRemote `json:"sources"` +} + +type StatusEndpointRemote struct { + ID ClientID `json:"id"` + ConnID ConnID `json:"conn_id"` } type StatusRelay struct { diff --git a/server/control/store.go b/server/control/store.go index 0638047..c347f69 100644 --- a/server/control/store.go +++ b/server/control/store.go @@ -74,7 +74,8 @@ type ConfigValue struct { } type ClientConnKey struct { - ID ClientID `json:"id"` + ID ClientID `json:"id"` + ConnID ConnID `json:"conn_id"` } type ClientConnValue struct { @@ -87,6 +88,7 @@ type ClientPeerKey struct { Endpoint model.Endpoint `json:"endpoint"` Role model.Role `json:"role"` ID ClientID `json:"id"` // TODO consider using the server cert key or peer id + ConnID ConnID `json:"conn_id"` } type ClientPeerValue struct { @@ -94,11 +96,6 @@ type ClientPeerValue struct { Metadata string `json:"metadata"` } -type cacheKey struct { - endpoint model.Endpoint - role model.Role -} - type RelayConnKey struct { ID RelayID `json:"id"` }