Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: flaky test by skipping connectivity checks #955

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
15 changes: 10 additions & 5 deletions v2/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,16 +395,21 @@ type QueryConfig struct {
// operation. A DefaultQuorum of 0 means that we search the network until
// we have exhausted the keyspace.
DefaultQuorum int

// SkipConnectivityCheck defines whether we do a connectivity check before
// we add peers to the routing table.
SkipConnectivityCheck bool
}

// DefaultQueryConfig returns the default query configuration options for a DHT.
func DefaultQueryConfig() *QueryConfig {
return &QueryConfig{
Concurrency: 3, // MAGIC
Timeout: 5 * time.Minute, // MAGIC
RequestConcurrency: 3, // MAGIC
RequestTimeout: time.Minute, // MAGIC
DefaultQuorum: 0, // MAGIC
Concurrency: 3, // MAGIC
Timeout: 5 * time.Minute, // MAGIC
RequestConcurrency: 3, // MAGIC
RequestTimeout: time.Minute, // MAGIC
DefaultQuorum: 0, // MAGIC
SkipConnectivityCheck: false,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this rather be on a RoutingConfig struct?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.

What about just using the following to make the RoutingConfig usable externally

type RoutingConfig = internal.coord.RoutingConfig

}
}

Expand Down
1 change: 1 addition & 0 deletions v2/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@
} else if cfg.ProtocolID == ProtocolIPFS {
d.backends, err = d.initAminoBackends()
if err != nil {
return nil, fmt.Errorf("init amino backends: %w", err)

Check warning on line 103 in v2/dht.go

View check run for this annotation

Codecov / codecov/patch

v2/dht.go#L103

Added line #L103 was not covered by tests
}
}

Expand Down Expand Up @@ -128,6 +128,7 @@
coordCfg.Routing.Logger = cfg.Logger.With("behaviour", "routing")
coordCfg.Routing.Tracer = cfg.TracerProvider.Tracer(tele.TracerName)
coordCfg.Routing.Meter = cfg.MeterProvider.Meter(tele.MeterName)
coordCfg.Routing.IncludeSkipCheck = cfg.Query.SkipConnectivityCheck

rtr := &router{
host: h,
Expand Down Expand Up @@ -172,18 +173,18 @@
)

if d.cfg.Datastore != nil {
dstore = d.cfg.Datastore

Check warning on line 176 in v2/dht.go

View check run for this annotation

Codecov / codecov/patch

v2/dht.go#L176

Added line #L176 was not covered by tests
} else if dstore, err = InMemoryDatastore(); err != nil {
return nil, fmt.Errorf("new default datastore: %w", err)
}

Check warning on line 179 in v2/dht.go

View check run for this annotation

Codecov / codecov/patch

v2/dht.go#L178-L179

Added lines #L178 - L179 were not covered by tests

// wrap datastore in open telemetry tracing
dstore = trace.New(dstore, d.tele.Tracer)

pbeCfg, err := DefaultProviderBackendConfig()
if err != nil {
return nil, fmt.Errorf("default provider config: %w", err)
}

Check warning on line 187 in v2/dht.go

View check run for this annotation

Codecov / codecov/patch

v2/dht.go#L186-L187

Added lines #L186 - L187 were not covered by tests
pbeCfg.Logger = d.cfg.Logger
pbeCfg.AddressFilter = d.cfg.AddressFilter
pbeCfg.Tele = d.tele
Expand All @@ -191,26 +192,26 @@

pbe, err := NewBackendProvider(d.host.Peerstore(), dstore, pbeCfg)
if err != nil {
return nil, fmt.Errorf("new provider backend: %w", err)
}

Check warning on line 196 in v2/dht.go

View check run for this annotation

Codecov / codecov/patch

v2/dht.go#L195-L196

Added lines #L195 - L196 were not covered by tests

rbeCfg, err := DefaultRecordBackendConfig()
if err != nil {
return nil, fmt.Errorf("default provider config: %w", err)
}

Check warning on line 201 in v2/dht.go

View check run for this annotation

Codecov / codecov/patch

v2/dht.go#L200-L201

Added lines #L200 - L201 were not covered by tests
rbeCfg.Logger = d.cfg.Logger
rbeCfg.Tele = d.tele
rbeCfg.clk = d.cfg.Clock

