博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Redux应用多人协作的思路和实现
阅读量:7072 次
发布时间:2019-06-28

本文共 8269 字,大约阅读时间需要 27 分钟。

先上Demo动图,效果如下:

基本思路

由于redux更改数据是dispatch(action),所以很自然而然想到以action作为基本单位在服务端和客户端进行传送,在客户端和服务端用数组来存放action,那么只要当客户端和服务端的action队列的顺序保持一样,reducer是纯函数的特性可以知道计算得到的state是一样的。

一些约定

本文中C1,C2...Cn表示客户端,S表示服务端,a1,a2,a3...an表示aciton,服务端是使用koa + socket.io来编写的(作为一个前端,服务端的知识几乎为0勿喷)。

整体思路

当客户端发起一个action的时候,服务端接收到这个action,服务端做了3件事:

  1. 把action推进栈中
  2. 把该客户端a1之前的action发送该客户端(类似git push之前有个git pull的过程)
  3. 将a1发送给其他的客户端

不过上面的思路是比较笼统的,细想会发现许多问题:

  1. 如果a1到达S端的时候,C2、C3还有action在派发怎么处理?
  2. 如果a1到达S端的时候,C1正在接收其他客户端发送的action怎么处理?
  3. 如果a1发送之前,C1正在发送前一个action怎么处理? 后面我们一一解决。

服务端派发action机制

服务端设立了2个概念:target、member指编辑对象(可能是报告、流程图等)和编辑用户,当你要发送两个action:a1、a2的时候,因为网络发送先后的不确定性,所以应该是先发送a1,然后等待客户端接收到,再发送a2,这样才能在客户端中保证a1和a2的顺序。因此,每个member会有变量pending表示在不在发送,index表示发送的最新的action在action队列中的索引。

当服务端接收到客户端的aciton的时候

