|
|
@@ -0,0 +1,160 @@ |
|
|
#include <stdio.h> |
|
|
#include <stdlib.h> |
|
|
#include <string.h> |
|
|
#include <signal.h> |
|
|
#include <time.h> |
|
|
|
|
|
#include "hiredis.h" |
|
|
#include "async.h" |
|
|
#include "adapters/ae.h" |
|
|
#include "sha1.h" |
|
|
|
|
|
struct Singleton { |
|
|
int n; |
|
|
int *port; |
|
|
redisAsyncContext **servers; |
|
|
aeEventLoop *loop; |
|
|
char luasha1[48]; |
|
|
} singleton; |
|
|
|
|
|
const char *LuaCmd = |
|
|
"local res = redis.call('ZRANGEBYSCORE',KEYS[1], 0, ARGV[1], 'LIMIT', 0, 10 ) " |
|
|
"if #res > 0 then " |
|
|
" redis.call( 'ZREMRANGEBYRANK', KEYS[1], 0, #res-1 ) " |
|
|
" return res " |
|
|
"else " |
|
|
" return false " |
|
|
"end "; |
|
|
|
|
|
void sha1hex(char *digest, const char *script, size_t len) { |
|
|
SHA1_CTX ctx; |
|
|
unsigned char hash[20]; |
|
|
char *cset = "0123456789abcdef"; |
|
|
int j; |
|
|
|
|
|
SHA1Init(&ctx); |
|
|
SHA1Update(&ctx,(unsigned char*)script,len); |
|
|
SHA1Final(hash,&ctx); |
|
|
|
|
|
for (j = 0; j < 20; j++) { |
|
|
digest[j*2] = cset[((hash[j]&0xF0)>>4)]; |
|
|
digest[j*2+1] = cset[(hash[j]&0xF)]; |
|
|
} |
|
|
digest[40] = '\0'; |
|
|
} |
|
|
|
|
|
void dequeuedItem(redisAsyncContext *c, void *r, void *privdata) { |
|
|
|
|
|
int i; |
|
|
redisReply *reply = r; |
|
|
if (reply == NULL) return; |
|
|
|
|
|
switch( reply->type ) { |
|
|
case REDIS_REPLY_ARRAY: |
|
|
for ( i=0; i<reply->elements; ++i ) { |
|
|
printf("Expired: %s\n", reply->element[i]->str ); |
|
|
redisAsyncCommand( c, NULL, NULL, "DEL %s", reply->element[i]->str ); |
|
|
} |
|
|
if ( i>0 ) |
|
|
redisAsyncCommand( c, dequeuedItem, NULL, "EVALSHA %s 1 to_be_expired %ld", singleton.luasha1, time(NULL) ); |
|
|
break; |
|
|
|
|
|
case REDIS_REPLY_ERROR: |
|
|
case REDIS_REPLY_STATUS: |
|
|
printf("ERror: %s\n",reply->str ); |
|
|
break; |
|
|
|
|
|
case REDIS_REPLY_NIL: |
|
|
break; |
|
|
default: |
|
|
printf("Error\n"); |
|
|
break; |
|
|
} |
|
|
} |
|
|
|
|
|
int mainLoop( struct aeEventLoop *loop, long long id, void *clientData) { |
|
|
|
|
|
time_t t = time(NULL); |
|
|
|
|
|
for ( int i=0; i<singleton.n; ++i ) { |
|
|
if ( singleton.servers[i] != NULL ) { |
|
|
redisAsyncCommand( singleton.servers[i], dequeuedItem, NULL, "EVALSHA %s 1 to_be_expired %ld", singleton.luasha1, t ); |
|
|
} |
|
|
} |
|
|
fflush(stdout); |
|
|
return 1000+rand()%1000; |
|
|
} |
|
|
|
|
|
void connectCallback(const redisAsyncContext *c, int status) { |
|
|
|
|
|
if ( status != REDIS_OK ) |
|
|
{ |
|
|
printf("Error: %s\n", c->errstr); |
|
|
for (int i=0; i<singleton.n; ++i ) |
|
|
if ( singleton.servers[i] == c ) |
|
|
singleton.servers[i] = NULL; |
|
|
} |
|
|
else |
|
|
printf("connected...\n"); |
|
|
} |
|
|
|
|
|
void disconnectCallback(const redisAsyncContext *c, int status) { |
|
|
if (status != REDIS_OK) { |
|
|
printf("Error: %s\n", c->errstr); |
|
|
} |
|
|
printf("disconnected...\n"); |
|
|
|
|
|
for (int i=0; i<singleton.n; ++i ) |
|
|
if ( singleton.servers[i] == c ) |
|
|
singleton.servers[i] = NULL; |
|
|
} |
|
|
|
|
|
void checkConnections() |
|
|
{ |
|
|
for ( int i=0; i<singleton.n; ++i ) { |
|
|
if ( singleton.servers[i] == NULL ) { |
|
|
printf("Connecting %d...\n",singleton.port[i] ); |
|
|
singleton.servers[i] = redisAsyncConnect("127.0.0.1", singleton.port[i] ); |
|
|
if ( singleton.servers[i]->err ) { perror("redisAsyncConnect"); exit( -1 ); } |
|
|
redisAeAttach( singleton.loop, singleton.servers[i] ); |
|
|
redisAsyncSetConnectCallback( singleton.servers[i],connectCallback); |
|
|
redisAsyncSetDisconnectCallback( singleton.servers[i],disconnectCallback); |
|
|
redisAsyncCommand( singleton.servers[i], NULL, NULL, "SCRIPT LOAD %s", LuaCmd ); |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
int reconnectIfNeeded( struct aeEventLoop *loop, long long id, void *clientData) { |
|
|
checkConnections(); |
|
|
return 1000; |
|
|
} |
|
|
|
|
|
int main ( int argc, char *argv[] ) { |
|
|
|
|
|
srand(time(NULL)); |
|
|
signal(SIGPIPE, SIG_IGN); |
|
|
memset( &singleton, '\0', sizeof(struct Singleton) ); |
|
|
|
|
|
singleton.n = argc - 1; |
|
|
singleton.servers = (redisAsyncContext **) malloc( singleton.n*sizeof( redisAsyncContext *) ); |
|
|
if ( !singleton.servers ) { perror("malloc"); exit( -1 ); } |
|
|
singleton.port = (int *) malloc( singleton.n*sizeof(int) ); |
|
|
if ( !singleton.port ) { perror("malloc"); exit( -1 ); } |
|
|
|
|
|
singleton.loop = aeCreateEventLoop(256); |
|
|
memset( singleton.servers, '\0', singleton.n*sizeof( redisAsyncContext *) ); |
|
|
memset( singleton.port, '\0', singleton.n*sizeof(int) ); |
|
|
|
|
|
for ( int i=0; i<singleton.n; ++i ) |
|
|
singleton.port[i] = atoi(argv[i+1]); |
|
|
|
|
|
sha1hex( singleton.luasha1, LuaCmd, strlen(LuaCmd) ); |
|
|
|
|
|
checkConnections(); |
|
|
aeCreateTimeEvent( singleton.loop,5,reconnectIfNeeded, NULL, NULL ); |
|
|
aeCreateTimeEvent( singleton.loop,5,mainLoop, NULL, NULL ); |
|
|
|
|
|
aeMain(singleton.loop); |
|
|
return 0; |
|
|
} |
|
|
|