Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: factor StorageCollections out of StorageController #27057

Merged

Conversation

aljoscha
Copy link
Contributor

@aljoscha aljoscha commented May 13, 2024

Note

The first commits are fixes that are required to pass CI, I have opened separate PRs for them. You should only have to look at the last commit.

Preparatory work for #24845, where we want to introduce more concurrency
to the Coordinator and Controllers.

The considerations/design are described in
doc/developer/design/20240117_decoupled_storage_controller.md

This is an intermediate step where we factor a StorageCollections out
of StorageController, and let the StorageController use it's
interface instead of holding collections state/sinces itself.

One of the next steps is to change usage sites of StorageController to
use their own handle to a StorageCollections, bypassing the
StorageController for query-processing (PEEKS, SUBSCRIBE, etc.) code
paths. This will let us introduce more concurrency in the Coordinator
and do less work in the main Coordinator loop.

The important parts in this change are:

  • StorageCollections::new and StorageController::new: these closely
    mirror each other.
  • StorageCollections::create_collections and
    StorageController::create_collections, ditto!
  • A lot of the rest is "boilerplate", passing through calls to the
    internal StorageCollections, and using StorageCollections from the
    controller instead of using owned state.
  • The last interesting thing to lock at is the new BackgroundTask: it
    takes over the work that persist_handles::PersistReadWorker was
    doing before plus it continually listens for upper changes and
    forwards the since frontier of collections/their since handles.

Checklist

@aljoscha aljoscha force-pushed the storage-factor-out-storage-collections branch 5 times, most recently from 9b45260 to 529bf9d Compare May 14, 2024 13:39
@aljoscha
Copy link
Contributor Author

@nrainer-materialize I kicked of a nightly run already because this is quite a deep change for how the StorageController works

@aljoscha aljoscha marked this pull request as ready for review May 14, 2024 17:30
@aljoscha aljoscha requested review from a team as code owners May 14, 2024 17:30
@aljoscha aljoscha requested a review from jkosh44 May 14, 2024 17:30
Copy link

shepherdlybot bot commented May 14, 2024

Risk Score:82 / 100 Bug Hotspots:4 Resilience Coverage:50%

Mitigations

Completing required mitigations increases Resilience Coverage.

  • (Required) Code Review 🔍 Detected
  • (Required) Feature Flag
  • (Required) Integration Test
  • (Required) Observability 🔍 Detected
  • (Required) QA Review 🔍 Detected
  • (Required) Run Nightly Tests
  • Unit Test
Risk Summary:

The pull request carries a high risk, with a score of 82, indicating a significant chance of introducing bugs. This assessment is based on predictors such as the average age of files, cognitive complexity within files, and changes to executable lines. Historically, pull requests with similar characteristics are 117% more likely to cause a bug compared to the repository baseline. Additionally, 4 files modified in this pull request have a recent history of frequent bug fixes, which contributes to the risk. The repository's observed bug trend is on an upward trajectory, although this is not directly tied to the risk score.

Note: The risk score is not based on semantic analysis but on historical predictors of bug occurrence in the repository. The attributes above were deemed the strongest predictors based on that history. Predictors and the score may change as the PR evolves in code, time, and review activity.

Bug Hotspots:
What's This?

File Percentile
../sequencer/inner.rs 99
../src/lib.rs 100
../src/controller.rs 94
../controller/instance.rs 91

@aljoscha
Copy link
Contributor Author

aljoscha commented May 15, 2024

@nrainer-materialize there is a regression in the feature benchmark, but I don't see how the change could only affect full outer joins. It might be a flake?

Other than that, nightly seems good? I did restart RQG dbt3-joins workload once because it timed out.

@nrainer-materialize
Copy link
Contributor

@nrainer-materialize there is a regression in the feature benchmark, but I don't see how the change could only affect full outer joins. It might be a flake?

Let's trigger a retry and see...

Copy link
Contributor

@jkosh44 jkosh44 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adapter parts LGTM, leaving the main review to storage folks.

