/*
Beekeeper client (JSON-RPC over MQTT)
Copyright 2015-2021 José Micó
For protocol references see:
- https://mqtt.org/mqtt-specification
- https://www.jsonrpc.org/specification
This requires the MQTT.js library:
- https://github.com/mqttjs/MQTT.js
let bkpr = new BeekeeperClient;
bkpr.connect({
url: "ws://localhost:8000/mqtt",
username: "guest",
password: "guest",
on_connect: function() {...}
});
bkpr.send_notification({
method: "test.foo",
params: { foo: "bar" }
});
bkpr.call_remote_method({
method: "test.bar",
params: { foo: "baz" },
on_success: function(result) {...},
on_error: function(error) {...}
});
bkpr.accept_notifications({
method: "test.foo.*",
on_receive: function(params) {...}
});
bkpr.accept_remote_calls({
method: "test.bar",
on_receive: function(params) {...}
});
*/
function BeekeeperClient () { return {
mqtt: null,
host: null,
client_id: null,
response_topic: null,
request_seq: 1,
subscr_seq: 1,
pending_req: {},
subscr_cb: {},
subscr_re: {},
connect: function(args) {
const This = this;
if (!this.client_id) this._generate_client_id();
if ('debug' in args) this.debug(args.debug);
this._debug(`Connecting to MQTT broker at ${args.url}`);
// It is possible to iterate over a list of servers specifying:
// url: [{ host: 'localhost', port: 1883 }, ... ]
// Connect to MQTT broker using websockets
this.mqtt = mqtt.connect( args.url, {
username: args.username || 'guest',
password: args.password || 'guest',
clientId: this.client_id,
protocolVersion: 5,
clean: true,
keepalive: 60,
reconnectPeriod: 1000,
connectTimeout: 30 * 1000
});
this.mqtt.on('connect', function (connack) {
This.host = This.mqtt.options.host;
This._debug("Connected to MQTT broker at " + This.host);
This._create_response_topic();
if (args.on_connect) args.on_connect(connack.properties);
});
this.mqtt.on('reconnect', function () {
// Emitted when a reconnect starts
This._debug("Reconnecting...");
});
this.mqtt.on('close', function () {
// Emitted after a disconnection
This._debug("Disconnected");
});
this.mqtt.on('disconnect', function (packet) {
// Emitted after receiving disconnect packet from broker
This._debug("Disconnected by broker");
});
this.mqtt.on('offline', function () {
// Emitted when the client goes offline
This._debug("Client offline");
});
this.mqtt.on('error', function (error) {
// Emitted when the client cannot connect
This._debug(error);
});
this.mqtt.on('message', function (topic, message, packet) {
let jsonrpc;
try { jsonrpc = JSON.parse( message.toString() ) }
catch (e) { throw `Received invalid JSON: ${e}` }
This._debug(`Got << ${message}`);
const subscr_id = packet.properties.subscriptionIdentifier;
const subscr_cb = This.subscr_cb[subscr_id];
subscr_cb(jsonrpc, packet.properties);
});
},
_generate_client_id: function() {
// Generate a random client id (128+ bits)
this.client_id = '';
const chars = '0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz';
const random = new Uint32Array(22);
window.crypto.getRandomValues(random);
for (let i = 0; i < random.length; i++) {
this.client_id += chars.charAt(random[i] % 62);
}
},
_debug: function () {},
debug: function(enabled) {
if (enabled) {
this._debug = function (msg) { console.log(`Beekeeper: ${msg}`) };
}
else {
this._debug = function () {};
}
},
on_error: null,
_on_error: function (error) {
if (!this.on_error) return;
try { this.on_error(error) }
catch(e) { this._debug(`Uncaught exception into on_error handler: ${e}`) }
},
send_notification: function(args) {
// This is included for reference, but please note that frontend clients
// should *not* be allowed to publish to msg/frontend/*, as that would
// allow a malicious actor to inject messages to other users
if (!this.mqtt.connected) throw "Not connected to MQTT broker";
const json = JSON.stringify({
jsonrpc: "2.0",
method: args.method,
params: args.params
});
this._debug(`Sent >> ${json}`);
this.mqtt.publish(
'msg/frontend/' + args.method.replace(/\./g,'/'),
json,
{}
);
},
accept_notifications: function (args) {
if (!this.mqtt.connected) throw "Not connected to MQTT broker";
const subscr_id = this.subscr_seq++;
const on_receive = args.on_receive;
const This = this;
this.subscr_cb[subscr_id] = function(jsonrpc, packet_prop) {
// Incoming notification
try { on_receive( jsonrpc.params, packet_prop ) }
catch(e) { This._debug(`Uncaught exception into on_receive callback of ${jsonrpc.method}: ${e}`) }
};
this.subscr_re[subscr_id] = new RegExp('^' + args.method.replace(/\./g,'\\.').replace(/\*/g,'.+') + '$');
// Private notifications are received on response_topic subscription
if (args.private) return;
const topic = 'msg/frontend/' + args.method.replace(/\./g,'/').replace(/\*/g,'#');
this.mqtt.subscribe(
topic,
{ properties: { subscriptionIdentifier: subscr_id }},
function (err, granted) {
if (err) throw `Failed to subscribe to ${topic}: ${err}`;
}
);
},
call_remote_method: function(args) {
if (!this.mqtt.connected) throw "Not connected to MQTT broker";
const req_id = this.request_seq++;
const json = JSON.stringify({
jsonrpc: "2.0",
method: args.method,
params: args.params,
id: req_id
});
const QUEUE_LANES = 2;
const topic = 'req/backend-' + Math.floor( Math.random() * QUEUE_LANES + 1 );
const fwd_to = 'req/backend/' + args.method.replace(/\.[\w-]+$/,'').replace(/\./g,'/');
this.mqtt.publish(
topic,
json,
{ properties: {
responseTopic: this.response_topic,
userProperties: { fwd_to: fwd_to }
}}
);
this._debug("Sent >> " + json);
this.pending_req[req_id] = {
method: args.method,
on_success: args.on_success,
on_error: args.on_error,
timeout: null
};
const This = this;
this.pending_req[req_id].timeout = setTimeout( function() {
delete This.pending_req[req_id];
This._debug(`Call to ${args.method} timed out`);
const err_resp = { code: -32603, message: "Request timeout" };
if (args.on_error) {
try { args.on_error(err_resp) }
catch(e) { This._debug(`Uncaught exception into on_error callback of ${args.method}: ${e}`) }
}
else {
This._on_error(err_resp);
}
}, (args.timeout || 30) * 1000);
},
_create_response_topic: function() {
const response_topic = 'priv/' + this.client_id;
this.response_topic = response_topic;
const subscr_id = this.subscr_seq++;
const This = this;
this.subscr_cb[subscr_id] = function(jsonrpc, packet_prop) {
if (!jsonrpc.id) {
// Incoming private notification
let on_receive;
for (let subscr_id in This.subscr_re) {
if (jsonrpc.method.match( This.subscr_re[subscr_id] )) {
on_receive = This.subscr_cb[subscr_id];
break;
}
}
if (on_receive) {
try { on_receive( jsonrpc, packet_prop ) }
catch(e) { This._debug(`Uncaught exception into on_receive callback of ${jsonrpc.method}: ${e}`) }
}
else {
This._debug(`Received unhandled private notification ${jsonrpc.method}`);
}
return;
}
// Incoming remote call response
const resp = jsonrpc;
const req = This.pending_req[resp.id];
delete This.pending_req[resp.id];
if (!req) return;
clearTimeout(req.timeout);
if ('result' in resp) {
if (req.on_success) {
try { req.on_success( resp.result, packet_prop ) }
catch(e) { This._debug(`Uncaught exception into on_success callback of ${req.method}: ${e}`) }
}
}
else {
This._debug(`Error response from ${req.method} call: ${resp.error.message}`);
if (req.on_error) {
try { req.on_error( resp.error, packet_prop ) }
catch(e) { This._debug(`Uncaught exception into on_error callback of ${req.method}: ${e}`) }
}
else {
This._on_error(resp.error);
}
}
};
this.mqtt.subscribe(
response_topic,
{ properties: { subscriptionIdentifier: subscr_id }},
function (err, granted) {
if (err) throw `Failed to subscribe to ${response_topic}: ${err}`;
}
);
},
accept_remote_calls: function(args) {
// This is included for reference, but please note that frontend clients
// should *not* be allowed to even connect to the backend broker, let alone
// consume from req/backend/*, as that would allow a malicious actor to
// disrupt services or steal other users credentials
if (!this.mqtt.connected) throw "Not connected to MQTT broker";
const subscr_id = this.subscr_seq++;
const on_receive = args.on_receive;
const This = this;
this.subscr_cb[subscr_id] = function(jsonrpc, packet_prop) {
// Incoming remote request
let json;
try {
let result = on_receive( jsonrpc.params, packet_prop );
json = JSON.stringify({
jsonrpc: "2.0",
result: result,
id: req.id
});
}
catch (e) {
json = JSON.stringify({
jsonrpc: "2.0",
error: { code: -32603, message: e.message },
id: req.id
});
}
This.mqtt.publish(
packet_prop.responseTopic,
json,
{}
);
This._debug(`Sent >> ${json}`);
};
const topic = '$share/BKPR/req/backend/' + args.method.replace(/\./g,'/');
this.mqtt.subscribe(
topic,
{ properties: { subscriptionIdentifier: subscr_id }},
function (err, granted) {
if (err) throw `Failed to subscribe to ${topic}: ${err}`;
}
);
},
}};