Skip to content

Commit

Permalink
Adressing comments on first commit: changed disable_sync_crc naming t…
Browse files Browse the repository at this point in the history
…o bypass_crc, encapsulated condition checks for skipping CRC, and chenged connIsTLS condition to connIntegrityChecked in ConnectionType. Some changes in the test as well

Signed-off-by: Tal Shachar <[email protected]>
  • Loading branch information
talxsha committed Dec 29, 2024
1 parent 67fa361 commit 2e5314e
Show file tree
Hide file tree
Showing 14 changed files with 97 additions and 85 deletions.
1 change: 0 additions & 1 deletion src/config.c
Original file line number Diff line number Diff line change
Expand Up @@ -3156,7 +3156,6 @@ static int applyClientMaxMemoryUsage(const char **err) {
standardConfig static_configs[] = {
/* Bool configs */
createBoolConfig("rdbchecksum", NULL, IMMUTABLE_CONFIG, server.rdb_checksum, 1, NULL, NULL),
createBoolConfig("disable-sync-crc", NULL, MODIFIABLE_CONFIG, server.disable_sync_crc, 0, NULL, NULL),
createBoolConfig("daemonize", NULL, IMMUTABLE_CONFIG, server.daemonize, 0, NULL, NULL),
createBoolConfig("always-show-logo", NULL, IMMUTABLE_CONFIG, server.always_show_logo, 0, NULL, NULL),
createBoolConfig("protected-mode", NULL, MODIFIABLE_CONFIG, server.protected_mode, 1, NULL, NULL),
Expand Down
7 changes: 7 additions & 0 deletions src/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,9 @@ typedef struct ConnectionType {

/* TLS specified methods */
sds (*get_peer_cert)(struct connection *conn);

/* Miselenious */
int (*connIntegrityChecked)(void); // return 1 iff connection type has built-in integrity checks
} ConnectionType;

struct connection {
Expand Down Expand Up @@ -483,4 +486,8 @@ static inline void connSetPostponeUpdateState(connection *conn, int on) {
}
}

static inline int connIsIntegrityChecked(connection *conn) {
return conn->type->connIntegrityChecked && conn->type->connIntegrityChecked();
}

#endif /* __REDIS_CONNECTION_H */
20 changes: 10 additions & 10 deletions src/rdb.c
Original file line number Diff line number Diff line change
Expand Up @@ -3010,7 +3010,7 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin
int error;
long long empty_keys_skipped = 0;

if (rdb->flags & RIO_FLAG_DISABLE_CRC) server.stat_total_crc_disabled_syncs_stated++;
if (rdb->flags & RIO_FLAG_BYPASS_CRC) server.stat_total_sync_bypass_crc++;
rdb->update_cksum = rdbLoadProgressCallback;
rdb->max_processing_chunk = server.loading_process_events_interval_bytes;
if (rioRead(rdb, buf, 9) == 0) goto eoferr;
Expand Down Expand Up @@ -3355,7 +3355,7 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin
if (rioRead(rdb, &cksum, 8) == 0) goto eoferr;
if (server.rdb_checksum && !server.skip_checksum_validation) {
memrev64ifbe(&cksum);
if (cksum == 0 || (rdb->flags & RIO_FLAG_DISABLE_CRC) != 0) {
if (cksum == 0 || (rdb->flags & RIO_FLAG_BYPASS_CRC)) {
serverLog(LL_NOTICE, "RDB file was saved with checksum disabled: no check performed.");
} else if (cksum != expected) {
serverLog(LL_WARNING,
Expand Down Expand Up @@ -3548,9 +3548,9 @@ int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi) {
}
/*
* For replicas with repl_state == REPLICA_STATE_WAIT_BGSAVE_END and replica_req == req:
* Check replica capabilities, if every replica supports disabled CRC, run with CRC disabled, otherwise, use CRC.
* Check replica capabilities, if every replica supports bypassing CRC, primary should also bypass CRC, otherwise, use CRC.
*/
int disable_sync_crc_capa = server.disable_sync_crc;
int bypass_crc_capa = server.bypass_crc;
/* Collect the connections of the replicas we want to transfer
* the RDB to, which are in WAIT_BGSAVE_START state. */
int connsnum = 0;
Expand Down Expand Up @@ -3585,9 +3585,9 @@ int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi) {
replicationSetupReplicaForFullResync(replica, getPsyncInitialOffset());
}

// do not disable CRC on the primary if TLS is disabled or if the replica doesn't support it
if (!connIsTLS(replica->conn) || (replica->replica_capa & REPLICA_CAPA_DISABLE_SYNC_CRC) == 0)
disable_sync_crc_capa = 0;
// do not bypass CRC on the primary if TLS is disabled or if the replica doesn't support it
if (!connIsIntegrityChecked(replica->conn) || !(replica->replica_capa & REPLICA_CAPA_BYPASS_CRC))
bypass_crc_capa = 0;

}

Expand All @@ -3612,10 +3612,10 @@ int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi) {
}
serverSetCpuAffinity(server.bgsave_cpulist);

if (disable_sync_crc_capa == 1) {
if (bypass_crc_capa) {
serverLog(LL_NOTICE, "CRC checksum is disabled for this RDB transfer");
// mark rdb object to skip CRC checksum calculations
rdb.flags |= RIO_FLAG_DISABLE_CRC;
rdb.flags |= RIO_FLAG_BYPASS_CRC;
}

retval = rdbSaveRioWithEOFMark(req, &rdb, NULL, rsi);
Expand Down Expand Up @@ -3683,7 +3683,7 @@ int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi) {
}
}
if (!dual_channel) close(safe_to_exit_pipe);
if (disable_sync_crc_capa) server.stat_total_crc_disabled_syncs_stated++;
if (bypass_crc_capa) server.stat_total_sync_bypass_crc++;
return (childpid == -1) ? C_ERR : C_OK;
}
return C_OK; /* Unreached. */
Expand Down
3 changes: 3 additions & 0 deletions src/rdma.c
Original file line number Diff line number Diff line change
Expand Up @@ -1817,6 +1817,9 @@ static ConnectionType CT_RDMA = {
.process_pending_data = rdmaProcessPendingData,
.postpone_update_state = postPoneUpdateRdmaState,
.update_state = updateRdmaState,

/* Miselenious */
.connIntegrityChecked = NULL,
};