Comment on lines 771 to 873
// use std::sync::Arc;
//
// use mz_build_info::DUMMY_BUILD_INFO;
// use mz_ore::metrics::MetricsRegistry;
// use mz_ore::now::SYSTEM_TIME;
// use mz_persist_client::cache::PersistClientCache;
// use mz_persist_client::cfg::PersistConfig;
// use mz_persist_client::rpc::PubSubClientConnection;
// use mz_persist_client::{Diagnostics, PersistClient, PersistLocation, ShardId};
// use mz_persist_types::codec_impls::UnitSchema;
// use mz_repr::{RelationDesc, Row};
//
// use super::*;
//
// #[mz_ore::test(tokio::test)]
// #[cfg_attr(miri, ignore)] // unsupported operation: integer-to-pointer casts and `ptr::from_exposed_addr`
// async fn snapshot_stats(&self) {
// let client = PersistClientCache::new(
// PersistConfig::new_default_configs(&DUMMY_BUILD_INFO, SYSTEM_TIME.clone()),
// &MetricsRegistry::new(),
// |_, _| PubSubClientConnection::noop(),
// )
// .open(PersistLocation {
// blob_uri: "mem://".to_owned(),
// consensus_uri: "mem://".to_owned(),
// })
// .await
// .unwrap();
// let shard_id = ShardId::new();
// let since_handle = client
// .open_critical_since(
// shard_id,
// PersistClient::CONTROLLER_CRITICAL_SINCE,
// Diagnostics::for_tests(),
// )
// .await
// .unwrap();
// let mut write_handle = client
// .open_writer::<SourceData, (), u64, i64>(
// shard_id,
// Arc::new(RelationDesc::empty()),
// Arc::new(UnitSchema),
// Diagnostics::for_tests(),
// )
// .await
// .unwrap();
//
// let worker = PersistReadWorker::<u64>::new();
// worker.register(GlobalId::User(1), since_handle);
//
// // No stats for unknown GlobalId.
// let stats = worker
// .snapshot_stats(
// GlobalId::User(2),
// SnapshotStatsAsOf::Direct(Antichain::from_elem(0)),
// )
// .await;
// assert!(stats.is_err());
//
// // Stats don't resolve for as_of past the upper.
// let stats_fut = worker.snapshot_stats(
// GlobalId::User(1),
// SnapshotStatsAsOf::Direct(Antichain::from_elem(1)),
// );
// assert!(stats_fut.now_or_never().is_none());
// // Call it again because now_or_never consumed our future and it's not clone-able.
// let stats_ts1_fut = worker.snapshot_stats(
// GlobalId::User(1),
// SnapshotStatsAsOf::Direct(Antichain::from_elem(1)),
// );
//
// // Write some data.
// let data = ((SourceData(Ok(Row::default())), ()), 0u64, 1i64);
// let () = write_handle
// .compare_and_append(&[data], Antichain::from_elem(0), Antichain::from_elem(1))
// .await
// .unwrap()
// .unwrap();
//
// // Verify that we can resolve stats for ts 0 while the ts 1 stats call is outstanding.
// let stats = worker
// .snapshot_stats(
// GlobalId::User(1),
// SnapshotStatsAsOf::Direct(Antichain::from_elem(0)),
// )
// .await
// .unwrap();
// assert_eq!(stats.num_updates, 1);
//
// // Write more data and unblock the ts 1 call
// let data = ((SourceData(Ok(Row::default())), ()), 1u64, 1i64);
// let () = write_handle
// .compare_and_append(&[data], Antichain::from_elem(1), Antichain::from_elem(2))
// .await
// .unwrap()
// .unwrap();
// let stats = stats_ts1_fut.await.unwrap();
// assert_eq!(stats.num_updates, 2);
// }
// }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this coming in a later PR or later commit?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dayum! forgot about this one 🙈

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

omg I didnt scroll enough and almost re-asked this question in another comment

