Skip to content

Instantly share code, notes, and snippets.

@manavid
Forked from dspezia/example.c
Created March 9, 2013 06:02
Show Gist options
  • Save manavid/5122952 to your computer and use it in GitHub Desktop.
Save manavid/5122952 to your computer and use it in GitHub Desktop.

Revisions

  1. @dspezia dspezia revised this gist Nov 26, 2012. 1 changed file with 9 additions and 0 deletions.
    9 changes: 9 additions & 0 deletions gen.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,9 @@
    #!/usr/bin/env python

    import time

    for x in range(0,1000):
    print "multi"
    print 'set c%06d dummydata%d' % ( x,x )
    print 'zadd to_be_expired %ld c%06d' % ( time.time()+20,x)
    print "exec"
  2. @dspezia dspezia created this gist Nov 26, 2012.
    160 changes: 160 additions & 0 deletions example.c
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,160 @@
    #include <stdio.h>
    #include <stdlib.h>
    #include <string.h>
    #include <signal.h>
    #include <time.h>

    #include "hiredis.h"
    #include "async.h"
    #include "adapters/ae.h"
    #include "sha1.h"

    struct Singleton {
    int n;
    int *port;
    redisAsyncContext **servers;
    aeEventLoop *loop;
    char luasha1[48];
    } singleton;

    const char *LuaCmd =
    "local res = redis.call('ZRANGEBYSCORE',KEYS[1], 0, ARGV[1], 'LIMIT', 0, 10 ) "
    "if #res > 0 then "
    " redis.call( 'ZREMRANGEBYRANK', KEYS[1], 0, #res-1 ) "
    " return res "
    "else "
    " return false "
    "end ";

    void sha1hex(char *digest, const char *script, size_t len) {
    SHA1_CTX ctx;
    unsigned char hash[20];
    char *cset = "0123456789abcdef";
    int j;

    SHA1Init(&ctx);
    SHA1Update(&ctx,(unsigned char*)script,len);
    SHA1Final(hash,&ctx);

    for (j = 0; j < 20; j++) {
    digest[j*2] = cset[((hash[j]&0xF0)>>4)];
    digest[j*2+1] = cset[(hash[j]&0xF)];
    }
    digest[40] = '\0';
    }

    void dequeuedItem(redisAsyncContext *c, void *r, void *privdata) {

    int i;
    redisReply *reply = r;
    if (reply == NULL) return;

    switch( reply->type ) {
    case REDIS_REPLY_ARRAY:
    for ( i=0; i<reply->elements; ++i ) {
    printf("Expired: %s\n", reply->element[i]->str );
    redisAsyncCommand( c, NULL, NULL, "DEL %s", reply->element[i]->str );
    }
    if ( i>0 )
    redisAsyncCommand( c, dequeuedItem, NULL, "EVALSHA %s 1 to_be_expired %ld", singleton.luasha1, time(NULL) );
    break;

    case REDIS_REPLY_ERROR:
    case REDIS_REPLY_STATUS:
    printf("ERror: %s\n",reply->str );
    break;

    case REDIS_REPLY_NIL:
    break;
    default:
    printf("Error\n");
    break;
    }
    }

    int mainLoop( struct aeEventLoop *loop, long long id, void *clientData) {

    time_t t = time(NULL);

    for ( int i=0; i<singleton.n; ++i ) {
    if ( singleton.servers[i] != NULL ) {
    redisAsyncCommand( singleton.servers[i], dequeuedItem, NULL, "EVALSHA %s 1 to_be_expired %ld", singleton.luasha1, t );
    }
    }
    fflush(stdout);
    return 1000+rand()%1000;
    }

    void connectCallback(const redisAsyncContext *c, int status) {

    if ( status != REDIS_OK )
    {
    printf("Error: %s\n", c->errstr);
    for (int i=0; i<singleton.n; ++i )
    if ( singleton.servers[i] == c )
    singleton.servers[i] = NULL;
    }
    else
    printf("connected...\n");
    }

    void disconnectCallback(const redisAsyncContext *c, int status) {
    if (status != REDIS_OK) {
    printf("Error: %s\n", c->errstr);
    }
    printf("disconnected...\n");

    for (int i=0; i<singleton.n; ++i )
    if ( singleton.servers[i] == c )
    singleton.servers[i] = NULL;
    }

    void checkConnections()
    {
    for ( int i=0; i<singleton.n; ++i ) {
    if ( singleton.servers[i] == NULL ) {
    printf("Connecting %d...\n",singleton.port[i] );
    singleton.servers[i] = redisAsyncConnect("127.0.0.1", singleton.port[i] );
    if ( singleton.servers[i]->err ) { perror("redisAsyncConnect"); exit( -1 ); }
    redisAeAttach( singleton.loop, singleton.servers[i] );
    redisAsyncSetConnectCallback( singleton.servers[i],connectCallback);
    redisAsyncSetDisconnectCallback( singleton.servers[i],disconnectCallback);
    redisAsyncCommand( singleton.servers[i], NULL, NULL, "SCRIPT LOAD %s", LuaCmd );
    }
    }
    }

    int reconnectIfNeeded( struct aeEventLoop *loop, long long id, void *clientData) {
    checkConnections();
    return 1000;
    }

    int main ( int argc, char *argv[] ) {

    srand(time(NULL));
    signal(SIGPIPE, SIG_IGN);
    memset( &singleton, '\0', sizeof(struct Singleton) );

    singleton.n = argc - 1;
    singleton.servers = (redisAsyncContext **) malloc( singleton.n*sizeof( redisAsyncContext *) );
    if ( !singleton.servers ) { perror("malloc"); exit( -1 ); }
    singleton.port = (int *) malloc( singleton.n*sizeof(int) );
    if ( !singleton.port ) { perror("malloc"); exit( -1 ); }

    singleton.loop = aeCreateEventLoop(256);
    memset( singleton.servers, '\0', singleton.n*sizeof( redisAsyncContext *) );
    memset( singleton.port, '\0', singleton.n*sizeof(int) );

    for ( int i=0; i<singleton.n; ++i )
    singleton.port[i] = atoi(argv[i+1]);

    sha1hex( singleton.luasha1, LuaCmd, strlen(LuaCmd) );

    checkConnections();
    aeCreateTimeEvent( singleton.loop,5,reconnectIfNeeded, NULL, NULL );
    aeCreateTimeEvent( singleton.loop,5,mainLoop, NULL, NULL );

    aeMain(singleton.loop);
    return 0;
    }