#include #include #include #include #include #include "hiredis.h" #include "async.h" #include "adapters/ae.h" #include "sha1.h" struct Singleton { int n; int *port; redisAsyncContext **servers; aeEventLoop *loop; char luasha1[48]; } singleton; const char *LuaCmd = "local res = redis.call('ZRANGEBYSCORE',KEYS[1], 0, ARGV[1], 'LIMIT', 0, 10 ) " "if #res > 0 then " " redis.call( 'ZREMRANGEBYRANK', KEYS[1], 0, #res-1 ) " " return res " "else " " return false " "end "; void sha1hex(char *digest, const char *script, size_t len) { SHA1_CTX ctx; unsigned char hash[20]; char *cset = "0123456789abcdef"; int j; SHA1Init(&ctx); SHA1Update(&ctx,(unsigned char*)script,len); SHA1Final(hash,&ctx); for (j = 0; j < 20; j++) { digest[j*2] = cset[((hash[j]&0xF0)>>4)]; digest[j*2+1] = cset[(hash[j]&0xF)]; } digest[40] = '\0'; } void dequeuedItem(redisAsyncContext *c, void *r, void *privdata) { int i; redisReply *reply = r; if (reply == NULL) return; switch( reply->type ) { case REDIS_REPLY_ARRAY: for ( i=0; ielements; ++i ) { printf("Expired: %s\n", reply->element[i]->str ); redisAsyncCommand( c, NULL, NULL, "DEL %s", reply->element[i]->str ); } if ( i>0 ) redisAsyncCommand( c, dequeuedItem, NULL, "EVALSHA %s 1 to_be_expired %ld", singleton.luasha1, time(NULL) ); break; case REDIS_REPLY_ERROR: case REDIS_REPLY_STATUS: printf("ERror: %s\n",reply->str ); break; case REDIS_REPLY_NIL: break; default: printf("Error\n"); break; } } int mainLoop( struct aeEventLoop *loop, long long id, void *clientData) { time_t t = time(NULL); for ( int i=0; ierrstr); for (int i=0; ierrstr); } printf("disconnected...\n"); for (int i=0; ierr ) { perror("redisAsyncConnect"); exit( -1 ); } redisAeAttach( singleton.loop, singleton.servers[i] ); redisAsyncSetConnectCallback( singleton.servers[i],connectCallback); redisAsyncSetDisconnectCallback( singleton.servers[i],disconnectCallback); redisAsyncCommand( singleton.servers[i], NULL, NULL, "SCRIPT LOAD %s", LuaCmd ); } } } int reconnectIfNeeded( struct aeEventLoop *loop, long long id, void *clientData) { checkConnections(); return 1000; } int main ( int argc, char *argv[] ) { srand(time(NULL)); signal(SIGPIPE, SIG_IGN); memset( &singleton, '\0', sizeof(struct Singleton) ); singleton.n = argc - 1; singleton.servers = (redisAsyncContext **) malloc( singleton.n*sizeof( redisAsyncContext *) ); if ( !singleton.servers ) { perror("malloc"); exit( -1 ); } singleton.port = (int *) malloc( singleton.n*sizeof(int) ); if ( !singleton.port ) { perror("malloc"); exit( -1 ); } singleton.loop = aeCreateEventLoop(256); memset( singleton.servers, '\0', singleton.n*sizeof( redisAsyncContext *) ); memset( singleton.port, '\0', singleton.n*sizeof(int) ); for ( int i=0; i