Comment on lines +180 to +159
/// Checks whether a collection exists under the given `GlobalId`. Returns
/// an error if the collection does not exist.
fn check_exists(&self, id: GlobalId) -> Result<(), StorageError<Self::Timestamp>>;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if this is existing code movement, but it's not clear to me if inactive (i.e. Collections that have been dropped by still have outstanding ReadHolds) are included in this.

/// associated metadata needed to ingest the particular source.
///
/// This command installs collection state for the indicated sources, and
/// the are now valid to use in queries at times beyond the initial `since`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// the are now valid to use in queries at times beyond the initial `since`
/// they are now valid to use in queries at times beyond the initial `since`

Comment on lines +274 to +272
/// Drops the read capability for the sources and allows their resources to
/// be reclaimed.
///
/// TODO(jkosh44): This method does not validate the provided identifiers.
/// Currently when the controller starts/restarts it has no durable state.
/// That means that it has no way of remembering any past commands sent. In
/// the future we plan on persisting state for the controller so that it is
/// aware of past commands. Therefore this method is for dropping sources
/// that we know to have been previously created, but have been forgotten by
/// the controller due to a restart. Once command history becomes durable we
/// can remove this method and use the normal `drop_sources`.
fn drop_collections_unvalidated(
&mut self,
storage_metadata: &StorageMetadata,
identifiers: Vec<GlobalId>,
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not necessary for this PR, but I'm just realizing that we can probably get rid of all these drop_.*_unvalidated methods an inline them into drop_.*.

Copy link
Contributor

@guswynn guswynn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The use of non-persist feedback to move uppers was something that always annoyed me, so that change is a huge + in my opinion!

I also liked the use of extra_state on CollectionState, the alternative of having lots of random maps in the state was quite annoying!


Unfortunately, I think my review is going to require 1 round of back and forth; I asked some questions that I need answered to get enough context to review the rest of the read hold and storage controller changes!

Comment on lines 771 to 873
// use std::sync::Arc;
//
// use mz_build_info::DUMMY_BUILD_INFO;
// use mz_ore::metrics::MetricsRegistry;
// use mz_ore::now::SYSTEM_TIME;
// use mz_persist_client::cache::PersistClientCache;
// use mz_persist_client::cfg::PersistConfig;
// use mz_persist_client::rpc::PubSubClientConnection;
// use mz_persist_client::{Diagnostics, PersistClient, PersistLocation, ShardId};
// use mz_persist_types::codec_impls::UnitSchema;
// use mz_repr::{RelationDesc, Row};
//
// use super::*;
//
// #[mz_ore::test(tokio::test)]
// #[cfg_attr(miri, ignore)] // unsupported operation: integer-to-pointer casts and `ptr::from_exposed_addr`
// async fn snapshot_stats(&self) {
// let client = PersistClientCache::new(
// PersistConfig::new_default_configs(&DUMMY_BUILD_INFO, SYSTEM_TIME.clone()),
// &MetricsRegistry::new(),
// |_, _| PubSubClientConnection::noop(),
// )
// .open(PersistLocation {
// blob_uri: "mem://".to_owned(),
// consensus_uri: "mem://".to_owned(),
// })
// .await
// .unwrap();
// let shard_id = ShardId::new();
// let since_handle = client
// .open_critical_since(
// shard_id,
// PersistClient::CONTROLLER_CRITICAL_SINCE,
// Diagnostics::for_tests(),
// )
// .await
// .unwrap();
// let mut write_handle = client
// .open_writer::<SourceData, (), u64, i64>(
// shard_id,
// Arc::new(RelationDesc::empty()),
// Arc::new(UnitSchema),
// Diagnostics::for_tests(),
// )
// .await
// .unwrap();
//
// let worker = PersistReadWorker::<u64>::new();
// worker.register(GlobalId::User(1), since_handle);
//
// // No stats for unknown GlobalId.
// let stats = worker
// .snapshot_stats(
// GlobalId::User(2),
// SnapshotStatsAsOf::Direct(Antichain::from_elem(0)),
// )
// .await;
// assert!(stats.is_err());
//
// // Stats don't resolve for as_of past the upper.
// let stats_fut = worker.snapshot_stats(
// GlobalId::User(1),
// SnapshotStatsAsOf::Direct(Antichain::from_elem(1)),
// );
// assert!(stats_fut.now_or_never().is_none());
// // Call it again because now_or_never consumed our future and it's not clone-able.
// let stats_ts1_fut = worker.snapshot_stats(
// GlobalId::User(1),
// SnapshotStatsAsOf::Direct(Antichain::from_elem(1)),
// );
//
// // Write some data.
// let data = ((SourceData(Ok(Row::default())), ()), 0u64, 1i64);
// let () = write_handle
// .compare_and_append(&[data], Antichain::from_elem(0), Antichain::from_elem(1))
// .await
// .unwrap()
// .unwrap();
//
// // Verify that we can resolve stats for ts 0 while the ts 1 stats call is outstanding.
// let stats = worker
// .snapshot_stats(
// GlobalId::User(1),
// SnapshotStatsAsOf::Direct(Antichain::from_elem(0)),
// )
// .await
// .unwrap();
// assert_eq!(stats.num_updates, 1);
//
// // Write more data and unblock the ts 1 call
// let data = ((SourceData(Ok(Row::default())), ()), 1u64, 1i64);
// let () = write_handle
// .compare_and_append(&[data], Antichain::from_elem(1), Antichain::from_elem(2))
// .await
// .unwrap()
// .unwrap();
// let stats = stats_ts1_fut.await.unwrap();
// assert_eq!(stats.num_updates, 2);
// }
// }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

omg I didnt scroll enough and almost re-asked this question in another comment

Comment on lines +588 to +609
let dependency_read_holds = self
.storage_collections
.acquire_read_holds(storage_dependencies)
.expect("can acquire read holds");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens when we are restarting envd and recreating collections in this StorageCollections, but before we acquire read holds for deps? Couldn't the compute controller downgrade the since through its StorageCollections if it boots before us?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently that can't happen, because create_collections is always called from the StorageController. In the future, the protocol has to be that the StorageController get's a chance to run before anything else happens. And it (the StorageController that is "local" to that cluster/responsible for that cluster) will install read holds.

Also, for the future: there will be a SinceHandle per cluster, and on the cluster that is responsible for maintaining a collection, the StorageController acquires since holds through that. But the SinceHandle of other clusters doesn't need to be held back beyond what its ComputeController/the adapter have as requirements.

if !dependency_read_holds.is_empty() {
let mut dependency_since = Antichain::from_elem(T::minimum());
for read_hold in dependency_read_holds.iter() {
dependency_since.join_assign(read_hold.since());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like using join instead of the bespoke logic above!!

also, we should calculate the dependency_since above the if statement, to avoid copying this code 2 times below!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Comment on lines +1799 to +1754
let dropped_subsources = self
.dropped_ingestions
.remove(id)
.expect("missing dropped subsources");

// The cluster is not sending these, so we take
// matters into our own hands!
tracing::debug!(?dropped_subsources, "synthesizing DroppedIds messages for subsources and the remap shard");
self.internal_response_sender
.send(StorageResponse::DroppedIds(
dropped_subsources.into_iter().collect(),
))
.expect("we are still alive");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see the code this is replacing...is this fixing a separate known issue as part of this pr? Additionally, why don't we just find the set of subsources/remap shard at this moment, instead of using this intermediate state? My understanding is that self.collections still has all that data at this point?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's new, and it fixes part of the issue that we're never cleaning up subsource state! I fixed this, because I was annoyed that the StorageController never drops the hold that it has with StorageCollections, meaning the latter never cleans up it's state.

Additionally, why don't we just find the set of subsources/remap shard at this moment, instead of using this intermediate state? My understanding is that self.collections still has all that data at this point?

Unfortunately, we don't have that state anymore. drop_sources_unvalidated removes the IngestionExport from the Ingestion, and I didn't want to touch that flow more than necessary. 🙈 In drop_sources_unvalidated, I now basically move the state over, so that we still have it here where we then synthesize messages. That whole flow around DroppedIds needs some love (before this PR and certainly also after!).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, lets not touch flows more than necessary!

It's very clarifying to my reading of these changes to know this is new!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing I realized is that now that we no longer use FrontierUppers feedback (instead just watching the persist shard), we might not need to wait for DroppedIds before deleting state; this may let us avoid this dance, I think a TODO to reconsider that is worth adding here!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I consider DroppedIds to be a separate part of the protocol. Petros (not summoning him here 😅) has opinions about how the protocol should evolve, so I don't want to put anything down here right now.

Comment on lines 2870 to 2926
// TODO(guswynn): we need to be more careful about the update time we get here:
// <https://github.com/MaterializeInc/materialize/issues/25349>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As an aside: can we guarantee at this point, that the old storage controller is fenced out, and that the upper from .collections_frontier() below is linearizably the greatest upper?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know! I think maybe we never could, and I think certainly information about uppers can always be outdated.

Copy link
Contributor

@guswynn guswynn May 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think ideally we need to ensure another writer to a shard is fenced...ill have to come back to this; certainly as we end up with multiple controllers writing to the statistics shard, we have to be a bit careful

Comment on lines 124 to 130
fn collection_frontiers(
&self,
id: GlobalId,
) -> Result<
(Antichain<Self::Timestamp>, Antichain<Self::Timestamp>),
StorageError<Self::Timestamp>,
>;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be a provided method built on top of collections_frontiers

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will do!

Comment on lines 135 to 136
/// requested collections, ensures that we can get a consistent "snapshot"
/// of collection state. If we had separate methods instead, and/or would
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What semantics does this snapshot have? There is nothing relating the upper frontier of any collections, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I put this comment in largely because collection state is now behind a lock and can be modified concurrently when you get frontiers (or other things) one-by-one. With this method we lock once, take the state we need, and release.

Should I just remove this comment about "snapshot"?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would replace with with "atomically", i think!

///
/// This is a separate set from `finalizable_shards` because we know that
/// some environments have many, many finalizable shards that we are
/// struggling to finalize.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this explains the fact that we have both very well...finalized_shards is in fact cleared periodically, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I copied these as-is from StorageController, but yes, the explanation is not super clear. I'll give it a think.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thank you!

Copy link
Contributor

@guswynn guswynn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Structurally I am happy with this, thank you for cleaning up some of the messier code/structure of the controller while you were at it! I think the change to pubsub-for-upper-tracking as opposed to StorageResponse's is phenomenal, that always bugged me.

I have some minor nits, questions, and comments, as well as some slightly-more-substantial suggestions on how to wield async-rust! (Also I am trying to avoid getting nerdsniped into cleaning up our source dropping/shard finalization logic)

Other than those, I have 1 larger point: I want to look at the readhold/read capability code again. I am going to accept and not block on this, because deep in my soul I believe CI will catch any bugs, but I want to make the point that I think this frontier management stuff has gotten to the boundary of what is understandable by a single person. I think there are 3 things we can do:

  • firstly, at some point, modularize all the read capability code into its own module, instead of leaving it storage_collections.
  • Clarify the connection between ReadHolds, ReadPolicy's, and "read capabilities" (as far as I can tell a "read capability" is a frontier that encapsulates a ReadPolicy + all the outstanding read holds). I think that we could come up with an abstraction that unifies these into some set of subtypes, instead of having them spread around the controller state.
  • Rename (and clarify) what an "implied capability" is. As far as I can tell, its the since frontier of a collection as understood by the storage controller, ignoring anyone else's read holds. And, as far as I can tell, its only real use is to manage the ReadHold that the controller holds. This can also be abstracted into the ReadHold struct, I feel.
    • Maybe this is just me but "implied" makes me think that its the capability that meets all ReadHolds??

I think this pr is good as is, and has absolutely made the right choice to leave things mostly as they were. But as we begin to split the storage controller into more pieces (and into separate clusters), I think we are going to hit huge issues with this complexity, which I think points to at least some future refactoring after this pr merges!

Comment on lines -748 to -772
/// The capability (hold on the since) that this export needs from its
/// dependencies (inputs). When the upper of the export changes, we
/// downgrade this, which in turn downgrades holds we have on our
/// dependencies' sinces.
pub read_capability: Antichain<T>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this drove me nuts; thank you for making it just a ReadHold

Comment on lines 77 to 78
/// - Hands out [ReadHold] that prevent a collection's since from advancing
/// while it needs to be read at a specific time.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// - Hands out [ReadHold] that prevent a collection's since from advancing
/// while it needs to be read at a specific time.
/// - Hands out [ReadHold] that prevent a collection's since from advancing
/// while it needs to be read at a specific time.
Suggested change
/// - Hands out [ReadHold] that prevent a collection's since from advancing
/// while it needs to be read at a specific time.
/// - Hands out [ReadHold]s that prevent a collection's since from advancing
/// while it needs to be read at a specific time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll change to [ReadHolds](ReadHold)

Comment on lines +87 to +97
init_ids: BTreeSet<GlobalId>,
drop_ids: BTreeSet<GlobalId>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

documenting these would be great!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Comment on lines 107 to 113
/// Marks the end of any initialization commands.
///
/// The implementor may wait for this method to be called before
/// implementing prior commands, and so it is important for a user to invoke
/// this method as soon as it is comfortable. This method can be invoked
/// immediately, at the potential expense of performance.
fn initialization_complete(&mut self);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what commands?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had the same question! On the controllers this refers to ComputeCommand/StorageCommand but StorageCollections is not concerned with these, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my mind, any calls from the adapter/Coordinator to StorageCollections are commands, because you can also imagine them having a channel as interface between them. So I kept the wording from StorageController.

I'll now remove this method from the trait altogether, because it's existence and documentation confused both of you. 😅

/// [StorageCollections::drop_collections]. Collections that have been
/// dropped by still have outstanding [ReadHolds](ReadHold) are not
/// considered active for this method.
fn active_collection_metadatas(&self) -> Vec<(GlobalId, CollectionMetadata)>;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we group this with collection_metadata?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👌

Comment on lines +1428 to +1374
// Webhooks and tables are dropped differently from
// ingestions and other collections.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Webhooks and tables are dropped differently from
// ingestions and other collections.
// Webhooks and tables are dropped differently from
// ingestions and other collections.
Suggested change
// Webhooks and tables are dropped differently from
// ingestions and other collections.
// Webhooks and tables are dropped differently from
// ingestions and other collections. We can immediately compact
// them, because they don't interact with clusterd.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👌

Comment on lines +1799 to +1754
let dropped_subsources = self
.dropped_ingestions
.remove(id)
.expect("missing dropped subsources");

// The cluster is not sending these, so we take
// matters into our own hands!
tracing::debug!(?dropped_subsources, "synthesizing DroppedIds messages for subsources and the remap shard");
self.internal_response_sender
.send(StorageResponse::DroppedIds(
dropped_subsources.into_iter().collect(),
))
.expect("we are still alive");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing I realized is that now that we no longer use FrontierUppers feedback (instead just watching the persist shard), we might not need to wait for DroppedIds before deleting state; this may let us avoid this dance, I think a TODO to reconsider that is worth adding here!

}
CollectionStateExtra::None => {
// No read holds for other types of collections!
tracing::info!("DroppedIds for collection {id}");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: debug!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👌

Comment on lines +2544 to +2614
#[instrument(level = "debug", fields(updates))]
fn update_write_frontiers(&mut self, updates: &[(GlobalId, Antichain<T>)]) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

document that its only for compute?

Ideally we could use persist feedback to drive this in the future, as well, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's actually not for compute, it's only used internally in the controller. I left the controller/cluster protocol unchanged, so we still get FrontierUppers and DroppedIds, and we drive around the read holds of ingestions based on that. It could be changed, but I didn't want to go that far.

Comment on lines 3464 to 3538
/// The policy that drives how we downgrade our read hold. That is how we
/// derive our since from our upper.
pub hold_policy: ReadPolicy<T>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we replace this (and all the derive_since and read_capabilities stuff) if we had a way to add a read policy to the policies set externally? Not a blocker, but definitely a bit weird that hand-manage a policy just to hold some ReadHold's correctly

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually like it like this: StorageCollections has a policy that is set by the adapter. The StorageController has a policy or for its internal needs, that it uses to drive forward its read handle that it has at StorageCollections.

The ReadPolicy basically encodes: "have we been dropped". And we could probably make that more explicit. But I again didn't want to go down that road right now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consider me convinced! might be worth in the comment saying: This is a _storage-controller-internal_ policy used to derive its personal read hold on the collection.

Copy link
Contributor

@teskje teskje left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Compute parts lgtm. I only skimmed over the storage controller changes.

})
.collect();
self.storage_controller
.update_write_frontiers(&storage_updates);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🥳

src/controller/src/lib.rs Outdated Show resolved Hide resolved
src/controller/src/lib.rs Show resolved Hide resolved
Comment on lines 107 to 113
/// Marks the end of any initialization commands.
///
/// The implementor may wait for this method to be called before
/// implementing prior commands, and so it is important for a user to invoke
/// this method as soon as it is comfortable. This method can be invoked
/// immediately, at the potential expense of performance.
fn initialization_complete(&mut self);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had the same question! On the controllers this refers to ComputeCommand/StorageCommand but StorageCollections is not concerned with these, right?

Comment on lines +320 to +299
/// Applies `updates` and sends any appropriate compaction command.
///
/// This is a legacy interface that should _not_ be used! It is only used by
/// the compute controller.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it's only used by the compute controller, the answer is "hopefully soon". I plan on porting the compute controller to the new ReadHolds interface once the whole storage controller refactor is merged.

src/storage-client/src/storage_collections.rs Outdated Show resolved Hide resolved
#[derive(Debug, Clone)]
pub struct StorageCollectionsImpl<
T: TimelyTimestamp + Lattice + Codec64 + From<EpochMillis> + TimestampManipulation,
> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed that in contrast to the StorageController, for StorageCollections the trait impl does not live in a separate crate. AFAIU the purpose of the StorageController trait is to free clients from having to depend on the implementation crate. Is that understanding wrong or does the split have a different purpose for StorageCollections?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That was the motivation, yes! But all of StorageCollections is pretty much a "client thing", which is why it's impl can live in the client crate. I did keep the customary separation out of a sense of consistency, plus it does hide away some of the implementation details.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fine for me! I think there is some friction introduced by having this trait in that "go to definition" leads you to the trait method definitions and there is no good way to get from there to the implementation (or at least I haven't found one apart from ctrl+f). But the point about hiding implementation details is also valid.

