/*
* Simple example how to enable client tracking to implement client side caching.
* Tracking can be enabled via a registered connect callback and invalidation
* messages are received via the registered push callback.
* The disconnect callback should also be used as an indication of invalidation.
*/
#include <hiredis_cluster/adapters/libevent.h>
#include <hiredis_cluster/hircluster.h>
#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#define CLUSTER_NODE "127.0.0.1:7000"
#define KEY "key:1"
void pushCallback(redisAsyncContext *ac, void *r);
void setCallback(redisClusterAsyncContext *acc, void *r, void *privdata);
void getCallback1(redisClusterAsyncContext *acc, void *r, void *privdata);
void getCallback2(redisClusterAsyncContext *acc, void *r, void *privdata);
void modifyKey(const char *key, const char *value);
/* The connect callback enables RESP3 and client tracking.
The non-const connect callback is used since we want to
set the push callback in the hiredis context. */
void connectCallbackNC(redisAsyncContext *ac, int status) {
assert(status == REDIS_OK);
redisAsyncSetPushCallback(ac, pushCallback);
redisAsyncCommand(ac, NULL, NULL, "HELLO 3");
redisAsyncCommand(ac, NULL, NULL, "CLIENT TRACKING ON");
printf("Connected to %s:%d\n", ac->c.tcp.host, ac->c.tcp.port);
}
/* The event callback issues a 'SET' command when the client is ready to accept
commands. A reply is expected via a call to 'setCallback()' */
void eventCallback(const redisClusterContext *cc, int event, void *privdata) {
(void)cc;
redisClusterAsyncContext *acc = (redisClusterAsyncContext *)privdata;
/* We send our commands when the client is ready to accept commands. */
if (event == HIRCLUSTER_EVENT_READY) {
printf("Client is ready to accept commands\n");
int status =
redisClusterAsyncCommand(acc, setCallback, NULL, "SET %s 1", KEY);
assert(status == REDIS_OK);
}
}
/* Message callback for 'SET' commands. Issues a 'GET' command and a reply is
expected as a call to 'getCallback1()' */
void setCallback(redisClusterAsyncContext *acc, void *r, void *privdata) {
(void)privdata;
redisReply *reply = (redisReply *)r;
assert(reply != NULL);
printf("Callback for 'SET', reply: %s\n", reply->str);
int status =
redisClusterAsyncCommand(acc, getCallback1, NULL, "GET %s", KEY);
assert(status == REDIS_OK);
}
/* Message callback for the first 'GET' command. Modifies the key to
trigger Redis to send a key invalidation message and then sends another
'GET' command. The invalidation message is received via the registered
push callback. */
void getCallback1(redisClusterAsyncContext *acc, void *r, void *privdata) {
(void)privdata;
redisReply *reply = (redisReply *)r;
assert(reply != NULL);
printf("Callback for first 'GET', reply: %s\n", reply->str);
/* Modify the key from another client which will invalidate a cached value.
Redis will send an invalidation message via a push message. */
modifyKey(KEY, "99");
int status =
redisClusterAsyncCommand(acc, getCallback2, NULL, "GET %s", KEY);
assert(status == REDIS_OK);
}
/* Push message callback handling invalidation messages. */
void pushCallback(redisAsyncContext *ac, void *r) {
redisReply *reply = r;
if (!(reply->type == REDIS_REPLY_PUSH && reply->elements == 2 &&
reply->element[0]->type == REDIS_REPLY_STRING &&
!strncmp(reply->element[0]->str, "invalidate", 10) &&
reply->element[1]->type == REDIS_REPLY_ARRAY)) {
/* Not an 'invalidate' message. Ignore. */
return;
}
redisReply *payload = reply->element[1];
size_t i;
for (i = 0; i < payload->elements; i++) {
redisReply *key = payload->element[i];
if (key->type == REDIS_REPLY_STRING)
printf("Invalidate key '%.*s'\n", (int)key->len, key->str);
else if (key->type == REDIS_REPLY_NIL)
printf("Invalidate all\n");
}
}
/* Message callback for 'GET' commands. Exits program. */
void getCallback2(redisClusterAsyncContext *acc, void *r, void *privdata) {
(void)privdata;
redisReply *reply = (redisReply *)r;
assert(reply != NULL);
printf("Callback for second 'GET', reply: %s\n", reply->str);
/* Exit the eventloop after a couple of sent commands. */
redisClusterAsyncDisconnect(acc);
}
/* A disconnect callback should invalidate all cached keys. */
void disconnectCallback(const redisAsyncContext *ac, int status) {
assert(status == REDIS_OK);
printf("Disconnected from %s:%d\n", ac->c.tcp.host, ac->c.tcp.port);
printf("Invalidate all\n");
}
/* Helper to modify keys using a separate client. */
void modifyKey(const char *key, const char *value) {
printf("Modify key: '%s'\n", key);
redisClusterContext *cc = redisClusterContextInit();
int status = redisClusterSetOptionAddNodes(cc, CLUSTER_NODE);
assert(status == REDIS_OK);
status = redisClusterConnect2(cc);
assert(status == REDIS_OK);
redisReply *reply = redisClusterCommand(cc, "SET %s %s", key, value);
assert(reply != NULL);
freeReplyObject(reply);
redisClusterFree(cc);
}
int main(int argc, char **argv) {
redisClusterAsyncContext *acc = redisClusterAsyncContextInit();
assert(acc);
int status;
status = redisClusterAsyncSetConnectCallbackNC(acc, connectCallbackNC);
assert(status == REDIS_OK);
status = redisClusterAsyncSetDisconnectCallback(acc, disconnectCallback);
assert(status == REDIS_OK);
status = redisClusterSetEventCallback(acc->cc, eventCallback, acc);
assert(status == REDIS_OK);
status = redisClusterSetOptionAddNodes(acc->cc, CLUSTER_NODE);
assert(status == REDIS_OK);
struct event_base *base = event_base_new();
status = redisClusterLibeventAttach(acc, base);
assert(status == REDIS_OK);
status = redisClusterAsyncConnect2(acc);
assert(status == REDIS_OK);
event_base_dispatch(base);
redisClusterAsyncFree(acc);
event_base_free(base);
return 0;
}