diff --git a/core/coreapi/dht.go b/core/coreapi/dht.go index a27628b4229..a9d0812093c 100644 --- a/core/coreapi/dht.go +++ b/core/coreapi/dht.go @@ -10,10 +10,7 @@ import ( dag "github.com/ipfs/go-ipfs/merkledag" routing "gx/ipfs/QmTiWLZ6Fo5j4KcTVutZJ5KWRRJrbxzmxA4td8NfEdrPh7/go-libp2p-routing" - notif "gx/ipfs/QmTiWLZ6Fo5j4KcTVutZJ5KWRRJrbxzmxA4td8NfEdrPh7/go-libp2p-routing/notifications" - ma "gx/ipfs/QmWWQ2Txc2c6tqjsBpzg5Ar652cHPGNsQQp2SejkNmkUMb/go-multiaddr" pstore "gx/ipfs/QmXauCuJzmzapetmC6W4TuDJLL1yFFrVzSHoWv8YdbmnxH/go-libp2p-peerstore" - ipdht "gx/ipfs/QmY1y2M1aCcVhy8UuTbZJBvuFbegZm47f9cDAdgxiehQfx/go-libp2p-kad-dht" peer "gx/ipfs/QmZoWKhxUmZ2seW4BzX6fJkNR8hh9PsGModr7q171yq2SS/go-libp2p-peer" cid "gx/ipfs/QmcZfnkapfECQGcLZaf9B79NRg7cRa9EnZh4LSbkCzwNvY/go-cid" ipld "gx/ipfs/Qme5bWv7wtjUNGsK2BNGVUFPKiuxWrsqrtvYwCLRw8YFES/go-ipld-format" @@ -23,73 +20,16 @@ var ErrNotDHT = errors.New("routing service is not a DHT") type DhtAPI CoreAPI -func (api *DhtAPI) FindPeer(ctx context.Context, p peer.ID) (<-chan ma.Multiaddr, error) { - dht, ok := api.node.Routing.(*ipdht.IpfsDHT) - if !ok { - return nil, ErrNotDHT - } - - outChan := make(chan ma.Multiaddr) - events := make(chan *notif.QueryEvent) - ctx = notif.RegisterForQueryEvents(ctx, events) - - go func() { - defer close(outChan) - - sendAddrs := func(responses []*pstore.PeerInfo) error { - for _, response := range responses { - for _, addr := range response.Addrs { - select { - case outChan <- addr: - case <-ctx.Done(): - return ctx.Err() - } - } - } - return nil - } - - for event := range events { - if event.Type == notif.FinalPeer { - err := sendAddrs(event.Responses) - if err != nil { - return - } - } - } - }() - - go func() { - defer close(events) - pi, err := dht.FindPeer(ctx, peer.ID(p)) - if err != nil { - notif.PublishQueryEvent(ctx, ¬if.QueryEvent{ - Type: notif.QueryError, - Extra: err.Error(), - }) - return - } - - notif.PublishQueryEvent(ctx, ¬if.QueryEvent{ - Type: notif.FinalPeer, - Responses: []*pstore.PeerInfo{&pi}, - }) - }() - - return outChan, nil +func (api *DhtAPI) FindPeer(ctx context.Context, p peer.ID) (pstore.PeerInfo, error) { + return api.node.Routing.FindPeer(ctx, p) } -func (api *DhtAPI) FindProviders(ctx context.Context, p coreiface.Path, opts ...caopts.DhtFindProvidersOption) (<-chan peer.ID, error) { +func (api *DhtAPI) FindProviders(ctx context.Context, p coreiface.Path, opts ...caopts.DhtFindProvidersOption) (<-chan pstore.PeerInfo, error) { settings, err := caopts.DhtFindProvidersOptions(opts...) if err != nil { return nil, err } - dht, ok := api.node.Routing.(*ipdht.IpfsDHT) - if !ok { - return nil, ErrNotDHT - } - p, err = api.core().ResolvePath(ctx, p) if err != nil { return nil, err @@ -99,50 +39,10 @@ func (api *DhtAPI) FindProviders(ctx context.Context, p coreiface.Path, opts ... numProviders := settings.NumProviders if numProviders < 1 { - return nil, fmt.Errorf("number of providers must be greater than 0") + return nil, fmt.Errorf("number of providers to find must be greater than 0") } - outChan := make(chan peer.ID) - events := make(chan *notif.QueryEvent) - ctx = notif.RegisterForQueryEvents(ctx, events) - - pchan := dht.FindProvidersAsync(ctx, c, numProviders) - go func() { - defer close(outChan) - - sendProviders := func(responses []*pstore.PeerInfo) error { - for _, response := range responses { - select { - case outChan <- response.ID: - case <-ctx.Done(): - return ctx.Err() - } - } - return nil - } - - for event := range events { - if event.Type == notif.Provider { - err := sendProviders(event.Responses) - if err != nil { - return - } - } - } - }() - - go func() { - defer close(events) - for p := range pchan { - np := p - notif.PublishQueryEvent(ctx, ¬if.QueryEvent{ - Type: notif.Provider, - Responses: []*pstore.PeerInfo{&np}, - }) - } - }() - - return outChan, nil + return api.node.Routing.FindProvidersAsync(ctx, c, numProviders), nil } func (api *DhtAPI) Provide(ctx context.Context, path coreiface.Path, opts ...caopts.DhtProvideOption) error { @@ -155,43 +55,19 @@ func (api *DhtAPI) Provide(ctx context.Context, path coreiface.Path, opts ...cao return errors.New("cannot provide in offline mode") } - if len(api.node.PeerHost.Network().Conns()) == 0 { - return errors.New("cannot provide, no connected peers") - } - - c := path.Cid() - - has, err := api.node.Blockstore.Has(c) + has, err := api.node.Blockstore.Has(path.Cid()) if err != nil { return err } if !has { - return fmt.Errorf("block %s not found locally, cannot provide", c) + return fmt.Errorf("block %s not found locally, cannot provide", path.Cid()) } - //TODO: either remove or use - //outChan := make(chan interface{}) - - //events := make(chan *notif.QueryEvent) - //ctx = notif.RegisterForQueryEvents(ctx, events) - - /*go func() { - defer close(outChan) - for range events { - select { - case <-ctx.Done(): - return - default: - } - } - }()*/ - - //defer close(events) if settings.Recursive { - err = provideKeysRec(ctx, api.node.Routing, api.node.DAG, []*cid.Cid{c}) + err = provideKeysRec(ctx, api.node.Routing, api.node.DAG, []*cid.Cid{path.Cid()}) } else { - err = provideKeys(ctx, api.node.Routing, []*cid.Cid{c}) + err = provideKeys(ctx, api.node.Routing, []*cid.Cid{path.Cid()}) } if err != nil { return err @@ -211,10 +87,11 @@ func provideKeys(ctx context.Context, r routing.IpfsRouting, cids []*cid.Cid) er } func provideKeysRec(ctx context.Context, r routing.IpfsRouting, dserv ipld.DAGService, cids []*cid.Cid) error { - provided := cid.NewSet() + provided := cid.NewSet() //TODO: Use a bloom filter for _, c := range cids { kset := cid.NewSet() + //TODO: After https://github.com/ipfs/go-ipfs/pull/4333 is merged, use n.Provider for this err := dag.EnumerateChildrenAsync(ctx, dag.GetLinksDirect(dserv), c, kset.Visit) if err != nil { return err diff --git a/core/coreapi/dht_test.go b/core/coreapi/dht_test.go index 4f5e2590038..e194a8fd2a3 100644 --- a/core/coreapi/dht_test.go +++ b/core/coreapi/dht_test.go @@ -9,6 +9,7 @@ import ( coreapi "github.com/ipfs/go-ipfs/core/coreapi" options "github.com/ipfs/go-ipfs/core/coreapi/interface/options" + ma "gx/ipfs/QmWWQ2Txc2c6tqjsBpzg5Ar652cHPGNsQQp2SejkNmkUMb/go-multiaddr" peer "gx/ipfs/QmZoWKhxUmZ2seW4BzX6fJkNR8hh9PsGModr7q171yq2SS/go-libp2p-peer" blocks "gx/ipfs/Qmej7nf81hi2x2tvjRBF3mcp74sQyuDH4VMYDGd1YtXjb2/go-block-format" ) @@ -25,7 +26,10 @@ func TestDhtFindPeer(t *testing.T) { t.Fatal(err) } - addr := <-out + var addr ma.Multiaddr + if len(out.Addrs) > 0 { + addr = out.Addrs[0] + } if addr == nil || addr.String() != "/ip4/127.0.0.1/tcp/4001" { t.Errorf("got unexpected address from FindPeer: %s", addr) @@ -36,7 +40,10 @@ func TestDhtFindPeer(t *testing.T) { t.Fatal(err) } - addr = <-out + addr = nil + if len(out.Addrs) > 0 { + addr = out.Addrs[0] + } if addr == nil || addr.String() != "/ip4/127.0.2.1/tcp/4001" { t.Errorf("got unexpected address from FindPeer: %s", addr) @@ -62,8 +69,8 @@ func TestDhtFindProviders(t *testing.T) { provider := <-out - if provider.String() != nds[0].Identity.String() { - t.Errorf("got wrong provider: %s != %s", provider.String(), nds[0].Identity.String()) + if provider.ID.String() != nds[0].Identity.String() { + t.Errorf("got wrong provider: %s != %s", provider.ID.String(), nds[0].Identity.String()) } } @@ -91,8 +98,8 @@ func TestDhtProvide(t *testing.T) { provider := <-out - if provider.String() != "" { - t.Errorf("got wrong provider: %s != %s", provider.String(), nds[0].Identity.String()) + if provider.ID.String() != "" { + t.Errorf("got wrong provider: %s != %s", provider.ID.String(), nds[0].Identity.String()) } err = apis[0].Dht().Provide(ctx, p) @@ -107,7 +114,7 @@ func TestDhtProvide(t *testing.T) { provider = <-out - if provider.String() != nds[0].Identity.String() { - t.Errorf("got wrong provider: %s != %s", provider.String(), nds[0].Identity.String()) + if provider.ID.String() != nds[0].Identity.String() { + t.Errorf("got wrong provider: %s != %s", provider.ID.String(), nds[0].Identity.String()) } } diff --git a/core/coreapi/interface/dht.go b/core/coreapi/interface/dht.go index c49903bee76..6da33c3fdb2 100644 --- a/core/coreapi/interface/dht.go +++ b/core/coreapi/interface/dht.go @@ -5,7 +5,7 @@ import ( options "github.com/ipfs/go-ipfs/core/coreapi/interface/options" - ma "gx/ipfs/QmWWQ2Txc2c6tqjsBpzg5Ar652cHPGNsQQp2SejkNmkUMb/go-multiaddr" + pstore "gx/ipfs/QmXauCuJzmzapetmC6W4TuDJLL1yFFrVzSHoWv8YdbmnxH/go-libp2p-peerstore" peer "gx/ipfs/QmZoWKhxUmZ2seW4BzX6fJkNR8hh9PsGModr7q171yq2SS/go-libp2p-peer" ) @@ -13,11 +13,11 @@ import ( type DhtAPI interface { // FindPeer queries the DHT for all of the multiaddresses associated with a // Peer ID - FindPeer(context.Context, peer.ID) (<-chan ma.Multiaddr, error) + FindPeer(context.Context, peer.ID) (pstore.PeerInfo, error) // FindProviders finds peers in the DHT who can provide a specific value // given a key. - FindProviders(context.Context, Path, ...options.DhtFindProvidersOption) (<-chan peer.ID, error) //TODO: is path the right choice here? + FindProviders(context.Context, Path, ...options.DhtFindProvidersOption) (<-chan pstore.PeerInfo, error) // Provide announces to the network that you are providing given values Provide(context.Context, Path, ...options.DhtProvideOption) error