图片来源:截图自 https://blog.csdn.net/WH_Crx/article/details/119116325
先看代码
demo代码背景:通过Socket,实时同步笔记内容
const interviewDict: Record<string, InterviewDetail> = {};
export const interviewWebSocket: expressWs.WebsocketRequestHandler = (ws: IWebSocket, req) => { const id = req.query.id as string; ws.interviewId = id; ws.uuid = uuidv4(); ws.on('message', msg => { if (!ArrayBuffer.isView(msg)) { return; } const [type] = msg; const valueBytes = msg.slice(1); const value = decode.decode(valueBytes); switch (type) { case encode.encode(WSTypes.heartbeat)[0]: ws.send(getBuffer(WSTypes.heartbeat, id)); return; case encode.encode(WSTypes.setValue)[0]: if (value && interviewDict[id]) { interviewDict[id].value = value; getWsInstance() ?.getWss() ?.clients?.forEach((wsItem: IWebSocket) => { console.log('ws', wsItem.interviewId, wsItem.uuid, ws.uuid); if (wsItem.interviewId === id && wsItem.uuid !== ws.uuid) { wsItem.send(getBuffer(WSTypes.getValue, value)); } }); } return; case encode.encode(WSTypes.getValue)[0]: if (interviewDict[id]) { ws.send(getBuffer(WSTypes.getValue, interviewDict[id].value)); } return; } }); };
会不会出现问题?## 什么时候出现问题?### 场景一:为了充分利用cpu的多核,需要通过fork方式进行多进程运行。例:Egg默认执行start会创建和 CPU 核数相当的 app worker进程。#### 解决方法:进程间通信Egg中的Master、Agent和Worker当一个应用启动时,会同时启动这三类进程。| **类型** | **进程数量** | **作用** | **稳定性** | **是否运行业务代码** || ---------- | ------------ | -------------- | ------- | ------------ || **Master** | 1 | 进程管理,进程间消息转发 | 非常高 | 否 || **Agent** | 1 | 后台运行工作(长连接客户端) | 高 | 少量 || **Worker** | 一般设置为 CPU 核数 | 执行业务代码 | 一般 | 是 |interviewDict需要放在Agent中```{TypeScript}// 发送app.messenger.once('egg-ready', () => { app.messenger.sendToAgent('agent-event', { foo: 'bar' }); app.messenger.sendToApp('app-event', { foo: 'bar' }); });};// 接收app.messenger.on(action, (data) => { // process data});app.messenger.once(action, (data) => { // process data});
扩展:为什么Node.js不适合cpu密集型任务
进程间通信与线程间通讯:
node是单线程应用(一个进程一个线程),单线程的进程只能运行在一个 CPU内核 上,node通过创建多进程来进行实现多线程。
线程:由于同一进程中的多个线程具有相同的地址空间,线程间可以直接读写进程数据段(如全局变量)来进行通信只需要进程同步和互斥手段的辅助,保证数据的一致性。
进程:进程通讯需要通过系统IPC等方式进行通信,远比线程间通讯开销大。
场景二:
生产环境部署服务的应用,一般会部署到两台以上(可以是实体机也可以是虚拟机)。然后nginx负载均衡到每台机器上。
解决方法:redis、其它数据库。
这种情况下更适合redis
Redis数据全部存在内存,定期写入磁盘,当内存不够时,通过LRU算法删除数据。
Egg中redis的使用
// 配置redisconfig.redis = { client: { host: 'www.yzapp.cn', port: 6379, password: process.env.NESTOR_REDIS, db: 0 }};config.websocket = { // 配置 websocket 使用 redis 作消息广播 redis: config.redis,};
import { Service } from 'egg';const prefix = '00353:';/** * 调用redis的服务 */export default class RedisService extends Service { /** * 根据key获得值 * @param key key */ public async get(key: string) { const { redis, logger } = this.app; const t = Date.now(); let data = await redis.getBuffer(prefix + key); if (!data) return; const duration = Date.now() - t; logger.debug('Cache', 'get', key, duration + 'ms'); return data; } /** * 根据key存值 * @param key key * @param value value */ public async set(key: string, value: any) { const { redis, logger } = this.app; const t = Date.now(); await redis.set(prefix + key, value); const duration = Date.now() - t; logger.debug('Cache', 'set', key, duration + 'ms'); } /** * 根据key存值并设置过期时间 * @param key key * @param value value * @param seconds 过期时间 */ public async setex(key: string, value: any, seconds: number) { const { redis, logger } = this.app; const t = Date.now(); await redis.set(prefix + key, value, 'EX', seconds); const duration = Date.now() - t; logger.debug('Cache', 'set', key, value, duration + 'ms'); } /** * 根据key删除缓存 * @param key key */ public async del(key: string) { const { redis, logger } = this.app; const t = Date.now(); await redis.del(prefix + key); const duration = Date.now() - t; logger.debug('Cache', 'del', key, duration + 'ms'); } /** * 递增值并设定过期时间 * @param key * @param seconds */ public async incr(key: string, seconds: number) { const { redis, logger } = this.app; const t = Date.now(); const result = await redis .multi() .incr(prefix + key) .expire(key, seconds) .exec(); const duration = Date.now() - t; logger.debug('Cache', 'set', key, duration + 'ms'); return result[0][1]; }}
/** * 同步答题内容 * @param websocket * @param id */public async syncValue(websocket: EggWsClient, id: string): Promise<string | null> { const uuid = uuidV4(); // 根据interview id创建房间 websocket.room.join(id, ({message}) => { const data = JSON.parse(message.toString()); if (!data || !data.value || data.from === uuid) { return; } // 将消息发送到除自己外的其它房间成员 websocket.send(encode.encode(data.value)); }); // 从数据库里读取 const info = await this.findInterviewById(Number(id)); if (info === null) { return `没有找到${id}的数据`; } // 从redis读取最新的笔试信息 const value = await this.service.redis.get("room" + id); websocket .on('message', async (msg) => { // console.log('receive', msg); if (!ArrayBuffer.isView(msg)) { return; } const type = msg[0]; switch (type) { case encode.encode(WSTypes.heartbeat)[0]: websocket.send(this.getBuffer(WSTypes.heartbeat, encode.encode(id))); return; case encode.encode(WSTypes.setValue)[0]: // 存入redis最新的笔试信息 await this.service.redis.set("room" + id, msg.slice(1)); if (msg.slice(1)) { this.app.ws.sendJsonTo(id, { from: uuid, value: decode.decode(this.getBuffer(WSTypes.getValue, msg.slice(1))) }); } return; case encode.encode(WSTypes.getValue)[0]: if (!value) { return; } websocket.send(this.getBuffer(WSTypes.getValue, value)); return; } }) .on('close', (code, reason) => { console.log('websocket closed', code, reason); }); return null;}
具体代码见:https://github.com/nesror/bikini_service
提问:哪种数据库无法解决或者说不建议在这种情况下使用?
关于进程与线程需要详细了解可以参考下面
node的进程与线程:https://blog.csdn.net/WH_Crx/article/details/119116325
linux-进程与线程:https://segmentfault.com/a/1190000039297639
本文作者:大风车无限-Nestor_Gu
原文:https://juejin.cn/post/7098161598304354312