ipnsBe, err := NewBackendIPNS(dstore, d.host.Peerstore(), rbeCfg)
if err != nil {
return nil, fmt.Errorf("new ipns backend: %w", err)
}

Check warning on line 209 in v2/dht.go

View check run for this annotation

Codecov / codecov/patch

v2/dht.go#L208-L209

Added lines #L208 - L209 were not covered by tests

pkBe, err := NewBackendPublicKey(dstore, rbeCfg)
if err != nil {
return nil, fmt.Errorf("new public key backend: %w", err)
}

Check warning on line 214 in v2/dht.go

View check run for this annotation

Codecov / codecov/patch

v2/dht.go#L213-L214

Added lines #L213 - L214 were not covered by tests

return map[string]Backend{
namespaceIPNS: ipnsBe,
Expand All @@ -226,11 +227,11 @@
}

if err := d.sub.Close(); err != nil {
d.debugErr(err, "failed closing event bus subscription")

Check warning on line 230 in v2/dht.go

View check run for this annotation

Codecov / codecov/patch

v2/dht.go#L230

Added line #L230 was not covered by tests
}

if err := d.kad.Close(); err != nil {
d.debugErr(err, "failed closing coordinator")

Check warning on line 234 in v2/dht.go

View check run for this annotation

Codecov / codecov/patch

v2/dht.go#L234

Added line #L234 was not covered by tests
}

for ns, b := range d.backends {
Expand All @@ -240,7 +241,7 @@
}

if err := closer.Close(); err != nil {
d.warnErr(err, "failed closing backend", "namespace", ns)

Check warning on line 244 in v2/dht.go

View check run for this annotation

Codecov / codecov/patch

v2/dht.go#L244

Added line #L244 was not covered by tests
}
}

Expand All @@ -254,7 +255,7 @@
if d.cfg.ProtocolID == ProtocolIPFS && d.cfg.Datastore == nil {
if pbe, err := typedBackend[*ProvidersBackend](d, namespaceProviders); err == nil {
if err := pbe.datastore.Close(); err != nil {
d.warnErr(err, "failed closing in memory datastore")

Check warning on line 258 in v2/dht.go

View check run for this annotation

Codecov / codecov/patch

v2/dht.go#L258

Added line #L258 was not covered by tests
}
}
}
Expand All @@ -268,7 +269,7 @@
}

if err := s.Reset(); err != nil {
d.debugErr(err, "failed closing stream")

Check warning on line 272 in v2/dht.go

View check run for this annotation

Codecov / codecov/patch

v2/dht.go#L272

Added line #L272 was not covered by tests
}
}
}
Expand Down Expand Up @@ -326,7 +327,7 @@
}

if err := s.Reset(); err != nil {
d.debugErr(err, "failed closing stream")

Check warning on line 330 in v2/dht.go

View check run for this annotation

Codecov / codecov/patch

v2/dht.go#L330

Added line #L330 was not covered by tests
}
}
}
Expand All @@ -345,22 +346,22 @@
d.log.Warn(msg, tele.LogAttrError(err))
return
}
d.log.With(args...).Warn(msg, tele.LogAttrError(err))

Check warning on line 349 in v2/dht.go

View check run for this annotation

Codecov / codecov/patch

v2/dht.go#L349

Added line #L349 was not covered by tests
}

// debugErr is a helper method that uses the slogger of the DHT and writes a
// debug log line with the given message alongside the error. args is a list of
// key/value pairs or slog.Attrs that will be included with the log message. If the error
// is nil, this method is a no-op.
func (d *DHT) debugErr(err error, msg string, args ...any) {
if err == nil {
return
}
if len(args) == 0 {
d.log.Debug(msg, tele.LogAttrError(err))
return
}
d.log.With(args...).Debug(msg, tele.LogAttrError(err))

Check warning on line 364 in v2/dht.go

View check run for this annotation

Codecov / codecov/patch

v2/dht.go#L356-L364

Added lines #L356 - L364 were not covered by tests
}

