You need to enable JavaScript to run this app.
导航
基于 RedisGears 实现 Chat room 即时信息同步
最近更新时间:2024.12.04 10:53:39首次发布时间:2021.11.15 16:07:16

本文以 all.chat.redis 聊天程序为例,介绍基于 RedisGears 实现 Chat room 即时信息同步的解决方案。

方案架构

在 all.chat.redis 聊天程序中,您可以通过 HTTP 请求与实时事件通信,信息会分别通过 Node.js webServer 和 Socket.IO 接收和推送,Redis Pub/Sub 用于接收、转发、同步信息。此外,您还可以通过 RedisGears 模块在 Redis 内自定义函数执行处理数据流,实现数据库中多模型高效数据处理。
All.chat.redis 聊天程序通过 RedisGears 在 Redis 中实现了JSON 快照缓存重构能力。该程序的存储设计详情如下:

  • 登录认证使用 JWT token 实现简单的账号名或哈希 map 密码(user_password_hash)。

  • 聊天信息(chat message)会被转换为 JSON 形式( room_messages:ID ),并经过排序处理后进行存储。

  • 聊天室主题及其他属性(room topics)会被存储为 Hash 数据(room_properties:ID)。

  • 主页(homepage)中的 snapshot 快照则以简单的 key 字符串(即 homepage_json)来定义。

All.chat.redis 聊天程序架构如下图所示。

方案实现

准备工作

  • 已创建 ECS 实例。具体操作步骤,请参见购买并使用云服务器实例
  • ECS 服务器上安装 Node.js 和 NPM 环境。
  • 已创建缓存数据库 Redis 企业版实例,且创建实例时已选择加载 RedisGears 和 RedisJSON 模块。具体操作步骤,请参见创建实例

    说明

    在创建 Redis 企业版实例选择规格配置时,建议选择单分片(即规格为 8GiB x 1 分片)实例。如果您选择了创建多分片(即规格为 8GiB x 2 或以上分片)实例,那么在执行本文中的代码示例时,需保证所要操作的 Key 都分布在 1 个 hash slot 中。您可以通过 hash tag 来构造相同 slot 的 Key。hash tag 的使用方法,请参见 Keys hash tags

  • 已为 Redis 企业版实例开启公网访问并使用公网地址进行连接。开启公网访问的操作步骤,请参见开启公网访问
  • 已为 Redis 实例设置访问白名单。具体操作步骤,请参见创建白名单

