Skip to content

Commit c16d864

Browse files
committed
Support hash field expiration
1 parent 7dd0248 commit c16d864

File tree

6 files changed

+534
-22
lines changed

6 files changed

+534
-22
lines changed

src/commands/cmd_hash.cc

+243-1
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
#include "error_constants.h"
2424
#include "scan_base.h"
2525
#include "server/server.h"
26+
#include "time_util.h"
2627
#include "types/redis_hash.h"
2728

2829
namespace redis {
@@ -429,6 +430,238 @@ class CommandHRandField : public Commander {
429430
bool no_parameters_ = true;
430431
};
431432

433+
434+
435+
class CommandFiledExpireBase : public Commander {
436+
protected:
437+
Status commonParse(const std::vector<std::string> &args, int start_idx) {
438+
int idx = start_idx;
439+
CommandParser parser(args, idx);
440+
std::string_view expire_flag, num_flag;
441+
uint64_t fields_num = 0;
442+
while (parser.Good()) {
443+
if (parser.EatEqICaseFlag("FIELDS", num_flag)) {
444+
fields_num = GET_OR_RET(parser.template TakeInt<uint64_t>());
445+
idx += 2;
446+
break;
447+
} else if (parser.EatEqICaseFlag("NX", expire_flag)) {
448+
idx += 1;
449+
field_expire_type_ = HashFieldExpireType::NX;
450+
} else if (parser.EatEqICaseFlag("XX", expire_flag)) {
451+
idx += 1;
452+
field_expire_type_ = HashFieldExpireType::XX;
453+
} else if (parser.EatEqICaseFlag("GT", expire_flag)) {
454+
idx += 1;
455+
field_expire_type_ = HashFieldExpireType::GT;
456+
} else if (parser.EatEqICaseFlag("LT", expire_flag)) {
457+
idx += 1;
458+
field_expire_type_ = HashFieldExpireType::LT;
459+
} else {
460+
// TODO: 更明确的错误信息
461+
return parser.InvalidSyntax();
462+
}
463+
}
464+
465+
if (args.size() != idx + fields_num) {
466+
return { Status::RedisParseErr, errWrongNumOfArguments };
467+
}
468+
469+
for (size_t i = idx; i < args.size(); i++) {
470+
fields_.emplace_back(args_[i]);
471+
}
472+
473+
return Status::OK();
474+
}
475+
476+
Status expireFieldExecute(Server *srv, Connection *conn, std::string *output) {
477+
std::vector<int8_t> ret;
478+
redis::Hash hash_db(srv->storage, conn->GetNamespace());
479+
auto s = hash_db.ExpireFields(args_[1], expire_, fields_, field_expire_type_, &ret);
480+
if (!s.ok()) {
481+
return {Status::RedisExecErr, s.ToString()};
482+
}
483+
484+
*output = redis::MultiLen(ret.size());
485+
for (const auto i : ret) {
486+
output->append(redis::Integer(i));
487+
}
488+
489+
return Status::OK();
490+
}
491+
492+
Status ttlExpireExecute(Server *srv, Connection *conn, std::vector<int64_t> &ret) {
493+
redis::Hash hash_db(srv->storage, conn->GetNamespace());
494+
auto s = hash_db.TTLFields(args_[1], fields_, &ret);
495+
if (!s.ok()) {
496+
return {Status::RedisExecErr, s.ToString()};
497+
}
498+
return Status::OK();
499+
}
500+
501+
uint64_t expire_ = 0;
502+
HashFieldExpireType field_expire_type_ = HashFieldExpireType::None;
503+
std::vector<Slice> fields_;
504+
};
505+
506+
507+
class CommandHExpire : public CommandFiledExpireBase {
508+
public:
509+
Status Parse(const std::vector<std::string> &args) override {
510+
auto parse_result = ParseInt<uint64_t>(args[2], 10);
511+
if (!parse_result) return { Status::RedisParseErr, errValueNotInteger };
512+
513+
expire_ = *parse_result * 1000 + util::GetTimeStampMS();
514+
return CommandFiledExpireBase::commonParse(args, 3);
515+
}
516+
517+
Status Execute(Server *srv, Connection *conn, std::string *output) override {
518+
return expireFieldExecute(srv, conn, output);
519+
}
520+
};
521+
522+
class CommandHExpireAt : public CommandFiledExpireBase {
523+
public:
524+
Status Parse(const std::vector<std::string> &args) override {
525+
auto parse_result = ParseInt<uint64_t>(args[2], 10);
526+
if (!parse_result) return { Status::RedisParseErr, errValueNotInteger };
527+
528+
expire_ = *parse_result * 1000;
529+
return CommandFiledExpireBase::commonParse(args, 3);
530+
}
531+
532+
Status Execute(Server *srv, Connection *conn, std::string *output) override {
533+
return expireFieldExecute(srv, conn, output);
534+
}
535+
};
536+
537+
class CommandHPExpire : public CommandFiledExpireBase {
538+
public:
539+
Status Parse(const std::vector<std::string> &args) override {
540+
auto parse_result = ParseInt<uint64_t>(args[2], 10);
541+
if (!parse_result) return { Status::RedisParseErr, errValueNotInteger };
542+
543+
expire_ = *parse_result + util::GetTimeStampMS();
544+
return CommandFiledExpireBase::commonParse(args, 3);
545+
}
546+
547+
Status Execute(Server *srv, Connection *conn, std::string *output) override {
548+
return expireFieldExecute(srv, conn, output);
549+
}
550+
};
551+
552+
class CommandHPExpireAt : public CommandFiledExpireBase {
553+
public:
554+
Status Parse(const std::vector<std::string> &args) override {
555+
auto parse_result = ParseInt<uint64_t>(args[2], 10);
556+
if (!parse_result) return { Status::RedisParseErr, errValueNotInteger };
557+
558+
expire_ = *parse_result;
559+
return CommandFiledExpireBase::commonParse(args, 3);
560+
}
561+
562+
Status Execute(Server *srv, Connection *conn, std::string *output) override {
563+
return expireFieldExecute(srv, conn, output);
564+
}
565+
};
566+
567+
568+
class CommandHExpireTime : public CommandFiledExpireBase {
569+
public:
570+
Status Parse(const std::vector<std::string> &args) override {
571+
return CommandFiledExpireBase::commonParse(args, 2);
572+
}
573+
574+
Status Execute(Server *srv, Connection *conn, std::string *output) override {
575+
std::vector<int64_t> ret;
576+
auto s = ttlExpireExecute(srv, conn, ret);
577+
if (!s.IsOK()) {
578+
return {Status::RedisExecErr, s.Msg()};
579+
}
580+
auto now = util::GetTimeStampMS();
581+
*output = redis::MultiLen(ret.size());
582+
for (const auto ttl : ret) {
583+
uint64_t expire = ttl;
584+
if (ttl > 0) {
585+
expire = (now + expire) / 1000;
586+
}
587+
output->append(redis::Integer(expire));
588+
}
589+
return Status::OK();
590+
}
591+
};
592+
593+
class CommandHPExpireTime : public CommandFiledExpireBase {
594+
public:
595+
Status Parse(const std::vector<std::string> &args) override {
596+
return CommandFiledExpireBase::commonParse(args, 2);
597+
}
598+
599+
Status Execute(Server *srv, Connection *conn, std::string *output) override {
600+
std::vector<int64_t> ret;
601+
auto s = ttlExpireExecute(srv, conn, ret);
602+
if (!s.IsOK()) {
603+
return {Status::RedisExecErr, s.Msg()};
604+
}
605+
auto now = util::GetTimeStampMS();
606+
*output = redis::MultiLen(ret.size());
607+
for (const auto ttl : ret) {
608+
uint64_t expire = ttl;
609+
if (ttl > 0) {
610+
expire = now + expire;
611+
}
612+
output->append(redis::Integer(expire));
613+
}
614+
return Status::OK();
615+
}
616+
};
617+
618+
class CommandHTTL : public CommandFiledExpireBase {
619+
public:
620+
Status Parse(const std::vector<std::string> &args) override {
621+
return CommandFiledExpireBase::commonParse(args, 2);
622+
}
623+
624+
Status Execute(Server *srv, Connection *conn, std::string *output) override {
625+
std::vector<int64_t> ret;
626+
auto s = ttlExpireExecute(srv, conn, ret);
627+
if (!s.IsOK()) {
628+
return {Status::RedisExecErr, s.Msg()};
629+
}
630+
*output = redis::MultiLen(ret.size());
631+
for (const auto ttl : ret) {
632+
output->append(redis::Integer(ttl > 0 ? ttl / 1000 : ttl));
633+
}
634+
return Status::OK();
635+
}
636+
};
637+
638+
class CommandHPTTL : public CommandFiledExpireBase {
639+
public:
640+
Status Parse(const std::vector<std::string> &args) override {
641+
return CommandFiledExpireBase::commonParse(args, 2);
642+
}
643+
644+
Status Execute(Server *srv, Connection *conn, std::string *output) override {
645+
std::vector<int64_t> ret;
646+
auto s = ttlExpireExecute(srv, conn, ret);
647+
if (!s.IsOK()) {
648+
return {Status::RedisExecErr, s.Msg()};
649+
}
650+
*output = redis::MultiLen(ret.size());
651+
for (const auto ttl : ret) {
652+
output->append(redis::Integer(ttl));
653+
}
654+
return Status::OK();
655+
}
656+
};
657+
658+
659+
class CommandHPersist : public CommandFiledExpireBase {
660+
public:
661+
Status Parse(const std::vector<std::string> &args) override {}
662+
Status Execute(Server *srv, Connection *conn, std::string *output) override {}
663+
};
664+
432665
REDIS_REGISTER_COMMANDS(MakeCmdAttr<CommandHGet>("hget", 3, "read-only", 1, 1, 1),
433666
MakeCmdAttr<CommandHIncrBy>("hincrby", 4, "write", 1, 1, 1),
434667
MakeCmdAttr<CommandHIncrByFloat>("hincrbyfloat", 4, "write", 1, 1, 1),
@@ -445,6 +678,15 @@ REDIS_REGISTER_COMMANDS(MakeCmdAttr<CommandHGet>("hget", 3, "read-only", 1, 1, 1
445678
MakeCmdAttr<CommandHGetAll>("hgetall", 2, "read-only", 1, 1, 1),
446679
MakeCmdAttr<CommandHScan>("hscan", -3, "read-only", 1, 1, 1),
447680
MakeCmdAttr<CommandHRangeByLex>("hrangebylex", -4, "read-only", 1, 1, 1),
448-
MakeCmdAttr<CommandHRandField>("hrandfield", -2, "read-only", 1, 1, 1), )
681+
MakeCmdAttr<CommandHRandField>("hrandfield", -2, "read-only", 1, 1, 1),
682+
MakeCmdAttr<CommandHExpire>("hexpire", -6, "write", 1, 1, 1),
683+
MakeCmdAttr<CommandHExpireAt>("hexpireat", -6, "write", 1, 1, 1),
684+
MakeCmdAttr<CommandHExpireTime>("hexpiretime", -5, "read-only", 1, 1, 1),
685+
MakeCmdAttr<CommandHPExpire>("hpexpire", -6, "write", 1, 1, 1),
686+
MakeCmdAttr<CommandHPExpireAt>("hpexpireat", -6, "write", 1, 1, 1),
687+
MakeCmdAttr<CommandHPExpireTime>("hpexpiretime", -5, "read-only", 1, 1, 1),
688+
MakeCmdAttr<CommandHPersist>("hpersist", -5, "write", 1, 1, 1),
689+
MakeCmdAttr<CommandHTTL>("httl", -5, "read-only", 1, 1, 1),
690+
MakeCmdAttr<CommandHPTTL>("hpttl", -5, "read-only", 1, 1, 1), )
449691

450692
} // namespace redis

src/storage/redis_db.cc

+13-1
Original file line numberDiff line numberDiff line change
@@ -568,6 +568,7 @@ rocksdb::Status SubKeyScanner::Scan(RedisType type, const Slice &user_key, const
568568
} else {
569569
start_key = match_prefix_key;
570570
}
571+
auto now = util::GetTimeStampMS();
571572
for (iter->Seek(start_key); iter->Valid(); iter->Next()) {
572573
if (!cursor.empty() && iter->key() == start_key) {
573574
// if cursor is not empty, then we need to skip start_key
@@ -578,9 +579,20 @@ rocksdb::Status SubKeyScanner::Scan(RedisType type, const Slice &user_key, const
578579
break;
579580
}
580581
InternalKey ikey(iter->key(), storage_->IsSlotIdEncoded());
582+
auto value = iter->value().ToString();
583+
// filter expired hash feild
584+
if (type == kRedisHash && (static_cast<HashMetadata*>(&metadata))->IsEncodedFieldExpire()) {
585+
uint64_t expire = 0;
586+
rocksdb::Slice data(value.data(), value.size());
587+
GetFixed64(&data, &expire);
588+
value = data.ToString();
589+
if (expire != 0 && expire <= now) {
590+
continue;
591+
}
592+
}
581593
keys->emplace_back(ikey.GetSubKey().ToString());
582594
if (values != nullptr) {
583-
values->emplace_back(iter->value().ToString());
595+
values->emplace_back(value);
584596
}
585597
cnt++;
586598
if (limit > 0 && cnt >= limit) {

src/storage/redis_metadata.cc

+4
Original file line numberDiff line numberDiff line change
@@ -334,6 +334,10 @@ bool Metadata::IsEmptyableType() const {
334334

335335
bool Metadata::Expired() const { return ExpireAt(util::GetTimeStampMS()); }
336336

337+
bool HashMetadata::IsEncodedFieldExpire() const {
338+
return flags & METADATA_HASH_FIELD_EXPIRE_MASK;
339+
}
340+
337341
ListMetadata::ListMetadata(bool generate_version)
338342
: Metadata(kRedisList, generate_version), head(UINT64_MAX / 2), tail(head) {}
339343

src/storage/redis_metadata.h

+3
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,7 @@ class InternalKey {
134134
};
135135

136136
constexpr uint8_t METADATA_64BIT_ENCODING_MASK = 0x80;
137+
constexpr uint8_t METADATA_HASH_FIELD_EXPIRE_MASK = 0x40;
137138
constexpr uint8_t METADATA_TYPE_MASK = 0x0f;
138139

139140
class Metadata {
@@ -203,6 +204,8 @@ class Metadata {
203204
class HashMetadata : public Metadata {
204205
public:
205206
explicit HashMetadata(bool generate_version = true) : Metadata(kRedisHash, generate_version) {}
207+
208+
bool IsEncodedFieldExpire() const;
206209
};
207210

208211
class SetMetadata : public Metadata {

0 commit comments

Comments
 (0)