本文以 all.chat.redis 聊天程序为例,介绍基于 RedisGears 实现 Chat room 即时信息同步的解决方案。
在 all.chat.redis 聊天程序中,您可以通过 HTTP 请求与实时事件通信,信息会分别通过 Node.js webServer 和 Socket.IO 接收和推送,Redis Pub/Sub 用于接收、转发、同步信息。此外,您还可以通过 RedisGears 模块在 Redis 内自定义函数执行处理数据流,实现数据库中多模型高效数据处理。
All.chat.redis 聊天程序通过 RedisGears 在 Redis 中实现了JSON 快照缓存重构能力。该程序的存储设计详情如下:
登录认证使用 JWT token 实现简单的账号名或哈希 map 密码(user_password_hash)。
聊天信息(chat message)会被转换为 JSON 形式( room_messages:ID
),并经过排序处理后进行存储。
聊天室主题及其他属性(room topics)会被存储为 Hash 数据(room_properties:ID
)。
主页(homepage)中的 snapshot 快照则以简单的 key 字符串(即 homepage_json
)来定义。
All.chat.redis 聊天程序架构如下图所示。
说明
在创建 Redis 企业版实例选择规格配置时,建议选择单分片(即规格为 8GiB x 1 分片)实例。如果您选择了创建多分片(即规格为 8GiB x 2 或以上分片)实例,那么在执行本文中的代码示例时,需保证所要操作的 Key 都分布在 1 个 hash slot 中。您可以通过 hash tag 来构造相同 slot 的 Key。hash tag 的使用方法,请参见 Keys hash tags。
登录已安装对应环境的 ECS 实例。ECS 实例登录方法,请参见登录实例。
在 ECS 的命令行窗口或命令终端工具中,执行如下命令克隆代码。
git clone https://github.com/redis-developer/all.chat.redis.git
进入 all.chat.redis 项目目录,执行如下命令进行安装。
npm install
在 all.chat.redis 项目中找到如下配置文件,并根据您 Redis 企业版实例的实际情况修改对应配置。
说明
all.chat.redis/demo_data.js
文件demo_data.js
文件中的 redis_client
位置,增加如下代码来创建 Redis 客户端,您需要在客户端设置您 Redis 企业版的连接信息(如下代码仅展示了 Redis 客户端相关部分)。代码示例如下(如下代码仅展示了 Redis 客户端相关部分)。var faker = require('faker'), redis_client = new (require('ioredis'))({ host: '<Redis 企业版实例的公网连接地址>', port: <Redis 企业版实例的端口,默认为 6379>, password: '<Redis 企业版实例 default 账号的密码>' });
var faker = require('faker'), redis_client = new (require('ioredis'))({ host: 'redis-cnlfwm1ecuiqv****.redis.volces.com', port: 6379, password: 'Pwd_test****' });
all.chat.redis/socket.js
文件socket.js
的文件中修改 io.adapter
的配置信息。其中:
host
:需要将原来的 localhost
改为 <Redis 企业版实例的公网连接地址>
。port
:需要将原来的 6379
端口号改为 <Redis 企业版实例的端口>
,Redis 企业版实例的端口号当前默认为 6379。password
:需要增加 password
配置,即 <Redis 企业版实例 default 账号的密码>
。代码示例如下。var io = require('socket.io')(2022); io.adapter(require('socket.io-redis')({ host : '<Redis 企业版实例的公网连接地址>', port : <Redis 企业版实例的端口,默认为 6379>, password:'<Redis 企业版实例 default 账号的密码>' })); // use redis pub/sub to pass messages between socket.io nodes (or from webserver) io.on('connection', function(socket){ console.log("connecting"); });
var io = require('socket.io')(2022); io.adapter(require('socket.io-redis')({ host : 'redis-cnlfwm1ecuiqv****.redis.volces.com', port : 6379, password:'Pwd_test****' })); // use redis pub/sub to pass messages between socket.io nodes (or from webserver) io.on('connection', function(socket){ console.log("connecting"); });
all.chat.redis/src/static/all.chat.js
文件src/static/all.chat.js
文件中的 socket
位置,将应用程序的访问地址由 all.chat.redis
改为 <ECS 的公网 IP>
(如下代码仅展示了 socket 访问地址部分)。代码示例如下(如下代码仅展示了 socket 访问地址部分)。var socket = io('ws://<ECS 的公网 IP>:2022', {transports : ['websocket']});
var socket = io('ws://180.184.***.***:2022', {transports : ['websocket']});
all.chat.redis/webserver.js
文件webserver.js
文件中的原代码全部删除,然后在下述文件中的 io
、 gears_client
和 redis_client
位置,分别增加 Redis 企业版实例的连接信息,最后将下述修改后的代码覆盖粘贴到原 webserver.js
文件中。代码示例如下。var express = require('express'), app = express(), cookie_parser = require('cookie-parser'), redis = require('ioredis'), async = require('neo-async'), bcrypt = require('bcrypt'), jwt = require('jsonwebtoken'), path = require('path'); var jwt_private_key = 'my-private-key-not-secure'; var io = new (require('@socket.io/redis-emitter').Emitter)(new redis({ host: '<Redis 企业版实例的公网连接地址>', port: <Redis 企业版实例的端口,默认为 6379>, password: '<Redis 企业版实例 default 账号的密码>' })); // socket.io publisher over redis pub/sub var gears_client = new redis({ host: '<Redis 企业版实例的公网连接地址>', port: <Redis 企业版实例的端口,默认为 6379>, password: '<Redis 企业版实例 default 账号的密码>' }); var redis_client = new redis({ host: '<Redis 企业版实例的公网连接地址>', port: <Redis 企业版实例的端口,默认为 6379>, password: '<Redis 企业版实例 default 账号的密码>' }); // --- app.use(cookie_parser()); // req.cookies.abc app.use(express.urlencoded({extended : false})); // req.params.abc app.use('/static', express.static('./src/static', {fallthrough : false})); // https://expressjs.com/en/advanced/best-practice-performance.html#use-a-reverse-proxy // --- app.get('/', function(req, res){ res.sendFile(path.resolve('./src/static/index.html')); }); app.get('/homepage_json', async function(req, res){ var homepage_json=[]; redis_client.smembers('homepage_room_set', async function(err, result){ // todo: handle error for (const room_key of result) { console.log(room_key); const msg_key = 'room_messages:' + room_key.split(':')[1]; var room_messages= await redis_client.zrange(msg_key, -10, -1) console.log(room_messages); var room_properties= await redis_client.hgetall(room_key) console.log(room_properties); homepage_json.push(room_key); homepage_json.push(room_properties); homepage_json.push(room_messages); } res.json(homepage_json) }); }); // --- app.post('/register', function(req, res){ // todo: validate input requirements var wf = []; wf.push(function(acb){ bcrypt.hash(req.body.password, 12, acb); }); wf.push(function(hash, acb){ redis_client.hsetnx('user_password_hash', req.body.username.toLowerCase(), hash, acb); // todo: use lua and per user hash }); wf.push(function(result, acb){ if(result === 0){ res.end('-1'); return acb(false); }; // --- var token = jwt.sign({username : req.body.username}, jwt_private_key, {algorithm : 'HS256'}); res.cookie('jwt', token, {expires : new Date(2500, 0)}); res.end(); acb(false); }); async.waterfall(wf); // todo: handle errors }); app.post('/login', function(req, res){ res.end(); // todo }); // --- require valid jwt cookie from here on out app.use(function(req, res, next){ if(typeof req.cookies.jwt !== 'string') return res.end(); jwt.verify(req.cookies.jwt, jwt_private_key, {algorithms : ['HS256']}, function(err, decoded){ if(err !== null) return res.end(); req.jwt = decoded; next(); }); }); // --- app.post('/new_message', function(req, res){ // todo: validate data (room_id is int etc). todo: csrf checks. res.end(); io.emit('custom_event', {type : 'new_message', room_id : req.body.room_id, message : req.body.message, username : req.jwt.username}); // --- var wf = []; wf.push(function (callback) { redis_client.hincrby('counters', 'message_id', 1, callback); }); wf.push(function(message_id, acb){ redis_client.zadd('room_messages:' + req.body.room_id, +new Date(), JSON.stringify({username : req.jwt.username, message : req.body.message, message_id : message_id}), acb); }); // --- async.waterfall(wf); // todo: handle errors }); // --- app.listen(2021); // --- var gears_script = `#!js name=chat_room api_version=1.0 function updateHomepageRoomSet(client, data) { if(data.event == 'hset') { client.call('sadd', 'homepage_room_set', data.key) }else if (data.event == 'del'){ client.call('spop','homepage_room_set',data.key) } } redis.registerKeySpaceTrigger('updateHomepageRoomSet', 'room_properties:', updateHomepageRoomSet);`; var gears_loop = async function(){ await gears_client.call('tfunction','load','replace', gears_script, function(err, result){ if (err){ console.log(err) } }); }; gears_loop();
var express = require('express'), app = express(), cookie_parser = require('cookie-parser'), redis = require('ioredis'), async = require('neo-async'), bcrypt = require('bcrypt'), jwt = require('jsonwebtoken'), path = require('path'); var jwt_private_key = 'my-private-key-not-secure'; var io = new (require('@socket.io/redis-emitter').Emitter)(new redis({ host: 'redis-cnlfwm1ecuiqv****.redis.volces.com', port: 6379, password: 'Pwd_test****' })); // socket.io publisher over redis pub/sub var gears_client = new redis({ host: 'redis-cnlfwm1ecuiqv****.redis.volces.com', port: 6379, password: 'Pwd_test****' }); var redis_client = new redis({ host: 'redis-cnlfwm1ecuiqv****.redis.volces.com', port: 6379, password: 'Pwd_test****' }); // --- app.use(cookie_parser()); // req.cookies.abc app.use(express.urlencoded({extended : false})); // req.params.abc app.use('/static', express.static('./src/static', {fallthrough : false})); // https://expressjs.com/en/advanced/best-practice-performance.html#use-a-reverse-proxy // --- app.get('/', function(req, res){ res.sendFile(path.resolve('./src/static/index.html')); }); app.get('/homepage_json', async function(req, res){ var homepage_json=[]; redis_client.smembers('homepage_room_set', async function(err, result){ // todo: handle error for (const room_key of result) { console.log(room_key); const msg_key = 'room_messages:' + room_key.split(':')[1]; var room_messages= await redis_client.zrange(msg_key, -10, -1) console.log(room_messages); var room_properties= await redis_client.hgetall(room_key) console.log(room_properties); homepage_json.push(room_key); homepage_json.push(room_properties); homepage_json.push(room_messages); } res.json(homepage_json) }); }); // --- app.post('/register', function(req, res){ // todo: validate input requirements var wf = []; wf.push(function(acb){ bcrypt.hash(req.body.password, 12, acb); }); wf.push(function(hash, acb){ redis_client.hsetnx('user_password_hash', req.body.username.toLowerCase(), hash, acb); // todo: use lua and per user hash }); wf.push(function(result, acb){ if(result === 0){ res.end('-1'); return acb(false); }; // --- var token = jwt.sign({username : req.body.username}, jwt_private_key, {algorithm : 'HS256'}); res.cookie('jwt', token, {expires : new Date(2500, 0)}); res.end(); acb(false); }); async.waterfall(wf); // todo: handle errors }); app.post('/login', function(req, res){ res.end(); // todo }); // --- require valid jwt cookie from here on out app.use(function(req, res, next){ if(typeof req.cookies.jwt !== 'string') return res.end(); jwt.verify(req.cookies.jwt, jwt_private_key, {algorithms : ['HS256']}, function(err, decoded){ if(err !== null) return res.end(); req.jwt = decoded; next(); }); }); // --- app.post('/new_message', function(req, res){ // todo: validate data (room_id is int etc). todo: csrf checks. res.end(); io.emit('custom_event', {type : 'new_message', room_id : req.body.room_id, message : req.body.message, username : req.jwt.username}); // --- var wf = []; wf.push(function (callback) { redis_client.hincrby('counters', 'message_id', 1, callback); }); wf.push(function(message_id, acb){ redis_client.zadd('room_messages:' + req.body.room_id, +new Date(), JSON.stringify({username : req.jwt.username, message : req.body.message, message_id : message_id}), acb); }); // --- async.waterfall(wf); // todo: handle errors }); // --- app.listen(2021); // --- var gears_script = `#!js name=chat_room api_version=1.0 function updateHomepageRoomSet(client, data) { if(data.event == 'hset') { client.call('sadd', 'homepage_room_set', data.key) }else if (data.event == 'del'){ client.call('spop','homepage_room_set',data.key) } } redis.registerKeySpaceTrigger('updateHomepageRoomSet', 'room_properties:', updateHomepageRoomSet);`; var gears_loop = async function(){ await gears_client.call('tfunction','load','replace', gears_script, function(err, result){ if (err){ console.log(err) } }); }; gears_loop();
在 all.chat.redis 项目下,执行如下命令生成有聊天记录的聊天室数据样本。
node demo_data.js
说明
仅需在第一次启动聊天室服务端前执行上述命令生成聊天室数据样本,后续再次使用该聊天室时仅需启动 Web 服务端与 Redis 服务端即可,否则您自己的聊天记录将会被 demo_data.js
中的聊天室数据所覆盖。
打开一个新的命令行窗口并进入 all.chat.redis 项目,执行如下命令启动 Web 服务端。
node webserver.js
打开一个新的命令行窗口并进入 all.chat.redis 项目,执行如下命令连接 Redis 企业版实例并启动 Redis 服务端。
node socket.js
将 2022
和 2021
端口分别添加至 ECS 实例的安全组访问规则中,需确保出方向和入方向的安全组规则中均允许访问 2022
和 2021
端口。安全组规则设置方法,请参见管理安全组规则。
打开浏览器,在 URL 地址中输入 <ECS 的公网 IP>:2021
打开 all.chat.redis 的 Web 界面。
代码示例如下。
180.184.***.***:2021