From 5ea8d6d01b9deaee1906e19c68d1300973a26c30 Mon Sep 17 00:00:00 2001 From: CookiePieWw Date: Fri, 10 Jan 2025 11:24:19 +0800 Subject: [PATCH] test(integration): add pg kvbackend --- .github/workflows/develop.yml | 19 +++++++++++++ tests-integration/Cargo.toml | 2 ++ tests-integration/src/cluster.rs | 47 ++++++++++++++++++++++---------- 3 files changed, 53 insertions(+), 15 deletions(-) diff --git a/.github/workflows/develop.yml b/.github/workflows/develop.yml index 4fada893ca30..d6e313ac2d7e 100644 --- a/.github/workflows/develop.yml +++ b/.github/workflows/develop.yml @@ -703,6 +703,25 @@ jobs: GT_KAFKA_ENDPOINTS: 127.0.0.1:9092 GT_KAFKA_SASL_ENDPOINTS: 127.0.0.1:9093 UNITTEST_LOG_DIR: "__unittest_logs" + - name: Run integration test with pg kvbackend + run: cargo test integration + env: + CARGO_BUILD_RUSTFLAGS: "-C link-arg=-fuse-ld=mold" + RUST_BACKTRACE: 1 + CARGO_INCREMENTAL: 0 + GT_S3_BUCKET: ${{ vars.AWS_CI_TEST_BUCKET }} + GT_S3_ACCESS_KEY_ID: ${{ secrets.AWS_CI_TEST_ACCESS_KEY_ID }} + GT_S3_ACCESS_KEY: ${{ secrets.AWS_CI_TEST_SECRET_ACCESS_KEY }} + GT_S3_REGION: ${{ vars.AWS_CI_TEST_BUCKET_REGION }} + GT_MINIO_BUCKET: greptime + GT_MINIO_ACCESS_KEY_ID: superpower_ci_user + GT_MINIO_ACCESS_KEY: superpower_password + GT_MINIO_REGION: us-west-2 + GT_MINIO_ENDPOINT_URL: http://127.0.0.1:9000 + GT_POSTGRES_ENDPOINTS: postgres://greptimedb:admin@127.0.0.1:5432/postgres + GT_KAFKA_ENDPOINTS: 127.0.0.1:9092 + GT_KAFKA_SASL_ENDPOINTS: 127.0.0.1:9093 + UNITTEST_LOG_DIR: "__unittest_logs" - name: Codecov upload uses: codecov/codecov-action@v4 with: diff --git a/tests-integration/Cargo.toml b/tests-integration/Cargo.toml index 057890b6395d..25cfb599d838 100644 --- a/tests-integration/Cargo.toml +++ b/tests-integration/Cargo.toml @@ -5,6 +5,8 @@ edition.workspace = true license.workspace = true [features] +pg_kvbackend = ["common-meta/pg_kvbackend", "meta-srv/pg_kvbackend"] +default = ["pg_kvbackend"] dashboard = [] [lints] diff --git a/tests-integration/src/cluster.rs b/tests-integration/src/cluster.rs index 93664c2f19cd..054592e9b56b 100644 --- a/tests-integration/src/cluster.rs +++ b/tests-integration/src/cluster.rs @@ -36,11 +36,14 @@ use common_meta::heartbeat::handler::HandlerGroupExecutor; use common_meta::kv_backend::chroot::ChrootKvBackend; use common_meta::kv_backend::etcd::EtcdStore; use common_meta::kv_backend::memory::MemoryKvBackend; +#[cfg(feature = "pg_kvbackend")] +use common_meta::kv_backend::postgres::PgStore; use common_meta::kv_backend::KvBackendRef; use common_meta::peer::Peer; use common_meta::DatanodeId; use common_runtime::runtime::BuilderBuild; use common_runtime::Builder as RuntimeBuilder; +use common_telemetry::info; use common_test_util::temp_dir::create_temp_dir; use common_wal::config::{DatanodeWalConfig, MetasrvWalConfig}; use datanode::config::{DatanodeOptions, ObjectStoreConfig}; @@ -94,21 +97,35 @@ pub struct GreptimeDbClusterBuilder { impl GreptimeDbClusterBuilder { pub async fn new(cluster_name: &str) -> Self { - let endpoints = env::var("GT_ETCD_ENDPOINTS").unwrap_or_default(); - - let kv_backend: KvBackendRef = if endpoints.is_empty() { - Arc::new(MemoryKvBackend::new()) - } else { - let endpoints = endpoints - .split(',') - .map(|s| s.to_string()) - .collect::>(); - let backend = EtcdStore::with_endpoints(endpoints, 128) - .await - .expect("malformed endpoints"); - // Each retry requires a new isolation namespace. - let chroot = format!("{}{}", cluster_name, Uuid::new_v4()); - Arc::new(ChrootKvBackend::new(chroot.into(), backend)) + let etcd_endpoints = env::var("GT_ETCD_ENDPOINTS").unwrap_or_default(); + let pg_endpoint = env::var("GT_PG_ENDPOINTS").unwrap_or_default(); + + let kv_backend: KvBackendRef = match (etcd_endpoints.is_empty(), pg_endpoint.is_empty()) { + (true, true) => { + info!("Using memory kv backend"); + Arc::new(MemoryKvBackend::new()) + } + (false, _) => { + info!("Using etcd endpoints: {}", etcd_endpoints); + let endpoints = etcd_endpoints + .split(',') + .map(|s| s.to_string()) + .collect::>(); + let backend = EtcdStore::with_endpoints(endpoints, 128) + .await + .expect("malformed endpoints"); + // Each retry requires a new isolation namespace. + let chroot = format!("{}{}", cluster_name, Uuid::new_v4()); + Arc::new(ChrootKvBackend::new(chroot.into(), backend)) + } + (true, false) => { + info!("Using pg endpoint: {}", pg_endpoint); + let backend = PgStore::with_url(&pg_endpoint, "greptime_metakv", 128) + .await + .expect("malformed pg endpoint"); + let chroot = format!("{}{}", cluster_name, Uuid::new_v4()); + Arc::new(ChrootKvBackend::new(chroot.into(), backend)) + } }; Self {