src/storage-client/src/controller.rs Show resolved Hide resolved
@aljoscha
Copy link
Contributor Author

@guswynn thanks for the very thorough review, I pushed a lot of individual fixup commits that address your comments!

@teskje also pushed commits for your comments. And also, thanks!

@aljoscha aljoscha force-pushed the storage-factor-out-storage-collections branch from a7970b7 to 2dff0af Compare May 23, 2024 14:12
@guswynn
Copy link
Contributor

guswynn commented May 23, 2024

fixup commits look good!

@aljoscha aljoscha force-pushed the storage-factor-out-storage-collections branch from 2dff0af to 92e9b95 Compare May 24, 2024 09:26
Preparatory work for MaterializeInc#24845, where we want to introduce more concurrency
to the Coordinator and Controllers.

The considerations/design are described in
[doc/developer/design/20240117_decoupled_storage_controller.md](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/design/20240117_decoupled_storage_controller.md)

This is an intermediate step where we factor a `StorageCollections` out
of `StorageController`, and let the `StorageController` use it's
interface instead of holding collections state/sinces itself. Note that
there are now a lot of methods in `StorageController` that pass through
to `StorageCollections`. A large part of this can be removed once we
change usage sites to use a `StorageCollections` directly.

One of the next steps is to change usage sites of `StorageController` to
use their own handle to a `StorageCollections`, bypassing the
`StorageController` for query-processing (PEEKS, SUBSCRIBE, etc.) code
paths. This will let us introduce more concurrency in the Coordinator
and do less work in the main Coordinator loop.

