Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Added SELECT redis command #482

Open
wants to merge 5 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions redis/include/userver/storages/redis/impl/base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,19 @@ struct ConnectionInfo {
bool read_only = false;
ConnectionSecurity connection_security = ConnectionSecurity::kNone;
using HostVector = std::vector<std::string>;
std::optional<size_t> database_index{};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New connections always use database index 0 so it is not really an optional:

Suggested change
std::optional<size_t> database_index{};
std::size_t database_index = 0;

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


ConnectionInfo() = default;
ConnectionInfo(std::string host, int port, Password password,
bool read_only = false,
ConnectionSecurity security = ConnectionSecurity::kNone)
ConnectionSecurity security = ConnectionSecurity::kNone,
std::optional<size_t> database_index = {})
: host{std::move(host)},
port{port},
password{std::move(password)},
read_only{read_only},
connection_security(security) {}
connection_security(security),
database_index{database_index} {}
};

struct Stat {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ struct RedisSettings {

std::vector<std::string> shards;
std::vector<HostPort> sentinels;
std::optional<size_t> database_index;
redis::Password password{std::string()};
redis::ConnectionSecurity secure_connection{redis::ConnectionSecurity::kNone};
};
Expand Down
80 changes: 64 additions & 16 deletions redis/src/storages/redis/impl/redis.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include <storages/redis/impl/redis_stats.hpp>
#include <storages/redis/impl/tcp_socket.hpp>
#include <userver/storages/redis/impl/reply.hpp>
#include <userver/utils/scope_guard.hpp>

#include "command_control_impl.hpp"

Expand Down Expand Up @@ -141,7 +142,7 @@ class Redis::RedisImpl : public std::enable_shared_from_this<Redis::RedisImpl> {
~RedisImpl();

void Connect(const ConnectionInfo::HostVector& host_addrs, int port,
const Password& password);
const Password& password, std::optional<size_t> database_index);
void Disconnect();

bool AsyncCommand(const CommandPtr& command);
Expand Down Expand Up @@ -228,6 +229,7 @@ class Redis::RedisImpl : public std::enable_shared_from_this<Redis::RedisImpl> {
void ProcessCommand(const CommandPtr& command);

void Authenticate();
void SelectDatabase();
void SendReadOnly();
void FreeCommands();

Expand All @@ -246,7 +248,8 @@ class Redis::RedisImpl : public std::enable_shared_from_this<Redis::RedisImpl> {
static bool WatchCommandTimerEnabled(
const CommandsBufferingSettings& commands_buffering_settings);

bool Connect(const std::string& host, int port, const Password& password);
bool Connect(const std::string& host, int port, const Password& password,
std::optional<size_t> database_index);

Redis* redis_obj_;
engine::ev::ThreadControl ev_thread_control_;
Expand All @@ -267,6 +270,7 @@ class Redis::RedisImpl : public std::enable_shared_from_this<Redis::RedisImpl> {
uint16_t port_ = 0;
std::string server_;
Password password_{std::string()};
std::optional<size_t> database_index_;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not optional and 0 by default

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

std::atomic<size_t> commands_size_ = 0;
size_t sent_count_ = 0;
size_t cmd_counter_ = 0;
Expand Down Expand Up @@ -331,8 +335,9 @@ Redis::~Redis() {
}

void Redis::Connect(const ConnectionInfo::HostVector& host_addrs, int port,
const Password& password) {
impl_->Connect(host_addrs, port, password);
const Password& password,
std::optional<size_t> database_index) {
impl_->Connect(host_addrs, port, password, database_index);
}

bool Redis::AsyncCommand(const CommandPtr& command) {
Expand Down Expand Up @@ -440,17 +445,19 @@ void Redis::RedisImpl::Detach() {
}

void Redis::RedisImpl::Connect(const ConnectionInfo::HostVector& host_addrs,
int port, const Password& password) {
int port, const Password& password,
std::optional<size_t> database_index) {
for (const auto& host : host_addrs)
if (Connect(host, port, password)) return;
if (Connect(host, port, password, database_index)) return;

LOG_ERROR() << "error async connect to Redis server (host addrs ="
<< host_addrs << ", port=" << port << ")";
SetState(State::kInitError);
}

bool Redis::RedisImpl::Connect(const std::string& host, int port,
const Password& password) {
const Password& password,
std::optional<size_t> database_index) {
UASSERT(context_ == nullptr);
UASSERT(state_ == State::kInit);

Expand All @@ -461,6 +468,7 @@ bool Redis::RedisImpl::Connect(const std::string& host, int port,
log_extra_.Extend("redis_server", GetServer());
log_extra_.Extend("server_id", GetServerId().GetId());
password_ = password;
database_index_ = database_index;
LOG_INFO() << log_extra_ << "Async connect to Redis server=" << GetServer();
context_ = redisAsyncConnect(host.c_str(), port);

Expand Down Expand Up @@ -1038,19 +1046,13 @@ bool Redis::RedisImpl::InitSecureConnection() {

void Redis::RedisImpl::Authenticate() {
if (password_.GetUnderlying().empty()) {
if (send_readonly_)
SendReadOnly();
else
SetState(State::kConnected);
SendReadOnly();
} else {
ProcessCommand(PrepareCommand(
CmdArgs{"AUTH", password_.GetUnderlying()},
[this](const CommandPtr&, ReplyPtr reply) {
if (*reply && reply->data.IsStatus()) {
if (send_readonly_)
SendReadOnly();
else
SetState(State::kConnected);
SendReadOnly();
} else {
if (*reply) {
if (reply->IsUnknownCommandError()) {
Expand All @@ -1077,12 +1079,17 @@ void Redis::RedisImpl::Authenticate() {
}

void Redis::RedisImpl::SendReadOnly() {
if (!send_readonly_) {
SelectDatabase();
return;
}

LOG_DEBUG() << "Send READONLY command to slave "
<< GetServerId().GetDescription() << " in cluster mode";
ProcessCommand(PrepareCommand(CmdArgs{"READONLY"}, [this](const CommandPtr&,
ReplyPtr reply) {
if (*reply && reply->data.IsStatus()) {
SetState(State::kConnected);
SelectDatabase();
} else {
if (*reply) {
LOG_LIMITED_ERROR()
Expand All @@ -1099,6 +1106,47 @@ void Redis::RedisImpl::SendReadOnly() {
}));
}

void Redis::RedisImpl::SelectDatabase() {
if (!database_index_) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better yet, if database index equals 0.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

SetState(RedisState::kConnected);
return;
}

ProcessCommand(PrepareCommand(
CmdArgs{"SELECT", *database_index_},
[this](const CommandPtr&, ReplyPtr reply) {
if (*reply && reply->data.IsStatus()) {
SetState(RedisState::kConnected);
LOG_INFO() << log_extra_
<< "Selected redis logical database with index "
<< *database_index_;
return;
}

const utils::ScopeGuard auto_disconnect([this]() { Disconnect(); });

if (!*reply) {
LOG_LIMITED_ERROR()
<< "SELECT failed with status " << reply->status << " ("
<< reply->status_string << ") " << log_extra_;
return;
}

if (reply->IsUnknownCommandError()) {
LOG_WARNING() << log_extra_
<< "SELECT failed: unknown command `SELECT` - "
"possible when connecting to Sentinel instead "
"of Redis master or slave instance";
return;
}

LOG_LIMITED_ERROR()
<< log_extra_
<< "SELECT failed: response type=" << reply->data.GetTypeString()
<< " msg=" << reply->data.ToDebugString();
}));
}

void Redis::RedisImpl::OnRedisReply(redisAsyncContext* c, void* r,
void* privdata) noexcept {
auto* impl = static_cast<Redis::RedisImpl*>(c->data);
Expand Down
3 changes: 2 additions & 1 deletion redis/src/storages/redis/impl/redis.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ class Redis {
Redis(Redis&& o) = delete;

void Connect(const ConnectionInfo::HostVector& host_addrs, int port,
const Password& password);
const Password& password,
std::optional<size_t> database_index = {});

bool AsyncCommand(const CommandPtr& command);
size_t GetRunningCommands() const;
Expand Down
1 change: 1 addition & 0 deletions redis/src/storages/redis/impl/redis_stats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ const std::string_view kCommandTypes[] = {
"scan",
"scard",
"script",
"select",
"sentinel",
"set",
"setex",
Expand Down
28 changes: 16 additions & 12 deletions redis/src/storages/redis/impl/sentinel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,18 @@ void ThrowIfCancelled() {

} // namespace

Sentinel::Sentinel(
const std::shared_ptr<ThreadPools>& thread_pools,
const std::vector<std::string>& shards,
const std::vector<ConnectionInfo>& conns, std::string shard_group_name,
const std::string& client_name, const Password& password,
ConnectionSecurity connection_security, ReadyChangeCallback ready_callback,
dynamic_config::Source dynamic_config_source,
std::unique_ptr<KeyShard>&& key_shard, CommandControl command_control,
const testsuite::RedisControl& testsuite_redis_control, ConnectionMode mode)
Sentinel::Sentinel(const std::shared_ptr<ThreadPools>& thread_pools,
const std::vector<std::string>& shards,
const std::vector<ConnectionInfo>& conns,
std::string shard_group_name, const std::string& client_name,
const Password& password,
ConnectionSecurity connection_security,
ReadyChangeCallback ready_callback,
dynamic_config::Source dynamic_config_source,
std::unique_ptr<KeyShard>&& key_shard,
CommandControl command_control,
const testsuite::RedisControl& testsuite_redis_control,
ConnectionMode mode, std::optional<size_t> database_index)
: thread_pools_(thread_pools),
secdist_default_command_control_(command_control),
testsuite_redis_control_(testsuite_redis_control) {
Expand Down Expand Up @@ -82,7 +85,7 @@ Sentinel::Sentinel(
*sentinel_thread_control_, thread_pools_->GetRedisThreadPool(), *this,
shards, conns, std::move(shard_group_name), client_name, password,
connection_security, std::move(ready_callback), std::move(key_shard),
dynamic_config_source, mode);
dynamic_config_source, mode, database_index);
}
});
}
Expand Down Expand Up @@ -152,7 +155,7 @@ std::shared_ptr<Sentinel> Sentinel::CreateSentinel(
// sentinels in cluster mode.
conns.emplace_back(sentinel.host, sentinel.port,
(key_shard ? Password("") : password), false,
settings.secure_connection);
settings.secure_connection, std::nullopt);
}

LOG_DEBUG() << "redis command_control:" << command_control.ToString();
Expand All @@ -162,7 +165,8 @@ std::shared_ptr<Sentinel> Sentinel::CreateSentinel(
thread_pools, shards, conns, std::move(shard_group_name), client_name,
password, settings.secure_connection, std::move(ready_callback),
dynamic_config_source, std::move(key_shard), command_control,
testsuite_redis_control);
testsuite_redis_control, ConnectionMode::kCommands,
settings.database_index);
client->Start();
}

Expand Down
3 changes: 2 additions & 1 deletion redis/src/storages/redis/impl/sentinel.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ class Sentinel {
std::unique_ptr<KeyShard>&& key_shard = nullptr,
CommandControl command_control = {},
const testsuite::RedisControl& testsuite_redis_control = {},
ConnectionMode mode = ConnectionMode::kCommands);
ConnectionMode mode = ConnectionMode::kCommands,
std::optional<size_t> database_index = {});
virtual ~Sentinel();

void Start();
Expand Down
8 changes: 6 additions & 2 deletions redis/src/storages/redis/impl/sentinel_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ SentinelImpl::SentinelImpl(
const std::string& client_name, const Password& password,
ConnectionSecurity connection_security, ReadyChangeCallback ready_callback,
std::unique_ptr<KeyShard>&& key_shard,
dynamic_config::Source dynamic_config_source, ConnectionMode mode)
dynamic_config::Source dynamic_config_source, ConnectionMode mode,
std::optional<size_t> database_index)
: sentinel_obj_(sentinel),
ev_thread_(sentinel_thread_control),
shard_group_name_(std::move(shard_group_name)),
Expand All @@ -79,7 +80,8 @@ SentinelImpl::SentinelImpl(
key_shard_(std::move(key_shard)),
connection_mode_(mode),
slot_info_(IsInClusterMode() ? std::make_unique<SlotInfo>() : nullptr),
dynamic_config_source_(dynamic_config_source) {
dynamic_config_source_(dynamic_config_source),
database_index_(database_index) {
for (size_t i = 0; i < init_shards_->size(); ++i) {
shards_[(*init_shards_)[i]] = i;
connected_statuses_.push_back(std::make_unique<ConnectedStatus>());
Expand Down Expand Up @@ -637,6 +639,7 @@ void SentinelImpl::ReadSentinels() {
for (auto shard_conn : info) {
if (shards_.find(shard_conn.Name()) != shards_.end()) {
shard_conn.SetConnectionSecurity(connection_security_);
shard_conn.SetDatabaseIndex(database_index_);
shard_found[shards_[shard_conn.Name()]] = true;
watcher->host_port_to_shard[shard_conn.HostPort()] =
shards_[shard_conn.Name()];
Expand Down Expand Up @@ -675,6 +678,7 @@ void SentinelImpl::ReadSentinels() {
shard_conn.SetName(shard);
shard_conn.SetReadOnly(true);
shard_conn.SetConnectionSecurity(connection_security_);
shard_conn.SetDatabaseIndex(database_index_);
if (shards_.find(shard_conn.Name()) != shards_.end())
watcher->host_port_to_shard[shard_conn.HostPort()] =
shards_[shard_conn.Name()];
Expand Down
4 changes: 3 additions & 1 deletion redis/src/storages/redis/impl/sentinel_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,8 @@ class SentinelImpl : public SentinelImplBase {
ReadyChangeCallback ready_callback,
std::unique_ptr<KeyShard>&& key_shard,
dynamic_config::Source dynamic_config_source,
ConnectionMode mode = ConnectionMode::kCommands);
ConnectionMode mode = ConnectionMode::kCommands,
std::optional<size_t> database_index = {});
~SentinelImpl() override;

std::unordered_map<ServerId, size_t, ServerIdHasher>
Expand Down Expand Up @@ -286,6 +287,7 @@ class SentinelImpl : public SentinelImplBase {
std::optional<CommandsBufferingSettings> commands_buffering_settings_;
dynamic_config::Source dynamic_config_source_;
std::atomic<int> publish_shard_{0};
std::optional<size_t> database_index_;
};

} // namespace redis
Expand Down
Loading
Loading