Skip to content

Commit

Permalink
Merge branch 'main' into v2
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex authored Jun 13, 2024
2 parents 1f31f25 + 61567c8 commit 825dbbe
Show file tree
Hide file tree
Showing 15 changed files with 82 additions and 38 deletions.
1 change: 1 addition & 0 deletions .github/workflows/flow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ jobs:
PGUSER: postgres
POSTGRES_PASSWORD: postgres
POSTGRES_DB: postgres
POSTGRES_INITDB_ARGS: --locale=C.UTF-8
elasticsearch:
image: elasticsearch:8.13.0
ports:
Expand Down
9 changes: 6 additions & 3 deletions docker-compose-dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,21 @@ x-flow-worker-env: &flow-worker-env
services:
catalog:
container_name: catalog
image: debezium/postgres:16-alpine
image: postgres:16-alpine
command: -c config_file=/etc/postgresql.conf
ports:
- 9901:5432
environment:
PGUSER: postgres
POSTGRES_PASSWORD: postgres
POSTGRES_DB: postgres
POSTGRES_INITDB_ARGS: --locale=C.UTF-8
extra_hosts:
- "host.docker.internal:host-gateway"
volumes:
- pgdata:/var/lib/postgresql/data
- ./volumes/postgresql.conf:/etc/postgresql.conf
- ./volumes/docker-entrypoint-initdb.d:/docker-entrypoint-initdb.d
healthcheck:
test: ["CMD", "pg_isready", "-d", "postgres", "-U", "postgres"]
interval: 10s
Expand All @@ -73,7 +77,7 @@ services:
ports:
- 7233:7233
volumes:
- ./temporal-dynamicconfig:/etc/temporal/config/dynamicconfig
- ./volumes/temporal-dynamicconfig:/etc/temporal/config/dynamicconfig
labels:
kompose.volume.type: configMap

Expand Down Expand Up @@ -165,7 +169,6 @@ services:
dockerfile: stacks/peerdb-server.Dockerfile
environment:
<<: *catalog-config
PEERDB_LOG_DIR: /var/log/peerdb
PEERDB_PASSWORD: peerdb
PEERDB_FLOW_SERVER_ADDRESS: grpc://flow_api:8112
RUST_LOG: info
Expand Down
9 changes: 6 additions & 3 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,22 @@ x-flow-worker-env: &flow-worker-env
services:
catalog:
container_name: catalog
image: debezium/postgres:16-alpine
image: postgres:16-alpine
command: -c config_file=/etc/postgresql.conf
restart: unless-stopped
ports:
- 9901:5432
environment:
PGUSER: postgres
POSTGRES_PASSWORD: postgres
POSTGRES_DB: postgres
POSTGRES_INITDB_ARGS: --locale=C.UTF-8
extra_hosts:
- "host.docker.internal:host-gateway"
volumes:
- pgdata:/var/lib/postgresql/data
- ./volumes/postgresql.conf:/etc/postgresql.conf
- ./volumes/docker-entrypoint-initdb.d:/docker-entrypoint-initdb.d
healthcheck:
test: ["CMD", "pg_isready", "-d", "postgres", "-U", "postgres"]
interval: 10s
Expand All @@ -68,7 +72,7 @@ services:
ports:
- 7233:7233
volumes:
- ./temporal-dynamicconfig:/etc/temporal/config/dynamicconfig
- ./volumes/temporal-dynamicconfig:/etc/temporal/config/dynamicconfig
labels:
kompose.volume.type: configMap

Expand Down Expand Up @@ -146,7 +150,6 @@ services:
restart: unless-stopped
environment:
<<: *catalog-config
PEERDB_LOG_DIR: /var/log/peerdb
PEERDB_PASSWORD:
PEERDB_FLOW_SERVER_ADDRESS: grpc://flow_api:8112
RUST_LOG: info
Expand Down
1 change: 0 additions & 1 deletion flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -794,7 +794,6 @@ func (a *FlowableActivity) CreateTablesFromExisting(ctx context.Context, req *pr
}
return bqConn.CreateTablesFromExisting(ctx, req)
}
a.Alerter.LogFlowError(ctx, req.FlowJobName, err)
return nil, errors.New("create tables from existing is only supported on snowflake and bigquery")
}

