#include "fmacros.h"
#include "sockcompat.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#ifndef _WIN32
#include <strings.h>
#include <sys/time.h>
#endif
#include <assert.h>
#include <signal.h>
#include <errno.h>
#include <limits.h>
#include <math.h>
#include "hiredis.h"
#include "async.h"
#include "adapters/poll.h"
#ifdef HIREDIS_TEST_SSL
#include "hiredis_ssl.h"
#endif
#ifdef HIREDIS_TEST_ASYNC
#include "adapters/libevent.h"
#include <event2/event.h>
#endif
#include "net.h"
#include "win32.h"
enum
connection_type {
CONN_TCP,
CONN_UNIX,
CONN_FD,
CONN_SSL
};
struct
config {
enum
connection_type type;
struct
timeval connect_timeout;
struct
{
const
char
*host;
int
port;
} tcp;
struct
{
const
char
*path;
} unix_sock;
struct
{
const
char
*host;
int
port;
const
char
*ca_cert;
const
char
*cert;
const
char
*key;
} ssl;
};
struct
privdata {
int
dtor_counter;
};
struct
pushCounters {
int
nil;
int
str;
};
static
int
insecure_calloc_calls;
#ifdef HIREDIS_TEST_SSL
redisSSLContext *_ssl_ctx = NULL;
#endif
static
int
tests = 0, fails = 0, skips = 0;
#define test(_s) { printf("#%02d ", ++tests); printf(_s); }
#define test_cond(_c) if(_c) printf("\033[0;32mPASSED\033[0;0m\n"); else {printf("\033[0;31mFAILED\033[0;0m\n"); fails++;}
#define test_skipped() { printf("\033[01;33mSKIPPED\033[0;0m\n"); skips++; }
static
void
millisleep(
int
ms)
{
#ifdef _MSC_VER
Sleep(ms);
#else
usleep(ms*1000);
#endif
}
static
long
long
usec(
void
) {
#ifndef _MSC_VER
struct
timeval tv;
gettimeofday(&tv,NULL);
return
(((
long
long
)tv.tv_sec)*1000000)+tv.tv_usec;
#else
FILETIME ft;
GetSystemTimeAsFileTime(&ft);
return
(((
long
long
)ft.dwHighDateTime << 32) | ft.dwLowDateTime) / 10;
#endif
}
#ifdef NDEBUG
#undef assert
#define assert(e) (void)(e)
#endif
#define REDIS_VERSION_FIELD "redis_version:"
void
get_redis_version(redisContext *c,
int
*majorptr,
int
*minorptr) {
redisReply *reply;
char
*eptr, *s, *e;
int
major, minor;
reply = redisCommand(c,
"INFO"
);
if
(reply == NULL || c->err || reply->type != REDIS_REPLY_STRING)
goto
abort
;
if
((s =
strstr
(reply->str, REDIS_VERSION_FIELD)) == NULL)
goto
abort
;
s +=
strlen
(REDIS_VERSION_FIELD);
if
((e =
strstr
(s,
"\r\n"
)) == NULL || (e - s) < 5)
goto
abort
;
major =
strtol
(s, &eptr, 10);
if
(*eptr !=
'.'
)
goto
abort
;
minor =
strtol
(eptr+1, NULL, 10);
if
(majorptr) *majorptr = major;
if
(minorptr) *minorptr = minor;
freeReplyObject(reply);
return
;
abort
:
freeReplyObject(reply);
fprintf
(stderr,
"Error: Cannot determine Redis version, aborting\n"
);
exit
(1);
}
static
redisContext *select_database(redisContext *c) {
redisReply *reply;
reply = redisCommand(c,
"SELECT 9"
);
assert
(reply != NULL);
freeReplyObject(reply);
reply = redisCommand(c,
"DBSIZE"
);
assert
(reply != NULL);
if
(reply->type == REDIS_REPLY_INTEGER && reply->integer == 0) {
freeReplyObject(reply);
}
else
{
printf
(
"Database #9 is not empty, test can not continue\n"
);
exit
(1);
}
return
c;
}
static
void
send_hello(redisContext *c,
int
version) {
redisReply *reply;
int
expected;
reply = redisCommand(c,
"HELLO %d"
, version);
expected = version == 3 ? REDIS_REPLY_MAP : REDIS_REPLY_ARRAY;
assert
(reply != NULL && reply->type == expected);
freeReplyObject(reply);
}
static
void
send_client_tracking(redisContext *c,
const
char
*str) {
redisReply *reply;
reply = redisCommand(c,
"CLIENT TRACKING %s"
, str);
assert
(reply != NULL && reply->type == REDIS_REPLY_STATUS);
freeReplyObject(reply);
}
static
int
disconnect(redisContext *c,
int
keep_fd) {
redisReply *reply;
reply = redisCommand(c,
"SELECT 9"
);
assert
(reply != NULL);
freeReplyObject(reply);
reply = redisCommand(c,
"FLUSHDB"
);
assert
(reply != NULL);
freeReplyObject(reply);
if
(keep_fd)
return
redisFreeKeepFd(c);
redisFree(c);
return
-1;
}
static
void
do_ssl_handshake(redisContext *c) {
#ifdef HIREDIS_TEST_SSL
redisInitiateSSLWithContext(c, _ssl_ctx);
if
(c->err) {
printf
(
"SSL error: %s\n"
, c->errstr);
redisFree(c);
exit
(1);
}
#else
(
void
) c;
#endif
}
static
redisContext *do_connect(
struct
config config) {
redisContext *c = NULL;
if
(config.type == CONN_TCP) {
c = redisConnect(config.tcp.host, config.tcp.port);
}
else
if
(config.type == CONN_SSL) {
c = redisConnect(config.ssl.host, config.ssl.port);
}
else
if
(config.type == CONN_UNIX) {
c = redisConnectUnix(config.unix_sock.path);
}
else
if
(config.type == CONN_FD) {
redisContext *dummy_ctx = redisConnectUnix(config.unix_sock.path);
if
(dummy_ctx) {
int
fd = disconnect(dummy_ctx, 1);
printf
(
"Connecting to inherited fd %d\n"
, fd);
c = redisConnectFd(fd);
}
}
else
{
assert
(NULL);
}
if
(c == NULL) {
printf
(
"Connection error: can't allocate redis context\n"
);
exit
(1);
}
else
if
(c->err) {
printf
(
"Connection error: %s\n"
, c->errstr);
redisFree(c);
exit
(1);
}
if
(config.type == CONN_SSL) {
do_ssl_handshake(c);
}
return
select_database(c);
}
static
void
do_reconnect(redisContext *c,
struct
config config) {
redisReconnect(c);
if
(config.type == CONN_SSL) {
do_ssl_handshake(c);
}
}
static
void
test_format_commands(
void
) {
char
*cmd;
int
len;
test(
"Format command without interpolation: "
);
len = redisFormatCommand(&cmd,
"SET foo bar"
);
test_cond(
strncmp
(cmd,
"*3\r\n$3\r\nSET\r\n$3\r\nfoo\r\n$3\r\nbar\r\n"
,len) == 0 &&
len == 4+4+(3+2)+4+(3+2)+4+(3+2));
hi_free(cmd);
test(
"Format command with %%s string interpolation: "
);
len = redisFormatCommand(&cmd,
"SET %s %s"
,
"foo"
,
"bar"
);
test_cond(
strncmp
(cmd,
"*3\r\n$3\r\nSET\r\n$3\r\nfoo\r\n$3\r\nbar\r\n"
,len) == 0 &&
len == 4+4+(3+2)+4+(3+2)+4+(3+2));
hi_free(cmd);
test(
"Format command with %%s and an empty string: "
);
len = redisFormatCommand(&cmd,
"SET %s %s"
,
"foo"
,
""
);
test_cond(
strncmp
(cmd,
"*3\r\n$3\r\nSET\r\n$3\r\nfoo\r\n$0\r\n\r\n"
,len) == 0 &&
len == 4+4+(3+2)+4+(3+2)+4+(0+2));
hi_free(cmd);
test(
"Format command with an empty string in between proper interpolations: "
);
len = redisFormatCommand(&cmd,
"SET %s %s"
,
""
,
"foo"
);
test_cond(
strncmp
(cmd,
"*3\r\n$3\r\nSET\r\n$0\r\n\r\n$3\r\nfoo\r\n"
,len) == 0 &&
len == 4+4+(3+2)+4+(0+2)+4+(3+2));
hi_free(cmd);
test(
"Format command with %%b string interpolation: "
);
len = redisFormatCommand(&cmd,
"SET %b %b"
,
"foo"
,(
size_t
)3,
"b\0r"
,(
size_t
)3);
test_cond(
strncmp
(cmd,
"*3\r\n$3\r\nSET\r\n$3\r\nfoo\r\n$3\r\nb\0r\r\n"
,len) == 0 &&
len == 4+4+(3+2)+4+(3+2)+4+(3+2));
hi_free(cmd);
test(
"Format command with %%b and an empty string: "
);
len = redisFormatCommand(&cmd,
"SET %b %b"
,
"foo"
,(
size_t
)3,
""
,(
size_t
)0);
test_cond(
strncmp
(cmd,
"*3\r\n$3\r\nSET\r\n$3\r\nfoo\r\n$0\r\n\r\n"
,len) == 0 &&
len == 4+4+(3+2)+4+(3+2)+4+(0+2));
hi_free(cmd);
test(
"Format command with literal %%: "
);
len = redisFormatCommand(&cmd,
"SET %% %%"
);
test_cond(
strncmp
(cmd,
"*3\r\n$3\r\nSET\r\n$1\r\n%\r\n$1\r\n%\r\n"
,len) == 0 &&
len == 4+4+(3+2)+4+(1+2)+4+(1+2));
hi_free(cmd);
#define INTEGER_WIDTH_TEST(fmt, type) do { \
type value = 123; \
test(
"Format command with printf-delegation ("
#type
"): "
); \
len = redisFormatCommand(&cmd,
"key:%08"
fmt
" str:%s"
, value,
"hello"
); \
test_cond(
strncmp
(cmd,
"*2\r\n$12\r\nkey:00000123\r\n$9\r\nstr:hello\r\n"
,len) == 0 && \
len == 4+5+(12+2)+4+(9+2)); \
hi_free(cmd); \
}
while
(0)
#define FLOAT_WIDTH_TEST(type) do { \
type value = 123.0; \
test(
"Format command with printf-delegation ("
#type
"): "
); \
len = redisFormatCommand(&cmd,
"key:%08.3f str:%s"
, value,
"hello"
); \
test_cond(
strncmp
(cmd,
"*2\r\n$12\r\nkey:0123.000\r\n$9\r\nstr:hello\r\n"
,len) == 0 && \
len == 4+5+(12+2)+4+(9+2)); \
hi_free(cmd); \
}
while
(0)
INTEGER_WIDTH_TEST(
"d"
,
int
);
INTEGER_WIDTH_TEST(
"hhd"
,
char
);
INTEGER_WIDTH_TEST(
"hd"
,
short
);
INTEGER_WIDTH_TEST(
"ld"
,
long
);
INTEGER_WIDTH_TEST(
"lld"
,
long
long
);
INTEGER_WIDTH_TEST(
"u"
, unsigned
int
);
INTEGER_WIDTH_TEST(
"hhu"
, unsigned
char
);
INTEGER_WIDTH_TEST(
"hu"
, unsigned
short
);
INTEGER_WIDTH_TEST(
"lu"
, unsigned
long
);
INTEGER_WIDTH_TEST(
"llu"
, unsigned
long
long
);
FLOAT_WIDTH_TEST(
float
);
FLOAT_WIDTH_TEST(
double
);
test(
"Format command with unhandled printf format (specifier 'p' not supported): "
);
len = redisFormatCommand(&cmd,
"key:%08p %b"
,(
void
*)1234,
"foo"
,(
size_t
)3);
test_cond(len == -1);
test(
"Format command with invalid printf format (specifier missing): "
);
len = redisFormatCommand(&cmd,
"%-"
);
test_cond(len == -1);
const
char
*argv[3];
argv[0] =
"SET"
;
argv[1] =
"foo\0xxx"
;
argv[2] =
"bar"
;
size_t
lens[3] = { 3, 7, 3 };
int
argc = 3;
test(
"Format command by passing argc/argv without lengths: "
);
len = redisFormatCommandArgv(&cmd,argc,argv,NULL);
test_cond(
strncmp
(cmd,
"*3\r\n$3\r\nSET\r\n$3\r\nfoo\r\n$3\r\nbar\r\n"
,len) == 0 &&
len == 4+4+(3+2)+4+(3+2)+4+(3+2));
hi_free(cmd);
test(
"Format command by passing argc/argv with lengths: "
);
len = redisFormatCommandArgv(&cmd,argc,argv,lens);
test_cond(
strncmp
(cmd,
"*3\r\n$3\r\nSET\r\n$7\r\nfoo\0xxx\r\n$3\r\nbar\r\n"
,len) == 0 &&
len == 4+4+(3+2)+4+(7+2)+4+(3+2));
hi_free(cmd);
sds sds_cmd;
sds_cmd = NULL;
test(
"Format command into sds by passing argc/argv without lengths: "
);
len = redisFormatSdsCommandArgv(&sds_cmd,argc,argv,NULL);
test_cond(
strncmp
(sds_cmd,
"*3\r\n$3\r\nSET\r\n$3\r\nfoo\r\n$3\r\nbar\r\n"
,len) == 0 &&
len == 4+4+(3+2)+4+(3+2)+4+(3+2));
sdsfree(sds_cmd);
sds_cmd = NULL;
test(
"Format command into sds by passing argc/argv with lengths: "
);
len = redisFormatSdsCommandArgv(&sds_cmd,argc,argv,lens);
test_cond(
strncmp
(sds_cmd,
"*3\r\n$3\r\nSET\r\n$7\r\nfoo\0xxx\r\n$3\r\nbar\r\n"
,len) == 0 &&
len == 4+4+(3+2)+4+(7+2)+4+(3+2));
sdsfree(sds_cmd);
}
static
void
test_append_formatted_commands(
struct
config config) {
redisContext *c;
redisReply *reply;
char
*cmd;
int
len;
c = do_connect(config);
test(
"Append format command: "
);
len = redisFormatCommand(&cmd,
"SET foo bar"
);
test_cond(redisAppendFormattedCommand(c, cmd, len) == REDIS_OK);
assert
(redisGetReply(c, (
void
*)&reply) == REDIS_OK);
hi_free(cmd);
freeReplyObject(reply);
disconnect(c, 0);
}
static
void
test_tcp_options(
struct
config cfg) {
redisContext *c;
c = do_connect(cfg);
test(
"We can enable TCP_KEEPALIVE: "
);
test_cond(redisEnableKeepAlive(c) == REDIS_OK);
#ifdef TCP_USER_TIMEOUT
test(
"We can set TCP_USER_TIMEOUT: "
);
test_cond(redisSetTcpUserTimeout(c, 100) == REDIS_OK);
#else
test(
"Setting TCP_USER_TIMEOUT errors when unsupported: "
);
test_cond(redisSetTcpUserTimeout(c, 100) == REDIS_ERR && c->err == REDIS_ERR_IO);
#endif
redisFree(c);
}
static
void
test_reply_reader(
void
) {
redisReader *reader;
void
*reply, *root;
int
ret;
int
i;
test(
"Error handling in reply parser: "
);
reader = redisReaderCreate();
redisReaderFeed(reader,(
char
*)
"@foo\r\n"
,6);
ret = redisReaderGetReply(reader,NULL);
test_cond(ret == REDIS_ERR &&
strcasecmp(reader->errstr,
"Protocol error, got \"@\" as reply type byte"
) == 0);
redisReaderFree(reader);
test(
"Memory cleanup in reply parser: "
);
reader = redisReaderCreate();
redisReaderFeed(reader,(
char
*)
"*2\r\n"
,4);
redisReaderFeed(reader,(
char
*)
"$5\r\nhello\r\n"
,11);
redisReaderFeed(reader,(
char
*)
"@foo\r\n"
,6);
ret = redisReaderGetReply(reader,NULL);
test_cond(ret == REDIS_ERR &&
strcasecmp(reader->errstr,
"Protocol error, got \"@\" as reply type byte"
) == 0);
redisReaderFree(reader);
reader = redisReaderCreate();
test(
"Can handle arbitrarily nested multi-bulks: "
);
for
(i = 0; i < 128; i++) {
redisReaderFeed(reader,(
char
*)
"*1\r\n"
, 4);
}
redisReaderFeed(reader,(
char
*)
"$6\r\nLOLWUT\r\n"
,12);
ret = redisReaderGetReply(reader,&reply);
root = reply;
test_cond(ret == REDIS_OK &&
((redisReply*)reply)->type == REDIS_REPLY_ARRAY &&
((redisReply*)reply)->elements == 1);
test(
"Can parse arbitrarily nested multi-bulks correctly: "
);
while
(i--) {
assert
(reply != NULL && ((redisReply*)reply)->type == REDIS_REPLY_ARRAY);
reply = ((redisReply*)reply)->element[0];
}
test_cond(((redisReply*)reply)->type == REDIS_REPLY_STRING &&
!
memcmp
(((redisReply*)reply)->str,
"LOLWUT"
, 6));
freeReplyObject(root);
redisReaderFree(reader);
test(
"Correctly parses LLONG_MAX: "
);
reader = redisReaderCreate();
redisReaderFeed(reader,
":9223372036854775807\r\n"
,22);
ret = redisReaderGetReply(reader,&reply);
test_cond(ret == REDIS_OK &&
((redisReply*)reply)->type == REDIS_REPLY_INTEGER &&
((redisReply*)reply)->integer == LLONG_MAX);
freeReplyObject(reply);
redisReaderFree(reader);
test(
"Set error when > LLONG_MAX: "
);
reader = redisReaderCreate();
redisReaderFeed(reader,
":9223372036854775808\r\n"
,22);
ret = redisReaderGetReply(reader,&reply);
test_cond(ret == REDIS_ERR &&
strcasecmp(reader->errstr,
"Bad integer value"
) == 0);
freeReplyObject(reply);
redisReaderFree(reader);
test(
"Correctly parses LLONG_MIN: "
);
reader = redisReaderCreate();
redisReaderFeed(reader,
":-9223372036854775808\r\n"
,23);
ret = redisReaderGetReply(reader,&reply);
test_cond(ret == REDIS_OK &&
((redisReply*)reply)->type == REDIS_REPLY_INTEGER &&
((redisReply*)reply)->integer == LLONG_MIN);
freeReplyObject(reply);
redisReaderFree(reader);
test(
"Set error when < LLONG_MIN: "
);
reader = redisReaderCreate();
redisReaderFeed(reader,
":-9223372036854775809\r\n"
,23);
ret = redisReaderGetReply(reader,&reply);
test_cond(ret == REDIS_ERR &&
strcasecmp(reader->errstr,
"Bad integer value"
) == 0);
freeReplyObject(reply);
redisReaderFree(reader);
test(
"Set error when array < -1: "
);
reader = redisReaderCreate();
redisReaderFeed(reader,
"*-2\r\n+asdf\r\n"
,12);
ret = redisReaderGetReply(reader,&reply);
test_cond(ret == REDIS_ERR &&
strcasecmp(reader->errstr,
"Multi-bulk length out of range"
) == 0);
freeReplyObject(reply);
redisReaderFree(reader);
test(
"Set error when bulk < -1: "
);
reader = redisReaderCreate();
redisReaderFeed(reader,
"$-2\r\nasdf\r\n"
,11);
ret = redisReaderGetReply(reader,&reply);
test_cond(ret == REDIS_ERR &&
strcasecmp(reader->errstr,
"Bulk string length out of range"
) == 0);
freeReplyObject(reply);
redisReaderFree(reader);
test(
"Can configure maximum multi-bulk elements: "
);
reader = redisReaderCreate();
reader->maxelements = 1024;
redisReaderFeed(reader,
"*1025\r\n"
, 7);
ret = redisReaderGetReply(reader,&reply);
test_cond(ret == REDIS_ERR &&
strcasecmp(reader->errstr,
"Multi-bulk length out of range"
) == 0);
freeReplyObject(reply);
redisReaderFree(reader);
test(
"Multi-bulk never overflows regardless of maxelements: "
);
size_t
bad_mbulk_len = (SIZE_MAX /
sizeof
(
void
*)) + 3;
char
bad_mbulk_reply[100];
snprintf(bad_mbulk_reply,
sizeof
(bad_mbulk_reply),
"*%llu\r\n+asdf\r\n"
,
(unsigned
long
long
) bad_mbulk_len);
reader = redisReaderCreate();
reader->maxelements = 0;
redisReaderFeed(reader, bad_mbulk_reply,
strlen
(bad_mbulk_reply));
ret = redisReaderGetReply(reader,&reply);
test_cond(ret == REDIS_ERR && strcasecmp(reader->errstr,
"Out of memory"
) == 0);
freeReplyObject(reply);
redisReaderFree(reader);
#if LLONG_MAX > SIZE_MAX
test(
"Set error when array > SIZE_MAX: "
);
reader = redisReaderCreate();
redisReaderFeed(reader,
"*9223372036854775807\r\n+asdf\r\n"
,29);
ret = redisReaderGetReply(reader,&reply);
test_cond(ret == REDIS_ERR &&
strcasecmp(reader->errstr,
"Multi-bulk length out of range"
) == 0);
freeReplyObject(reply);
redisReaderFree(reader);
test(
"Set error when bulk > SIZE_MAX: "
);
reader = redisReaderCreate();
redisReaderFeed(reader,
"$9223372036854775807\r\nasdf\r\n"
,28);
ret = redisReaderGetReply(reader,&reply);
test_cond(ret == REDIS_ERR &&
strcasecmp(reader->errstr,
"Bulk string length out of range"
) == 0);
freeReplyObject(reply);
redisReaderFree(reader);
#endif
test(
"Works with NULL functions for reply: "
);
reader = redisReaderCreate();
reader->fn = NULL;
redisReaderFeed(reader,(
char
*)
"+OK\r\n"
,5);
ret = redisReaderGetReply(reader,&reply);
test_cond(ret == REDIS_OK && reply == (
void
*)REDIS_REPLY_STATUS);
redisReaderFree(reader);
test(
"Works when a single newline (\\r\\n) covers two calls to feed: "
);
reader = redisReaderCreate();
reader->fn = NULL;
redisReaderFeed(reader,(
char
*)
"+OK\r"
,4);
ret = redisReaderGetReply(reader,&reply);
assert
(ret == REDIS_OK && reply == NULL);
redisReaderFeed(reader,(
char
*)
"\n"
,1);
ret = redisReaderGetReply(reader,&reply);
test_cond(ret == REDIS_OK && reply == (
void
*)REDIS_REPLY_STATUS);
redisReaderFree(reader);
test(
"Don't reset state after protocol error: "
);
reader = redisReaderCreate();
reader->fn = NULL;
redisReaderFeed(reader,(
char
*)
"x"
,1);
ret = redisReaderGetReply(reader,&reply);
assert
(ret == REDIS_ERR);
ret = redisReaderGetReply(reader,&reply);
test_cond(ret == REDIS_ERR && reply == NULL);
redisReaderFree(reader);
test(
"Don't reset state after protocol error(not segfault): "
);
reader = redisReaderCreate();
redisReaderFeed(reader,(
char
*)
"*3\r\n$3\r\nSET\r\n$5\r\nhello\r\n$"
, 25);
ret = redisReaderGetReply(reader,&reply);
assert
(ret == REDIS_OK);
redisReaderFeed(reader,(
char
*)
"3\r\nval\r\n"
, 8);
ret = redisReaderGetReply(reader,&reply);
test_cond(ret == REDIS_OK &&
((redisReply*)reply)->type == REDIS_REPLY_ARRAY &&
((redisReply*)reply)->elements == 3);
freeReplyObject(reply);
redisReaderFree(reader);
test(
"Don't do empty allocation for empty multi bulk: "
);
reader = redisReaderCreate();
redisReaderFeed(reader,(
char
*)
"*0\r\n"
,4);
ret = redisReaderGetReply(reader,&reply);
test_cond(ret == REDIS_OK &&
((redisReply*)reply)->type == REDIS_REPLY_ARRAY &&
((redisReply*)reply)->elements == 0);
freeReplyObject(reply);
redisReaderFree(reader);
test(
"Can parse RESP3 verbatim strings: "
);
reader = redisReaderCreate();
redisReaderFeed(reader,(
char
*)
"=10\r\ntxt:LOLWUT\r\n"
,17);
ret = redisReaderGetReply(reader,&reply);
test_cond(ret == REDIS_OK &&
((redisReply*)reply)->type == REDIS_REPLY_VERB &&
!
memcmp
(((redisReply*)reply)->str,
"LOLWUT"
, 6));
freeReplyObject(reply);
redisReaderFree(reader);
test(
"Can parse RESP3 push messages: "
);
reader = redisReaderCreate();
redisReaderFeed(reader,(
char
*)
">2\r\n$6\r\nLOLWUT\r\n:42\r\n"
,21);
ret = redisReaderGetReply(reader,&reply);
test_cond(ret == REDIS_OK &&
((redisReply*)reply)->type == REDIS_REPLY_PUSH &&
((redisReply*)reply)->elements == 2 &&
((redisReply*)reply)->element[0]->type == REDIS_REPLY_STRING &&
!
memcmp
(((redisReply*)reply)->element[0]->str,
"LOLWUT"
,6) &&
((redisReply*)reply)->element[1]->type == REDIS_REPLY_INTEGER &&
((redisReply*)reply)->element[1]->integer == 42);
freeReplyObject(reply);
redisReaderFree(reader);
test(
"Can parse RESP3 doubles: "
);
reader = redisReaderCreate();
redisReaderFeed(reader,
",3.14159265358979323846\r\n"
,25);
ret = redisReaderGetReply(reader,&reply);
test_cond(ret == REDIS_OK &&
((redisReply*)reply)->type == REDIS_REPLY_DOUBLE &&
fabs
(((redisReply*)reply)->dval - 3.14159265358979323846) < 0.00000001 &&
((redisReply*)reply)->len == 22 &&
strcmp
(((redisReply*)reply)->str,
"3.14159265358979323846"
) == 0);
freeReplyObject(reply);
redisReaderFree(reader);
test(
"Set error on invalid RESP3 double: "
);
reader = redisReaderCreate();
redisReaderFeed(reader,
",3.14159\000265358979323846\r\n"
,26);
ret = redisReaderGetReply(reader,&reply);
test_cond(ret == REDIS_ERR &&
strcasecmp(reader->errstr,
"Bad double value"
) == 0);
freeReplyObject(reply);
redisReaderFree(reader);
test(
"Correctly parses RESP3 double INFINITY: "
);
reader = redisReaderCreate();
redisReaderFeed(reader,
",inf\r\n"
,6);
ret = redisReaderGetReply(reader,&reply);
test_cond(ret == REDIS_OK &&
((redisReply*)reply)->type == REDIS_REPLY_DOUBLE &&
isinf(((redisReply*)reply)->dval) &&
((redisReply*)reply)->dval > 0);
freeReplyObject(reply);
redisReaderFree(reader);
test(
"Correctly parses RESP3 double NaN: "
);
reader = redisReaderCreate();
redisReaderFeed(reader,
",nan\r\n"
,6);
ret = redisReaderGetReply(reader,&reply);
test_cond(ret == REDIS_OK &&
((redisReply*)reply)->type == REDIS_REPLY_DOUBLE &&
isnan(((redisReply*)reply)->dval));
freeReplyObject(reply);
redisReaderFree(reader);
test(
"Correctly parses RESP3 double -Nan: "
);
reader = redisReaderCreate();
redisReaderFeed(reader,
",-nan\r\n"
, 7);
ret = redisReaderGetReply(reader, &reply);
test_cond(ret == REDIS_OK &&
((redisReply*)reply)->type == REDIS_REPLY_DOUBLE &&
isnan(((redisReply*)reply)->dval));
freeReplyObject(reply);
redisReaderFree(reader);
test(
"Can parse RESP3 nil: "
);
reader = redisReaderCreate();
redisReaderFeed(reader,
"_\r\n"
,3);
ret = redisReaderGetReply(reader,&reply);
test_cond(ret == REDIS_OK &&
((redisReply*)reply)->type == REDIS_REPLY_NIL);
freeReplyObject(reply);
redisReaderFree(reader);
test(
"Set error on invalid RESP3 nil: "
);
reader = redisReaderCreate();
redisReaderFeed(reader,
"_nil\r\n"
,6);
ret = redisReaderGetReply(reader,&reply);
test_cond(ret == REDIS_ERR &&
strcasecmp(reader->errstr,
"Bad nil value"
) == 0);
freeReplyObject(reply);
redisReaderFree(reader);
test(
"Can parse RESP3 bool (true): "
);
reader = redisReaderCreate();
redisReaderFeed(reader,
"#t\r\n"
,4);
ret = redisReaderGetReply(reader,&reply);
test_cond(ret == REDIS_OK &&
((redisReply*)reply)->type == REDIS_REPLY_BOOL &&
((redisReply*)reply)->integer);
freeReplyObject(reply);
redisReaderFree(reader);
test(
"Can parse RESP3 bool (false): "
);
reader = redisReaderCreate();
redisReaderFeed(reader,
"#f\r\n"
,4);
ret = redisReaderGetReply(reader,&reply);
test_cond(ret == REDIS_OK &&
((redisReply*)reply)->type == REDIS_REPLY_BOOL &&
!((redisReply*)reply)->integer);
freeReplyObject(reply);
redisReaderFree(reader);
test(
"Set error on invalid RESP3 bool: "
);
reader = redisReaderCreate();
redisReaderFeed(reader,
"#foobar\r\n"
,9);
ret = redisReaderGetReply(reader,&reply);
test_cond(ret == REDIS_ERR &&
strcasecmp(reader->errstr,
"Bad bool value"
) == 0);
freeReplyObject(reply);
redisReaderFree(reader);
test(
"Can parse RESP3 map: "
);
reader = redisReaderCreate();
redisReaderFeed(reader,
"%2\r\n+first\r\n:123\r\n$6\r\nsecond\r\n#t\r\n"
,34);
ret = redisReaderGetReply(reader,&reply);
test_cond(ret == REDIS_OK &&
((redisReply*)reply)->type == REDIS_REPLY_MAP &&
((redisReply*)reply)->elements == 4 &&
((redisReply*)reply)->element[0]->type == REDIS_REPLY_STATUS &&
((redisReply*)reply)->element[0]->len == 5 &&
!
strcmp
(((redisReply*)reply)->element[0]->str,
"first"
) &&
((redisReply*)reply)->element[1]->type == REDIS_REPLY_INTEGER &&
((redisReply*)reply)->element[1]->integer == 123 &&
((redisReply*)reply)->element[2]->type == REDIS_REPLY_STRING &&
((redisReply*)reply)->element[2]->len == 6 &&
!
strcmp
(((redisReply*)reply)->element[2]->str,
"second"
) &&
((redisReply*)reply)->element[3]->type == REDIS_REPLY_BOOL &&
((redisReply*)reply)->element[3]->integer);
freeReplyObject(reply);
redisReaderFree(reader);
test(
"Can parse RESP3 set: "
);
reader = redisReaderCreate();
redisReaderFeed(reader,
"~5\r\n+orange\r\n$5\r\napple\r\n#f\r\n:100\r\n:999\r\n"
,40);
ret = redisReaderGetReply(reader,&reply);
test_cond(ret == REDIS_OK &&
((redisReply*)reply)->type == REDIS_REPLY_SET &&
((redisReply*)reply)->elements == 5 &&
((redisReply*)reply)->element[0]->type == REDIS_REPLY_STATUS &&
((redisReply*)reply)->element[0]->len == 6 &&
!
strcmp
(((redisReply*)reply)->element[0]->str,
"orange"
) &&
((redisReply*)reply)->element[1]->type == REDIS_REPLY_STRING &&
((redisReply*)reply)->element[1]->len == 5 &&
!
strcmp
(((redisReply*)reply)->element[1]->str,
"apple"
) &&
((redisReply*)reply)->element[2]->type == REDIS_REPLY_BOOL &&
!((redisReply*)reply)->element[2]->integer &&
((redisReply*)reply)->element[3]->type == REDIS_REPLY_INTEGER &&
((redisReply*)reply)->element[3]->integer == 100 &&
((redisReply*)reply)->element[4]->type == REDIS_REPLY_INTEGER &&
((redisReply*)reply)->element[4]->integer == 999);
freeReplyObject(reply);
redisReaderFree(reader);
test(
"Can parse RESP3 bignum: "
);
reader = redisReaderCreate();
redisReaderFeed(reader,
"(3492890328409238509324850943850943825024385\r\n"
,46);
ret = redisReaderGetReply(reader,&reply);
test_cond(ret == REDIS_OK &&
((redisReply*)reply)->type == REDIS_REPLY_BIGNUM &&
((redisReply*)reply)->len == 43 &&
!
strcmp
(((redisReply*)reply)->str,
"3492890328409238509324850943850943825024385"
));
freeReplyObject(reply);
redisReaderFree(reader);
test(
"Can parse RESP3 doubles in an array: "
);
reader = redisReaderCreate();
redisReaderFeed(reader,
"*1\r\n,3.14159265358979323846\r\n"
,31);
ret = redisReaderGetReply(reader,&reply);
test_cond(ret == REDIS_OK &&
((redisReply*)reply)->type == REDIS_REPLY_ARRAY &&
((redisReply*)reply)->elements == 1 &&
((redisReply*)reply)->element[0]->type == REDIS_REPLY_DOUBLE &&
fabs
(((redisReply*)reply)->element[0]->dval - 3.14159265358979323846) < 0.00000001 &&
((redisReply*)reply)->element[0]->len == 22 &&
strcmp
(((redisReply*)reply)->element[0]->str,
"3.14159265358979323846"
) == 0);
freeReplyObject(reply);
redisReaderFree(reader);
}
static
void
test_free_null(
void
) {
void
*redisCtx = NULL;
void
*reply = NULL;
test(
"Don't fail when redisFree is passed a NULL value: "
);
redisFree(redisCtx);
test_cond(redisCtx == NULL);
test(
"Don't fail when freeReplyObject is passed a NULL value: "
);
freeReplyObject(reply);
test_cond(reply == NULL);
}
static
void
*hi_malloc_fail(
size_t
size) {
(
void
)size;
return
NULL;
}
static
void
*hi_calloc_fail(
size_t
nmemb,
size_t
size) {
(
void
)nmemb;
(
void
)size;
return
NULL;
}
static
void
*hi_calloc_insecure(
size_t
nmemb,
size_t
size) {
(
void
)nmemb;
(
void
)size;
insecure_calloc_calls++;
return
(
void
*)0xdeadc0de;
}
static
void
*hi_realloc_fail(
void
*ptr,
size_t
size) {
(
void
)ptr;
(
void
)size;
return
NULL;
}
static
void
test_allocator_injection(
void
) {
void
*ptr;
hiredisAllocFuncs ha = {
.mallocFn = hi_malloc_fail,
.callocFn = hi_calloc_fail,
.reallocFn = hi_realloc_fail,
.strdupFn = strdup,
.freeFn =
free
,
};
hiredisSetAllocators(&ha);
test(
"redisContext uses injected allocators: "
);
redisContext *c = redisConnect(
"localhost"
, 6379);
test_cond(c == NULL);
test(
"redisReader uses injected allocators: "
);
redisReader *reader = redisReaderCreate();
test_cond(reader == NULL);
test(
"hiredis calloc wrapper protects against overflow: "
);
ha.callocFn = hi_calloc_insecure;
hiredisSetAllocators(&ha);
ptr = hi_calloc((SIZE_MAX /
sizeof
(
void
*)) + 3,
sizeof
(
void
*));
test_cond(ptr == NULL && insecure_calloc_calls == 0);
hiredisResetAllocators();
}
#define HIREDIS_BAD_DOMAIN "idontexist-noreally.com"
static
void
test_blocking_connection_errors(
void
) {
struct
addrinfo hints = {.ai_family = AF_INET};
struct
addrinfo *ai_tmp = NULL;
redisContext *c;
int
rv = getaddrinfo(HIREDIS_BAD_DOMAIN,
"6379"
, &hints, &ai_tmp);
if
(rv != 0) {
test(
"Returns error when host cannot be resolved: "
);
c = redisConnect(HIREDIS_BAD_DOMAIN, 6379);
test_cond(
c->err == REDIS_ERR_OTHER &&
(
strcmp
(c->errstr,
"Name or service not known"
) == 0 ||
strcmp
(c->errstr,
"Can't resolve: "
HIREDIS_BAD_DOMAIN) == 0 ||
strcmp
(c->errstr,
"Name does not resolve"
) == 0 ||
strcmp
(c->errstr,
"nodename nor servname provided, or not known"
) == 0 ||
strcmp
(c->errstr,
"node name or service name not known"
) == 0 ||
strcmp
(c->errstr,
"No address associated with hostname"
) == 0 ||
strcmp
(c->errstr,
"Temporary failure in name resolution"
) == 0 ||
strcmp
(c->errstr,
"hostname nor servname provided, or not known"
) == 0 ||
strcmp
(c->errstr,
"no address associated with name"
) == 0 ||
strcmp
(c->errstr,
"No such host is known. "
) == 0));
redisFree(c);
}
else
{
printf
(
"Skipping NXDOMAIN test. Found evil ISP!\n"
);
freeaddrinfo(ai_tmp);
}
#ifndef _WIN32
redisOptions opt = {0};
struct
timeval tv;
test(
"Returns error when the port is not open: "
);
c = redisConnect((
char
*)
"localhost"
, 1);
test_cond(c->err == REDIS_ERR_IO &&
strcmp
(c->errstr,
"Connection refused"
) == 0);
redisFree(c);
test(
"We don't clobber connection exception with setsockopt error: "
);
tv = (
struct
timeval){.tv_sec = 0, .tv_usec = 500000};
opt.command_timeout = opt.connect_timeout = &tv;
REDIS_OPTIONS_SET_TCP(&opt,
"localhost"
, 10337);
c = redisConnectWithOptions(&opt);
test_cond(c->err == REDIS_ERR_IO &&
strcmp
(c->errstr,
"Connection refused"
) == 0);
redisFree(c);
test(
"Returns error when the unix_sock socket path doesn't accept connections: "
);
c = redisConnectUnix((
char
*)
"/tmp/idontexist.sock"
);
test_cond(c->err == REDIS_ERR_IO);
redisFree(c);
#endif
}
void
push_handler(
void
*privdata,
void
*r) {
struct
pushCounters *pcounts = privdata;
redisReply *reply = r, *payload;
assert
(reply && reply->type == REDIS_REPLY_PUSH && reply->elements == 2);
payload = reply->element[1];
if
(payload->type == REDIS_REPLY_ARRAY) {
payload = payload->element[0];
}
if
(payload->type == REDIS_REPLY_STRING) {
pcounts->str++;
}
else
if
(payload->type == REDIS_REPLY_NIL) {
pcounts->nil++;
}
freeReplyObject(reply);
}
void
push_handler_async(redisAsyncContext *ac,
void
*reply) {
(
void
)ac;
(
void
)reply;
}
static
void
test_resp3_push_handler(redisContext *c) {
struct
pushCounters pc = {0};
redisPushFn *old = NULL;
redisReply *reply;
void
*privdata;
send_hello(c, 3);
send_client_tracking(c,
"ON"
);
privdata = c->privdata;
c->privdata = &pc;
reply = redisCommand(c,
"GET key:0"
);
assert
(reply != NULL);
freeReplyObject(reply);
test(
"RESP3 PUSH messages are handled out of band by default: "
);
reply = redisCommand(c,
"SET key:0 val:0"
);
test_cond(reply != NULL && reply->type == REDIS_REPLY_STATUS);
freeReplyObject(reply);
assert
((reply = redisCommand(c,
"GET key:0"
)) != NULL);
freeReplyObject(reply);
old = redisSetPushCallback(c, push_handler);
test(
"We can set a custom RESP3 PUSH handler: "
);
reply = redisCommand(c,
"SET key:0 val:0"
);
assert
(reply != NULL);
freeReplyObject(reply);
reply = redisCommand(c,
"PING"
);
test_cond(reply != NULL && reply->type == REDIS_REPLY_STATUS && pc.str == 1);
freeReplyObject(reply);
test(
"We properly handle a NIL invalidation payload: "
);
reply = redisCommand(c,
"FLUSHDB"
);
assert
(reply != NULL);
freeReplyObject(reply);
reply = redisCommand(c,
"PING"
);
test_cond(reply != NULL && reply->type == REDIS_REPLY_STATUS && pc.nil == 1);
freeReplyObject(reply);
test(
"With no handler, PUSH replies come in-band: "
);
redisSetPushCallback(c, NULL);
assert
((reply = redisCommand(c,
"GET key:0"
)) != NULL);
freeReplyObject(reply);
assert
((reply = redisCommand(c,
"SET key:0 invalid"
)) != NULL);
if
(reply->type == REDIS_REPLY_STATUS) {
freeReplyObject(reply);
reply = redisCommand(c,
"PING"
);
}
test_cond(reply->type == REDIS_REPLY_PUSH);
freeReplyObject(reply);
test(
"With no PUSH handler, no replies are lost: "
);
assert
(redisGetReply(c, (
void
**)&reply) == REDIS_OK);
test_cond(reply != NULL && reply->type == REDIS_REPLY_STATUS);
freeReplyObject(reply);
assert
(old != NULL);
redisSetPushCallback(c, old);
c->privdata = privdata;
send_client_tracking(c,
"OFF"
);
send_hello(c, 2);
}
redisOptions get_redis_tcp_options(
struct
config config) {
redisOptions options = {0};
REDIS_OPTIONS_SET_TCP(&options, config.tcp.host, config.tcp.port);
return
options;
}
static
void
test_resp3_push_options(
struct
config config) {
redisAsyncContext *ac;
redisContext *c;
redisOptions options;
test(
"We set a default RESP3 handler for redisContext: "
);
options = get_redis_tcp_options(config);
assert
((c = redisConnectWithOptions(&options)) != NULL);
test_cond(c->push_cb != NULL);
redisFree(c);
test(
"We don't set a default RESP3 push handler for redisAsyncContext: "
);
options = get_redis_tcp_options(config);
assert
((ac = redisAsyncConnectWithOptions(&options)) != NULL);
test_cond(ac->c.push_cb == NULL);
redisAsyncFree(ac);
test(
"Our REDIS_OPT_NO_PUSH_AUTOFREE flag works: "
);
options = get_redis_tcp_options(config);
options.options |= REDIS_OPT_NO_PUSH_AUTOFREE;
assert
((c = redisConnectWithOptions(&options)) != NULL);
test_cond(c->push_cb == NULL);
redisFree(c);
test(
"We can use redisOptions to set a custom PUSH handler for redisContext: "
);
options = get_redis_tcp_options(config);
options.push_cb = push_handler;
assert
((c = redisConnectWithOptions(&options)) != NULL);
test_cond(c->push_cb == push_handler);
redisFree(c);
test(
"We can use redisOptions to set a custom PUSH handler for redisAsyncContext: "
);
options = get_redis_tcp_options(config);
options.async_push_cb = push_handler_async;
assert
((ac = redisAsyncConnectWithOptions(&options)) != NULL);
test_cond(ac->push_cb == push_handler_async);
redisAsyncFree(ac);
}
void
free_privdata(
void
*privdata) {
struct
privdata *data = privdata;
data->dtor_counter++;
}
static
void
test_privdata_hooks(
struct
config config) {
struct
privdata data = {0};
redisOptions options;
redisContext *c;
test(
"We can use redisOptions to set privdata: "
);
options = get_redis_tcp_options(config);
REDIS_OPTIONS_SET_PRIVDATA(&options, &data, free_privdata);
assert
((c = redisConnectWithOptions(&options)) != NULL);
test_cond(c->privdata == &data);
test(
"Our privdata destructor fires when we free the context: "
);
redisFree(c);
test_cond(data.dtor_counter == 1);
}
static
void
test_blocking_connection(
struct
config config) {
redisContext *c;
redisReply *reply;
int
major;
c = do_connect(config);
test(
"Is able to deliver commands: "
);
reply = redisCommand(c,
"PING"
);
test_cond(reply->type == REDIS_REPLY_STATUS &&
strcasecmp(reply->str,
"pong"
) == 0)
freeReplyObject(reply);
test(
"Is a able to send commands verbatim: "
);
reply = redisCommand(c,
"SET foo bar"
);
test_cond (reply->type == REDIS_REPLY_STATUS &&
strcasecmp(reply->str,
"ok"
) == 0)
freeReplyObject(reply);
test(
"%%s String interpolation works: "
);
reply = redisCommand(c,
"SET %s %s"
,
"foo"
,
"hello world"
);
freeReplyObject(reply);
reply = redisCommand(c,
"GET foo"
);
test_cond(reply->type == REDIS_REPLY_STRING &&
strcmp
(reply->str,
"hello world"
) == 0);
freeReplyObject(reply);
test(
"%%b String interpolation works: "
);
reply = redisCommand(c,
"SET %b %b"
,
"foo"
,(
size_t
)3,
"hello\x00world"
,(
size_t
)11);
freeReplyObject(reply);
reply = redisCommand(c,
"GET foo"
);
test_cond(reply->type == REDIS_REPLY_STRING &&
memcmp
(reply->str,
"hello\x00world"
,11) == 0)
test(
"Binary reply length is correct: "
);
test_cond(reply->len == 11)
freeReplyObject(reply);
test(
"Can parse nil replies: "
);
reply = redisCommand(c,
"GET nokey"
);
test_cond(reply->type == REDIS_REPLY_NIL)
freeReplyObject(reply);
test(
"Can parse integer replies: "
);
reply = redisCommand(c,
"INCR mycounter"
);
test_cond(reply->type == REDIS_REPLY_INTEGER && reply->integer == 1)
freeReplyObject(reply);
test(
"Can parse multi bulk replies: "
);
freeReplyObject(redisCommand(c,
"LPUSH mylist foo"
));
freeReplyObject(redisCommand(c,
"LPUSH mylist bar"
));
reply = redisCommand(c,
"LRANGE mylist 0 -1"
);
test_cond(reply->type == REDIS_REPLY_ARRAY &&
reply->elements == 2 &&
!
memcmp
(reply->element[0]->str,
"bar"
,3) &&
!
memcmp
(reply->element[1]->str,
"foo"
,3))
freeReplyObject(reply);
test(
"Can handle nested multi bulk replies: "
);
freeReplyObject(redisCommand(c,
"MULTI"
));
freeReplyObject(redisCommand(c,
"LRANGE mylist 0 -1"
));
freeReplyObject(redisCommand(c,
"PING"
));
reply = (redisCommand(c,
"EXEC"
));
test_cond(reply->type == REDIS_REPLY_ARRAY &&
reply->elements == 2 &&
reply->element[0]->type == REDIS_REPLY_ARRAY &&
reply->element[0]->elements == 2 &&
!
memcmp
(reply->element[0]->element[0]->str,
"bar"
,3) &&
!
memcmp
(reply->element[0]->element[1]->str,
"foo"
,3) &&
reply->element[1]->type == REDIS_REPLY_STATUS &&
strcasecmp(reply->element[1]->str,
"pong"
) == 0);
freeReplyObject(reply);
test(
"Send command by passing argc/argv: "
);
const
char
*argv[3] = {
"SET"
,
"foo"
,
"bar"
};
size_t
argvlen[3] = {3, 3, 3};
reply = redisCommandArgv(c,3,argv,argvlen);
test_cond(reply->type == REDIS_REPLY_STATUS);
freeReplyObject(reply);
test(
"Can pass NULL to redisGetReply: "
);
assert
(redisAppendCommand(c,
"PING"
) == REDIS_OK);
test_cond(redisGetReply(c, NULL) == REDIS_OK);
get_redis_version(c, &major, NULL);
if
(major >= 6) test_resp3_push_handler(c);
test_resp3_push_options(config);
test_privdata_hooks(config);
disconnect(c, 0);
}
static
int
detect_debug_sleep(redisContext *c) {
int
detected;
redisReply *reply = redisCommand(c,
"DEBUG SLEEP 0\r\n"
);
if
(reply == NULL || c->err) {
const
char
*cause = c->err ? c->errstr :
"(none)"
;
fprintf
(stderr,
"Error testing for DEBUG SLEEP (Redis error: %s), exiting\n"
, cause);
exit
(-1);
}
detected = reply->type == REDIS_REPLY_STATUS;
freeReplyObject(reply);
return
detected;
}
static
void
test_blocking_connection_timeouts(
struct
config config) {
redisContext *c;
redisReply *reply;
ssize_t s;
const
char
*sleep_cmd =
"DEBUG SLEEP 3\r\n"
;
struct
timeval tv;
c = do_connect(config);
test(
"Successfully completes a command when the timeout is not exceeded: "
);
reply = redisCommand(c,
"SET foo fast"
);
freeReplyObject(reply);
tv.tv_sec = 0;
tv.tv_usec = 10000;
redisSetTimeout(c, tv);
reply = redisCommand(c,
"GET foo"
);
test_cond(reply != NULL && reply->type == REDIS_REPLY_STRING &&
memcmp
(reply->str,
"fast"
, 4) == 0);
freeReplyObject(reply);
disconnect(c, 0);
c = do_connect(config);
test(
"Does not return a reply when the command times out: "
);
if
(detect_debug_sleep(c)) {
redisAppendFormattedCommand(c, sleep_cmd,
strlen
(sleep_cmd));
s = c->funcs->write(c);
assert
(s == (ssize_t)sdslen(c->obuf));
sdsfree(c->obuf);
c->obuf = sdsempty();
tv.tv_sec = 0;
tv.tv_usec = 10000;
redisSetTimeout(c, tv);
reply = redisCommand(c,
"GET foo"
);
#ifndef _WIN32
test_cond(s > 0 && reply == NULL && c->err == REDIS_ERR_IO &&
strcmp
(c->errstr,
"Resource temporarily unavailable"
) == 0);
#else
test_cond(s > 0 && reply == NULL && c->err == REDIS_ERR_TIMEOUT &&
strcmp
(c->errstr,
"recv timeout"
) == 0);
#endif
freeReplyObject(reply);
millisleep(3000);
}
else
{
test_skipped();
}
test(
"Reconnect properly reconnects after a timeout: "
);
do_reconnect(c, config);
reply = redisCommand(c,
"PING"
);
test_cond(reply != NULL && reply->type == REDIS_REPLY_STATUS &&
strcmp
(reply->str,
"PONG"
) == 0);
freeReplyObject(reply);
test(
"Reconnect properly uses owned parameters: "
);
config.tcp.host =
"foo"
;
config.unix_sock.path =
"foo"
;
do_reconnect(c, config);
reply = redisCommand(c,
"PING"
);
test_cond(reply != NULL && reply->type == REDIS_REPLY_STATUS &&
strcmp
(reply->str,
"PONG"
) == 0);
freeReplyObject(reply);
disconnect(c, 0);
}
static
void
test_blocking_io_errors(
struct
config config) {
redisContext *c;
redisReply *reply;
void
*_reply;
int
major, minor;
c = do_connect(config);
get_redis_version(c, &major, &minor);
test(
"Returns I/O error when the connection is lost: "
);
reply = redisCommand(c,
"QUIT"
);
if
(major > 2 || (major == 2 && minor > 0)) {
test_cond(strcasecmp(reply->str,
"OK"
) == 0 &&
redisGetReply(c,&_reply) == REDIS_ERR);
freeReplyObject(reply);
}
else
{
test_cond(reply == NULL);
}
#ifndef _WIN32
assert
(c->err == REDIS_ERR_EOF &&
strcmp
(c->errstr,
"Server closed the connection"
) == 0);
#endif
redisFree(c);
c = do_connect(config);
test(
"Returns I/O error on socket timeout: "
);
struct
timeval tv = { 0, 1000 };
assert
(redisSetTimeout(c,tv) == REDIS_OK);
int
respcode = redisGetReply(c,&_reply);
#ifndef _WIN32
test_cond(respcode == REDIS_ERR && c->err == REDIS_ERR_IO &&
errno
== EAGAIN);
#else
test_cond(respcode == REDIS_ERR && c->err == REDIS_ERR_TIMEOUT);
#endif
redisFree(c);
}
static
void
test_invalid_timeout_errors(
struct
config config) {
redisContext *c;
test(
"Set error when an invalid timeout usec value is used during connect: "
);
config.connect_timeout.tv_sec = 0;
config.connect_timeout.tv_usec = 10000001;
if
(config.type == CONN_TCP || config.type == CONN_SSL) {
c = redisConnectWithTimeout(config.tcp.host, config.tcp.port, config.connect_timeout);
}
else
if
(config.type == CONN_UNIX) {
c = redisConnectUnixWithTimeout(config.unix_sock.path, config.connect_timeout);
}
else
{
assert
(NULL);
}
test_cond(c->err == REDIS_ERR_IO &&
strcmp
(c->errstr,
"Invalid timeout specified"
) == 0);
redisFree(c);
test(
"Set error when an invalid timeout sec value is used during connect: "
);
config.connect_timeout.tv_sec = (((LONG_MAX) - 999) / 1000) + 1;
config.connect_timeout.tv_usec = 0;
if
(config.type == CONN_TCP || config.type == CONN_SSL) {
c = redisConnectWithTimeout(config.tcp.host, config.tcp.port, config.connect_timeout);
}
else
if
(config.type == CONN_UNIX) {
c = redisConnectUnixWithTimeout(config.unix_sock.path, config.connect_timeout);
}
else
{
assert
(NULL);
}
test_cond(c->err == REDIS_ERR_IO &&
strcmp
(c->errstr,
"Invalid timeout specified"
) == 0);
redisFree(c);
}
void
*hi_malloc_safe(
size_t
size) {
void
*ptr = hi_malloc(size);
if
(ptr == NULL) {
fprintf
(stderr,
"Error: Out of memory\n"
);
exit
(-1);
}
return
ptr;
}
static
void
test_throughput(
struct
config config) {
redisContext *c = do_connect(config);
redisReply **replies;
int
i, num;
long
long
t1, t2;
test(
"Throughput:\n"
);
for
(i = 0; i < 500; i++)
freeReplyObject(redisCommand(c,
"LPUSH mylist foo"
));
num = 1000;
replies = hi_malloc_safe(
sizeof
(redisReply*)*num);
t1 = usec();
for
(i = 0; i < num; i++) {
replies[i] = redisCommand(c,
"PING"
);
assert
(replies[i] != NULL && replies[i]->type == REDIS_REPLY_STATUS);
}
t2 = usec();
for
(i = 0; i < num; i++) freeReplyObject(replies[i]);
hi_free(replies);
printf
(
"\t(%dx PING: %.3fs)\n"
, num, (t2-t1)/1000000.0);
replies = hi_malloc_safe(
sizeof
(redisReply*)*num);
t1 = usec();
for
(i = 0; i < num; i++) {
replies[i] = redisCommand(c,
"LRANGE mylist 0 499"
);
assert
(replies[i] != NULL && replies[i]->type == REDIS_REPLY_ARRAY);
assert
(replies[i] != NULL && replies[i]->elements == 500);
}
t2 = usec();
for
(i = 0; i < num; i++) freeReplyObject(replies[i]);
hi_free(replies);
printf
(
"\t(%dx LRANGE with 500 elements: %.3fs)\n"
, num, (t2-t1)/1000000.0);
replies = hi_malloc_safe(
sizeof
(redisReply*)*num);
t1 = usec();
for
(i = 0; i < num; i++) {
replies[i] = redisCommand(c,
"INCRBY incrkey %d"
, 1000000);
assert
(replies[i] != NULL && replies[i]->type == REDIS_REPLY_INTEGER);
}
t2 = usec();
for
(i = 0; i < num; i++) freeReplyObject(replies[i]);
hi_free(replies);
printf
(
"\t(%dx INCRBY: %.3fs)\n"
, num, (t2-t1)/1000000.0);
num = 10000;
replies = hi_malloc_safe(
sizeof
(redisReply*)*num);
for
(i = 0; i < num; i++)
redisAppendCommand(c,
"PING"
);
t1 = usec();
for
(i = 0; i < num; i++) {
assert
(redisGetReply(c, (
void
*)&replies[i]) == REDIS_OK);
assert
(replies[i] != NULL && replies[i]->type == REDIS_REPLY_STATUS);
}
t2 = usec();
for
(i = 0; i < num; i++) freeReplyObject(replies[i]);
hi_free(replies);
printf
(
"\t(%dx PING (pipelined): %.3fs)\n"
, num, (t2-t1)/1000000.0);
replies = hi_malloc_safe(
sizeof
(redisReply*)*num);
for
(i = 0; i < num; i++)
redisAppendCommand(c,
"LRANGE mylist 0 499"
);
t1 = usec();
for
(i = 0; i < num; i++) {
assert
(redisGetReply(c, (
void
*)&replies[i]) == REDIS_OK);
assert
(replies[i] != NULL && replies[i]->type == REDIS_REPLY_ARRAY);
assert
(replies[i] != NULL && replies[i]->elements == 500);
}
t2 = usec();
for
(i = 0; i < num; i++) freeReplyObject(replies[i]);
hi_free(replies);
printf
(
"\t(%dx LRANGE with 500 elements (pipelined): %.3fs)\n"
, num, (t2-t1)/1000000.0);
replies = hi_malloc_safe(
sizeof
(redisReply*)*num);
for
(i = 0; i < num; i++)
redisAppendCommand(c,
"INCRBY incrkey %d"
, 1000000);
t1 = usec();
for
(i = 0; i < num; i++) {
assert
(redisGetReply(c, (
void
*)&replies[i]) == REDIS_OK);
assert
(replies[i] != NULL && replies[i]->type == REDIS_REPLY_INTEGER);
}
t2 = usec();
for
(i = 0; i < num; i++) freeReplyObject(replies[i]);
hi_free(replies);
printf
(
"\t(%dx INCRBY (pipelined): %.3fs)\n"
, num, (t2-t1)/1000000.0);
disconnect(c, 0);
}
#ifdef HIREDIS_TEST_ASYNC
#pragma GCC diagnostic ignored "-Woverlength-strings" /* required on gcc 4.8.x due to assert statements */
struct
event_base *base;
typedef
struct
TestState {
redisOptions *options;
int
checkpoint;
int
resp3;
int
disconnect;
} TestState;
void
async_disconnect(redisAsyncContext *ac) {
redisAsyncDisconnect(ac);
event_base_loopbreak(base);
}
void
timeout_cb(
int
fd,
short
event,
void
*arg) {
(
void
) fd; (
void
) event; (
void
) arg;
printf
(
"Timeout in async testing!\n"
);
exit
(1);
}
void
unexpected_cb(redisAsyncContext *ac,
void
*r,
void
*privdata) {
(
void
) ac; (
void
) r;
printf
(
"Unexpected call: %s\n"
,(
char
*)privdata);
exit
(1);
}
void
publish_msg(redisOptions *options,
const
char
* channel,
const
char
* msg) {
redisContext *c = redisConnectWithOptions(options);
assert
(c != NULL);
redisReply *reply = redisCommand(c,
"PUBLISH %s %s"
,channel,msg);
assert
(reply->type == REDIS_REPLY_INTEGER && reply->integer == 1);
freeReplyObject(reply);
disconnect(c, 0);
}
void
integer_cb(redisAsyncContext *ac,
void
*r,
void
*privdata) {
redisReply *reply = r;
TestState *state = privdata;
assert
(reply != NULL && reply->type == REDIS_REPLY_INTEGER);
state->checkpoint++;
if
(state->disconnect) async_disconnect(ac);
}
void
subscribe_cb(redisAsyncContext *ac,
void
*r,
void
*privdata) {
redisReply *reply = r;
TestState *state = privdata;
assert
(reply != NULL &&
reply->type == (state->resp3 ? REDIS_REPLY_PUSH : REDIS_REPLY_ARRAY) &&
reply->elements == 3);
if
(
strcmp
(reply->element[0]->str,
"subscribe"
) == 0) {
assert
(
strcmp
(reply->element[1]->str,
"mychannel"
) == 0 &&
reply->element[2]->str == NULL);
publish_msg(state->options,
"mychannel"
,
"Hello!"
);
}
else
if
(
strcmp
(reply->element[0]->str,
"message"
) == 0) {
assert
(
strcmp
(reply->element[1]->str,
"mychannel"
) == 0 &&
strcmp
(reply->element[2]->str,
"Hello!"
) == 0);
state->checkpoint++;
redisAsyncCommand(ac,unexpected_cb,
(
void
*)
"unsubscribe should call subscribe_cb()"
,
"unsubscribe"
);
state->disconnect = 1;
redisAsyncCommand(ac,integer_cb,state,
"LPUSH mylist foo"
);
}
else
if
(
strcmp
(reply->element[0]->str,
"unsubscribe"
) == 0) {
assert
(
strcmp
(reply->element[1]->str,
"mychannel"
) == 0 &&
reply->element[2]->str == NULL);
}
else
{
printf
(
"Unexpected pubsub command: %s\n"
, reply->element[0]->str);
exit
(1);
}
}
void
array_cb(redisAsyncContext *ac,
void
*r,
void
*privdata) {
redisReply *reply = r;
TestState *state = privdata;
assert
(reply != NULL && reply->type == REDIS_REPLY_ARRAY);
state->checkpoint++;
if
(state->disconnect) async_disconnect(ac);
}
void
null_cb(redisAsyncContext *ac,
void
*r,
void
*privdata) {
(
void
) ac;
assert
(r == NULL);
TestState *state = privdata;
state->checkpoint++;
}
static
void
test_pubsub_handling(
struct
config config) {
test(
"Subscribe, handle published message and unsubscribe: "
);
base = event_base_new();
struct
event *timeout = evtimer_new(base, timeout_cb, NULL);
assert
(timeout != NULL);
evtimer_assign(timeout,base,timeout_cb,NULL);
struct
timeval timeout_tv = {.tv_sec = 10};
evtimer_add(timeout, &timeout_tv);
redisOptions options = get_redis_tcp_options(config);
redisAsyncContext *ac = redisAsyncConnectWithOptions(&options);
assert
(ac != NULL && ac->err == 0);
redisLibeventAttach(ac,base);
TestState state = {.options = &options};
redisAsyncCommand(ac,subscribe_cb,&state,
"subscribe mychannel"
);
redisAsyncCommand(ac,array_cb,&state,
"PING"
);
test_cond(event_base_dispatch(base) == 0);
event_free(timeout);
event_base_free(base);
assert
(state.checkpoint == 3);
}
void
unexpected_push_cb(redisAsyncContext *ac,
void
*r) {
(
void
) ac; (
void
) r;
printf
(
"Unexpected call to the PUSH callback!\n"
);
exit
(1);
}
static
void
test_pubsub_handling_resp3(
struct
config config) {
test(
"Subscribe, handle published message and unsubscribe using RESP3: "
);
base = event_base_new();
struct
event *timeout = evtimer_new(base, timeout_cb, NULL);
assert
(timeout != NULL);
evtimer_assign(timeout,base,timeout_cb,NULL);
struct
timeval timeout_tv = {.tv_sec = 10};
evtimer_add(timeout, &timeout_tv);
redisOptions options = get_redis_tcp_options(config);
redisAsyncContext *ac = redisAsyncConnectWithOptions(&options);
assert
(ac != NULL && ac->err == 0);
redisLibeventAttach(ac,base);
redisAsyncSetPushCallback(ac, unexpected_push_cb);
redisAsyncCommand(ac,NULL,NULL,
"HELLO 3"
);
TestState state = {.options = &options, .resp3 = 1};
redisAsyncCommand(ac,subscribe_cb,&state,
"subscribe mychannel"
);
redisAsyncCommand(ac,integer_cb,&state,
"LPUSH mylist foo"
);
redisAsyncCommand(ac,integer_cb,&state,
"LPUSH mylist foo"
);
redisAsyncCommand(ac,integer_cb,&state,
"LPUSH mylist foo"
);
redisAsyncCommand(ac,array_cb,&state,
"LRANGE mylist 0 2"
);
test_cond(event_base_dispatch(base) == 0);
event_free(timeout);
event_base_free(base);
assert
(state.checkpoint == 6);
}
void
subscribe_with_timeout_cb(redisAsyncContext *ac,
void
*r,
void
*privdata) {
redisReply *reply = r;
TestState *state = privdata;
if
(reply == NULL) {
state->checkpoint++;
event_base_loopbreak(base);
return
;
}
assert
(reply->type == (state->resp3 ? REDIS_REPLY_PUSH : REDIS_REPLY_ARRAY) &&
reply->elements == 3);
if
(
strcmp
(reply->element[0]->str,
"subscribe"
) == 0) {
assert
(
strcmp
(reply->element[1]->str,
"mychannel"
) == 0 &&
reply->element[2]->str == NULL);
publish_msg(state->options,
"mychannel"
,
"Hello!"
);
state->checkpoint++;
}
else
if
(
strcmp
(reply->element[0]->str,
"message"
) == 0) {
assert
(
strcmp
(reply->element[1]->str,
"mychannel"
) == 0 &&
strcmp
(reply->element[2]->str,
"Hello!"
) == 0);
state->checkpoint++;
redisAsyncCommand(ac,null_cb,state,
"DEBUG SLEEP 3"
);
redisAsyncCommand(ac,null_cb,state,
"LPUSH mylist foo"
);
}
else
{
printf
(
"Unexpected pubsub command: %s\n"
, reply->element[0]->str);
exit
(1);
}
}
static
void
test_command_timeout_during_pubsub(
struct
config config) {
test(
"Command timeout during Pub/Sub: "
);
base = event_base_new();
struct
event *timeout = evtimer_new(base,timeout_cb,NULL);
assert
(timeout != NULL);
evtimer_assign(timeout,base,timeout_cb,NULL);
struct
timeval timeout_tv = {.tv_sec = 10};
evtimer_add(timeout,&timeout_tv);
redisOptions options = get_redis_tcp_options(config);
redisAsyncContext *ac = redisAsyncConnectWithOptions(&options);
assert
(ac != NULL && ac->err == 0);
redisLibeventAttach(ac,base);
struct
timeval command_timeout = {.tv_sec = 2};
redisAsyncSetTimeout(ac,command_timeout);
redisAsyncSetPushCallback(ac,unexpected_push_cb);
redisAsyncCommand(ac,NULL,NULL,
"HELLO 3"
);
TestState state = {.options = &options, .resp3 = 1};
redisAsyncCommand(ac,subscribe_with_timeout_cb,&state,
"subscribe mychannel"
);
assert
(event_base_dispatch(base) == 0);
event_free(timeout);
event_base_free(base);
test_cond(state.checkpoint == 5);
}
void
subscribe_channel_a_cb(redisAsyncContext *ac,
void
*r,
void
*privdata) {
redisReply *reply = r;
TestState *state = privdata;
assert
(reply != NULL && reply->type == REDIS_REPLY_ARRAY &&
reply->elements == 3);
if
(
strcmp
(reply->element[0]->str,
"subscribe"
) == 0) {
assert
(
strcmp
(reply->element[1]->str,
"A"
) == 0);
publish_msg(state->options,
"A"
,
"Hello!"
);
state->checkpoint++;
}
else
if
(
strcmp
(reply->element[0]->str,
"message"
) == 0) {
assert
(
strcmp
(reply->element[1]->str,
"A"
) == 0 &&
strcmp
(reply->element[2]->str,
"Hello!"
) == 0);
state->checkpoint++;
redisAsyncCommand(ac,unexpected_cb,
(
void
*)
"unsubscribe should not call unexpected_cb()"
,
"unsubscribe B X A A Z"
);
redisAsyncCommand(ac,unexpected_cb,
(
void
*)
"punsubscribe should not call unexpected_cb()"
,
"punsubscribe"
);
state->disconnect = 1;
redisAsyncCommand(ac,integer_cb,state,
"LPUSH mylist foo"
);
}
else
if
(
strcmp
(reply->element[0]->str,
"unsubscribe"
) == 0) {
assert
(
strcmp
(reply->element[1]->str,
"A"
) == 0);
state->checkpoint++;
}
else
{
printf
(
"Unexpected pubsub command: %s\n"
, reply->element[0]->str);
exit
(1);
}
}
void
subscribe_channel_b_cb(redisAsyncContext *ac,
void
*r,
void
*privdata) {
redisReply *reply = r;
TestState *state = privdata;
(
void
)ac;
assert
(reply != NULL && reply->type == REDIS_REPLY_ARRAY &&
reply->elements == 3);
if
(
strcmp
(reply->element[0]->str,
"subscribe"
) == 0) {
assert
(
strcmp
(reply->element[1]->str,
"B"
) == 0);
state->checkpoint++;
}
else
if
(
strcmp
(reply->element[0]->str,
"unsubscribe"
) == 0) {
assert
(
strcmp
(reply->element[1]->str,
"B"
) == 0);
state->checkpoint++;
}
else
{
printf
(
"Unexpected pubsub command: %s\n"
, reply->element[0]->str);
exit
(1);
}
}
static
void
test_pubsub_multiple_channels(
struct
config config) {
test(
"Subscribe to multiple channels: "
);
base = event_base_new();
struct
event *timeout = evtimer_new(base,timeout_cb,NULL);
assert
(timeout != NULL);
evtimer_assign(timeout,base,timeout_cb,NULL);
struct
timeval timeout_tv = {.tv_sec = 10};
evtimer_add(timeout,&timeout_tv);
redisOptions options = get_redis_tcp_options(config);
redisAsyncContext *ac = redisAsyncConnectWithOptions(&options);
assert
(ac != NULL && ac->err == 0);
redisLibeventAttach(ac,base);
redisAsyncSetPushCallback(ac,unexpected_push_cb);
TestState state = {.options = &options};
redisAsyncCommand(ac,subscribe_channel_a_cb,&state,
"subscribe A"
);
redisAsyncCommand(ac,subscribe_channel_b_cb,&state,
"subscribe B"
);
assert
(event_base_dispatch(base) == 0);
event_free(timeout);
event_base_free(base);
test_cond(state.checkpoint == 6);
}
void
monitor_cb(redisAsyncContext *ac,
void
*r,
void
*privdata) {
redisReply *reply = r;
TestState *state = privdata;
if
(reply == NULL) {
event_base_loopbreak(base);
return
;
}
assert
(reply != NULL && reply->type == REDIS_REPLY_STATUS);
state->checkpoint++;
if
(state->checkpoint == 1) {
redisContext *c = redisConnectWithOptions(state->options);
assert
(c != NULL);
redisReply *reply = redisCommand(c,
"SET first 1"
);
assert
(reply->type == REDIS_REPLY_STATUS);
freeReplyObject(reply);
redisFree(c);
}
else
if
(state->checkpoint == 2) {
assert
(
strstr
(reply->str,
"first"
) != NULL);
redisContext *c = redisConnectWithOptions(state->options);
assert
(c != NULL);
redisReply *reply = redisCommand(c,
"SET second 2"
);
assert
(reply->type == REDIS_REPLY_STATUS);
freeReplyObject(reply);
redisFree(c);
}
else
if
(state->checkpoint == 3) {
assert
(
strstr
(reply->str,
"second"
) != NULL);
redisAsyncCommand(ac,NULL,NULL,
"QUIT"
);
}
}
static
void
test_monitor(
struct
config config) {
test(
"Enable monitoring: "
);
base = event_base_new();
struct
event *timeout = evtimer_new(base, timeout_cb, NULL);
assert
(timeout != NULL);
evtimer_assign(timeout,base,timeout_cb,NULL);
struct
timeval timeout_tv = {.tv_sec = 10};
evtimer_add(timeout, &timeout_tv);
redisOptions options = get_redis_tcp_options(config);
redisAsyncContext *ac = redisAsyncConnectWithOptions(&options);
assert
(ac != NULL && ac->err == 0);
redisLibeventAttach(ac,base);
redisAsyncSetPushCallback(ac,unexpected_push_cb);
TestState state = {.options = &options};
redisAsyncCommand(ac,monitor_cb,&state,
"monitor"
);
test_cond(event_base_dispatch(base) == 0);
event_free(timeout);
event_base_free(base);
assert
(state.checkpoint == 3);
}
#endif /* HIREDIS_TEST_ASYNC */
typedef
enum
astest_no
{
ASTEST_CONNECT=0,
ASTEST_CONN_TIMEOUT,
ASTEST_PINGPONG,
ASTEST_PINGPONG_TIMEOUT,
ASTEST_ISSUE_931,
ASTEST_ISSUE_931_PING
}astest_no;
struct
_astest {
redisAsyncContext *ac;
astest_no testno;
int
counter;
int
connects;
int
connect_status;
int
disconnects;
int
pongs;
int
disconnect_status;
int
connected;
int
err;
char
errstr[256];
};
static
struct
_astest astest;
static
void
asCleanup(
void
* data)
{
struct
_astest *t = (
struct
_astest *)data;
t->ac = NULL;
}
static
void
commandCallback(
struct
redisAsyncContext *ac,
void
* _reply,
void
* _privdata);
static
void
connectCallback(redisAsyncContext *c,
int
status) {
struct
_astest *t = (
struct
_astest *)c->data;
assert
(t == &astest);
assert
(t->connects == 0);
t->err = c->err;
strcpy
(t->errstr, c->errstr);
t->connects++;
t->connect_status = status;
t->connected = status == REDIS_OK ? 1 : -1;
if
(t->testno == ASTEST_ISSUE_931) {
redisAsyncDisconnect(c);
}
else
if
(t->testno == ASTEST_ISSUE_931_PING)
{
redisAsyncCommand(c, commandCallback, NULL,
"PING"
);
}
}
static
void
disconnectCallback(
const
redisAsyncContext *c,
int
status) {
assert
(c->data == (
void
*)&astest);
assert
(astest.disconnects == 0);
astest.err = c->err;
strcpy
(astest.errstr, c->errstr);
astest.disconnects++;
astest.disconnect_status = status;
astest.connected = 0;
}
static
void
commandCallback(
struct
redisAsyncContext *ac,
void
* _reply,
void
* _privdata)
{
redisReply *reply = (redisReply*)_reply;
struct
_astest *t = (
struct
_astest *)ac->data;
assert
(t == &astest);
(
void
)_privdata;
t->err = ac->err;
strcpy
(t->errstr, ac->errstr);
t->counter++;
if
(t->testno == ASTEST_PINGPONG ||t->testno == ASTEST_ISSUE_931_PING)
{
assert
(reply != NULL && reply->type == REDIS_REPLY_STATUS &&
strcmp
(reply->str,
"PONG"
) == 0);
t->pongs++;
redisAsyncFree(ac);
}
if
(t->testno == ASTEST_PINGPONG_TIMEOUT)
{
assert
(reply != NULL && reply->type == REDIS_REPLY_STATUS &&
strcmp
(reply->str,
"PONG"
) == 0);
t->pongs++;
if
(t->counter == 1) {
int
status = redisAsyncCommand(ac, commandCallback, NULL,
"PING"
);
assert
(status == REDIS_OK);
}
else
{
redisAsyncFree(ac);
}
}
}
static
redisAsyncContext *do_aconnect(
struct
config config, astest_no testno)
{
redisOptions options = {0};
memset
(&astest, 0,
sizeof
(astest));
astest.testno = testno;
astest.connect_status = astest.disconnect_status = -2;
if
(config.type == CONN_TCP) {
options.type = REDIS_CONN_TCP;
options.connect_timeout = &config.connect_timeout;
REDIS_OPTIONS_SET_TCP(&options, config.tcp.host, config.tcp.port);
}
else
if
(config.type == CONN_SSL) {
options.type = REDIS_CONN_TCP;
options.connect_timeout = &config.connect_timeout;
REDIS_OPTIONS_SET_TCP(&options, config.ssl.host, config.ssl.port);
}
else
if
(config.type == CONN_UNIX) {
options.type = REDIS_CONN_UNIX;
options.endpoint.unix_socket = config.unix_sock.path;
}
else
if
(config.type == CONN_FD) {
options.type = REDIS_CONN_USERFD;
redisContext *dummy_ctx = redisConnectUnix(config.unix_sock.path);
if
(dummy_ctx) {
redisFD fd = disconnect(dummy_ctx, 1);
printf
(
"Connecting to inherited fd %d\n"
, (
int
)fd);
options.endpoint.fd = fd;
}
}
redisAsyncContext *c = redisAsyncConnectWithOptions(&options);
assert
(c);
astest.ac = c;
c->data = &astest;
c->dataCleanup = asCleanup;
redisPollAttach(c);
redisAsyncSetConnectCallbackNC(c, connectCallback);
redisAsyncSetDisconnectCallback(c, disconnectCallback);
return
c;
}
static
void
as_printerr(
void
) {
printf
(
"Async err %d : %s\n"
, astest.err, astest.errstr);
}
#define ASASSERT(e) do { \
if
(!(e)) \
as_printerr(); \
assert
(e); \
}
while
(0);
static
void
test_async_polling(
struct
config config) {
int
status;
redisAsyncContext *c;
struct
config defaultconfig = config;
test(
"Async connect: "
);
c = do_aconnect(config, ASTEST_CONNECT);
assert
(c);
while
(astest.connected == 0)
redisPollTick(c, 0.1);
assert
(astest.connects == 1);
ASASSERT(astest.connect_status == REDIS_OK);
assert
(astest.disconnects == 0);
test_cond(astest.connected == 1);
test(
"Async free after connect: "
);
assert
(astest.ac != NULL);
redisAsyncFree(c);
assert
(astest.disconnects == 1);
assert
(astest.ac == NULL);
test_cond(astest.disconnect_status == REDIS_OK);
if
(config.type == CONN_TCP || config.type == CONN_SSL) {
test(
"Async connect timeout: "
);
config.tcp.host =
"192.168.254.254"
;
config.connect_timeout.tv_usec = 100000;
c = do_aconnect(config, ASTEST_CONN_TIMEOUT);
assert
(c);
assert
(c->err == 0);
while
(astest.connected == 0)
redisPollTick(c, 0.1);
assert
(astest.connected == -1);
assert
(astest.ac == NULL);
test_cond(astest.connect_status == REDIS_ERR);
config = defaultconfig;
}
test(
"Async PING/PONG: "
);
c = do_aconnect(config, ASTEST_PINGPONG);
while
(astest.connected == 0)
redisPollTick(c, 0.1);
status = redisAsyncCommand(c, commandCallback, NULL,
"PING"
);
assert
(status == REDIS_OK);
while
(astest.ac)
redisPollTick(c, 0.1);
test_cond(astest.pongs == 1);
if
(config.type == CONN_TCP || config.type == CONN_SSL) {
test(
"Async PING/PONG after connect timeout: "
);
config.connect_timeout.tv_usec = 10000;
c = do_aconnect(config, ASTEST_PINGPONG_TIMEOUT);
while
(astest.connected == 0)
redisPollTick(c, 0.1);
millisleep(10);
status = redisAsyncCommand(c, commandCallback, NULL,
"PING"
);
assert
(status == REDIS_OK);
while
(astest.ac)
redisPollTick(c, 0.1);
test_cond(astest.pongs == 2);
config = defaultconfig;
}
test(
"Disconnect from onConnected callback (Issue #931): "
);
c = do_aconnect(config, ASTEST_ISSUE_931);
while
(astest.disconnects == 0)
redisPollTick(c, 0.1);
assert
(astest.connected == 0);
assert
(astest.connects == 1);
test_cond(astest.disconnects == 1);
test(
"Ping/Pong from onConnected callback (Issue #931): "
);
c = do_aconnect(config, ASTEST_ISSUE_931_PING);
while
(astest.ac)
redisPollTick(c, 0.1);
assert
(astest.connected == 0);
assert
(astest.connects == 1);
assert
(astest.disconnects == 1);
test_cond(astest.pongs == 1);
}
int
main(
int
argc,
char
**argv) {
struct
config cfg = {
.tcp = {
.host =
"127.0.0.1"
,
.port = 6379
},
.unix_sock = {
.path =
"/tmp/redis.sock"
}
};
int
throughput = 1;
int
test_inherit_fd = 1;
int
skips_as_fails = 0;
int
test_unix_socket;
argv++; argc--;
while
(argc) {
if
(argc >= 2 && !
strcmp
(argv[0],
"-h"
)) {
argv++; argc--;
cfg.tcp.host = argv[0];
}
else
if
(argc >= 2 && !
strcmp
(argv[0],
"-p"
)) {
argv++; argc--;
cfg.tcp.port =
atoi
(argv[0]);
}
else
if
(argc >= 2 && !
strcmp
(argv[0],
"-s"
)) {
argv++; argc--;
cfg.unix_sock.path = argv[0];
}
else
if
(argc >= 1 && !
strcmp
(argv[0],
"--skip-throughput"
)) {
throughput = 0;
}
else
if
(argc >= 1 && !
strcmp
(argv[0],
"--skip-inherit-fd"
)) {
test_inherit_fd = 0;
}
else
if
(argc >= 1 && !
strcmp
(argv[0],
"--skips-as-fails"
)) {
skips_as_fails = 1;
#ifdef HIREDIS_TEST_SSL
}
else
if
(argc >= 2 && !
strcmp
(argv[0],
"--ssl-port"
)) {
argv++; argc--;
cfg.ssl.port =
atoi
(argv[0]);
}
else
if
(argc >= 2 && !
strcmp
(argv[0],
"--ssl-host"
)) {
argv++; argc--;
cfg.ssl.host = argv[0];
}
else
if
(argc >= 2 && !
strcmp
(argv[0],
"--ssl-ca-cert"
)) {
argv++; argc--;
cfg.ssl.ca_cert = argv[0];
}
else
if
(argc >= 2 && !
strcmp
(argv[0],
"--ssl-cert"
)) {
argv++; argc--;
cfg.ssl.cert = argv[0];
}
else
if
(argc >= 2 && !
strcmp
(argv[0],
"--ssl-key"
)) {
argv++; argc--;
cfg.ssl.key = argv[0];
#endif
}
else
{
fprintf
(stderr,
"Invalid argument: %s\n"
, argv[0]);
exit
(1);
}
argv++; argc--;
}
#ifndef _WIN32
signal
(SIGPIPE, SIG_IGN);
test_unix_socket = access(cfg.unix_sock.path, F_OK) == 0;
#else
test_unix_socket = 0;
#endif
test_allocator_injection();
test_format_commands();
test_reply_reader();
test_blocking_connection_errors();
test_free_null();
printf
(
"\nTesting against TCP connection (%s:%d):\n"
, cfg.tcp.host, cfg.tcp.port);
cfg.type = CONN_TCP;
test_blocking_connection(cfg);
test_blocking_connection_timeouts(cfg);
test_blocking_io_errors(cfg);
test_invalid_timeout_errors(cfg);
test_append_formatted_commands(cfg);
test_tcp_options(cfg);
if
(throughput) test_throughput(cfg);
printf
(
"\nTesting against Unix socket connection (%s): "
, cfg.unix_sock.path);
if
(test_unix_socket) {
printf
(
"\n"
);
cfg.type = CONN_UNIX;
test_blocking_connection(cfg);
test_blocking_connection_timeouts(cfg);
test_blocking_io_errors(cfg);
test_invalid_timeout_errors(cfg);
if
(throughput) test_throughput(cfg);
}
else
{
test_skipped();
}
#ifdef HIREDIS_TEST_SSL
if
(cfg.ssl.port && cfg.ssl.host) {
redisInitOpenSSL();
_ssl_ctx = redisCreateSSLContext(cfg.ssl.ca_cert, NULL, cfg.ssl.cert, cfg.ssl.key, NULL, NULL);
assert
(_ssl_ctx != NULL);
printf
(
"\nTesting against SSL connection (%s:%d):\n"
, cfg.ssl.host, cfg.ssl.port);
cfg.type = CONN_SSL;
test_blocking_connection(cfg);
test_blocking_connection_timeouts(cfg);
test_blocking_io_errors(cfg);
test_invalid_timeout_errors(cfg);
test_append_formatted_commands(cfg);
if
(throughput) test_throughput(cfg);
redisFreeSSLContext(_ssl_ctx);
_ssl_ctx = NULL;
}
#endif
#ifdef HIREDIS_TEST_ASYNC
cfg.type = CONN_TCP;
printf
(
"\nTesting asynchronous API against TCP connection (%s:%d):\n"
, cfg.tcp.host, cfg.tcp.port);
cfg.type = CONN_TCP;
int
major;
redisContext *c = do_connect(cfg);
get_redis_version(c, &major, NULL);
disconnect(c, 0);
test_pubsub_handling(cfg);
test_pubsub_multiple_channels(cfg);
test_monitor(cfg);
if
(major >= 6) {
test_pubsub_handling_resp3(cfg);
test_command_timeout_during_pubsub(cfg);
}
#endif /* HIREDIS_TEST_ASYNC */
cfg.type = CONN_TCP;
printf
(
"\nTesting asynchronous API using polling_adapter TCP (%s:%d):\n"
, cfg.tcp.host, cfg.tcp.port);
test_async_polling(cfg);
if
(test_unix_socket) {
cfg.type = CONN_UNIX;
printf
(
"\nTesting asynchronous API using polling_adapter UNIX (%s):\n"
, cfg.unix_sock.path);
test_async_polling(cfg);
}
if
(test_inherit_fd) {
printf
(
"\nTesting against inherited fd (%s): "
, cfg.unix_sock.path);
if
(test_unix_socket) {
printf
(
"\n"
);
cfg.type = CONN_FD;
test_blocking_connection(cfg);
}
else
{
test_skipped();
}
}
if
(fails || (skips_as_fails && skips)) {
printf
(
"*** %d TESTS FAILED ***\n"
, fails);
if
(skips) {
printf
(
"*** %d TESTS SKIPPED ***\n"
, skips);
}
return
1;
}
printf
(
"ALL TESTS PASSED (%d skipped)\n"
, skips);
return
0;
}