// AddAddresses suggests peers and their associated addresses to be added to the routing table.
Expand Down
21 changes: 21 additions & 0 deletions v2/internal/coord/routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@
// ProbeCheckInterval is the time interval the behaviour should use between connectivity checks for the same node in the routing table.
ProbeCheckInterval time.Duration

// IncludeSkipCheck indicates whether we perform connectivity checks before we add a peer to the routing table.
IncludeSkipCheck bool

// IncludeQueueCapacity is the maximum number of nodes the behaviour should keep queued as candidates for inclusion in the routing table.
IncludeQueueCapacity int

Expand Down Expand Up @@ -268,6 +271,7 @@
ProbeRequestConcurrency: 3, // MAGIC
ProbeCheckInterval: 6 * time.Hour, // MAGIC

IncludeSkipCheck: false,
IncludeRequestConcurrency: 3, // MAGIC
IncludeQueueCapacity: 128, // MAGIC

Expand Down Expand Up @@ -307,12 +311,28 @@
ready chan struct{}
}

type Recording2SM[E any, S any] struct {
State S
Received E
}

func NewRecording2SM[E any, S any](response S) *Recording2SM[E, S] {
return &Recording2SM[E, S]{
State: response,
}

Check warning on line 322 in v2/internal/coord/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/routing.go#L319-L322

Added lines #L319 - L322 were not covered by tests
}

func (r *Recording2SM[E, S]) Advance(ctx context.Context, e E) S {
r.Received = e
return r.State

Check warning on line 327 in v2/internal/coord/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/routing.go#L325-L327

Added lines #L325 - L327 were not covered by tests
}

func NewRoutingBehaviour(self kadt.PeerID, rt routing.RoutingTableCpl[kadt.Key, kadt.PeerID], cfg *RoutingConfig) (*RoutingBehaviour, error) {
if cfg == nil {
cfg = DefaultRoutingConfig()

Check warning on line 332 in v2/internal/coord/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/routing.go#L332

Added line #L332 was not covered by tests
} else if err := cfg.Validate(); err != nil {
return nil, err
}

Check warning on line 335 in v2/internal/coord/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/routing.go#L334-L335

Added lines #L334 - L335 were not covered by tests

bootstrapCfg := routing.DefaultBootstrapConfig()
bootstrapCfg.Clock = cfg.Clock
Expand All @@ -324,21 +344,22 @@

bootstrap, err := routing.NewBootstrap[kadt.Key](self, bootstrapCfg)
if err != nil {
return nil, fmt.Errorf("bootstrap: %w", err)
}

Check warning on line 348 in v2/internal/coord/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/routing.go#L347-L348

Added lines #L347 - L348 were not covered by tests

includeCfg := routing.DefaultIncludeConfig()
includeCfg.Clock = cfg.Clock
includeCfg.Tracer = cfg.Tracer
includeCfg.Meter = cfg.Meter
includeCfg.SkipCheck = cfg.IncludeSkipCheck
includeCfg.Timeout = cfg.ConnectivityCheckTimeout
includeCfg.QueueCapacity = cfg.IncludeQueueCapacity
includeCfg.Concurrency = cfg.IncludeRequestConcurrency

include, err := routing.NewInclude[kadt.Key, kadt.PeerID](rt, includeCfg)
if err != nil {
return nil, fmt.Errorf("include: %w", err)
}

Check warning on line 362 in v2/internal/coord/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/routing.go#L361-L362

Added lines #L361 - L362 were not covered by tests

probeCfg := routing.DefaultProbeConfig()
probeCfg.Clock = cfg.Clock
Expand All @@ -350,8 +371,8 @@

probe, err := routing.NewProbe[kadt.Key](rt, probeCfg)
if err != nil {
return nil, fmt.Errorf("probe: %w", err)
}

Check warning on line 375 in v2/internal/coord/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/routing.go#L374-L375

Added lines #L374 - L375 were not covered by tests

exploreCfg := routing.DefaultExploreConfig()
exploreCfg.Clock = cfg.Clock
Expand All @@ -363,13 +384,13 @@

schedule, err := routing.NewDynamicExploreSchedule(cfg.ExploreMaximumCpl, cfg.Clock.Now(), cfg.ExploreInterval, cfg.ExploreIntervalMultiplier, cfg.ExploreIntervalJitter)
if err != nil {
return nil, fmt.Errorf("explore schedule: %w", err)
}

Check warning on line 388 in v2/internal/coord/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/routing.go#L387-L388

Added lines #L387 - L388 were not covered by tests

explore, err := routing.NewExplore[kadt.Key](self, rt, cplutil.GenRandPeerID, schedule, exploreCfg)
if err != nil {
return nil, fmt.Errorf("explore: %w", err)
}

Check warning on line 393 in v2/internal/coord/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/routing.go#L392-L393

Added lines #L392 - L393 were not covered by tests

return ComposeRoutingBehaviour(self, bootstrap, include, probe, explore, cfg)
}
Expand All @@ -385,10 +406,10 @@
cfg *RoutingConfig,
) (*RoutingBehaviour, error) {
if cfg == nil {
cfg = DefaultRoutingConfig()

Check warning on line 409 in v2/internal/coord/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/routing.go#L409

Added line #L409 was not covered by tests
} else if err := cfg.Validate(); err != nil {
return nil, err
}

Check warning on line 412 in v2/internal/coord/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/routing.go#L411-L412

Added lines #L411 - L412 were not covered by tests

r := &RoutingBehaviour{
self: self,
Expand Down Expand Up @@ -452,8 +473,8 @@
// attempt to advance the probe state machine
next, ok := r.advanceProbe(ctx, cmd)
if ok {
r.pending = append(r.pending, next)
}

Check warning on line 477 in v2/internal/coord/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/routing.go#L476-L477

Added lines #L476 - L477 were not covered by tests

case *EventGetCloserNodesSuccess:
span.SetAttributes(attribute.String("event", "EventGetCloserNodesSuccess"), attribute.String("queryid", string(ev.QueryID)), attribute.String("nodeid", ev.To.String()))
Expand Down Expand Up @@ -483,35 +504,35 @@
NodeID: ev.To,
}
} else {
cmd = &routing.EventIncludeConnectivityCheckFailure[kadt.Key, kadt.PeerID]{
NodeID: ev.To,
Error: fmt.Errorf("response did not include any closer nodes"),
}
}