this.socket.on('client send action', (action) => {    //目标将action放入队列中,并返回该action的索引    let index = this.target.addAction(action);      if (this.pending) {        this.queue.push({          method: 'sendBeforeActions',          args: [index]        })      } else {        this.sendBeforeActions(index);      }      this.target.receiveAction({        member: this,        index      })    })复制代码

这就是上面讲的当服务端收到a1的时候做的3件事情。只是这里会去判断该member是不是正在执行发送任务,如果是,那么就将发送a1前面的aciton这个动作存入到一个动作队列中,并告知target,我这个member发送了一个action。

sendBeforeActions

sendBeforeActions(refIndex) {    let {actions} = this.getCurrent(refIndex);    actions = actions.slice(0, -1);    this.pending = true;    this.socket.emit('server send before actions', { actions }, () => {      this.pending = false;      this.target.setIndex(this, refIndex);      this.sendProcess();    });  }复制代码

这个函数接收一个索引,这个索引在上面的代码中是这个member接收到的action在队列中的索引,所以getCurrent(refIndex)指到refIndex为止,还没发送给这个member的所有的action(可能为空),所以要剔除本身后actions.slice(0, -1)发送给客户端。 回调中终止发送状态,设置member最新的action的index,然后执行sendProcess函数去看看,在自己本身发送的过程中,是不是有后续的动作存入到发送队列中了

sendProcess() {    if (this.queue.length > 0 && !this.pending) {      let current = this.queue.shift();      let method = this[current.method];      method.apply(this, current.args);    }  }复制代码

如果你注意到刚才的:

if (this.pending) {    this.queue.push({        method: 'sendBeforeActions',        args: [index]    })}复制代码

你就会发现,如果刚才想发送before action的时候这个member在发送其他action,那么会等待这个action发送完后才触发sendProcess去执行这个发送。

还要将这个action发送给其他用户

在刚才的代码中

//this指某个member对象this.target.receiveAction({    member: this,    index})复制代码

就是这个触发了其他客户端的发送

//this指某个target对象receiveAction({member, index}) {    this.members.forEach(m => {      if (m.id !== member.id) {        m.queue.push({          method: 'sendActions',          args: [index]        });        m.sendProcess();      }    })  }复制代码

如果members中存在发送方的member,那么会将发送动作存入member的发送队列中,执行sendProcess

sendActions

sendActions(refIndex) {    let {actions} = this.getCurrent(refIndex);    if (actions.length) {      this.pending = true;      this.socket.emit('server send actions', {actions}, () => {        this.pending = false;        this.target.setIndex(this, refIndex);        this.sendProcess();      })    }  }复制代码

这个函数和sendBeforeActions几乎一样,只差要不要剔除最新的action,这样,就保证了服务端的发送action顺序

客户端IO中间件

在客户端中,将io有关的操作都封装在一个中间件中

module.exports = store => next => action => {    if (action.type === 'connection') {        //连接初始化一些事件        return initIo(action.payload)    }      if (action.type === 'disconnection') {        return socket.disconnect(action.payload)    }    if (['@replace/state'].indexOf(action.type.toLowerCase()) === -1 && !action.escapeServer && !action.temporary) {        //将action给定userId、targetId        action = actionCreator(action);        //得到新的action队列,并计算actions,然后更新到state上        let newCacheActions = [...cacheActions, action];        mapActionsToState(newCacheActions);        //发送给服务端        return delieverAction(action);    }    //这样就只允许replace state 的action进入到store里面,这个是我这个思路在实现undo、redo的一个要求,后面会讲到    next();}复制代码

一些全局变量

具体作用后面会用到

let cacheActions = [];   //action队列,这个和服务端的action队列保持一致let currentActions = []; //根据cacheActions计算的actionlet redoActions = {};    //缓存每个用户的undo后拿掉的actionlet pending = false;     //是否在发送请求let actionsToPend = [];  //缓存发送队列let beforeActions = [];  //缓存pull下来的actionslet currentAction = null;//当前发送的actionlet user, tid;           //用户名和targetIdlet initialState;        //初始的statelet timeline = [];       //缓存state复制代码

客户端整体思路图

主要讲两个地方:

(1)在computeActions的时候,碰到undo拿掉该用户的最后一个action,并把倒数第二个action提升到最后的原因是因为假如在该用户倒数第二个action之后还有其他用户的action发生,那么可能其他用户会覆盖掉这个用户action的设定值,那么这个用户undo的时候就无法回到之前的状态了,这时候提升相当于是undo后做了新的action,这个action就是前一次的action。这个算法是有bug的,当一个用户undo的时候,由于我们会提升他倒数第二的action,这样会导致与这个action冲突的action的修改被覆盖。这个解决冲突的策略有点问题。如果没有提升,那么如果该用户undo的时候,如果他上一个action被其他用户的action覆盖了,那么他就无法undo回去了。这个是个痛点,我还在持续探索中,欢迎大神指教。

(2)在用户pending的时候收到了actions,这个时候相当于是before actions。 下面贴几个主要函数的代码

initIo

function initIo(payload, dispatch) {  user = payload.user;  tid = parseInt(payload.tid, 10);  //初始化socket  let socket = cache.socket = io(location.protocol + '//' + location.host, {    query: {      user: JSON.stringify(user),      tid    }  });  //获取初始数据  socket.on('deliver initial data', (params) => {    ...获取初始的state,actions  })  //发送action会等待pull之前的actions  socket.on('server send before actions', (payload, callback) => {    pending = false;    callback && callback();    let {actions} = payload;    actions = [...actions, ...beforeActions, currentAction];    cacheActions = [...cacheActions, ...actions];    if (actions.length > 1) {      //证明有前面的action,需要根据actions重新计算state      mapActionsToState();    }    if (actionsToPend.length) {      let action = actionsToPend.shift();      sendAction(action);    }  })  //接收actions  socket.on('server send actions', (payload, callback) => {    let {actions} = payload;    callback && callback();    if (pending) {      beforeActions = [...beforeActions, ...actions];    } else {      cacheActions = [...cacheActions, ...actions];      mapActionsToState();    }  })}复制代码

mapActionsToState

function mapActionsToState(actions) {  actions = actions || cacheActions;  if (actions.length === 0) {    return replaceState(dispatch)(initialState);  }  let {newCurrentActions, newRedoActions} = computeActions(actions);  let {same} = diff(newCurrentActions);  let state = initialState;  if (timeline[same]) {    state = timeline[same];    timeline = timeline.slice(0, same + 1);  }  if (same === -1) {    timeline = [];  }  let differentActions = newCurrentActions.slice(same + 1);  differentActions.forEach(action => {    state = store.reducer(state, action);    timeline.push(state);  });  currentActions = newCurrentActions;  redoActions = newRedoActions;  store.canUndo = () => currentActions.some(action => action.userId === user.id);  store.canRedo = () => !!(redoActions[user.id] || []).length;  return replaceState(dispatch)(state);}复制代码

computeActions

function computeActions(actions) {  let newCurrentActions = [];  let newRedoActions = {};  actions.forEach(action => {    let type = action.type.toLowerCase();    newRedoActions[action.userId] = newRedoActions[action.userId] || [];    if (type !== 'redo' && type !== 'undo') {      newCurrentActions.push(action);      newRedoActions[action.userId] = [];    }    if (type === 'undo') {      let indexes = [];      for (let i = newCurrentActions.length - 1; i >= 0; i--) {        if (newCurrentActions[i].userId === action.userId) {          indexes.push(i);        }        if (indexes.length === 2) {          break;        }      }      if (indexes.length > 0) {        let redo = newCurrentActions.splice(indexes[0], 1)[0];        newRedoActions[action.userId].push(redo);      }      if (indexes.length > 1) {        let temp = newCurrentActions.splice(indexes[1], 1);        newCurrentActions.push(temp[0]);      }    }    if (type === 'redo') {      let redo = newRedoActions[action.userId].pop();      newCurrentActions.push(redo);    }  });  return {    newCurrentActions,    newRedoActions  }}复制代码

diff

function diff(newCurrentActions) {  let same = -1;  newCurrentActions.some((action, index) => {    let currentAction = currentActions[index];    if (currentAction && action.id === currentAction.id) {      same = index;      return false;    }    return true;  });  return {    same  }}复制代码

结束语

讲了一堆,不知道有没有将自己的思路讲清楚,自己的demo也运行了起来,测试只用了两个浏览器来模拟测试,总感觉一些并发延时出现还会有bug,后面会持续优化这个想法,添加一些自动化测试来验证,另外,对于服务端的存储也还没考虑,先在只在内存中跑,会思考保存方案。希望对这方面有兴趣的大神可以指导一下

转载地址:http://bokml.baihongyu.com/

你可能感兴趣的文章
PowerShell应用之-批量执行SQL脚本
查看>>
职场加薪步步高升的五大法则
查看>>
增删主键及修改表名
查看>>
Gson库使用-排序字段(ExclusionStrategy)或者修改(FieldNamingStrategy)字段
查看>>
NTFS For Mac 的特点有哪些
查看>>
新技能,利用Reflector来修改dll引用
查看>>
Java编程的逻辑 (1) - 数据和变量
查看>>
我的屌丝giser成长记-研一篇(下)
查看>>
raft 分布式协议 -- mongodb
查看>>
[TypeScript] Using Lodash in TypeScript with Typings and SystemJS
查看>>
ASP.Net MVC开发基础学习笔记(1):走向MVC模式
查看>>
虚函数可不可以是内联函数
查看>>
据说看完这21个故事的人,30岁前都成了亿万富翁
查看>>
HDOJ-4505 小Q系列故事——电梯里的爱情
查看>>
【转】Navigation Drawer(导航抽屉)
查看>>
Linux Shell常用技巧(十)
查看>>
【从零之三(更)】自定义类中调用讯飞语音包错误解决办法
查看>>
【源代码】LinkedHashMap源代码剖析
查看>>
Android InputStream转Bitmap
查看>>
记录水电系统开发的心理【1】
查看>>