ConnectionType *connectionTypeRdma(void) {
Expand Down
37 changes: 18 additions & 19 deletions src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -1244,12 +1244,12 @@ void syncCommand(client *c) {
* the primary can accurately lists replicas and their listening ports in the
* INFO output.
*
* - capa <eof|psync2|dual-channel|disable_sync_crc>
* - capa <eof|psync2|dual-channel|bypass-crc>
* What is the capabilities of this instance.
* eof: supports EOF-style RDB transfer for diskless replication.
* psync2: supports PSYNC v2, so understands +CONTINUE <new repl ID>.
* dual-channel: supports full sync using rdb channel.
* disable_sync_crc: supports disabling CRC during TLS enabled diskless sync.
* bypass-crc: supports skipping CRC calculations during TLS enabled diskless sync.
*
* - ack <offset> [fack <aofofs>]
* Replica informs the primary the amount of replication stream that it
Expand Down Expand Up @@ -1315,8 +1315,8 @@ void replconfCommand(client *c) {
/* If dual-channel is disable on this primary, treat this command as unrecognized
* replconf option. */
c->replica_capa |= REPLICA_CAPA_DUAL_CHANNEL;
} else if (!strcasecmp(c->argv[j + 1]->ptr, REPLICA_CAPA_DISABLE_SYNC_CRC_STR))
c->replica_capa |= REPLICA_CAPA_DISABLE_SYNC_CRC;
} else if (!strcasecmp(c->argv[j + 1]->ptr, REPLICA_CAPA_BYPASS_CRC_STR))
c->replica_capa |= REPLICA_CAPA_BYPASS_CRC;
} else if (!strcasecmp(c->argv[j]->ptr, "ack")) {
/* REPLCONF ACK is used by replica to inform the primary the amount
* of replication stream that it processed so far. It is an
Expand Down Expand Up @@ -1974,6 +1974,11 @@ static int useDisklessLoad(void) {
return enabled;
}

/* Returns 1 if the replica can skip CRC calculations during full sync */
int replicationBypassCRC(connection *conn, int is_replica_diskless_load, int is_primary_diskless_sync) {
return server.bypass_crc && is_replica_diskless_load && is_primary_diskless_sync && connIsIntegrityChecked(conn);
}

/* Helper function for readSyncBulkPayload() to initialize tempDb
* before socket-loading the new db from primary. The tempDb may be populated
* by swapMainDbWithTempDb or freed by disklessLoadDiscardTempDb later. */
Expand Down Expand Up @@ -2087,11 +2092,6 @@ void readSyncBulkPayload(connection *conn) {
(long long)server.repl_transfer_size, use_diskless_load ? "to parser" : "to disk");
}