Check warning on line 511 in v2/internal/coord/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/routing.go#L507-L511

Added lines #L507 - L511 were not covered by tests
// attempt to advance the include
next, ok := r.advanceInclude(ctx, cmd)
if ok {
r.pending = append(r.pending, next)
}

case ProbeQueryID:
var cmd routing.ProbeEvent
// require that the node responded with at least one closer node
if len(ev.CloserNodes) > 0 {
cmd = &routing.EventProbeConnectivityCheckSuccess[kadt.Key, kadt.PeerID]{
NodeID: ev.To,
}
} else {
cmd = &routing.EventProbeConnectivityCheckFailure[kadt.Key, kadt.PeerID]{
NodeID: ev.To,
Error: fmt.Errorf("response did not include any closer nodes"),
}
}

Check warning on line 530 in v2/internal/coord/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/routing.go#L518-L530

Added lines #L518 - L530 were not covered by tests
// attempt to advance the probe state machine
next, ok := r.advanceProbe(ctx, cmd)
if ok {
r.pending = append(r.pending, next)
}

Check warning on line 535 in v2/internal/coord/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/routing.go#L532-L535

Added lines #L532 - L535 were not covered by tests

case routing.ExploreQueryID:
for _, info := range ev.CloserNodes {
Expand All @@ -525,11 +546,11 @@
}
next, ok := r.advanceExplore(ctx, cmd)
if ok {
r.pending = append(r.pending, next)
}

Check warning on line 550 in v2/internal/coord/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/routing.go#L549-L550

Added lines #L549 - L550 were not covered by tests

default:
panic(fmt.Sprintf("unexpected query id: %s", ev.QueryID))

Check warning on line 553 in v2/internal/coord/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/routing.go#L552-L553

Added lines #L552 - L553 were not covered by tests
}
case *EventGetCloserNodesFailure:
span.SetAttributes(attribute.String("event", "EventGetCloserNodesFailure"), attribute.String("queryid", string(ev.QueryID)), attribute.String("nodeid", ev.To.String()))
Expand All @@ -543,8 +564,8 @@
// attempt to advance the bootstrap
next, ok := r.advanceBootstrap(ctx, cmd)
if ok {
r.pending = append(r.pending, next)
}

