Redis6.0.6 sentinel

Redis6.0.6 sentinel

redis sentinel redis redis sentinel

Sentinel

  • Sentinel
  • Sentinel Sentinel

sentinel monitor <master-name> <ip> <port> <count>

count sentinel sentinel +1

sentinel down-after-millseconds <master-name> 100000

sentinel failover-timeout mymaster 180000

char *sentinelHandleConfiguration(char **argv, int argc) { sentinelRedisInstance *ri; if (!strcasecmp(argv[0],"monitor") && argc == 5) { /* monitor <name> <host> <port> <quorum> */ //quorum failover int quorum = atoi(argv[4]); if (quorum <= 0) return "Quorum must be 1 or greater."; //master if (createSentinelRedisInstance(argv[1],SRI_MASTER,argv[2], atoi(argv[3]),quorum,NULL) == NULL) { switch(errno) { case EBUSY: return "Duplicated master name."; case ENOENT: return "Can't resolve master instance hostname."; case EINVAL: return "Invalid port number"; } } //down-after-milliseconds } else if (!strcasecmp(argv[0],"down-after-milliseconds") && argc == 3) { /* down-after-milliseconds <name> <milliseconds> */ // ri = sentinelGetMasterByName(argv[1]); if (!ri) return "No such master with specified name."; ri->down_after_period = atoi(argv[2]); if (ri->down_after_period <= 0) return "negative or zero time parameter."; sentinelPropagateDownAfterPeriod(ri); // } else if (!strcasecmp(argv[0],"failover-timeout") && argc == 3) { /* failover-timeout <name> <milliseconds> */ ri = sentinelGetMasterByName(argv[1]); if (!ri) return "No such master with specified name."; ri->failover_timeout = atoi(argv[2]); if (ri->failover_timeout <= 0) return "negative or zero time parameter."; //Redis } else if (!strcasecmp(argv[0],"parallel-syncs") && argc == 3) { /* parallel-syncs <name> <milliseconds> */ ri = sentinelGetMasterByName(argv[1]); if (!ri) return "No such master with specified name."; ri->parallel_syncs = atoi(argv[2]); // }

sentinelHandleConfiguration createSentinelRedisInstance

sentinelRedisInstance *createSentinelRedisInstance(char *name, int flags, char *hostname, int port, int quorum, sentinelRedisInstance *master) { sentinelRedisInstance *ri; sentinelAddr *addr; dict *table = NULL; char slavename[NET_PEER_ID_LEN], *sdsname; serverAssert(flags & (SRI_MASTER|SRI_SLAVE|SRI_SENTINEL)); serverAssert((flags & SRI_MASTER) || master != NULL); /* Check address validity. */ addr = createSentinelAddr(hostname,port); if (addr == NULL) return NULL; /* For slaves use ip:port as name. */ if (flags & SRI_SLAVE) { anetFormatAddr(slavename, sizeof(slavename), hostname, port); name = slavename; } if (flags & SRI_MASTER) table = sentinel.masters; else if (flags & SRI_SLAVE) table = master->slaves; else if (flags & SRI_SENTINEL) table = master->sentinels; // sdsname = sdsnew(name); if (dictFind(table,sdsname)) { releaseSentinelAddr(addr); sdsfree(sdsname); errno = EBUSY; return NULL; }

sentinel.masters slaves sentinels

void initSentinel(void) { unsigned int j; //redis dictEmpty(server.commands,NULL); for (j = 0; j < sizeof(sentinelcmds)/sizeof(sentinelcmds[0]); j++) { int retval; struct redisCommand *cmd = sentinelcmds+j; retval = dictAdd(server.commands, sdsnew(cmd->name), cmd); serverAssert(retval == DICT_OK); if (populateCommandTableParseFlags(cmd,cmd->sflags) == C_ERR) serverPanic("Unsupported command flag"); } // sentinel.current_epoch = 0; sentinel.masters = dictCreate(&instancesDictType,NULL); sentinel.tilt = 0; sentinel.tilt_start_time = 0; sentinel.previous_time = mstime(); sentinel.running_scripts = 0; sentinel.scripts_queue = listCreate(); sentinel.announce_ip = NULL; sentinel.announce_port = 0; sentinel.simfailure_flags = SENTINEL_SIMFAILURE_NONE; sentinel.deny_scripts_reconfig = SENTINEL_DEFAULT_DENY_SCRIPTS_RECONFIG; memset(sentinel.myid,0,sizeof(sentinel.myid)); }

initSentinel current_epoch raft term

sentinel

  • 1S ping ping
  • 2S hello hello master master epoch
  • 10s info info slave

  • ping

sentinel raft

void sentinelTimer(void) { // sentinelCheckTiltCondition(); sentinelHandleDictOfRedisInstances(sentinel.masters); sentinelRunPendingScripts(); sentinelCollectTerminatedScripts(); sentinelKillTimedoutScripts();

void sentinelCheckTiltCondition(void) { mstime_t now = mstime(); mstime_t delta = now - sentinel.previous_time; if (delta < 0 || delta > SENTINEL_TILT_TRIGGER) { sentinel.tilt = 1; sentinel.tilt_start_time = mstime(); sentinelEvent(LL_WARNING,"+tilt",NULL,"#tilt mode entered"); } sentinel.previous_time = mstime(); }

sentinelCheckTiltCondition 2s titl titl

void sentinelHandleDictOfRedisInstances(dict *instances) { dictIterator *di; dictEntry *de; sentinelRedisInstance *switch_to_promoted = NULL; // di = dictGetIterator(instances); while((de = dictNext(di)) != NULL) { sentinelRedisInstance *ri = dictGetVal(de); sentinelHandleRedisInstance(ri); if (ri->flags & SRI_MASTER) { sentinelHandleDictOfRedisInstances(ri->slaves); sentinelHandleDictOfRedisInstances(ri->sentinels); // if (ri->failover_state == SENTINEL_FAILOVER_STATE_UPDATE_CONFIG) { switch_to_promoted = ri; } } } if (switch_to_promoted) sentinelFailoverSwitchToPromotedSlave(switch_to_promoted); dictReleaseIterator(di); }

sentinelHandleDictOfRedisInstances master slave sentinel sentinelHandleRedisInstance

void sentinelHandleRedisInstance(sentinelRedisInstance *ri) { // sentinelReconnectInstance(ri); // sentinelSendPeriodicCommands(ri); //titl if (sentinel.tilt) { if (mstime()-sentinel.tilt_start_time < SENTINEL_TILT_PERIOD) return; sentinel.tilt = 0; sentinelEvent(LL_WARNING,"-tilt",NULL,"#tilt mode exited"); } sentinelCheckSubjectivelyDown(ri); if (ri->flags & (SRI_MASTER|SRI_SLAVE)) { } /* Only masters */ if (ri->flags & SRI_MASTER) { //master ODWON sentinelCheckObjectivelyDown(ri); // if (sentinelStartFailoverIfNeeded(ri)) sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_ASK_FORCED); sentinelFailoverStateMachine(ri); sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_NO_FLAGS); } }

sentinelHandleRedisInstance

sentinelReconnectInstance PUB/SUB host port

sentinelSendPeriodicCommands INFO PING HELLO

void sentinelSendPeriodicCommands(sentinelRedisInstance *ri) { mstime_t now = mstime(); mstime_t info_period, ping_period; int retval; //disconnected if (ri->link->disconnected) return; //100 if (ri->link->pending_commands >= SENTINEL_MAX_PENDING_COMMANDS * ri->link->refcount) return; //info10 down 1s if ((ri->flags & SRI_SLAVE) && ((ri->master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS)) || (ri->master_link_down_time != 0))) { info_period = 1000; } else { info_period = SENTINEL_INFO_PERIOD; } //ping1s down_after_period down_after_period ping_period = ri->down_after_period; if (ping_period > SENTINEL_PING_PERIOD) ping_period = SENTINEL_PING_PERIOD; //info if ((ri->flags & SRI_SENTINEL) == 0 && (ri->info_refresh == 0 || (now - ri->info_refresh) > info_period)) { retval = redisAsyncCommand(ri->link->cc, sentinelInfoReplyCallback, ri, "%s", sentinelInstanceMapCommand(ri,"INFO")); if (retval == C_OK) ri->link->pending_commands++; } //ping if ((now - ri->link->last_pong_time) > ping_period && (now - ri->link->last_ping_time) > ping_period/2) { sentinelSendPing(ri); } //hello 2s if ((now - ri->last_pub_time) > SENTINEL_PUBLISH_PERIOD) { sentinelSendHello(ri); } }

  • 100 100
  • INFO Master Slave 10S 1S
  • ping 1s
  • hello 2s
ping

int sentinelSendPing(sentinelRedisInstance *ri) { int retval = redisAsyncCommand(ri->link->cc, sentinelPingReplyCallback, ri, "%s", sentinelInstanceMapCommand(ri,"PING")); if (retval == C_OK) { ri->link->pending_commands++; ri->link->last_ping_time = mstime(); /* We update the active ping time only if we received the pong for * the previous ping, otherwise we are technically waiting since the * first ping that did not receive a reply. */ if (ri->link->act_ping_time == 0) ri->link->act_ping_time = ri->link->last_ping_time; return 1; } else { return 0; } }

ping last_ping_time ping sentinelPingReplyCallback

void sentinelPingReplyCallback(redisAsyncContext *c, void *reply, void *privdata) { sentinelRedisInstance *ri = privdata; instanceLink *link = c->data; redisReply *r; if (!reply || !link) return; link->pending_commands--; r = reply; if (r->type == REDIS_REPLY_STATUS || r->type == REDIS_REPLY_ERROR) { if (strncmp(r->str,"PONG",4) == 0 || strncmp(r->str,"LOADING",7) == 0 || strncmp(r->str,"MASTERDOWN",10) == 0) { link->last_avail_time = mstime(); link->act_ping_time = 0; } else { //lua kill if (strncmp(r->str,"BUSY",4) == 0 && (ri->flags & SRI_S_DOWN) && !(ri->flags & SRI_SCRIPT_KILL_SENT)) { if (redisAsyncCommand(ri->link->cc, sentinelDiscardReplyCallback, ri, "%s KILL", sentinelInstanceMapCommand(ri,"SCRIPT")) == C_OK) { ri->link->pending_commands++; } ri->flags |= SRI_SCRIPT_KILL_SENT; } } } //ping last_avail_time last_pong_time link->last_pong_time = mstime(); }

last_pong_time last_avail_time

hello

snprintf(payload,sizeof(payload), "%s,%d,%s,%llu," // "%s,%s,%d,%llu", //master announce_ip, announce_port, sentinel.myid, (unsigned long long) sentinel.current_epoch, /* --- */ master->name,master_addr->ip,master_addr->port, (unsigned long long) master->config_epoch); retval = redisAsyncCommand(ri->link->cc, sentinelPublishReplyCallback, ri, "%s %s %s", sentinelInstanceMapCommand(ri,"PUBLISH"), SENTINEL_HELLO_CHANNEL,payload); if (retval != C_OK) return C_ERR; ri->link->pending_commands++; return C_OK;

hello hello leader ip port myid master_name current_epoch master current_epoch

void sentinelPublishCommand(client *c) { if (strcmp(c->argv[1]->ptr,SENTINEL_HELLO_CHANNEL)) { addReplyError(c, "Only HELLO messages are accepted by Sentinel instances."); return; } sentinelProcessHelloMessage(c->argv[2]->ptr,sdslen(c->argv[2]->ptr)); addReplyLongLong(c,1); }

hello sentinelProcessHelloMessage

void sentinelProcessHelloMessage(char *hello, int hello_len) { /* * 0=ip,1=port,2=runid,3=current_epoch,4=master_name, * 5=master_ip,6=master_port,7=master_config_epoch. */ int numtokens, port, removed, master_port; uint64_t current_epoch, master_config_epoch; char **token = sdssplitlen(hello, hello_len, ",", 1, &numtokens); sentinelRedisInstance *si, *master; if (numtokens == 8) { //master master = sentinelGetMasterByName(token[4]); if (!master) goto cleanup; port = atoi(token[1]); master_port = atoi(token[6]); // si = getSentinelRedisInstanceByAddrAndRunID( master->sentinels,token[0],port,token[2]); current_epoch = strtoull(token[3],NULL,10); master_config_epoch = strtoull(token[7],NULL,10); if (!si) { //master removed = removeMatchingSentinelFromMaster(master,token[2]); if (removed) { sentinelEvent(LL_NOTICE,"+sentinel-address-switch",master, "%@ ip %s port %d for %s", token[0],port,token[2]); } else { // 0 sentinelRedisInstance *other = getSentinelRedisInstanceByAddrAndRunID( master->sentinels, token[0],port,NULL); if (other) { sentinelEvent(LL_NOTICE,"+sentinel-invalid-addr",other,"%@"); other->addr->port = 0; //master sentinelUpdateSentinelAddressInAllMasters(other); } } // si = createSentinelRedisInstance(token[2],SRI_SENTINEL, token[0],port,master->quorum,master); if (si) { if (!removed) sentinelEvent(LL_NOTICE,"+sentinel",si,"%@"); si->runid = sdsnew(token[2]); sentinelTryConnectionSharing(si); if (removed) sentinelUpdateSentinelAddressInAllMasters(si); sentinelFlushConfig(); } } //current_epoch if (current_epoch > sentinel.current_epoch) { sentinel.current_epoch = current_epoch; sentinelFlushConfig(); sentinelEvent(LL_WARNING,"+new-epoch",master,"%llu", (unsigned long long) sentinel.current_epoch); } //master if (si && master->config_epoch < master_config_epoch) { master->config_epoch = master_config_epoch; if (master_port != master->addr->port || strcmp(master->addr->ip, token[5])) { sentinelAddr *old_addr; sentinelEvent(LL_WARNING,"+config-update-from",si,"%@"); sentinelEvent(LL_WARNING,"+switch-master", master,"%s %s %d %s %d", master->name, master->addr->ip, master->addr->port, token[5], master_port); old_addr = dupSentinelAddr(master->addr); sentinelResetMasterAndChangeAddress(master, token[5], master_port); sentinelCallClientReconfScript(master, SENTINEL_OBSERVER,"start", old_addr,master->addr); releaseSentinelAddr(old_addr); } } /* Update the state of the Sentinel. */ if (si) si->last_hello_time = mstime(); } cleanup: sdsfreesplitres(token,numtokens); }

  • master sentinel
  • master sentinel sentinel sentinel
  • raft current_epoch current_epoch
  • master_epoch master

++hello callbak ++

info
if (defsections || allsections || !strcasecmp(section,"sentinel")) { dictIterator *di; dictEntry *de; int master_id = 0; if (sections++) info = sdscat(info,"\r\n"); info = sdscatprintf(info, "# Sentinel\r\n" "sentinel_masters:%lu\r\n" "sentinel_tilt:%d\r\n" "sentinel_running_scripts:%d\r\n" "sentinel_scripts_queue_length:%ld\r\n" "sentinel_simulate_failure_flags:%lu\r\n", dictSize(sentinel.masters), sentinel.tilt, sentinel.running_scripts, listLength(sentinel.scripts_queue), sentinel.simfailure_flags); di = dictGetIterator(sentinel.masters); while((de = dictNext(di)) != NULL) { sentinelRedisInstance *ri = dictGetVal(de); char *status = "ok"; if (ri->flags & SRI_O_DOWN) status = "odown"; else if (ri->flags & SRI_S_DOWN) status = "sdown"; info = sdscatprintf(info, "master%d:name=%s,status=%s,address=%s:%d," "slaves=%lu,sentinels=%lu\r\n", master_id++, ri->name, status, ri->addr->ip, ri->addr->port, dictSize(ri->slaves), dictSize(ri->sentinels)+1); } dictReleaseIterator(di); } addReplyBulkSds(c, info); }

info info

void sentinelInfoReplyCallback(redisAsyncContext *c, void *reply, void *privdata) { sentinelRedisInstance *ri = privdata; instanceLink *link = c->data; redisReply *r; if (!reply || !link) return; link->pending_commands--; r = reply; if (r->type == REDIS_REPLY_STRING) sentinelRefreshInstanceInfo(ri,r->str); }

info Callbacl sentinelRefreshInstanceInfo

if (sentinelRedisInstanceLookupSlave(ri,ip,atoi(port)) == NULL) { if ((slave = createSentinelRedisInstance(NULL,SRI_SLAVE,ip, atoi(port), ri->quorum, ri)) != NULL) { sentinelEvent(LL_NOTICE,"+slave",slave,"%@"); sentinelFlushConfig(); } }

host port info

  • info
  • slaveOf
  • slaveOf

void sentinelCheckSubjectivelyDown(sentinelRedisInstance *ri) { mstime_t elapsed = 0; if (ri->link->act_ping_time) elapsed = mstime() - ri->link->act_ping_time; else if (ri->link->disconnected) elapsed = mstime() - ri->link->last_avail_time; // //1) SENTINEL_MIN_LINK_RECONNECT_PERIOD down_after_period if (ri->link->cc && (mstime() - ri->link->cc_conn_time) > SENTINEL_MIN_LINK_RECONNECT_PERIOD && ri->link->act_ping_time != 0 && (mstime() - ri->link->act_ping_time) > (ri->down_after_period/2) && (mstime() - ri->link->last_pong_time) > (ri->down_after_period/2)) { instanceLinkCloseConnection(ri->link,ri->link->cc); } //2) pubsub SENTINEL_PUBLISH_PERIOD*3 if (ri->link->pc && (mstime() - ri->link->pc_conn_time) > SENTINEL_MIN_LINK_RECONNECT_PERIOD && (mstime() - ri->link->pc_last_activity) > (SENTINEL_PUBLISH_PERIOD*3)) { instanceLinkCloseConnection(ri->link,ri->link->pc); } //SDOWN //1) //2) if (elapsed > ri->down_after_period || (ri->flags & SRI_MASTER && ri->role_reported == SRI_SLAVE && mstime() - ri->role_reported_time > (ri->down_after_period+SENTINEL_INFO_PERIOD*2))) { /* Is subjectively down */ if ((ri->flags & SRI_S_DOWN) == 0) { sentinelEvent(LL_WARNING,"+sdown",ri,"%@"); ri->s_down_since_time = mstime(); ri->flags |= SRI_S_DOWN; } } else { /* Is subjectively up */ if (ri->flags & SRI_S_DOWN) { sentinelEvent(LL_WARNING,"-sdown",ri,"%@"); ri->flags &= ~(SRI_S_DOWN|SRI_SCRIPT_KILL_SENT); } } }

SDOWN

void sentinelCheckObjectivelyDown(sentinelRedisInstance *master) { dictIterator *di; dictEntry *de; unsigned int quorum = 0, odown = 0; if (master->flags & SRI_S_DOWN) { quorum = 1; // di = dictGetIterator(master->sentinels); while((de = dictNext(di)) != NULL) { sentinelRedisInstance *ri = dictGetVal(de); if (ri->flags & SRI_MASTER_DOWN) quorum++; } dictReleaseIterator(di); if (quorum >= master->quorum) odown = 1; } /* Set the flag accordingly to the outcome. */ if (odown) { if ((master->flags & SRI_O_DOWN) == 0) { sentinelEvent(LL_WARNING,"+odown",master,"%@ #quorum %d/%d", quorum, master->quorum); master->flags |= SRI_O_DOWN; master->o_down_since_time = mstime(); } } else { if (master->flags & SRI_O_DOWN) { sentinelEvent(LL_WARNING,"-odown",master,"%@"); master->flags &= ~SRI_O_DOWN; } } }

sentinels flags SRI_MASTER_DOWN

int sentinelStartFailoverIfNeeded(sentinelRedisInstance *master) { //ODWON if (!(master->flags & SRI_O_DOWN)) return 0; // if (master->flags & SRI_FAILOVER_IN_PROGRESS) return 0; // log if (mstime() - master->failover_start_time < master->failover_timeout*2) { if (master->failover_delay_logged != master->failover_start_time) { time_t clock = (master->failover_start_time + master->failover_timeout*2)/1000; char ctimebuf[26]; ctime_r(&clock,ctimebuf); ctimebuf[24] = '\0'; /* Remove newline. */ master->failover_delay_logged = master->failover_start_time; serverLog(LL_WARNING, "Next failover delay: I will not start a failover before %s", ctimebuf); } return 0; } sentinelStartFailover(master); return 1; }

sentinelAskMasterStateToOtherSentinels

if (sentinelStartFailoverIfNeeded(ri)) sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_ASK_FORCED); sentinelFailoverStateMachine(ri); sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_NO_FLAGS);

sentinelAskMasterStateToOtherSentinels sentinelAskMasterStateToOtherSentinels

void sentinelStartFailover(sentinelRedisInstance *master) { serverAssert(master->flags & SRI_MASTER); master->failover_state = SENTINEL_FAILOVER_STATE_WAIT_START; master->flags |= SRI_FAILOVER_IN_PROGRESS; //current_epoch master->failover_epoch = ++sentinel.current_epoch; sentinelEvent(LL_WARNING,"+new-epoch",master,"%llu", (unsigned long long) sentinel.current_epoch); sentinelEvent(LL_WARNING,"+try-failover",master,"%@"); master->failover_start_time = mstime()+rand()%SENTINEL_MAX_DESYNC; master->failover_state_change_time = mstime(); }

flags current_epoch

void sentinelAskMasterStateToOtherSentinels(sentinelRedisInstance *master, int flags) { dictIterator *di; dictEntry *de; di = dictGetIterator(master->sentinels); while((de = dictNext(di)) != NULL) { sentinelRedisInstance *ri = dictGetVal(de); mstime_t elapsed = mstime() - ri->last_master_down_reply_time; char port[32]; int retval; // if (elapsed > SENTINEL_ASK_PERIOD*5) { ri->flags &= ~SRI_MASTER_DOWN; sdsfree(ri->leader); ri->leader = NULL; } //down if ((master->flags & SRI_S_DOWN) == 0) continue; // if (ri->link->disconnected) continue; if (!(flags & SENTINEL_ASK_FORCED) && mstime() - ri->last_master_down_reply_time < SENTINEL_ASK_PERIOD) continue; // sentinel is-master-down-by-addr <ip> <port> <current_epoch> <runid> ll2string(port,sizeof(port),master->addr->port); retval = redisAsyncCommand(ri->link->cc, sentinelReceiveIsMasterDownReply, ri, "%s is-master-down-by-addr %s %s %llu %s", sentinelInstanceMapCommand(ri,"SENTINEL"), master->addr->ip, port, sentinel.current_epoch, (master->failover_state > SENTINEL_FAILOVER_STATE_NONE) ? sentinel.myid : "*"); if (retval == C_OK) ri->link->pending_commands++; } dictReleaseIterator(di); }

sentinel

sentinel is-master-down-by-addr <ip> <port> <current_epoch> <runid>

sentinel runid runid * id runid sentinel

ri = getSentinelRedisInstanceByAddrAndRunID(sentinel.masters, c->argv[2]->ptr,port,NULL); // if (!sentinel.tilt && ri && (ri->flags & SRI_S_DOWN) && (ri->flags & SRI_MASTER)) isdown = 1; //runid if (ri && ri->flags & SRI_MASTER && strcasecmp(c->argv[5]->ptr,"*")) { leader = sentinelVoteLeader(ri,(uint64_t)req_epoch, c->argv[5]->ptr, &leader_epoch); } /* Reply with a three-elements multi-bulk reply: * down state, leader, vote epoch. */ addReplyArrayLen(c,3); addReply(c, isdown ? shared.cone : shared.czero); addReplyBulkCString(c, leader ? leader : "*"); addReplyLongLong(c, (long long)leader_epoch); if (leader) sdsfree(leader);

is-master-down-by-addr sentinel runid sentinelVoteLeader

++ epoch sentinel epoch raft ++

// sentinel ri->last_master_down_reply_time = mstime(); if (r->element[0]->integer == 1) { ri->flags |= SRI_MASTER_DOWN; } else { ri->flags &= ~SRI_MASTER_DOWN; } // leader if (strcmp(r->element[1]->str,"*")) { /* If the runid in the reply is not "*" the Sentinel actually * replied with a vote. */ sdsfree(ri->leader); if ((long long)ri->leader_epoch != r->element[2]->integer) serverLog(LL_WARNING, "%s voted for %s %llu", ri->name, r->element[1]->str, (unsigned long long) r->element[2]->integer); ri->leader = sdsnew(r->element[1]->str); ri->leader_epoch = r->element[2]->integer; } }

void sentinelFailoverStateMachine(sentinelRedisInstance *ri) { serverAssert(ri->flags & SRI_MASTER); if (!(ri->flags & SRI_FAILOVER_IN_PROGRESS)) return; switch(ri->failover_state) { // case SENTINEL_FAILOVER_STATE_WAIT_START: sentinelFailoverWaitStart(ri); break; // case SENTINEL_FAILOVER_STATE_SELECT_SLAVE: sentinelFailoverSelectSlave(ri); break; // case SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE: sentinelFailoverSendSlaveOfNoOne(ri); break; // case SENTINEL_FAILOVER_STATE_WAIT_PROMOTION: sentinelFailoverWaitPromotion(ri); break; //slave case SENTINEL_FAILOVER_STATE_RECONF_SLAVES: sentinelFailoverReconfNextSlave(ri); break; } }

info

  • SENTINEL_FAILOVER_STATE_WAIT_START
void sentinelFailoverWaitStart(sentinelRedisInstance *ri) { char *leader; int isleader; /* Check if we are the leader for the failover epoch. */ leader = sentinelGetLeader(ri, ri->failover_epoch); isleader = leader && strcasecmp(leader,sentinel.myid) == 0; sdsfree(leader); // if (!isleader && !(ri->flags & SRI_FORCE_FAILOVER)) { int election_timeout = SENTINEL_ELECTION_TIMEOUT; // if (election_timeout > ri->failover_timeout) election_timeout = ri->failover_timeout; // if (mstime() - ri->failover_start_time > election_timeout) { sentinelEvent(LL_WARNING,"-failover-abort-not-elected",ri,"%@"); sentinelAbortFailover(ri); } return; } sentinelEvent(LL_WARNING,"+elected-leader",ri,"%@"); if (sentinel.simfailure_flags & SENTINEL_SIMFAILURE_CRASH_AFTER_ELECTION) sentinelSimFailureCrash(); // ri->failover_state = SENTINEL_FAILOVER_STATE_SELECT_SLAVE; ri->failover_state_change_time = mstime(); sentinelEvent(LL_WARNING,"+failover-state-select-slave",ri,"%@"); }

sentinelGetLeader

di = dictGetIterator(master->sentinels); while((de = dictNext(di)) != NULL) { sentinelRedisInstance *ri = dictGetVal(de); if (ri->leader != NULL && ri->leader_epoch == sentinel.current_epoch) sentinelLeaderIncr(counters,ri->leader); } dictReleaseIterator(di);

  • SENTINEL_FAILOVER_STATE_SELECT_SLAVE
void sentinelFailoverSelectSlave(sentinelRedisInstance *ri) { sentinelRedisInstance *slave = sentinelSelectSlave(ri);

sentinelSelectSlave

sentinelRedisInstance *sentinelSelectSlave(sentinelRedisInstance *master) { sentinelRedisInstance **instance = zmalloc(sizeof(instance[0])*dictSize(master->slaves)); sentinelRedisInstance *selected = NULL; int instances = 0; dictIterator *di; dictEntry *de; mstime_t max_master_down_time = 0; if (master->flags & SRI_S_DOWN) max_master_down_time += mstime() - master->s_down_since_time; max_master_down_time += master->down_after_period * 10; di = dictGetIterator(master->slaves); while((de = dictNext(di)) != NULL) { sentinelRedisInstance *slave = dictGetVal(de); mstime_t info_validity_time; // 0 if (slave->flags & (SRI_S_DOWN|SRI_O_DOWN)) continue; if (slave->link->disconnected) continue; if (mstime() - slave->link->last_avail_time > SENTINEL_PING_PERIOD*5) continue; if (slave->slave_priority == 0) continue; // info if (master->flags & SRI_S_DOWN) info_validity_time = SENTINEL_PING_PERIOD*5; else info_validity_time = SENTINEL_INFO_PERIOD*3; // if (mstime() - slave->info_refresh > info_validity_time) continue; if (slave->master_link_down_time > max_master_down_time) continue; instance[instances++] = slave; } dictReleaseIterator(di); if (instances) { qsort(instance,instances,sizeof(sentinelRedisInstance*), compareSlavesForPromotion); selected = instance[0]; } zfree(instance); return selected; }

slave

  • SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE
void sentinelFailoverSendSlaveOfNoOne(sentinelRedisInstance *ri) { int retval; // if (ri->promoted_slave->link->disconnected) { if (mstime() - ri->failover_state_change_time > ri->failover_timeout) { sentinelEvent(LL_WARNING,"-failover-abort-slave-timeout",ri,"%@"); sentinelAbortFailover(ri); } return; } retval = sentinelSendSlaveOf(ri->promoted_slave,NULL,0); if (retval != C_OK) return; sentinelEvent(LL_NOTICE, "+failover-state-wait-promotion", ri->promoted_slave,"%@"); ri->failover_state = SENTINEL_FAILOVER_STATE_WAIT_PROMOTION; ri->failover_state_change_time = mstime(); }

sentinelSendSlaveOf slave of

  • SENTINEL_FAILOVER_STATE_WAIT_PROMOTION
void sentinelFailoverWaitPromotion(sentinelRedisInstance *ri) { if (mstime() - ri->failover_state_change_time > ri->failover_timeout) { sentinelEvent(LL_WARNING,"-failover-abort-slave-timeout",ri,"%@"); sentinelAbortFailover(ri); } }

info

if ((ri->flags & SRI_SLAVE) && role == SRI_MASTER) { // if ((ri->flags & SRI_PROMOTED) && (ri->master->flags & SRI_FAILOVER_IN_PROGRESS) && (ri->master->failover_state == SENTINEL_FAILOVER_STATE_WAIT_PROMOTION)) { // ri->master->config_epoch = ri->master->failover_epoch; ri->master->failover_state = SENTINEL_FAILOVER_STATE_RECONF_SLAVES; ri->master->failover_state_change_time = mstime(); sentinelFlushConfig(); sentinelEvent(LL_WARNING,"+promoted-slave",ri,"%@"); if (sentinel.simfailure_flags & SENTINEL_SIMFAILURE_CRASH_AFTER_PROMOTION) sentinelSimFailureCrash(); sentinelEvent(LL_WARNING,"+failover-state-reconf-slaves", ri->master,"%@"); sentinelCallClientReconfScript(ri->master,SENTINEL_LEADER, "start",ri->master->addr,ri->addr); sentinelForceHelloUpdateForMaster(ri->master); } else {

info

  • SENTINEL_FAILOVER_STATE_RECONF_SLAVES

slave slaveof

info master sentinel sentinel leader slave raft leader