redis源码分析,客户端解读
redis服务器是典型的一对多的服务器应用程序:一个服务器可以与多个客户端连接,每个客户端可以向服务器发送命令请求,而服务器则接受并处理客户端发送的请求,并将处理结果返回给客户端。
通过使用I/O多路复用技术, redis 服务器使用单线程单进程的方式处理命令请求,并与多个客户端连接进行网络通讯。
根据 redis-cli.c 中main函数分析客户端的启动流程
上图因为在 Visio 中画的,截图的时候图方便,看起来像然在一起。(:haha)
下面看一下客户端的两个重要的全局变量
static redisContext *context;
static struct config
启动客户端时,会初始化 config
全局变量,该变量记录了客户端几乎所有的配置参数信息,而 context
用于连接 redis 服务器。看一下 config 的结构
static struct config {
char * hostip ; // IP
int hostport ; // 端口
char * hostsocket ;
long repeat ;
long interval ;
int dbnum ; // 数据库编号,一般默认是 0 - 15
int interactive ; // 交互模式
int shutdown ;
int monitor_mode ;
int pubsub_mode ;
int latency_mode ;
int latency_dist_mode ;
int latency_history ;
int lru_test_mode ;
long long lru_test_sample_size ;
int cluster_mode ;
int cluster_reissue_command ;
int slave_mode ;
int pipe_mode ;
int pipe_timeout ;
int getrdb_mode ;
int stat_mode ;
int scan_mode ;
int intrinsic_latency_mode ;
int intrinsic_latency_duration ;
char * pattern ;
char * rdb_filename ;
int bigkeys ;
int stdinarg ; /* get last arg from stdin. (-x option) */
char * auth ;
int output ; /* output mode, see OUTPUT_* defines */
sds mb_delim ;
char prompt [ 128 ];
char * eval ;
int last_cmd_type ;
} config ;
客户端在 parseOptions()
中设置 config 变量参数
static int parseOptions ( int argc , char ** argv ) {
int i ;
for ( i = 1 ; i < argc ; i ++ ) {
int lastarg = i == argc - 1 ;
if ( ! strcmp ( argv [ i ], "-h" ) && ! lastarg ) { // . / redis - cli - h 10.255 . 245.41
sdsfree ( config . hostip );
config . hostip = sdsnew ( argv [ ++ i ]);
} else if ( ! strcmp ( argv [ i ], "-h" ) && lastarg ) { // . / redis - cli - h
usage ();
} else if ( ! strcmp ( argv [ i ], "--help" )) { // . / redis - cli -- help
usage ();
} else if ( ! strcmp ( argv [ i ], "-x" )) { // . / redis - cli - x
config . stdinarg = 1 ;
} else if ( ! strcmp ( argv [ i ], "-p" ) && ! lastarg ) { // . / redis - cli - p [ PORT ]
config . hostport = atoi ( argv [ ++ i ]);
} else if ( ! strcmp ( argv [ i ], "-s" ) && ! lastarg ) { // . / redis - cli - s < socket >
config . hostsocket = argv [ ++ i ];
} else if ( ! strcmp ( argv [ i ], "-r" ) && ! lastarg ) { // . / redis - cli - r < repeat > , execute specified command N times
config . repeat = strtoll ( argv [ ++ i ], NULL , 10 );
} else if ( ! strcmp ( argv [ i ], "-i" ) && ! lastarg ) { // . / redis - cli - i < interval > , used with - r
double seconds = atof ( argv [ ++ i ]);
config . interval = seconds * 1000000 ;
} else if ( ! strcmp ( argv [ i ], "-n" ) && ! lastarg ) { // . / redis - cli - n < dbnum >
config . dbnum = atoi ( argv [ ++ i ]);
} else if ( ! strcmp ( argv [ i ], "-a" ) && ! lastarg ) { // . / redis - cli - a < password > , connect to server with password
config . auth = argv [ ++ i ];
} else if ( ! strcmp ( argv [ i ], "--raw" )) {
config . output = OUTPUT_RAW ; // no formatted output
} else if ( ! strcmp ( argv [ i ], "--no-raw" )) {
config . output = OUTPUT_STANDARD ;
} else if ( ! strcmp ( argv [ i ], "--csv" )) { // redis - cli -- csv , output in csv format
config . output = OUTPUT_CSV ;
} else if ( ! strcmp ( argv [ i ], "--latency" )) {
config . latency_mode = 1 ;
} else if ( ! strcmp ( argv [ i ], "--latency-dist" )) {
config . latency_dist_mode = 1 ;
} else if ( ! strcmp ( argv [ i ], "--mono" )) {
spectrum_palette = spectrum_palette_mono ;
spectrum_palette_size = spectrum_palette_mono_size ;
} else if ( ! strcmp ( argv [ i ], "--latency-history" )) {
config . latency_mode = 1 ;
config . latency_history = 1 ;
} else if ( ! strcmp ( argv [ i ], "--lru-test" ) && ! lastarg ) {
config . lru_test_mode = 1 ;
config . lru_test_sample_size = strtoll ( argv [ ++ i ], NULL , 10 );
} else if ( ! strcmp ( argv [ i ], "--slave" )) {
config . slave_mode = 1 ;
} else if ( ! strcmp ( argv [ i ], "--stat" )) {
config . stat_mode = 1 ;
} else if ( ! strcmp ( argv [ i ], "--scan" )) {
config . scan_mode = 1 ;
} else if ( ! strcmp ( argv [ i ], "--pattern" ) && ! lastarg ) {
config . pattern = argv [ ++ i ];
} else if ( ! strcmp ( argv [ i ], "--intrinsic-latency" ) && ! lastarg ) {
config . intrinsic_latency_mode = 1 ;
config . intrinsic_latency_duration = atoi ( argv [ ++ i ]);
} else if ( ! strcmp ( argv [ i ], "--rdb" ) && ! lastarg ) { // redis - cli -- rdb < rdbfilename > , transfer an rdb dump from remote server to local file
config . getrdb_mode = 1 ;
config . rdb_filename = argv [ ++ i ];
} else if ( ! strcmp ( argv [ i ], "--pipe" )) {
config . pipe_mode = 1 ;
} else if ( ! strcmp ( argv [ i ], "--pipe-timeout" ) && ! lastarg ) {
config . pipe_timeout = atoi ( argv [ ++ i ]);
} else if ( ! strcmp ( argv [ i ], "--bigkeys" )) {
config . bigkeys = 1 ;
} else if ( ! strcmp ( argv [ i ], "--eval" ) && ! lastarg ) {
config . eval = argv [ ++ i ];
} else if ( ! strcmp ( argv [ i ], "-c" )) {
config . cluster_mode = 1 ;
} else if ( ! strcmp ( argv [ i ], "-d" ) && ! lastarg ) {
sdsfree ( config . mb_delim );
config . mb_delim = sdsnew ( argv [ ++ i ]);
} else if ( ! strcmp ( argv [ i ], "-v" ) || ! strcmp ( argv [ i ], "--version" )) {
sds version = cliVersion ();
printf ( "redis-cli %s \n " , version );
sdsfree ( version );
exit ( 0 );
} else {
if ( argv [ i ][ 0 ] == '-' ) {
fprintf ( stderr ,
"Unrecognized option or bad number of args for: '%s' \n " ,
argv [ i ]);
exit ( 1 );
} else {
/* Likely the command name, stop here. */
break ;
}
}
}
return i ;
}
上述解析命令行参数的各个参数信息,在客户端,使用 --help
就能看到,
void usage(void) {
fprintf(stderr,"Usage: ./redis-server [/path/to/redis.conf] [options]\n");
fprintf(stderr," ./redis-server - (read config from stdin)\n");
fprintf(stderr," ./redis-server -v or --version\n");
fprintf(stderr," ./redis-server -h or --help\n");
fprintf(stderr," ./redis-server --test-memory <megabytes>\n\n");
fprintf(stderr,"Examples:\n");
fprintf(stderr," ./redis-server (run the server with default conf)\n");
fprintf(stderr," ./redis-server /etc/redis/6379.conf\n");
fprintf(stderr," ./redis-server --port 7777\n");
fprintf(stderr," ./redis-server --port 7777 --slaveof 127.0.0.1 8888\n");
fprintf(stderr," ./redis-server /etc/myredis.conf --loglevel verbose\n\n");
fprintf(stderr,"Sentinel mode:\n");
fprintf(stderr," ./redis-server /etc/sentinel.conf --sentinel\n");
exit(1);
}
根据不同的参数,设置 config 的值,然后,根据命令行参数设定的值和模式(mode),选择进入不同的模式与 redis 服务器进行通讯。在本机上,直接启动 redis 客户端,比如 ./redis-cli
,这样客户端启动之后,进入的将是交互模式,config.interactive = 1
,这种模式下,用户可以直接在客户端输入指令,并能立马得到服务器返回的信息。下面,主要介绍的就是交互模式。
交互模式下,首先,需要连接服务器,这时,需要用到 context 变量
/* Connect to the server. If force is not zero the connection is performed
* even if there is already a connected socket. */
static int cliConnect ( int force ) {
if ( context == NULL || force ) {
if ( context != NULL )
redisFree ( context );
if ( config . hostsocket == NULL ) {
context = redisConnect ( config . hostip , config . hostport );
} else {
context = redisConnectUnix ( config . hostsocket );
}
if ( context -> err ) {
fprintf ( stderr , "Could not connect to Redis at " );
if ( config . hostsocket == NULL )
fprintf ( stderr , "%s:%d: %s \n " , config . hostip , config . hostport , context -> errstr );
else
fprintf ( stderr , "%s: %s \n " , config . hostsocket , context -> errstr );
redisFree ( context );
context = NULL ;
return REDIS_ERR ;
}
/* Set aggressive KEEP_ALIVE socket option in the Redis context socket
* in order to prevent timeouts caused by the execution of long
* commands. At the same time this improves the detection of real
* errors. */
anetKeepAlive ( NULL , context -> fd , REDIS_CLI_KEEPALIVE_INTERVAL );
/* Do AUTH and select the right DB. */
if ( cliAuth () != REDIS_OK )
return REDIS_ERR ;
if ( cliSelect () != REDIS_OK )
return REDIS_ERR ;
}
return REDIS_OK ;
}
当服务器连接成功时,context 的 fd 为连接成功后的 sockfd,flags 设置为
REDIS_CONNECTED,redisContext 的结构如下
/* Context for a connection to Redis */
typedef struct redisContext {
int err; /* Error flags, 0 when there is no error */
char errstr[128]; /* String representation of error when applicable */
int fd;
int flags;
char *obuf; /* Write buffer */
redisReader *reader; /* Protocol reader */
} redisContext;
当连接服务器或者命令发生错误时,err将设置为非0数字,errstr 中将记录错误信息,连接成功时,将 socket 套接字的文件描述符记录在 fd 中,同时 flags 设置为 REDIS_CONNECTED,obuf 为输出缓存,客户端发送给服务器的命令信息,解析后存放在 obuf 中,reader 作为协议解析器,用于读取和分析服务器返回的信息。
当客户端成功连接 redis 服务器之后,需要对客户端的身份进行验证(前提是服务器打开了验证的功能),cliAuth()
,如果验证失败,出了 AUTH 操作之外, 服务器将决绝客户端发送的一切其他命令操作。
cliSelect()
,用于客户端选择 redis 数据库,通过 select dbnum
的指令进行数据库选择。
服务器连接成功,进入到交互模式下,与服务器交互。但是在交互之前,还需要设置一下终端的模式。
客户端的交互模式
准备工作
客户端进入交互模式如下所示
也就是说,在用户通过客户端与服务器交互之前,还需要一些准备工作。
redis 会将在客户端上操作的所有命令记录在一个历史文件中 historyfile
,如果没有设置,一般默认为 $HOME/.rediscli_history
文件。同时,设置提示信息 config.prompt
,如上图所示的提示信息为 “127.0.0.1:6379>”,这里默认的数据库编号为 0 ,所以没有显示出来,如果是非 0 的数据库,比如是 1,需要重新设置提示信息,为 “127.0.0.1:6379[1]>”。
在交互模式下获取用户输入
/* The high level function that is the main API of the linenoise library.
* This function checks if the terminal has basic capabilities, just checking
* for a blacklist of stupid terminals, and later either calls the line
* editing function or uses dummy fgets() so that you will be able to type
* something even in the most desperate of the conditions. */
char * linenoise ( const char * prompt ) {
char buf [ LINENOISE_MAX_LINE ];
int count ;
if ( isUnsupportedTerm ()) { //no t support these terms , such as dumb , cons25 , emacs
size_t len ;
printf ( "%s" , prompt );
fflush ( stdout );
if ( fgets ( buf , LINENOISE_MAX_LINE , stdin ) == NULL ) return NULL ;
len = strlen ( buf );
while ( len && ( buf [ len - 1 ] == '\n' || buf [ len - 1 ] == '\r' )) {
len -- ;
buf [ len ] = '\0' ;
}
return strdup ( buf ); //s hould be free
} else {
count = linenoiseRaw ( buf , LINENOISE_MAX_LINE , prompt );
if ( count == - 1 ) return NULL ;
return strdup ( buf );
}
}
redis 通过上面的 linenoise()
函数获取用户输入,首先判断当前终端是不是 redis 所支持的终端类型(通过判断环境变量 TERM
),如果不是,通过 fgets 函数获取用户输入;如果是支持的终端,那么首先通过 termios
相关的API,将 term 设置为 raw mode
,该模式下,用户输入一个字符时,程序就会立即处理,类似于ncurses 中的 cbreak 模式,在 linenoiseEdit()
函数中,redis 对用户键盘的各种操作进行处理,并记录用户输入的有效字符
/* This function is the core of the line editing capability of linenoise.
* It expects 'fd' to be already in "raw mode" so that every key pressed
* will be returned ASAP to read().
*
* The resulting string is put into 'buf' when the user type enter, or
* when ctrl+d is typed.
*
* The function returns the length of the current buffer. */
static int linenoiseEdit ( int stdin_fd , int stdout_fd , char * buf , size_t buflen , const char * prompt )
{
struct linenoiseState l ;
/* Populate the linenoise state that we pass to functions implementing
* specific editing functionalities. */
l . ifd = stdin_fd ;
l . ofd = stdout_fd ;
l . buf = buf ;
l . buflen = buflen ;
l . prompt = prompt ;
l . plen = strlen ( prompt );
l . oldpos = l . pos = 0 ;
l . len = 0 ;
l . cols = getColumns ( stdin_fd , stdout_fd );
l . maxrows = 0 ;
l . history_index = 0 ;
/* Buffer starts empty. */
l . buf [ 0 ] = '\0' ;
l . buflen -- ; /* Make sure there is always space for the nulterm */
/* The latest history entry is always our current buffer, that
* initially is just an empty string. */
linenoiseHistoryAdd ( "" );
if ( write ( l . ofd , prompt , l . plen ) == - 1 ) return - 1 ;
while ( 1 ) {
char c ;
int nread ;
char seq [ 3 ];
nread = read ( l . ifd , & c , 1 );
if ( nread <= 0 ) return l . len ; //n read is 0 , server may close the connect
/* Only autocomplete when the callback is set. It returns < 0 when
* there was an error reading from fd. Otherwise it will return the
* character that should be handled next. */
if ( c == 9 && completionCallback != NULL ) {
c = completeLine ( & l );
/* Return on errors */
if ( c < 0 ) return l . len ;
/* Read next character when 0 */
if ( c == 0 ) continue ;
}
switch ( c ) {
case ENTER : /* enter */
history_len -- ;
free ( history [ history_len ]);
if ( mlmode ) linenoiseEditMoveEnd ( & l );
return ( int ) l . len ;
case CTRL_C : /* ctrl-c */
errno = EAGAIN ;
return - 1 ;
case BACKSPACE : /* backspace */
case 8 : /* ctrl-h */
linenoiseEditBackspace ( & l );
break ;
case CTRL_D : /* ctrl-d, remove char at right of cursor, or if the
line is empty, act as end-of-file. */
if ( l . len > 0 ) {
linenoiseEditDelete ( & l );
} else {
history_len -- ;
free ( history [ history_len ]);
return - 1 ;
}
break ;
case CTRL_T : /* ctrl-t, swaps current character with previous. */
if ( l . pos > 0 && l . pos < l . len ) {
int aux = buf [ l . pos - 1 ];
buf [ l . pos - 1 ] = buf [ l . pos ];
buf [ l . pos ] = aux ;
if ( l . pos != l . len - 1 ) l . pos ++ ;
refreshLine ( & l );
}
break ;
case CTRL_B : /* ctrl-b */
linenoiseEditMoveLeft ( & l );
break ;
case CTRL_F : /* ctrl-f */
linenoiseEditMoveRight ( & l );
break ;
case CTRL_P : /* ctrl-p */
linenoiseEditHistoryNext ( & l , LINENOISE_HISTORY_PREV );
break ;
case CTRL_N : /* ctrl-n */
linenoiseEditHistoryNext ( & l , LINENOISE_HISTORY_NEXT );
break ;
case ESC : /* escape sequence */
/* Read the next two bytes representing the escape sequence.
* Use two calls to handle slow terminals returning the two
* chars at different times. */
if ( read ( l . ifd , seq , 1 ) == - 1 ) break ;
if ( read ( l . ifd , seq + 1 , 1 ) == - 1 ) break ;
/* ESC [ sequences. */
if ( seq [ 0 ] == '[' ) {
if ( seq [ 1 ] >= '0' && seq [ 1 ] <= '9' ) {
/* Extended escape, read additional byte. */
if ( read ( l . ifd , seq + 2 , 1 ) == - 1 ) break ;
if ( seq [ 2 ] == '~' ) {
switch ( seq [ 1 ]) {
case '3' : /* Delete key. */
linenoiseEditDelete ( & l );
break ;
}
}
} else {
switch ( seq [ 1 ]) {
case 'A' : /* Up */
linenoiseEditHistoryNext ( & l , LINENOISE_HISTORY_PREV );
break ;
case 'B' : /* Down */
linenoiseEditHistoryNext ( & l , LINENOISE_HISTORY_NEXT );
break ;
case 'C' : /* Right */
linenoiseEditMoveRight ( & l );
break ;
case 'D' : /* Left */
linenoiseEditMoveLeft ( & l );
break ;
case 'H' : /* Home */
linenoiseEditMoveHome ( & l );
break ;
case 'F' : /* End*/
linenoiseEditMoveEnd ( & l );
break ;
}
}
}
/* ESC O sequences. */
else if ( seq [ 0 ] == 'O' ) {
switch ( seq [ 1 ]) {
case 'H' : /* Home */
linenoiseEditMoveHome ( & l );
break ;
case 'F' : /* End*/
linenoiseEditMoveEnd ( & l );
break ;
}
}
break ;
default:
if ( linenoiseEditInsert ( & l , c )) return - 1 ;
break ;
case CTRL_U : /* Ctrl+u, delete the whole line. */
buf [ 0 ] = '\0' ;
l . pos = l . len = 0 ;
refreshLine ( & l );
break ;
case CTRL_K : /* Ctrl+k, delete from current to end of line. */
buf [ l . pos ] = '\0' ;
l . len = l . pos ;
refreshLine ( & l );
break ;
case CTRL_A : /* Ctrl+a, go to the start of the line */
linenoiseEditMoveHome ( & l );
break ;
case CTRL_E : /* ctrl+e, go to the end of the line */
linenoiseEditMoveEnd ( & l );
break ;
case CTRL_L : /* ctrl+l, clear screen */
linenoiseClearScreen ();
refreshLine ( & l );
break ;
case CTRL_W : /* ctrl+w, delete previous word */
linenoiseEditDeletePrevWord ( & l );
break ;
}
}
return l . len ;
}
获取用户输入之后,将用户输入写入到历史文件中 historyfile
中,并将用户输入的命令参数解析后,发送到服务器。
客户端发送消息到服务器
客户端获取用户输入的命令及参数之后
argv = sdssplitargs(line,&argc);
将命令参数放到 argv 数组中,通过 cliSendCommand()
发送给服务器
客户端发送命令和接受结果的函数调用关系如下:
cliSendCommand -> redisAppendCommandArgv -> redisFormatCommandArgv这是根据 redis 协议格式化输出,发送到服务器
cliSendCommand -> cliReadReply -> redisGetReply ->
redisBufferWrite 和 redisBufferRead 发送和接收
redis 协议格式
/* Format a command according to the Redis protocol. This function takes the
* number of arguments, an array with arguments and an array with their
* lengths. If the latter is set to NULL, strlen will be used to compute the
* argument lengths.
*/
int redisFormatCommandArgv ( char ** target , int argc , const char ** argv , const size_t * argvlen ) {
char * cmd = NULL ; /* final command */
int pos ; /* position in final command */
size_t len ;
int totlen , j ;
/* Calculate number of bytes needed for the command */
totlen = 1 + intlen ( argc ) + 2 ; // 1 长度表示开头 * , 2 表示 \ r \ n ,再加上 argc 转换成字符串后的长度
for ( j = 0 ; j < argc ; j ++ ) {
len = argvlen ? argvlen [ j ] : strlen ( argv [ j ]);
totlen += bulklen ( len );
}
/* Build the command at protocol level */
cmd = malloc ( totlen + 1 );
if ( cmd == NULL )
return - 1 ;
pos = sprintf ( cmd , "*%d \r\n " , argc );
for ( j = 0 ; j < argc ; j ++ ) {
len = argvlen ? argvlen [ j ] : strlen ( argv [ j ]);
pos += sprintf ( cmd + pos , "$%zu \r\n " , len );
memcpy ( cmd + pos , argv [ j ], len );
pos += len ;
cmd [ pos ++ ] = '\r' ;
cmd [ pos ++ ] = '\n' ;
}
assert ( pos == totlen );
cmd [ pos ] = '\0' ;
* target = cmd ;
return totlen ;
}
redisFormatCommandArgv
函数是将客户端输入的命令按照 redis protocol
格式化,然后发送给服务器。比如 SET NAME "redis"
,按照 Redis protocol
格式化成
*3\r\n$3\r\nSET\r\n$4\r\nNAME\r\n$5\r\nredis\r\n
每一个元素都是以 \r\n
分割,最前面 *3
表示该条命令有三个元素,后面 $3
表示当前元素的长度为3