Skip to content

Commit

Permalink
fix: fix fe internal rpc host addr issues (#18241) (#18243)
Browse files Browse the repository at this point in the history
Co-authored-by: August <[email protected]>
  • Loading branch information
github-actions[bot] and yezizp2012 authored Aug 26, 2024
1 parent 940ee8b commit 7c389f1
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 4 deletions.
2 changes: 1 addition & 1 deletion src/ctl/src/cmd_impl/meta/migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an
is_serving: Set(pb_property.is_serving),
is_unschedulable: Set(pb_property.is_unschedulable),
parallelism: Set(worker.worker_node.parallelism() as _),
internal_rpc_host_addr: Set(pb_property.internal_rpc_host_addr.clone()),
internal_rpc_host_addr: Set(Some(pb_property.internal_rpc_host_addr.clone())),
};
WorkerProperty::insert(property)
.exec(&meta_store_sql.conn)
Expand Down
2 changes: 1 addition & 1 deletion src/meta/model_v2/src/worker_property.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub struct Model {
pub is_streaming: bool,
pub is_serving: bool,
pub is_unschedulable: bool,
pub internal_rpc_host_addr: String,
pub internal_rpc_host_addr: Option<String>,
}

#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
Expand Down
21 changes: 19 additions & 2 deletions src/meta/src/controller/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ impl From<WorkerInfo> for PbWorkerNode {
is_streaming: p.is_streaming,
is_serving: p.is_serving,
is_unschedulable: p.is_unschedulable,
internal_rpc_host_addr: p.internal_rpc_host_addr.clone(),
internal_rpc_host_addr: p.internal_rpc_host_addr.clone().unwrap_or_default(),
}),
transactional_id: info.0.transaction_id.map(|id| id as _),
resource: info.2.resource,
Expand Down Expand Up @@ -653,6 +653,23 @@ impl ClusterControllerInner {
self.update_worker_ttl(worker.worker_id, ttl)?;
self.update_resource_and_started_at(worker.worker_id, resource)?;
Ok(worker.worker_id)
} else if worker.worker_type == WorkerType::Frontend && property.is_none() {
let worker_property = worker_property::ActiveModel {
worker_id: Set(worker.worker_id),
parallelism: Set(add_property
.worker_node_parallelism
.try_into()
.expect("invalid parallelism")),
is_streaming: Set(add_property.is_streaming),
is_serving: Set(add_property.is_serving),
is_unschedulable: Set(add_property.is_unschedulable),
internal_rpc_host_addr: Set(Some(add_property.internal_rpc_host_addr)),
};
WorkerProperty::insert(worker_property).exec(&txn).await?;
txn.commit().await?;
self.update_worker_ttl(worker.worker_id, ttl)?;
self.update_resource_and_started_at(worker.worker_id, resource)?;
Ok(worker.worker_id)
} else {
self.update_worker_ttl(worker.worker_id, ttl)?;
self.update_resource_and_started_at(worker.worker_id, resource)?;
Expand Down Expand Up @@ -681,7 +698,7 @@ impl ClusterControllerInner {
is_streaming: Set(add_property.is_streaming),
is_serving: Set(add_property.is_serving),
is_unschedulable: Set(add_property.is_unschedulable),
internal_rpc_host_addr: Set(add_property.internal_rpc_host_addr),
internal_rpc_host_addr: Set(Some(add_property.internal_rpc_host_addr)),
};
WorkerProperty::insert(property).exec(&txn).await?;
}
Expand Down

0 comments on commit 7c389f1

Please sign in to comment.