Check warning on line 568 in v2/internal/coord/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/routing.go#L567-L568

Added lines #L567 - L568 were not covered by tests
case IncludeQueryID:
cmd := &routing.EventIncludeConnectivityCheckFailure[kadt.Key, kadt.PeerID]{
NodeID: ev.To,
Expand All @@ -553,18 +574,18 @@
// attempt to advance the include state machine
next, ok := r.advanceInclude(ctx, cmd)
if ok {
r.pending = append(r.pending, next)
}
case ProbeQueryID:
cmd := &routing.EventProbeConnectivityCheckFailure[kadt.Key, kadt.PeerID]{
NodeID: ev.To,
Error: ev.Err,
}
// attempt to advance the probe state machine
next, ok := r.advanceProbe(ctx, cmd)
if ok {
r.pending = append(r.pending, next)
}

Check warning on line 588 in v2/internal/coord/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/routing.go#L577-L588

Added lines #L577 - L588 were not covered by tests
case routing.ExploreQueryID:
cmd := &routing.EventExploreFindCloserFailure[kadt.Key, kadt.PeerID]{
NodeID: ev.To,
Expand All @@ -573,17 +594,17 @@
// attempt to advance the explore
next, ok := r.advanceExplore(ctx, cmd)
if ok {
r.pending = append(r.pending, next)
}

Check warning on line 598 in v2/internal/coord/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/routing.go#L597-L598

Added lines #L597 - L598 were not covered by tests

default:
panic(fmt.Sprintf("unexpected query id: %s", ev.QueryID))

Check warning on line 601 in v2/internal/coord/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/routing.go#L600-L601

Added lines #L600 - L601 were not covered by tests
}
case *EventNotifyConnectivity:
span.SetAttributes(attribute.String("event", "EventNotifyConnectivity"), attribute.String("nodeid", ev.NodeID.String()))
// ignore self
if r.self.Equal(ev.NodeID) {
break

Check warning on line 607 in v2/internal/coord/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/routing.go#L607

Added line #L607 was not covered by tests
}
r.cfg.Logger.Debug("peer has connectivity", tele.LogAttrPeerID(ev.NodeID))

Expand All @@ -602,8 +623,8 @@
}
nextProbe, ok := r.advanceProbe(ctx, cmdProbe)
if ok {
r.pending = append(r.pending, nextProbe)
}

Check warning on line 627 in v2/internal/coord/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/routing.go#L626-L627

Added lines #L626 - L627 were not covered by tests
case *EventNotifyNonConnectivity:
span.SetAttributes(attribute.String("event", "EventNotifyConnectivity"), attribute.String("nodeid", ev.NodeID.String()))

Expand All @@ -613,13 +634,13 @@
}
nextProbe, ok := r.advanceProbe(ctx, cmdProbe)
if ok {
r.pending = append(r.pending, nextProbe)
}

Check warning on line 638 in v2/internal/coord/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/routing.go#L637-L638

Added lines #L637 - L638 were not covered by tests
case *EventRoutingPoll:
r.pollChildren(ctx)

default:
panic(fmt.Sprintf("unexpected dht event: %T", ev))

Check warning on line 643 in v2/internal/coord/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/routing.go#L642-L643

Added lines #L642 - L643 were not covered by tests
}

