Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for removing nodes from the metadata store cluster #2531

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions crates/metadata-store/src/raft/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,15 @@ impl RocksDbStorage {
self.commit_write_batch(write_batch).await
}

async fn delete(&mut self, key: impl AsRef<[u8]>) -> Result<(), Error> {
let mut write_batch = WriteBatch::default();
{
let cf = self.get_cf_handle();
write_batch.delete_cf(&cf, key.as_ref());
}
self.commit_write_batch(write_batch).await
}

pub async fn append(&mut self, entries: &Vec<Entry>) -> Result<(), Error> {
let mut write_batch = WriteBatch::default();
let mut buffer = mem::take(&mut self.buffer);
Expand Down Expand Up @@ -319,6 +328,10 @@ impl RocksDbStorage {
.await
}

pub async fn delete_raft_configuration(&mut self) -> Result<(), Error> {
self.delete(Self::raft_configuration_key()).await
}

pub fn get_raft_configuration(&self) -> Result<Option<RaftConfiguration>, Error> {
if let Some(bytes) = self.get_bytes(Self::raft_configuration_key())? {
Ok(Some(
Expand Down
187 changes: 161 additions & 26 deletions crates/metadata-store/src/raft/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,7 @@ struct Active {
request_rx: RequestReceiver,
join_cluster_rx: JoinClusterReceiver,
status_tx: StatusSender,
nodes_config_watch: watch::Receiver<Version>,
}

impl Active {
Expand Down Expand Up @@ -496,6 +497,7 @@ impl Active {
let mut config = Config {
id: to_raft_id(my_member_id.node_id),
read_only_option: ReadOnlyOption::Safe,
pre_vote: true,
..Default::default()
};

Expand Down Expand Up @@ -532,6 +534,9 @@ impl Active {
status_tx,
pending_join_requests: HashMap::default(),
read_index_to_request_id: VecDeque::default(),
nodes_config_watch: Metadata::with_current(|m| {
m.watch(MetadataKind::NodesConfiguration)
}),
})
}

Expand All @@ -551,23 +556,34 @@ impl Active {

let mut status_update_interval = time::interval(Duration::from_secs(5));

let mut nodes_config_watch =
Metadata::with_current(|m| m.watch(MetadataKind::NodesConfiguration));
nodes_config_watch.mark_changed();

loop {
tokio::select! {
Some(request) = self.request_rx.recv() => {
self.handle_request(request);
},
Some(request) = self.join_cluster_rx.recv() => {
Some(request) = self.join_cluster_rx.recv(), if self.is_caught_up() => {
self.handle_join_request(request);
}
Some(raft) = self.raft_rx.recv() => {
self.raw_node.step(raft)?;
if let Err(err) = self.raw_node.step(raft) {
match err {
RaftError::StepPeerNotFound => {
info!("Received raft message from unknown node. This can happen if \
the node has been removed from the cluster. If not, then this \
indicates a misconfiguration of your cluster!");
}
// escalate, as we can't handle this error
err => Err(err)?
}
}
},
Ok(()) = nodes_config_watch.changed() => {
Ok(()) = self.nodes_config_watch.changed(), if self.is_caught_up() => {
if self.should_leave() {
break;
}

self.update_node_addresses(&Metadata::with_current(|m| m.nodes_config_ref()));
self.check_leave_requests();
},
_ = tick_interval.tick() => {
self.raw_node.tick();
Expand All @@ -580,6 +596,56 @@ impl Active {
self.on_ready().await?;
self.update_leadership();
}

self.fail_pending_requests();

let mut storage = self.raw_node.raft.r.raft_log.store;
// prevent us from starting as a member again
storage.delete_raft_configuration().await?;

let passive = Passive::new(
storage,
self.connection_manager,
self.request_rx,
self.join_cluster_rx,
self.metadata_writer,
self.my_member_id.storage_id,
self.status_tx,
);

Ok(passive)
}

fn should_leave(&self) -> bool {
let nodes_config = Metadata::with_current(|m| m.nodes_config_ref());
let my_node_config = nodes_config
.find_node_by_id(self.my_member_id.node_id)
.expect("my node config should be present");
// we use the NodesConfiguration as the authoritative signal for leaving the cluster
let should_leave = my_node_config.metadata_server_config.metadata_server_state
== MetadataServerState::Outsider;
Comment on lines +624 to +626
Copy link
Contributor Author

@tillrohrmann tillrohrmann Jan 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I realized that this won't reliably work in the case where the user removed the node and then shortly after added it back. If the node missed the actual configuration change and then reaches this point, it looks at the MetadataServerStatus and thinks that everything is ok. The problem is that w/o sending a join request, the current leader won't add this node back to the configuration.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One idea for solving this problem could be to also introduce a transitional state for joining a cluster (similar to Leaving). That way, adding a node won't immediately set the state to Member and would tell a node which missed the configuration change to step down and rejoin.


if should_leave {
let my_member_id = self.my_member_id;
let is_member = self.is_member(my_member_id);

if is_member {
info!("Asked to leave the metadata store cluster as of NodesConfiguration '{}' while \
still being a member of the configuration. This indicates that I missed the \
configuration change to remove me.", nodes_config.version());
} else {
info!(
"Leaving metadata store cluster as of NodesConfiguration '{}'",
nodes_config.version()
);
}
}

should_leave
}

fn is_caught_up(&self) -> bool {
self.raw_node.raft.raft_log.applied == self.raw_node.raft.raft_log.committed
}

fn update_leadership(&mut self) {
Expand All @@ -588,23 +654,27 @@ impl Active {

if previous_is_leader && !self.is_leader {
debug!("Lost metadata store leadership");
let known_leader = self.known_leader();

// todo we might fail some of the request too eagerly here because the answer might be
// stored in the unapplied log entries. Better to fail the callbacks based on
// (term, index).
// we lost leadership :-( notify callers that their requests might not get committed
// because we don't know whether the leader will start with the same log as we have.
self.kv_storage.fail_pending_requests(|| {
RequestError::Unavailable("lost leadership".into(), known_leader.clone())
});
self.fail_join_callbacks(|| JoinClusterError::NotLeader(known_leader.clone()));
self.read_index_to_request_id.clear();
self.fail_pending_requests();
} else if !previous_is_leader && self.is_leader {
debug!("Won metadata store leadership");
}
}

fn fail_pending_requests(&mut self) {
let known_leader = self.known_leader();

// todo we might fail some of the request too eagerly here because the answer might be
// stored in the unapplied log entries. Better to fail the callbacks based on
// (term, index).
// we lost leadership :-( notify callers that their requests might not get committed
// because we don't know whether the leader will start with the same log as we have.
self.kv_storage.fail_pending_requests(|| {
RequestError::Unavailable("lost leadership".into(), known_leader.clone())
});
self.fail_join_callbacks(|| JoinClusterError::NotLeader(known_leader.clone()));
self.read_index_to_request_id.clear();
}

fn handle_request(&mut self, request: MetadataStoreRequest) {
let request = request.into_request();
trace!("Handle metadata store request: {request:?}");
Expand Down Expand Up @@ -667,7 +737,7 @@ impl Active {
return;
}

let nodes_config = Metadata::with_current(|m| m.nodes_config_ref());
let nodes_config = self.kv_storage.last_seen_nodes_configuration();

let Ok(joining_node_config) = nodes_config.find_node_by_id(joining_member_id.node_id)
else {
Expand Down Expand Up @@ -695,12 +765,7 @@ impl Active {

// It's possible to batch multiple new joining nodes into a single conf change if we want.
// This will, however, require joint consensus.
let mut conf_change_single = ConfChangeSingle::new();
conf_change_single.change_type = ConfChangeType::AddNode;
conf_change_single.node_id = to_raft_id(joining_member_id.node_id);

let mut conf_change = ConfChangeV2::new();
conf_change.set_changes(vec![conf_change_single].into());
let conf_change = Self::add_node(joining_member_id.node_id);

if let Err(err) = self.raw_node.propose_conf_change(Vec::new(), conf_change) {
let response = match err {
Expand All @@ -718,6 +783,74 @@ impl Active {
}
}

fn add_node(node_id: PlainNodeId) -> ConfChangeV2 {
let mut conf_change_single = ConfChangeSingle::new();
conf_change_single.change_type = ConfChangeType::AddNode;
conf_change_single.node_id = to_raft_id(node_id);

let mut conf_change = ConfChangeV2::new();
conf_change.set_changes(vec![conf_change_single].into());
conf_change
}

fn remove_node(node_id: PlainNodeId) -> ConfChangeV2 {
let mut conf_change_single = ConfChangeSingle::new();
conf_change_single.change_type = ConfChangeType::RemoveNode;
conf_change_single.node_id = to_raft_id(node_id);

let mut conf_change = ConfChangeV2::new();
conf_change.set_changes(vec![conf_change_single].into());
conf_change
}

fn check_leave_requests(&mut self) {
if !self.is_leader {
// only the leader should handle leave requests
return;
}

if self.raw_node.raft.has_pending_conf() {
// while another configuration change is pending we can't start a new one
return;
}

if !self.is_caught_up() {
// while we aren't caught up, we risk operating on a stale NodesConfiguration
// schedule another check once we are caught up
self.nodes_config_watch.mark_changed();
return;
}

let nodes_config = Metadata::with_current(|m| m.nodes_config_ref());
let mut leaver = None;

for member_id in self.current_members() {
if let Ok(node_config) = nodes_config.find_node_by_id(member_id.node_id) {
if node_config.metadata_server_config.metadata_server_state
== MetadataServerState::Leaving
{
leaver = Some(member_id);
break;
}
}
}

let Some(leaver) = leaver else {
return;
};

let conf_change = Self::remove_node(leaver.node_id);

if let Err(err) = self.raw_node.propose_conf_change(Vec::new(), conf_change) {
warn!("Failed to remove node '{}': {err:?}", leaver);
} else {
info!(
"Triggered reconfiguration of metadata store cluster to remove node '{}'",
leaver
);
}
}

async fn on_ready(&mut self) -> Result<(), Error> {
if !self.raw_node.has_ready() {
return Ok(());
Expand Down Expand Up @@ -915,6 +1048,8 @@ impl Active {
self.update_leadership();
self.update_node_addresses(&Metadata::with_current(|m| m.nodes_config_ref()));
self.update_status();
// schedule a check for leave requests that we might have missed in the meantime
self.nodes_config_watch.mark_changed();

Ok(())
}
Expand Down
42 changes: 39 additions & 3 deletions crates/types/src/nodes_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,12 +183,45 @@ impl NodesConfiguration {
MaybeNode::Node(found) => found,
};

if node_id
if !Self::is_node_id_matching(node_id, found) {
return Err(NodesConfigError::GenerationMismatch {
expected: node_id,
found: found.current_generation.into(),
});
}

Ok(found)
}

fn is_node_id_matching(node_id: NodeId, found: &NodeConfig) -> bool {
!node_id
.as_generational()
.is_some_and(|requested_generational| {
requested_generational != found.current_generation
})
{
}

/// Find a node by its ID. If called with a generational ID, the node config will only return
/// if the node has the same generation, otherwise, it returns
/// NodeConfigError::GenerationMismatch. If called with a plain node id (either PlainNodeId, or
/// NodeId::Plain) then it won't care about the generation and will only return based on the ID
/// match.
pub fn find_node_by_id_mut(
&mut self,
id: impl Into<NodeId>,
) -> Result<&mut NodeConfig, NodesConfigError> {
let node_id: NodeId = id.into();
let maybe = self.nodes.get_mut(&node_id.id());
let Some(maybe) = maybe else {
return Err(NodesConfigError::UnknownNodeId(node_id));
};

let found = match maybe {
MaybeNode::Tombstone => return Err(NodesConfigError::Deleted(node_id)),
MaybeNode::Node(found) => found,
};

if !Self::is_node_id_matching(node_id, found) {
return Err(NodesConfigError::GenerationMismatch {
expected: node_id,
found: found.current_generation.into(),
Expand Down Expand Up @@ -398,10 +431,13 @@ impl StorageState {
pub enum MetadataServerState {
/// The server is not considered as part of the metadata store cluster. Node can be safely
/// decommissioned.
#[default]
Outsider,
/// The server is an active member of the metadata store cluster.
#[default]
Member,
/// The server is in the process of leaving the metadata store cluster. Right now, it is still
/// considered a member of the cluster.
Leaving,
}

#[derive(Clone, Default, Debug, Eq, PartialEq, serde::Serialize, serde::Deserialize)]
Expand Down
Loading
Loading