diff --git a/src/ctl/src/cmd_impl/meta/migration.rs b/src/ctl/src/cmd_impl/meta/migration.rs index c491fc3bf44b2..3277f255ebe80 100644 --- a/src/ctl/src/cmd_impl/meta/migration.rs +++ b/src/ctl/src/cmd_impl/meta/migration.rs @@ -164,7 +164,7 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an is_serving: Set(pb_property.is_serving), is_unschedulable: Set(pb_property.is_unschedulable), parallelism: Set(worker.worker_node.parallelism() as _), - internal_rpc_host_addr: Set(pb_property.internal_rpc_host_addr.clone()), + internal_rpc_host_addr: Set(Some(pb_property.internal_rpc_host_addr.clone())), }; WorkerProperty::insert(property) .exec(&meta_store_sql.conn) diff --git a/src/meta/model_v2/src/worker_property.rs b/src/meta/model_v2/src/worker_property.rs index 204882646d815..ff19cdeb6f65b 100644 --- a/src/meta/model_v2/src/worker_property.rs +++ b/src/meta/model_v2/src/worker_property.rs @@ -26,7 +26,7 @@ pub struct Model { pub is_streaming: bool, pub is_serving: bool, pub is_unschedulable: bool, - pub internal_rpc_host_addr: String, + pub internal_rpc_host_addr: Option, } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] diff --git a/src/meta/src/controller/cluster.rs b/src/meta/src/controller/cluster.rs index 9fe1451a3eebc..0340313261d9c 100644 --- a/src/meta/src/controller/cluster.rs +++ b/src/meta/src/controller/cluster.rs @@ -84,7 +84,7 @@ impl From for PbWorkerNode { is_streaming: p.is_streaming, is_serving: p.is_serving, is_unschedulable: p.is_unschedulable, - internal_rpc_host_addr: p.internal_rpc_host_addr.clone(), + internal_rpc_host_addr: p.internal_rpc_host_addr.clone().unwrap_or_default(), }), transactional_id: info.0.transaction_id.map(|id| id as _), resource: info.2.resource, @@ -653,6 +653,23 @@ impl ClusterControllerInner { self.update_worker_ttl(worker.worker_id, ttl)?; self.update_resource_and_started_at(worker.worker_id, resource)?; Ok(worker.worker_id) + } else if worker.worker_type == WorkerType::Frontend && property.is_none() { + let worker_property = worker_property::ActiveModel { + worker_id: Set(worker.worker_id), + parallelism: Set(add_property + .worker_node_parallelism + .try_into() + .expect("invalid parallelism")), + is_streaming: Set(add_property.is_streaming), + is_serving: Set(add_property.is_serving), + is_unschedulable: Set(add_property.is_unschedulable), + internal_rpc_host_addr: Set(Some(add_property.internal_rpc_host_addr)), + }; + WorkerProperty::insert(worker_property).exec(&txn).await?; + txn.commit().await?; + self.update_worker_ttl(worker.worker_id, ttl)?; + self.update_resource_and_started_at(worker.worker_id, resource)?; + Ok(worker.worker_id) } else { self.update_worker_ttl(worker.worker_id, ttl)?; self.update_resource_and_started_at(worker.worker_id, resource)?; @@ -681,7 +698,7 @@ impl ClusterControllerInner { is_streaming: Set(add_property.is_streaming), is_serving: Set(add_property.is_serving), is_unschedulable: Set(add_property.is_unschedulable), - internal_rpc_host_addr: Set(add_property.internal_rpc_host_addr), + internal_rpc_host_addr: Set(Some(add_property.internal_rpc_host_addr)), }; WorkerProperty::insert(property).exec(&txn).await?; }