操作步骤

  1. 登录已安装对应环境的 ECS 实例。ECS 实例登录方法,请参见登录实例

  2. 在 ECS 的命令行窗口或命令终端工具中,执行如下命令克隆代码。

    git clone https://github.com/redis-developer/all.chat.redis.git
    
  3. 进入 all.chat.redis 项目目录,执行如下命令进行安装。

    npm install
    
  4. 在 all.chat.redis 项目中找到如下配置文件,并根据您 Redis 企业版实例的实际情况修改对应配置。

    说明

    • 您可以在 Redis 控制台上查看实例的连接地址域名和端口号。具体操作步骤,请参见查看连接地址
    • 若您忘记 Redis 实例 default 账号的密码,您也可以在控制台上修改或重置密码。具体操作步骤,请参见修改或重置密码
    • all.chat.redis/demo_data.js 文件
      demo_data.js 文件中的 redis_client 位置,增加如下代码来创建 Redis 客户端,您需要在客户端设置您 Redis 企业版的连接信息(如下代码仅展示了 Redis 客户端相关部分)。
      var faker = require('faker'),
          redis_client = new (require('ioredis'))({
              host: '<Redis 企业版实例的公网连接地址>',
              port: <Redis 企业版实例的端口,默认为 6379>,
              password: '<Redis 企业版实例 default 账号的密码>'
          });        
      
      代码示例如下(如下代码仅展示了 Redis 客户端相关部分)。
      var faker = require('faker'),
          redis_client = new (require('ioredis'))({
              host: 'redis-cnlfwm1ecuiqv****.redis.volces.com',
              port: 6379,
              password: 'Pwd_test****'
          });         
      
      • all.chat.redis/socket.js 文件
        socket.js 的文件中修改 io.adapter 的配置信息。其中:
        • host:需要将原来的 localhost 改为 <Redis 企业版实例的公网连接地址>
        • port:需要将原来的 6379 端口号改为 <Redis 企业版实例的端口>,Redis 企业版实例的端口号当前默认为 6379。
        • password:需要增加 password 配置,即 <Redis 企业版实例 default 账号的密码>
        var io = require('socket.io')(2022);
        
        io.adapter(require('socket.io-redis')({
             host : '<Redis 企业版实例的公网连接地址>', 
             port : <Redis 企业版实例的端口,默认为 6379>, 
             password:'<Redis 企业版实例 default 账号的密码>'
          })); // use redis pub/sub to pass messages between socket.io nodes (or from webserver)
        
        io.on('connection', function(socket){
            console.log("connecting");
        });
        
        代码示例如下。
        var io = require('socket.io')(2022);
        
        io.adapter(require('socket.io-redis')({
           host : 'redis-cnlfwm1ecuiqv****.redis.volces.com', 
           port : 6379, 
           password:'Pwd_test****'
         })); // use redis pub/sub to pass messages between socket.io nodes (or from webserver)
         
        io.on('connection', function(socket){
            console.log("connecting");
        });
                    
        
      • all.chat.redis/src/static/all.chat.js 文件
        src/static/all.chat.js 文件中的 socket 位置,将应用程序的访问地址由 all.chat.redis 改为 <ECS 的公网 IP>(如下代码仅展示了 socket 访问地址部分)。

        说明

        您可以在 ECS 控制台的列表页查看指定 ECS 实例的公网 IP 地址,具体操作步骤,请参见查看实例信息

        var socket = io('ws://<ECS 的公网 IP>:2022', {transports : ['websocket']});   
        
        代码示例如下(如下代码仅展示了 socket 访问地址部分)。
        var socket = io('ws://180.184.***.***:2022', {transports : ['websocket']});   
        
      • all.chat.redis/webserver.js 文件
        先将 webserver.js 文件中的原代码全部删除,然后在下述文件中的 iogears_clientredis_client 位置,分别增加 Redis 企业版实例的连接信息,最后将下述修改后的代码覆盖粘贴到原 webserver.js 文件中。
        var express = require('express'),
            app = express(),
            cookie_parser = require('cookie-parser'),
            redis = require('ioredis'),
            async = require('neo-async'),
            bcrypt = require('bcrypt'),
            jwt = require('jsonwebtoken'),
            path = require('path');
        
        var jwt_private_key = 'my-private-key-not-secure';
        
        var io = new (require('@socket.io/redis-emitter').Emitter)(new redis({
            host: '<Redis 企业版实例的公网连接地址>', 
            port: <Redis 企业版实例的端口,默认为 6379>, 
            password: '<Redis 企业版实例 default 账号的密码>'
            })); // socket.io publisher over redis pub/sub
        
        var gears_client = new redis({
            host: '<Redis 企业版实例的公网连接地址>', 
            port: <Redis 企业版实例的端口,默认为 6379>, 
            password: '<Redis 企业版实例 default 账号的密码>'
            });
            
         var redis_client = new redis({
            host: '<Redis 企业版实例的公网连接地址>', 
            port: <Redis 企业版实例的端口,默认为 6379>, 
            password: '<Redis 企业版实例 default 账号的密码>'
            });
        
        // ---
        
        app.use(cookie_parser()); // req.cookies.abc
        
        app.use(express.urlencoded({extended : false})); // req.params.abc
        
        app.use('/static', express.static('./src/static', {fallthrough : false})); // https://expressjs.com/en/advanced/best-practice-performance.html#use-a-reverse-proxy
        
        // ---
        
        app.get('/', function(req, res){
            res.sendFile(path.resolve('./src/static/index.html'));
        });
        
        app.get('/homepage_json', async function(req, res){
            var homepage_json=[];
            redis_client.smembers('homepage_room_set', async function(err, result){ // todo: handle error 
                for (const room_key of result) {
                    console.log(room_key);
                    const msg_key = 'room_messages:' + room_key.split(':')[1];
        
                    var room_messages= await redis_client.zrange(msg_key, -10, -1) 
                    console.log(room_messages);
                    
                    var room_properties= await redis_client.hgetall(room_key)
                    console.log(room_properties);
                            
                    homepage_json.push(room_key);
                    homepage_json.push(room_properties);
                    homepage_json.push(room_messages);
                }
        
                res.json(homepage_json)
            });
        });
        
        // ---
        
        app.post('/register', function(req, res){ // todo: validate input requirements
            var wf = [];
        
            wf.push(function(acb){
                bcrypt.hash(req.body.password, 12, acb);
            });
        
            wf.push(function(hash, acb){
                redis_client.hsetnx('user_password_hash', req.body.username.toLowerCase(), hash, acb); // todo: use lua and per user hash
            });
        
            wf.push(function(result, acb){
        
                if(result === 0){
                    res.end('-1');
                    return acb(false);
                };
        
                // ---
        
                var token = jwt.sign({username : req.body.username}, jwt_private_key, {algorithm : 'HS256'});
        
                res.cookie('jwt', token, {expires : new Date(2500, 0)});
        
                res.end();
        
                acb(false);
            });
        
            async.waterfall(wf); // todo: handle errors
        });
        
        app.post('/login', function(req, res){
            res.end(); // todo
        });
        
        // --- require valid jwt cookie from here on out
        
        app.use(function(req, res, next){
            if(typeof req.cookies.jwt !== 'string')
                return res.end();
        
            jwt.verify(req.cookies.jwt, jwt_private_key, {algorithms : ['HS256']}, function(err, decoded){
                if(err !== null)
                    return res.end();
        
                req.jwt = decoded;
                next();
            });
        
        });
        
        // ---
        
        app.post('/new_message', function(req, res){ // todo: validate data (room_id is int etc). todo: csrf checks.
        
            res.end();
        
            io.emit('custom_event', {type : 'new_message', room_id : req.body.room_id, message : req.body.message, username : req.jwt.username});
        
            // ---
        
            var wf = [];
        
            wf.push(function (callback) {
                redis_client.hincrby('counters', 'message_id', 1, callback);
            });
        
            wf.push(function(message_id, acb){
                redis_client.zadd('room_messages:' + req.body.room_id, +new Date(), 
                JSON.stringify({username : req.jwt.username, message : req.body.message, message_id : message_id}), acb);
            });
        
            // ---
        
            async.waterfall(wf); // todo: handle errors
        });
        
        // ---
        
        app.listen(2021);
        
        //  ---
        
        var gears_script = 
        `#!js name=chat_room api_version=1.0
        
        function updateHomepageRoomSet(client, data) {
            if(data.event == 'hset') {
                client.call('sadd', 'homepage_room_set', data.key)        
            }else if (data.event == 'del'){
                client.call('spop','homepage_room_set',data.key)
            }
        }
        
        redis.registerKeySpaceTrigger('updateHomepageRoomSet', 'room_properties:', updateHomepageRoomSet);`;
        
        var gears_loop = async function(){
            await gears_client.call('tfunction','load','replace', gears_script, function(err, result){
                if (err){
                    console.log(err)
                }
            });
        };
        
        gears_loop();
              
        
        代码示例如下。
        var express = require('express'),
            app = express(),
            cookie_parser = require('cookie-parser'),
            redis = require('ioredis'),
            async = require('neo-async'),
            bcrypt = require('bcrypt'),
            jwt = require('jsonwebtoken'),
            path = require('path');
        
        var jwt_private_key = 'my-private-key-not-secure';
        
        var io = new (require('@socket.io/redis-emitter').Emitter)(new redis({
            host: 'redis-cnlfwm1ecuiqv****.redis.volces.com', 
            port: 6379, 
            password: 'Pwd_test****'
            })); // socket.io publisher over redis pub/sub
        
        var gears_client = new redis({
            host: 'redis-cnlfwm1ecuiqv****.redis.volces.com', 
            port: 6379, 
            password: 'Pwd_test****'
            });
            
         var redis_client = new redis({
            host: 'redis-cnlfwm1ecuiqv****.redis.volces.com', 
            port: 6379, 
            password: 'Pwd_test****'
            });
        
        // ---
        
        app.use(cookie_parser()); // req.cookies.abc
        
        app.use(express.urlencoded({extended : false})); // req.params.abc
        
        app.use('/static', express.static('./src/static', {fallthrough : false})); // https://expressjs.com/en/advanced/best-practice-performance.html#use-a-reverse-proxy
        
        // ---
        
        app.get('/', function(req, res){
            res.sendFile(path.resolve('./src/static/index.html'));
        });
        
        app.get('/homepage_json', async function(req, res){
            var homepage_json=[];
            redis_client.smembers('homepage_room_set', async function(err, result){ // todo: handle error 
                for (const room_key of result) {
                    console.log(room_key);
                    const msg_key = 'room_messages:' + room_key.split(':')[1];
        
                    var room_messages= await redis_client.zrange(msg_key, -10, -1) 
                    console.log(room_messages);
                    
                    var room_properties= await redis_client.hgetall(room_key)
                    console.log(room_properties);
                            
                    homepage_json.push(room_key);
                    homepage_json.push(room_properties);
                    homepage_json.push(room_messages);
                }
        
                res.json(homepage_json)
            });
        });
        
        // ---
        
        app.post('/register', function(req, res){ // todo: validate input requirements
            var wf = [];
        
            wf.push(function(acb){
                bcrypt.hash(req.body.password, 12, acb);
            });
        
            wf.push(function(hash, acb){
                redis_client.hsetnx('user_password_hash', req.body.username.toLowerCase(), hash, acb); // todo: use lua and per user hash
            });
        
            wf.push(function(result, acb){
        
                if(result === 0){
                    res.end('-1');
                    return acb(false);
                };
        
                // ---
        
                var token = jwt.sign({username : req.body.username}, jwt_private_key, {algorithm : 'HS256'});
        
                res.cookie('jwt', token, {expires : new Date(2500, 0)});
        
                res.end();
        
                acb(false);
            });
        
            async.waterfall(wf); // todo: handle errors
        });
        
        app.post('/login', function(req, res){
            res.end(); // todo
        });
        
        // --- require valid jwt cookie from here on out
        
        app.use(function(req, res, next){
            if(typeof req.cookies.jwt !== 'string')
                return res.end();
        
            jwt.verify(req.cookies.jwt, jwt_private_key, {algorithms : ['HS256']}, function(err, decoded){
                if(err !== null)
                    return res.end();
        
                req.jwt = decoded;
                next();
            });
        
        });
        
        // ---
        
        app.post('/new_message', function(req, res){ // todo: validate data (room_id is int etc). todo: csrf checks.
        
            res.end();
        
            io.emit('custom_event', {type : 'new_message', room_id : req.body.room_id, message : req.body.message, username : req.jwt.username});
        
            // ---
        
            var wf = [];
        
            wf.push(function (callback) {
                redis_client.hincrby('counters', 'message_id', 1, callback);
            });
        
            wf.push(function(message_id, acb){
                redis_client.zadd('room_messages:' + req.body.room_id, +new Date(), 
                JSON.stringify({username : req.jwt.username, message : req.body.message, message_id : message_id}), acb);
            });
        
            // ---
        
            async.waterfall(wf); // todo: handle errors
        });
        
        // ---
        
        app.listen(2021);
        
        //  ---
        
        var gears_script = 
        `#!js name=chat_room api_version=1.0
        
        function updateHomepageRoomSet(client, data) {
            if(data.event == 'hset') {
                client.call('sadd', 'homepage_room_set', data.key)        
            }else if (data.event == 'del'){
                client.call('spop','homepage_room_set',data.key)
            }
        }
        
        redis.registerKeySpaceTrigger('updateHomepageRoomSet', 'room_properties:', updateHomepageRoomSet);`;
        
        var gears_loop = async function(){
            await gears_client.call('tfunction','load','replace', gears_script, function(err, result){
                if (err){
                    console.log(err)
                }
            });
        };
        
        gears_loop();
                
        
  5. 在 all.chat.redis 项目下,执行如下命令生成有聊天记录的聊天室数据样本。

    node demo_data.js
    

    说明

    仅需在第一次启动聊天室服务端前执行上述命令生成聊天室数据样本,后续再次使用该聊天室时仅需启动 Web 服务端与 Redis 服务端即可,否则您自己的聊天记录将会被 demo_data.js 中的聊天室数据所覆盖。

  6. 打开一个新的命令行窗口并进入 all.chat.redis 项目,执行如下命令启动 Web 服务端。

    node webserver.js
    
  7. 打开一个新的命令行窗口并进入 all.chat.redis 项目,执行如下命令连接 Redis 企业版实例并启动 Redis 服务端。

    node socket.js
    
  8. 20222021 端口分别添加至 ECS 实例的安全组访问规则中,需确保出方向和入方向的安全组规则中均允许访问 20222021 端口。安全组规则设置方法,请参见管理安全组规则

  9. 打开浏览器,在 URL 地址中输入 <ECS 的公网 IP>:2021 打开 all.chat.redis 的 Web 界面。
     代码示例如下。

    180.184.***.***:2021
    

    说明

    您可以在 ECS 控制台的列表页查看指定 ECS 实例的公网 IP 地址,具体操作步骤,请参见查看实例信息

后续操作

  1. 进入 all.chat.redis 的 Web 界面后,您可以先在页面最上方的 usernamepassword 文本框中输入用户名和密码,并单击 Register 注册聊天室的账号。
  2. 注册成功后您可以选择任意一个聊天室,并在聊天室下方的蓝色文本框中输入聊天信息,按下 Enter 键即可发送聊天信息。