if len(r.pending) > 0 {
Expand Down Expand Up @@ -671,13 +692,13 @@
func (r *RoutingBehaviour) pollChildren(ctx context.Context) {
ev, ok := r.advanceBootstrap(ctx, &routing.EventBootstrapPoll{})
if ok {
r.pending = append(r.pending, ev)
}

Check warning on line 696 in v2/internal/coord/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/routing.go#L695-L696

Added lines #L695 - L696 were not covered by tests

ev, ok = r.advanceInclude(ctx, &routing.EventIncludePoll{})
if ok {
r.pending = append(r.pending, ev)
}

Check warning on line 701 in v2/internal/coord/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/routing.go#L700-L701

Added lines #L700 - L701 were not covered by tests

ev, ok = r.advanceProbe(ctx, &routing.EventProbePoll{})
if ok {
Expand All @@ -704,7 +725,7 @@
Notify: r,
}, true

case *routing.StateBootstrapWaiting:

Check warning on line 728 in v2/internal/coord/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/routing.go#L728

Added line #L728 was not covered by tests
// bootstrap waiting for a message response, nothing to do
case *routing.StateBootstrapFinished:
r.cfg.Logger.Debug("bootstrap finished", slog.Duration("elapsed", st.Stats.End.Sub(st.Stats.Start)), slog.Int("requests", st.Stats.Requests), slog.Int("failures", st.Stats.Failure))
Expand All @@ -713,8 +734,8 @@
}, true
case *routing.StateBootstrapIdle:
// bootstrap not running, nothing to do
default:
panic(fmt.Sprintf("unexpected bootstrap state: %T", st))

Check warning on line 738 in v2/internal/coord/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/routing.go#L737-L738

Added lines #L737 - L738 were not covered by tests
}

return nil, false
Expand Down Expand Up @@ -751,16 +772,16 @@
return &EventRoutingUpdated{
NodeID: st.NodeID,
}, true
case *routing.StateIncludeWaitingAtCapacity:

Check warning on line 775 in v2/internal/coord/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/routing.go#L775

Added line #L775 was not covered by tests
// nothing to do except wait for message response or timeout
case *routing.StateIncludeWaitingWithCapacity:
// nothing to do except wait for message response or timeout
case *routing.StateIncludeWaitingFull:

Check warning on line 779 in v2/internal/coord/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/routing.go#L779

Added line #L779 was not covered by tests
// nothing to do except wait for message response or timeout
case *routing.StateIncludeIdle:
// nothing to do except wait for new nodes to be added to queue
default:
panic(fmt.Sprintf("unexpected include state: %T", st))

Check warning on line 784 in v2/internal/coord/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/routing.go#L783-L784

Added lines #L783 - L784 were not covered by tests
}

return nil, false
Expand Down Expand Up @@ -793,17 +814,17 @@
r.notify(ctx, &EventAddNode{
NodeID: st.NodeID,
})
case *routing.StateProbeWaitingAtCapacity:

Check warning on line 817 in v2/internal/coord/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/routing.go#L817

Added line #L817 was not covered by tests
// the probe state machine is waiting for responses for checks and the maximum number of concurrent checks has been reached.
// nothing to do except wait for message response or timeout
case *routing.StateProbeWaitingWithCapacity:

Check warning on line 820 in v2/internal/coord/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/routing.go#L820

Added line #L820 was not covered by tests
// the probe state machine is waiting for responses for checks but has capacity to perform more
// nothing to do except wait for message response or timeout
case *routing.StateProbeIdle:
// the probe state machine is not running any checks.
// nothing to do except wait for message response or timeout
default:
panic(fmt.Sprintf("unexpected include state: %T", st))

Check warning on line 827 in v2/internal/coord/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/routing.go#L826-L827

Added lines #L826 - L827 were not covered by tests
}

return nil, false
Expand All @@ -824,18 +845,18 @@
Notify: r,
}, true

case *routing.StateExploreWaiting:

Check warning on line 848 in v2/internal/coord/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/routing.go#L848

Added line #L848 was not covered by tests
// explore waiting for a message response, nothing to do
case *routing.StateExploreQueryFinished:

Check warning on line 850 in v2/internal/coord/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/routing.go#L850

Added line #L850 was not covered by tests
// nothing to do except notify via telemetry
case *routing.StateExploreQueryTimeout:

Check warning on line 852 in v2/internal/coord/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/routing.go#L852

Added line #L852 was not covered by tests
// nothing to do except notify via telemetry
case *routing.StateExploreFailure:
r.cfg.Logger.Warn("explore failure", slog.Int("cpl", st.Cpl), tele.LogAttrError(st.Error))

Check warning on line 855 in v2/internal/coord/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/routing.go#L854-L855

Added lines #L854 - L855 were not covered by tests
case *routing.StateExploreIdle:
// bootstrap not running, nothing to do
default:
panic(fmt.Sprintf("unexpected explore state: %T", st))

Check warning on line 859 in v2/internal/coord/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/routing.go#L858-L859