// Set a flag to determin later whether or not the replica will skip CRC calculations for this sync -
// Disable CRC on replica if: (1) TLS is enabled; (2) replica disable_sync_crc is enabled; (3) diskelss sync enabled on both replica and primary.
// Otherwise, CRC should be enabled/disabled as per server.rdb_checksum
if (connIsTLS(conn) && server.disable_sync_crc && use_diskless_load && usemark)
server.repl_meet_disable_crc_cond = 1;
return;
}

Expand Down Expand Up @@ -2259,7 +2259,7 @@ void readSyncBulkPayload(connection *conn) {

serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Loading DB in memory");
startLoading(server.repl_transfer_size, RDBFLAGS_REPLICATION, asyncLoading);
if (server.repl_meet_disable_crc_cond == 1) rdb.flags |= RIO_FLAG_DISABLE_CRC;
if (replicationBypassCRC(conn, use_diskless_load, usemark)) rdb.flags |= RIO_FLAG_BYPASS_CRC;

int loadingFailed = 0;
rdbLoadingCtx loadingCtx = {.dbarray = dbarray, .functions_lib_ctx = functions_lib_ctx};
Expand Down Expand Up @@ -3523,17 +3523,16 @@ void syncWithPrimary(connection *conn) {
*
* EOF: supports EOF-style RDB transfer for diskless replication.
* PSYNC2: supports PSYNC v2, so understands +CONTINUE <new repl ID>.
* DISABLE-SYNC-CRC: supports disabling CRC calculations during full sync.
* Inform the primary of this capa only during diskless sync with TLS enabled.
* In disk-based sync, or non-TLS, there is more concern for data corruprion
* so we keep this extra layer of detection.
*
* BYPASS-CRC: supports skipping CRC calculations during full sync.
* Inform the primary of this capa only during diskless sync with TLS enabled.
* In disk-based sync, or non-TLS, there is more concern for data corruprion
* so we keep this extra layer of detection.
*
* The primary will ignore capabilities it does not understand. */
server.repl_meet_disable_crc_cond = 0; // reset this value before sync starts
int send_disable_crc_capa = (connIsTLS(conn) && server.disable_sync_crc && useDisklessLoad());
int send_bypass_crc_capa = replicationBypassCRC(conn, useDisklessLoad(), 1);
err = sendCommand(conn, "REPLCONF", "capa", "eof", "capa", "psync2",
send_disable_crc_capa ? "capa" : "",
send_disable_crc_capa ? REPLICA_CAPA_DISABLE_SYNC_CRC_STR : "",
send_bypass_crc_capa ? "capa" : "",
send_bypass_crc_capa ? REPLICA_CAPA_BYPASS_CRC_STR : "",
server.dual_channel_replication ? "capa" : "",
server.dual_channel_replication ? "dual-channel" : "", NULL);
if (err) goto write_error;
Expand Down
2 changes: 1 addition & 1 deletion src/rio.c
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ void rioFreeFd(rio *r) {
/* This function can be installed both in memory and file streams when checksum
* computation is needed. */
void rioGenericUpdateChecksum(rio *r, const void *buf, size_t len) {
if ((r->flags & RIO_FLAG_DISABLE_CRC) != 0) return; // skip CRC64 calculations
if ((r->flags & RIO_FLAG_BYPASS_CRC) != 0) return; // skip CRC64 calculations
r->cksum = crc64(r->cksum, buf, len);
}

Expand Down
2 changes: 1 addition & 1 deletion src/rio.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@

#define RIO_FLAG_READ_ERROR (1 << 0)
#define RIO_FLAG_WRITE_ERROR (1 << 1)
#define RIO_FLAG_DISABLE_CRC (1 << 2)
#define RIO_FLAG_BYPASS_CRC (1 << 2)

#define RIO_TYPE_FILE (1 << 0)
#define RIO_TYPE_BUFFER (1 << 1)
Expand Down
6 changes: 3 additions & 3 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -2218,7 +2218,7 @@ void initServerConfig(void) {
server.fsynced_reploff_pending = 0;
server.rdb_client_id = -1;
server.loading_process_events_interval_ms = LOADING_PROCESS_EVENTS_INTERVAL_DEFAULT;
server.repl_meet_disable_crc_cond = 0;
server.bypass_crc = 1;

/* Replication partial resync backlog */
server.repl_backlog = NULL;
Expand Down Expand Up @@ -2639,7 +2639,7 @@ void resetServerStats(void) {
server.stat_fork_rate = 0;
server.stat_total_forks = 0;
server.stat_rejected_conn = 0;
server.stat_total_crc_disabled_syncs_stated = 0;
server.stat_total_sync_bypass_crc = 0;
server.stat_sync_full = 0;
server.stat_sync_partial_ok = 0;
server.stat_sync_partial_err = 0;
Expand Down Expand Up @@ -5880,7 +5880,7 @@ sds genValkeyInfoString(dict *section_dict, int all_sections, int everything) {
"instantaneous_input_repl_kbps:%.2f\r\n", (float)getInstantaneousMetric(STATS_METRIC_NET_INPUT_REPLICATION) / 1024,
"instantaneous_output_repl_kbps:%.2f\r\n", (float)getInstantaneousMetric(STATS_METRIC_NET_OUTPUT_REPLICATION) / 1024,
"rejected_connections:%lld\r\n", server.stat_rejected_conn,
"total_crc_disabled_syncs_stated:%ld\r\n", server.stat_total_crc_disabled_syncs_stated,
"total_sync_bypass_crc:%ld\r\n", server.stat_total_sync_bypass_crc,
"sync_full:%lld\r\n", server.stat_sync_full,
"sync_partial_ok:%lld\r\n", server.stat_sync_partial_ok,
"sync_partial_err:%lld\r\n", server.stat_sync_partial_err,
Expand Down
9 changes: 4 additions & 5 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -438,10 +438,10 @@ typedef enum {
#define REPLICA_CAPA_EOF (1 << 0) /* Can parse the RDB EOF streaming format. */
#define REPLICA_CAPA_PSYNC2 (1 << 1) /* Supports PSYNC2 protocol. */
#define REPLICA_CAPA_DUAL_CHANNEL (1 << 2) /* Supports dual channel replication sync */
#define REPLICA_CAPA_DISABLE_SYNC_CRC (1 << 3) /* Disable CRC checks for sync requests. */
#define REPLICA_CAPA_BYPASS_CRC (1 << 3) /* Supports bypassing CRC checks for sync requests. */

/* Replica capability strings */
#define REPLICA_CAPA_DISABLE_SYNC_CRC_STR "disable-sync-crc" /* Disable CRC calculations during full sync */
#define REPLICA_CAPA_BYPASS_CRC_STR "bypass-crc" /* Supports bypassing CRC checks for sync requests. */

/* Replica requirements */
#define REPLICA_REQ_NONE 0
Expand Down Expand Up @@ -1842,7 +1842,7 @@ struct valkeyServer {
double stat_fork_rate; /* Fork rate in GB/sec. */
long long stat_total_forks; /* Total count of fork. */
long long stat_rejected_conn; /* Clients rejected because of maxclients */
size_t stat_total_crc_disabled_syncs_stated; /* Total number of full syncs stated with CRC checksum disabled */ // AMZN
size_t stat_total_sync_bypass_crc; /* Total number of full syncs stated with CRC checksum bypassed */
long long stat_sync_full; /* Number of full resyncs with replicas. */
long long stat_sync_partial_ok; /* Number of accepted PSYNC requests. */
long long stat_sync_partial_err; /* Number of unaccepted PSYNC requests. */
Expand Down Expand Up @@ -1991,7 +1991,7 @@ struct valkeyServer {
char *rdb_filename; /* Name of RDB file */
int rdb_compression; /* Use compression in RDB? */
int rdb_checksum; /* Use RDB checksum? */
int disable_sync_crc; /* Use RDB checksum during sync? Applicable only for TLS enabled diskless sync */
int bypass_crc; /* Skip RDB checksum? Applicable only for TLS enabled diskless full sync */
int rdb_del_sync_files; /* Remove RDB files used only for SYNC if
the instance does not use persistence. */
time_t lastsave; /* Unix time of last successful save */
Expand Down Expand Up @@ -2118,7 +2118,6 @@ struct valkeyServer {
* when it receives an error on the replication stream */
int repl_ignore_disk_write_error; /* Configures whether replicas panic when unable to
* persist writes to AOF. */
int repl_meet_disable_crc_cond; /* Set to true only when replica meets all conditions for disabling CRC */

/* The following two fields is where we store primary PSYNC replid/offset
* while the PSYNC is in progress. At the end we'll copy the fields into
Expand Down
3 changes: 3 additions & 0 deletions src/socket.c
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,9 @@ static ConnectionType CT_Socket = {
.process_pending_data = NULL,
.postpone_update_state = NULL,
.update_state = NULL,

/* Miselenious */
.connIntegrityChecked = NULL,
};

int connBlock(connection *conn) {
Expand Down
7 changes: 7 additions & 0 deletions src/tls.c
Original file line number Diff line number Diff line change
Expand Up @@ -814,6 +814,10 @@ static int connTLSListen(connListener *listener) {
return listenToPort(listener);
}

static int connTLSIsIntegrityChecked(void) {
return 1;
}

static void connTLSCloseListener(connListener *listener) {
connectionTypeTcp()->closeListener(listener);
}
Expand Down Expand Up @@ -1186,6 +1190,9 @@ static ConnectionType CT_TLS = {

/* TLS specified methods */
.get_peer_cert = connTLSGetPeerCert,

/* Miselenious */
.connIntegrityChecked = connTLSIsIntegrityChecked,
};

int RedisRegisterConnectionTypeTLS(void) {
Expand Down
3 changes: 3 additions & 0 deletions src/unix.c
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,9 @@ static ConnectionType CT_Unix = {
.process_pending_data = NULL,
.postpone_update_state = NULL,
.update_state = NULL,

/* Miselenious */
.connIntegrityChecked = NULL,
};

int RedisRegisterConnectionTypeUnix(void) {
Expand Down
37 changes: 37 additions & 0 deletions tests/integration/bypass-crc.tcl
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
start_server {tags {"repl tls"} overrides {save {}}} {
set primary [srv 0 client]
set primary_host [srv 0 host]
set primary_port [srv 0 port]
set primary_bypassed_crc_counter 0
foreach mds {no yes} {
foreach sdl {disabled on-empty-db swapdb flush-before-load} {
test "Bypass CRC sync - tls:$::tls, repl_diskless_sync:$mds, repl_diskless_load:$sdl" {
$primary config set repl-diskless-sync $mds
start_server {overrides {save {}}} {
set replica [srv 0 client]
$replica config set repl-diskless-load $sdl
$replica replicaof $primary_host $primary_port

wait_for_condition 50 100 {
[string match {*master_link_status:up*} [$replica info replication]]
} else {
fail "Replication not started"
}

set replica_bypassing_crc_count [string match {*total_sync_bypass_crc:1*} [$replica info stats]]
set stats [regexp -inline {total_sync_bypass_crc:(\d+)} [$primary info stats]]
set primary_bypass_crc_count [lindex $stats 1]

if {$sdl eq "disabled" || $mds eq "no" || !$::tls} {
assert_equal $primary_bypassed_crc_counter $primary_bypass_crc_count "Primary should not bypass CRC in this scenario"
assert_equal 0 $replica_bypassing_crc_count "Replica should not bypass CRC in this scenario"
} else {
incr primary_bypassed_crc_counter
assert_equal $primary_bypassed_crc_counter $primary_bypass_crc_count "Primary should bypass CRC in this scenario"
assert_equal 1 $replica_bypassing_crc_count "Replica should bypass CRC in this scenario"
}
}
}
}
}
}
Loading

0 comments on commit 2e5314e

Please sign in to comment.