Expand Down
2 changes: 1 addition & 1 deletion flow/e2e/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func (s KafkaSuite) TestMessage() {
env := e2e.ExecutePeerflow(tc, peerflow.CDCFlowWorkflow, flowConnConfig, nil)
e2e.SetupCDCFlowStatusQuery(s.t, env, flowConnConfig)

_, err = s.Conn().Exec(context.Background(), "select pg_logical_emit_message(false, 'topic', convert_to('heartbeat', 'utf-8'))")
_, err = s.Conn().Exec(context.Background(), "select pg_logical_emit_message(false, 'topic', '\\x686561727462656174'::bytea)")
require.NoError(s.t, err)

e2e.EnvWaitFor(s.t, env, 3*time.Minute, "normalize message", func() bool {
Expand Down
34 changes: 31 additions & 3 deletions nexus/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion nexus/catalog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,5 @@ tokio-postgres = { version = "0.7.6", features = [
"with-serde_json-1",
"with-uuid-1",
] }
tracing = "0.1.29"
tracing.workspace = true
postgres-connection = { path = "../postgres-connection" }
2 changes: 1 addition & 1 deletion nexus/server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ time = "0.3"
tokio = { version = "1", features = ["full"] }
tracing.workspace = true
tracing-appender = "0.2"
tracing-subscriber = "0.3"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
uuid = "1.0"
cargo-deb = "2.0"

Expand Down
53 changes: 28 additions & 25 deletions nexus/server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ use tokio::signal::unix::{signal, SignalKind};
use tokio::sync::Mutex;
use tokio::{io::AsyncWriteExt, net::TcpListener};
use tracing_appender::non_blocking::WorkerGuard;
use tracing_subscriber::{fmt, prelude::*};
use tracing_subscriber::{fmt, prelude::*, EnvFilter};

mod cursor;

Expand Down Expand Up @@ -1079,11 +1079,8 @@ struct Args {
tls_key: Option<String>,

/// Path to the directory where peerdb logs will be written to.
///
/// This is only respected in release mode. In debug mode the logs
/// will exlusively be written to stdout.
#[clap(short, long, default_value = "/var/log/peerdb", env = "PEERDB_LOG_DIR")]
log_dir: String,
#[clap(short, long, env = "PEERDB_LOG_DIR")]
log_dir: Option<String>,

/// Password for the postgres interface.
///
Expand Down Expand Up @@ -1130,28 +1127,34 @@ impl ServerParameterProvider for NexusServerParameterProvider {
}
}

struct TracerGuards {
_rolling_guard: WorkerGuard,
}

// setup tracing
fn setup_tracing(log_dir: &str) -> TracerGuards {
// also log to peerdb.log in log_dir
let file_appender = tracing_appender::rolling::never(log_dir, "peerdb.log");
let (non_blocking, _guard) = tracing_appender::non_blocking(file_appender);
let fmt_file_layer = fmt::layer().with_target(false).with_writer(non_blocking);
type TracerGuards = Option<WorkerGuard>;

fn setup_tracing(log_dir: Option<&str>) -> TracerGuards {
let fmt_stdout_layer = fmt::layer().with_target(false).with_writer(std::io::stdout);

tracing_subscriber::registry()
.with(fmt_stdout_layer)
.with(fmt_file_layer)
.init();
// add min tracing as info
let env_filter = EnvFilter::try_from_default_env()
.or_else(|_| EnvFilter::try_new("info"))
.unwrap();

// return guard so that the file appender is not dropped
// and the file is not closed
TracerGuards {
_rolling_guard: _guard,
let tracing = tracing_subscriber::registry()
.with(fmt_stdout_layer)
.with(env_filter);

// return guard so file appender is not dropped, which would close file
match log_dir {
Some(log_dir) if !log_dir.is_empty() => {
// also log to peerdb.log in log_dir
let file_appender = tracing_appender::rolling::never(log_dir, "peerdb.log");
let (non_blocking, guard) = tracing_appender::non_blocking(file_appender);
let fmt_file_layer = fmt::layer().with_target(false).with_writer(non_blocking);
tracing.with(fmt_file_layer).init();
Some(guard)
}
_ => {
tracing.init();
None
}
}
}

Expand Down Expand Up @@ -1182,7 +1185,7 @@ pub async fn main() -> anyhow::Result<()> {
dotenvy::dotenv().ok();

let args = Args::parse();
let _guard = setup_tracing(&args.log_dir);
let _guard = setup_tracing(args.log_dir.as_ref().map(|s| &s[..]));

let authenticator = MakeSASLScramAuthStartupHandler::new(
Arc::new(FixedPasswordAuthSource::new(args.peerdb_password.clone())),
Expand Down
2 changes: 2 additions & 0 deletions volumes/docker-entrypoint-initdb.d/pg-hba-replication.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
#!/bin/sh
echo "host replication $POSTGRES_USER 0.0.0.0/0 trust" >> "$PGDATA/pg_hba.conf"
5 changes: 5 additions & 0 deletions volumes/postgresql.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
listen_addresses = '*'

wal_level = logical
max_wal_senders = 4
max_replication_slots = 4
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.

0 comments on commit 825dbbe

Please sign in to comment.