Added lines #L858 - L859 were not covered by tests
}

return nil, false
Expand Down
19 changes: 13 additions & 6 deletions v2/internal/coord/routing/include.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,9 @@
Concurrency int // the maximum number of include checks that may be in progress at any one time
Timeout time.Duration // the time to wait before terminating a check that is not making progress
Clock clock.Clock // a clock that may replaced by a mock when testing

// Tracer is the tracer that should be used to trace execution.
Tracer trace.Tracer

// Meter is the meter that should be used to record metrics.
Meter metric.Meter
SkipCheck bool // whether to skip connectivity checks and add any node passed to this state machine
Tracer trace.Tracer // Tracer is the tracer that should be used to trace execution.
Meter metric.Meter // Meter is the meter that should be used to record metrics.
}

// Validate checks the configuration options and returns an error if any have invalid values.
Expand Down Expand Up @@ -100,18 +97,18 @@
}

if cfg.Tracer == nil {
return &errs.ConfigurationError{
Component: "IncludeConfig",
Err: fmt.Errorf("tracer must not be nil"),
}
}

Check warning on line 104 in v2/internal/coord/routing/include.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/routing/include.go#L100-L104

Added lines #L100 - L104 were not covered by tests

if cfg.Meter == nil {
return &errs.ConfigurationError{
Component: "IncludeConfig",
Err: fmt.Errorf("meter must not be nil"),
}
}

Check warning on line 111 in v2/internal/coord/routing/include.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/routing/include.go#L107-L111

Added lines #L107 - L111 were not covered by tests

return nil
}
Expand All @@ -124,6 +121,7 @@
Tracer: tele.NoopTracer(),
Meter: tele.NoopMeter(),

SkipCheck: false,
Concurrency: 3,
Timeout: time.Minute,
QueueCapacity: 128,
Expand All @@ -132,10 +130,10 @@

func NewInclude[K kad.Key[K], N kad.NodeID[K]](rt kad.RoutingTable[K, N], cfg *IncludeConfig) (*Include[K, N], error) {
if cfg == nil {
cfg = DefaultIncludeConfig()

Check warning on line 133 in v2/internal/coord/routing/include.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/routing/include.go#L133

Added line #L133 was not covered by tests
} else if err := cfg.Validate(); err != nil {
return nil, err
}

Check warning on line 136 in v2/internal/coord/routing/include.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/routing/include.go#L135-L136

Added lines #L135 - L136 were not covered by tests

in := &Include[K, N]{
candidates: newNodeQueue[K, N](cfg.QueueCapacity),
Expand All @@ -152,8 +150,8 @@
metric.WithUnit("1"),
)
if err != nil {
return nil, fmt.Errorf("create include_checks_sent counter: %w", err)
}

Check warning on line 154 in v2/internal/coord/routing/include.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/routing/include.go#L153-L154

Added lines #L153 - L154 were not covered by tests

in.counterChecksPassed, err = cfg.Meter.Int64Counter(
"include_checks_passed",
Expand All @@ -161,8 +159,8 @@
metric.WithUnit("1"),
)
if err != nil {
return nil, fmt.Errorf("create include_checks_passed counter: %w", err)
}

Check warning on line 163 in v2/internal/coord/routing/include.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/routing/include.go#L162-L163

Added lines #L162 - L163 were not covered by tests

in.counterChecksFailed, err = cfg.Meter.Int64Counter(
"include_checks_failed",
Expand All @@ -170,8 +168,8 @@
metric.WithUnit("1"),
)
if err != nil {
return nil, fmt.Errorf("create include_checks_failed counter: %w", err)
}

Check warning on line 172 in v2/internal/coord/routing/include.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/routing/include.go#L171-L172

Added lines #L171 - L172 were not covered by tests

in.counterCandidatesDroppedCapacity, err = cfg.Meter.Int64Counter(
"include_candidates_dropped_capacity",
Expand All @@ -179,21 +177,21 @@
metric.WithUnit("1"),
)
if err != nil {
return nil, fmt.Errorf("create include_candidates_dropped_capacity counter: %w", err)
}

Check warning on line 181 in v2/internal/coord/routing/include.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/routing/include.go#L180-L181

Added lines #L180 - L181 were not covered by tests

in.gaugeCandidateCount, err = cfg.Meter.Int64ObservableGauge(
"include_candidate_count",
metric.WithDescription("Total number of nodes in the include state machine's candidate queue"),
metric.WithUnit("1"),
metric.WithInt64Callback(func(ctx context.Context, o metric.Int64Observer) error {
o.Observe(in.candidateCount.Load())
return nil
}),

Check warning on line 190 in v2/internal/coord/routing/include.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/routing/include.go#L188-L190

Added lines #L188 - L190 were not covered by tests
)
if err != nil {
return nil, fmt.Errorf("create include_candidate_count counter: %w", err)
}

Check warning on line 194 in v2/internal/coord/routing/include.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/routing/include.go#L193-L194

Added lines #L193 - L194 were not covered by tests

return in, nil
}
Expand All @@ -209,6 +207,15 @@

