-
Notifications
You must be signed in to change notification settings - Fork 7
Exploratory refactor-2 of libp2p + HTTP #103
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -15,16 +15,23 @@ import ( | |
| "github.com/ipld/go-ipld-prime" | ||
| "github.com/ipld/go-ipld-prime/codec/dagjson" | ||
| "github.com/ipld/go-ipld-prime/datamodel" | ||
| "github.com/ipld/go-ipld-prime/fluent" | ||
| "github.com/ipld/go-ipld-prime/linking" | ||
| cidlink "github.com/ipld/go-ipld-prime/linking/cid" | ||
| "github.com/ipld/go-ipld-prime/node/basicnode" | ||
| "github.com/ipld/go-ipld-prime/storage/memstore" | ||
| "github.com/ipld/go-ipld-prime/traversal" | ||
| selectorparse "github.com/ipld/go-ipld-prime/traversal/selector/parse" | ||
| "github.com/ipni/go-libipni/announce" | ||
| "github.com/ipni/go-libipni/announce/message" | ||
| "github.com/ipni/go-libipni/dagsync/httpsync" | ||
| "github.com/libp2p/go-libp2p" | ||
| "github.com/libp2p/go-libp2p/core/crypto" | ||
| "github.com/libp2p/go-libp2p/core/peer" | ||
| "github.com/libp2p/go-libp2p/core/protocol" | ||
| libp2phttp "github.com/libp2p/go-libp2p/p2p/http" | ||
| "github.com/multiformats/go-multiaddr" | ||
| "github.com/multiformats/go-multicodec" | ||
| "github.com/stretchr/testify/require" | ||
| ) | ||
|
|
||
|
|
@@ -96,6 +103,151 @@ func TestNewPublisherForListener(t *testing.T) { | |
| } | ||
| } | ||
|
|
||
| func TestPublisherWithLibp2pHTTP(t *testing.T) { | ||
| ctx := context.Background() | ||
| req := require.New(t) | ||
|
|
||
| publisherStore := &correctedMemStore{&memstore.Store{ | ||
| Bag: make(map[string][]byte), | ||
| }} | ||
| publisherLsys := cidlink.DefaultLinkSystem() | ||
| publisherLsys.TrustedStorage = true | ||
| publisherLsys.SetReadStorage(publisherStore) | ||
| publisherLsys.SetWriteStorage(publisherStore) | ||
|
|
||
| privKey, _, err := crypto.GenerateKeyPairWithReader(crypto.Ed25519, 256, rand.Reader) | ||
| req.NoError(err) | ||
|
|
||
| publisher, err := httpsync.NewPublisherHandler(publisherLsys, privKey) | ||
| req.NoError(err) | ||
|
|
||
| // Use same identity as publisher. This is necessary so that same ID that | ||
| // the publisher uses to sign head/ query responses is the same as the ID | ||
| // used to identify the publisherStreamHost. Otherwise, it would be | ||
| // necessary for the sync client to know both IDs: one for the stream host | ||
| // to connect to, and one for the publisher to validate the dignatuse with. | ||
| publisherStreamHost, err := libp2p.New(libp2p.Identity(privKey), libp2p.ListenAddrStrings("/ip4/127.0.0.1/tcp/0")) | ||
| req.NoError(err) | ||
|
|
||
| // This is the "HTTP Host". It's like the libp2p "stream host" (aka core | ||
| // host.Host), but it uses HTTP semantics instead of stream semantics. | ||
| // | ||
| // You can pass in options on creation like a stream host to do HTTP over | ||
| // libp2p streams, and multiaddrs to create listeners on. | ||
| publisherHost, err := libp2phttp.New( | ||
| libp2phttp.StreamHost(publisherStreamHost), | ||
| libp2phttp.ListenAddrs([]multiaddr.Multiaddr{multiaddr.StringCast("/ip4/127.0.0.1/tcp/0/http")}), | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This might be is obvious, but I forgot to note this here: In production this MUST be an |
||
| ) | ||
| req.NoError(err) | ||
|
|
||
| go publisherHost.Serve() | ||
| defer publisherHost.Close() | ||
|
|
||
| protoID := protocol.ID("/ipni-sync/1") | ||
|
|
||
| serverStreamMa := publisherHost.Addrs()[0] | ||
| serverHTTPMa := publisherHost.Addrs()[1] | ||
| req.Contains(serverHTTPMa.String(), "/http") | ||
|
|
||
| t.Log("libp2p stream server address:", serverStreamMa.String()) | ||
| t.Log("libp2p http server address:", serverHTTPMa.String()) | ||
|
|
||
| // Here is where we attach our request handler. Note that we are mounting | ||
| // the "/ipni-sync/1" protocol at /ipni/. libp2phttp manages this mapping | ||
| // and clients can learn about the mapping at .well-known/libp2p. | ||
| // | ||
| // In this case we also want out HTTP handler to not even know about the | ||
| // prefix, so we use the stdlib http.StripPrefix. | ||
| publisherHost.SetHttpHandlerAtPath(protoID, "/ipni/", http.StripPrefix("/ipni/", publisher)) | ||
|
|
||
| link, err := publisherLsys.Store( | ||
| ipld.LinkContext{Ctx: ctx}, | ||
| cidlink.LinkPrototype{ | ||
| Prefix: cid.Prefix{ | ||
| Version: 1, | ||
| Codec: uint64(multicodec.DagJson), | ||
| MhType: uint64(multicodec.Sha2_256), | ||
| MhLength: -1, | ||
| }, | ||
| }, | ||
| fluent.MustBuildMap(basicnode.Prototype.Map, 4, func(na fluent.MapAssembler) { | ||
| na.AssembleEntry("fish").AssignString("lobster") | ||
| na.AssembleEntry("fish1").AssignString("lobster1") | ||
| na.AssembleEntry("fish2").AssignString("lobster2") | ||
| na.AssembleEntry("fish0").AssignString("lobster0") | ||
| })) | ||
| req.NoError(err) | ||
| publisher.SetRoot(link.(cidlink.Link).Cid) | ||
|
|
||
| testCases := []struct { | ||
| name string | ||
| publisher peer.AddrInfo | ||
| newClientHost func(t *testing.T) *libp2phttp.HTTPHost | ||
| }{ | ||
| { | ||
| "HTTP transport", | ||
| peer.AddrInfo{Addrs: []multiaddr.Multiaddr{serverHTTPMa}}, | ||
| func(t *testing.T) *libp2phttp.HTTPHost { | ||
| clientHost, err := libp2phttp.New() | ||
| req.NoError(err) | ||
| return clientHost | ||
| }, | ||
| }, | ||
| { | ||
| "libp2p stream transport", | ||
| peer.AddrInfo{ID: publisherStreamHost.ID(), Addrs: []multiaddr.Multiaddr{serverStreamMa}}, | ||
| func(t *testing.T) *libp2phttp.HTTPHost { | ||
| clientStreamHost, err := libp2p.New(libp2p.NoListenAddrs) | ||
| req.NoError(err) | ||
| clientHost, err := libp2phttp.New(libp2phttp.StreamHost(clientStreamHost)) | ||
| req.NoError(err) | ||
| return clientHost | ||
| }, | ||
| }, | ||
| } | ||
|
|
||
| for _, tc := range testCases { | ||
| t.Run(tc.name, func(t *testing.T) { | ||
| // Plumbing to set up the test. | ||
| clientStore := &correctedMemStore{&memstore.Store{ | ||
| Bag: make(map[string][]byte), | ||
| }} | ||
| clientLsys := cidlink.DefaultLinkSystem() | ||
| clientLsys.TrustedStorage = true | ||
| clientLsys.SetReadStorage(clientStore) | ||
| clientLsys.SetWriteStorage(clientStore) | ||
| clientSync := httpsync.NewLibp2pSync(clientLsys, tc.newClientHost(t), protoID, nil) | ||
|
|
||
| // In a dagsync Subscriber, the clientSync is created once and | ||
| // lives for the lifetime of the Subscriber (lifetime of indexer), | ||
| // The clientSyncer is created for each sync operation and only | ||
| // lives for the duration of the sync. The publisher's address may | ||
| // change from one sync to the next, and we do not know the | ||
| // addresses ahead of time. | ||
| clientSyncer, err := clientSync.NewSyncer(tc.publisher.ID, tc.publisher.Addrs) | ||
| req.NoError(err) | ||
|
|
||
| headCid, err := clientSyncer.GetHead(ctx) | ||
| req.NoError(err) | ||
|
|
||
| req.Equal(link.(cidlink.Link).Cid, headCid) | ||
|
|
||
| clientSyncer.Sync(ctx, headCid, selectorparse.CommonSelector_MatchPoint) | ||
| require.NoError(t, err) | ||
|
|
||
| // Assert that data is loadable from the link system. | ||
| wantLink := cidlink.Link{Cid: headCid} | ||
| node, err := clientLsys.Load(ipld.LinkContext{Ctx: ctx}, wantLink, basicnode.Prototype.Any) | ||
| require.NoError(t, err) | ||
|
|
||
| // Assert synced node link matches the computed link, i.e. is spec-compliant. | ||
| gotLink, err := clientLsys.ComputeLink(wantLink.Prototype(), node) | ||
| require.NoError(t, err) | ||
| require.Equal(t, gotLink, wantLink, "computed %s but got %s", gotLink.String(), wantLink.String()) | ||
| }) | ||
| } | ||
| } | ||
|
|
||
| func mapKeys(t *testing.T, n ipld.Node) []string { | ||
| var keys []string | ||
| require.Equal(t, n.Kind(), datamodel.Kind_Map) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,6 +21,8 @@ import ( | |
| "github.com/ipni/go-libipni/maurl" | ||
| ic "github.com/libp2p/go-libp2p/core/crypto" | ||
| "github.com/libp2p/go-libp2p/core/peer" | ||
| "github.com/libp2p/go-libp2p/core/protocol" | ||
| libp2phttp "github.com/libp2p/go-libp2p/p2p/http" | ||
| "github.com/multiformats/go-multiaddr" | ||
| "github.com/multiformats/go-multihash" | ||
| ) | ||
|
|
@@ -34,6 +36,10 @@ type Sync struct { | |
| blockHook func(peer.ID, cid.Cid) | ||
| client *http.Client | ||
| lsys ipld.LinkSystem | ||
|
|
||
| // libp2phttp | ||
| clientHost *libp2phttp.HTTPHost | ||
| protoID protocol.ID | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This can probably be a constant value here. It is simply some identifier for this protocol. Maybe |
||
| } | ||
|
|
||
| // NewSync creates a new Sync. | ||
|
|
@@ -50,19 +56,82 @@ func NewSync(lsys ipld.LinkSystem, client *http.Client, blockHook func(peer.ID, | |
| } | ||
| } | ||
|
|
||
| // NewSyncer creates a new Syncer to use for a single sync operation against a peer. | ||
| var errHeadFromUnexpectedPeer = errors.New("found head signed from an unexpected peer") | ||
|
|
||
| // Syncer provides sync functionality for a single sync with a peer. | ||
| type Syncer struct { | ||
| client *http.Client | ||
| peerID peer.ID | ||
| rootURL url.URL | ||
| urls []*url.URL | ||
| sync *Sync | ||
| } | ||
|
|
||
| func NewLibp2pSync(lsys ipld.LinkSystem, clientHost *libp2phttp.HTTPHost, protoID protocol.ID, blockHook func(peer.ID, cid.Cid)) *Sync { | ||
| return &Sync{ | ||
| blockHook: blockHook, | ||
| lsys: lsys, | ||
|
|
||
| clientHost: clientHost, | ||
| protoID: protoID, | ||
| } | ||
| } | ||
|
|
||
| // NewSyncer creates a new Syncer to use for a single sync operation against a | ||
| // peer. A value for peerID is optional for the HTTP transport. | ||
| // | ||
| // TODO: Replace arguments with peer.AddrInfo | ||
| func (s *Sync) NewSyncer(peerID peer.ID, peerAddrs []multiaddr.Multiaddr) (*Syncer, error) { | ||
| urls := make([]*url.URL, len(peerAddrs)) | ||
| for i := range peerAddrs { | ||
| peerInfo := peer.AddrInfo{ | ||
| ID: peerID, | ||
| Addrs: peerAddrs, | ||
| } | ||
| if s.clientHost != nil { | ||
| return s.newLibp2pSyncer(peerInfo) | ||
| } | ||
| return s.newSyncer(peerInfo) | ||
| } | ||
|
|
||
| func (s *Sync) newLibp2pSyncer(peerInfo peer.AddrInfo) (*Syncer, error) { | ||
| httpClient, err := s.clientHost.NamespacedClient(s.protoID, peerInfo) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is the best go-libp2p api to expose optional server authentication to users? My original idea was to determine if a user wanted to authenticate the server if
* A user might not know if the multiaddr they are passing is an HTTP transport or Stream transport.
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think having an option to allow authentication to be bypassed works. This is also not an unfamailiar thing to do, as it is similar to |
||
| if err != nil { | ||
| return nil, err | ||
| } | ||
|
|
||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just a note, if we see an
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @MarcoPolo This would work for the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe I'm misunderstanding something. In the current version of master we have the notion of a syncer's rootURL. This represents the prefix that the IPNI protocol is mounted at. If we want to GET the head we do I'm noting here that we could save a round trip to .well-known/libp2p telling us the prefix by using the rootURL. We would do something like You probably need to do this for backwards compatibility as well as an optimization, since existing deployed publishers won't have set up a I don't quite understand why you'd want to do a prefix match, what am I missing?
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am probably not full understanding the correct use. The code immediately following this comment is my attempt to use this feature. Maybe I need to omit the "head" or any other thing following the root prefix? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah okay. I think I see. You don't actually need to do any of the below. The path in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. https://gist.github.com/MarcoPolo/30c5c138a126473a949ab1879c16c37a We learn about this prefix out-of-band by using the |
||
| // Cache protocol mapping for this well-known path. | ||
| if peerInfo.ID != "" { | ||
| protos, err := s.clientHost.GetAndStorePeerProtoMap(httpClient.Transport, peerInfo.ID) | ||
| if err == nil { | ||
| meta := libp2phttp.WellKnownProtocolMeta{ | ||
| Path: "/head", | ||
| } | ||
| protos[s.protoID] = meta | ||
| s.clientHost.AddPeerMetadata(peerInfo.ID, protos) | ||
| } | ||
| } | ||
|
|
||
| return &Syncer{ | ||
| client: &httpClient, | ||
| peerID: peerInfo.ID, | ||
| rootURL: url.URL{Path: "/"}, | ||
| urls: nil, | ||
| sync: s, | ||
| }, nil | ||
| } | ||
|
|
||
| func (s *Sync) newSyncer(peerInfo peer.AddrInfo) (*Syncer, error) { | ||
| urls := make([]*url.URL, len(peerInfo.Addrs)) | ||
| for i, addr := range peerInfo.Addrs { | ||
| var err error | ||
| urls[i], err = maurl.ToURL(peerAddrs[i]) | ||
| urls[i], err = maurl.ToURL(addr) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| } | ||
|
|
||
| return &Syncer{ | ||
| peerID: peerID, | ||
| client: s.client, | ||
| peerID: peerInfo.ID, | ||
| rootURL: *urls[0], | ||
| urls: urls[1:], | ||
| sync: s, | ||
|
|
@@ -73,16 +142,6 @@ func (s *Sync) Close() { | |
| s.client.CloseIdleConnections() | ||
| } | ||
|
|
||
| var errHeadFromUnexpectedPeer = errors.New("found head signed from an unexpected peer") | ||
|
|
||
| // Syncer provides sync functionality for a single sync with a peer. | ||
| type Syncer struct { | ||
| peerID peer.ID | ||
| rootURL url.URL | ||
| urls []*url.URL | ||
| sync *Sync | ||
| } | ||
|
|
||
| // GetHead fetches the head of the peer's advertisement chain. | ||
| func (s *Syncer) GetHead(ctx context.Context) (cid.Cid, error) { | ||
| var head cid.Cid | ||
|
|
@@ -102,7 +161,9 @@ func (s *Syncer) GetHead(ctx context.Context) (cid.Cid, error) { | |
| return cid.Undef, err | ||
| } | ||
|
|
||
| if peerIDFromSig != s.peerID { | ||
| if s.peerID == "" { | ||
| log.Warn("cannot verify publisher signature without peer ID") | ||
| } else if peerIDFromSig != s.peerID { | ||
| return cid.Undef, errHeadFromUnexpectedPeer | ||
| } | ||
|
|
||
|
|
@@ -136,7 +197,7 @@ func (s *Syncer) Sync(ctx context.Context, nextCid cid.Cid, sel ipld.Node) error | |
| } | ||
| } | ||
|
|
||
| s.sync.client.CloseIdleConnections() | ||
| s.client.CloseIdleConnections() | ||
| return nil | ||
| } | ||
|
|
||
|
|
@@ -205,7 +266,7 @@ nextURL: | |
| return err | ||
| } | ||
|
|
||
| resp, err := s.sync.client.Do(req) | ||
| resp, err := s.client.Do(req) | ||
| if err != nil { | ||
| if len(s.urls) != 0 { | ||
| log.Errorw("Fetch request failed, will retry with next address", "err", err) | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.