Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion source/dnode/mnode/impl/inc/mndDef.h
Original file line number Diff line number Diff line change
Expand Up @@ -1044,7 +1044,6 @@ typedef struct {
int32_t resetOffsetCfg;
int32_t sessionTimeoutMs;
int32_t maxPollIntervalMs;
int64_t ownerId;
} SMqConsumerObj;

int32_t tNewSMqConsumerObj(int64_t consumerId, char* cgroup, int8_t updateType, char* topic, SCMSubscribeReq* subscribe,
Expand Down
8 changes: 2 additions & 6 deletions source/dnode/mnode/impl/src/mndConsumer.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
#include "tcompare.h"
#include "tname.h"

#define MND_CONSUMER_VER_NUMBER 4
#define MND_CONSUMER_VER_NUMBER 3
#define MND_CONSUMER_RESERVE_SIZE 64

#define MND_MAX_GROUP_PER_TOPIC 100
Expand Down Expand Up @@ -680,15 +680,12 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
char *msgStr = pMsg->pCont;
int32_t code = 0;
int32_t lino = 0;
SUserObj *pOperUser = NULL;
SMqConsumerObj *pConsumerNew = NULL;
STrans *pTrans = NULL;

PRINT_LOG_START
SCMSubscribeReq subscribe = {0};
MND_TMQ_RETURN_CHECK(tDeserializeSCMSubscribeReq(msgStr, &subscribe, pMsg->contLen));
MND_TMQ_RETURN_CHECK(mndAcquireUser(pMnode, RPC_MSG_USER(pMsg), &pOperUser));
subscribe.ownerId = pOperUser->uid;
bool unSubscribe = (taosArrayGetSize(subscribe.topicNames) == 0);
if(unSubscribe){
SMqConsumerObj *pConsumerTmp = NULL;
Expand All @@ -715,7 +712,6 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {

END:
mndTransDrop(pTrans);
mndReleaseUser(pMnode, pOperUser);
tDeleteSMqConsumerObj(pConsumerNew);
taosArrayDestroyP(subscribe.topicNames, NULL);
code = (code == TSDB_CODE_TMQ_NO_NEED_REBALANCE || code == TSDB_CODE_MND_CONSUMER_NOT_EXIST) ? 0 : code;
Expand Down Expand Up @@ -1131,7 +1127,7 @@ static int32_t retrieveOneConsumer(SRpcMsg *pReq, SMqConsumerObj *pConsumer, SUs

for (int32_t i = 0; i < topicSz; i++) {
char *pTopicFName = taosArrayGetP(pConsumer->assignedTopics, i);
if (!showAll && (pOperUser->uid != pConsumer->ownerId)) {
if (!showAll && (strncmp(pOperUser->name, pConsumer->user, TSDB_USER_LEN) != 0)) {
bool showConsumer = false;
SMqTopicObj *pTopic = NULL;
(void)mndAcquireTopic(pMnode, pTopicFName, &pTopic);
Expand Down
5 changes: 0 additions & 5 deletions source/dnode/mnode/impl/src/mndDef.c
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,6 @@ int32_t tNewSMqConsumerObj(int64_t consumerId, char *cgroup, int8_t updateType,
pConsumer->sessionTimeoutMs = subscribe->sessionTimeoutMs;
tstrncpy(pConsumer->user, subscribe->user, TSDB_USER_LEN);
tstrncpy(pConsumer->fqdn, subscribe->fqdn, TSDB_FQDN_LEN);
pConsumer->ownerId = subscribe->ownerId;

pConsumer->rebNewTopics = taosArrayDup(subscribe->topicNames, topicNameDup);
if (pConsumer->rebNewTopics == NULL) {
Expand Down Expand Up @@ -266,7 +265,6 @@ int32_t tEncodeSMqConsumerObj(void **buf, const SMqConsumerObj *pConsumer) {
tlen += taosEncodeFixedI32(buf, pConsumer->sessionTimeoutMs);
tlen += taosEncodeString(buf, pConsumer->user);
tlen += taosEncodeString(buf, pConsumer->fqdn);
tlen += taosEncodeFixedI64(buf, pConsumer->ownerId);
return tlen;
}

Expand Down Expand Up @@ -347,9 +345,6 @@ void *tDecodeSMqConsumerObj(const void *buf, SMqConsumerObj *pConsumer, int8_t s
pConsumer->maxPollIntervalMs = DEFAULT_MAX_POLL_INTERVAL;
pConsumer->sessionTimeoutMs = DEFAULT_SESSION_TIMEOUT;
}
if (sver > 3) {
buf = taosDecodeFixedI64(buf, &pConsumer->ownerId);
}

return (void *)buf;
}
Expand Down
2 changes: 1 addition & 1 deletion source/dnode/mnode/impl/src/mndSubscribe.c
Original file line number Diff line number Diff line change
Expand Up @@ -1853,7 +1853,7 @@ static int32_t retrieveSub(SRpcMsg *pReq, SMqSubscribeObj *pSub, SUserObj *pOper
if (pConsumer != NULL) {
user = pConsumer->user;
fqdn = pConsumer->fqdn;
if (pConsumer->ownerId == pOperUser->uid) {
if (strncmp(pConsumer->user, pOperUser->name, TSDB_USER_LEN) == 0) {
subscribeOwner = true;
}
sdbRelease(pSdb, pConsumer);
Expand Down
Loading