switch tev := ev.(type) {
case *EventIncludeAddCandidate[K, N]:

if in.cfg.SkipCheck {
if in.rt.AddNode(tev.NodeID) {
return &StateIncludeRoutingUpdated[K, N]{NodeID: tev.NodeID}
} else {
return &StateIncludeIdle{}
}
}

// Ignore if already running a check
_, checking := in.checks[key.HexString(tev.NodeID.Key())]
if checking {
Expand Down Expand Up @@ -245,8 +252,8 @@

case *EventIncludePoll:
// ignore, nothing to do
default:
panic(fmt.Sprintf("unexpected event: %T", tev))

Check warning on line 256 in v2/internal/coord/routing/include.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/routing/include.go#L255-L256

Added lines #L255 - L256 were not covered by tests
}

if len(in.checks) == in.cfg.Concurrency {
Expand Down Expand Up @@ -297,12 +304,12 @@
// added and false otherwise.
func (q *nodeQueue[K, N]) Enqueue(ctx context.Context, id N) bool {
if len(q.nodes) == q.capacity {
return false
}

Check warning on line 308 in v2/internal/coord/routing/include.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/routing/include.go#L307-L308

Added lines #L307 - L308 were not covered by tests

if _, exists := q.keys[key.HexString(id.Key())]; exists {
return false
}

Check warning on line 312 in v2/internal/coord/routing/include.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/routing/include.go#L311-L312

Added lines #L311 - L312 were not covered by tests

q.nodes = append(q.nodes, id)
q.keys[key.HexString(id.Key())] = struct{}{}
Expand Down Expand Up @@ -364,12 +371,12 @@
}

// includeState() ensures that only Include states can be assigned to an IncludeState.
func (*StateIncludeConnectivityCheck[K, N]) includeState() {}
func (*StateIncludeIdle) includeState() {}
func (*StateIncludeWaitingAtCapacity) includeState() {}
func (*StateIncludeWaitingWithCapacity) includeState() {}
func (*StateIncludeWaitingFull) includeState() {}
func (*StateIncludeRoutingUpdated[K, N]) includeState() {}

Check warning on line 379 in v2/internal/coord/routing/include.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/routing/include.go#L374-L379

Added lines #L374 - L379 were not covered by tests

// IncludeEvent is an event intended to advance the state of an [Include].
type IncludeEvent interface {
Expand All @@ -396,7 +403,7 @@
}

// includeEvent() ensures that only events accepted by an [Include] can be assigned to the [IncludeEvent] interface.
func (*EventIncludePoll) includeEvent() {}
func (*EventIncludeAddCandidate[K, N]) includeEvent() {}
func (*EventIncludeConnectivityCheckSuccess[K, N]) includeEvent() {}
func (*EventIncludeConnectivityCheckFailure[K, N]) includeEvent() {}

Check warning on line 409 in v2/internal/coord/routing/include.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/routing/include.go#L406-L409

Added lines #L406 - L409 were not covered by tests
2 changes: 2 additions & 0 deletions v2/routing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -682,6 +682,8 @@ func (suite *SearchValueQuorumTestSuite) SetupTest() {

cfg := DefaultConfig()
cfg.Clock = clk
cfg.Query.SkipConnectivityCheck = true

top := NewTopology(t)

// init privileged DHT server
Expand Down