The important parts in this change are:

- `StorageCollections::new` and `StorageController::new`: these closely
  mirror each other.
- `StorageCollections::create_collections` and
  `StorageController::create_collections`, ditto!
- A lot of the rest is "boilerplate", passing through calls to the
  internal `StorageCollections`, and using `StorageCollections` from the
  controller instead of using owned state.
- The last interesting thing to look at is the new `BackgroundTask`: it
  takes over the work that `persist_handles::PersistReadWorker` was
  doing before plus it continually listens for upper changes and
  forwards the since frontier of collections/their since handles.
@aljoscha aljoscha force-pushed the storage-factor-out-storage-collections branch from 92e9b95 to a986ee0 Compare May 24, 2024 15:36
@aljoscha aljoscha merged commit 01ea188 into MaterializeInc:main May 24, 2024
77 of 78 checks passed
@aljoscha aljoscha deleted the storage-factor-out-storage-collections branch May 24, 2024 16:42
aljoscha added a commit to aljoscha/materialize that referenced this pull request May 27, 2024
Fixes MaterializeInc#27304

The inline comment explains the mechanism/problem that this "fixes".

My recent change that factors a `StorageCollections` out of the
`StorageController`
(MaterializeInc#27057) made an
existing bug more problematic. See below!

Before the mentioned change, this could happen:

1. create cluster c
2. create source/sink on cluster c
3. do a DROP CLUSTER c cascade
4. cluster processes are killed before they get a chance to send back
   DroppedIds messages
5. controller does not clean out state about that collection (until the
   next restart)

With my refactor, this would happen, which manifests in the observed bug:

1. create cluster c
2. create source/sink on cluster c, **this acquires a read hold at
   StorageCollections and stores it in that collection's state**
3. do a DROP CLUSTER c cascade
4. cluster processes are killed before they get a chance to send back
   DroppedIds messages
5. controller does not clean out state about that collection (until the
   next restart)
6. **the read hold is never released**, meaning StorageCollections does
   not clean out some of its state and we still report frontiers for
   these "active collections"
aljoscha added a commit to aljoscha/materialize that referenced this pull request May 28, 2024
Fixes MaterializeInc#27304

My recent change that factors a `StorageCollections` out of the
`StorageController`
(MaterializeInc#27057) made an
existing bug more problematic. See below!

Before the mentioned change, this could happen:

1. create cluster c
2. create source/sink on cluster c
3. do a DROP CLUSTER c cascade
4. cluster processes are killed before they get a chance to send back
   DroppedIds messages
5. controller does not clean out state about that collection (until the
   next restart)

With my refactor, this would happen, which manifests in the observed bug:

1. create cluster c
2. create source/sink on cluster c, **this acquires a read hold at
   StorageCollections and stores it in that collection's state**
3. do a DROP CLUSTER c cascade
4. cluster processes are killed before they get a chance to send back
   DroppedIds messages
5. controller does not clean out state about that collection (until the
   next restart)
6. **the read hold is never released**, meaning StorageCollections does
   not clean out some of its state and we still report frontiers for
   these "active collections"

Due to reasons (tm) my previous changes did make it so that we only drop
read holds when getting a DroppedIds message: the previous code would
only attempt shard finalization after getting a DroppedIds, and
StorageCollections starts to attempt shard finalization when all read
holds have been dropped. This preserved the previous behavior of
StorageController.

With this here change, we allow eagerly dropping the read holds
(advancing their since to the empty frontier), which we previously did
not allow, on purpose. This makes is so that we correctly drop their
state, and no longer report their frontiers. But it also makes it so
that we attempt shard finalization slightly earlier. I think that is
okay, though.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants