Mini-Redis教程.html 274 KB


  1. <!DOCTYPE html>
  2. <html lang="en-US">
  3. <head>
  4. <meta charset="utf-8">
  5. <meta name="viewport" content="width=device-width,initial-scale=1">
  6. <title>Mini-Redis教程 | Rust训练营教程文档</title>
  7. <meta name="generator" content="VuePress 1.9.10">
  8. <link rel="icon" href="/rust_camp_tutorial/logo.png">
  9. <meta name="description" content="DragonOS-Rust camp">
  10. <link rel="preload" href="/rust_camp_tutorial/assets/css/0.styles.7dd9be3e.css" as="style"><link rel="preload" href="/rust_camp_tutorial/assets/js/app.d7ab8f65.js" as="script"><link rel="preload" href="/rust_camp_tutorial/assets/js/2.3dc1b8de.js" as="script"><link rel="preload" href="/rust_camp_tutorial/assets/js/1.7f771cfb.js" as="script"><link rel="preload" href="/rust_camp_tutorial/assets/js/23.7f3a9620.js" as="script"><link rel="prefetch" href="/rust_camp_tutorial/assets/js/10.23a1f579.js"><link rel="prefetch" href="/rust_camp_tutorial/assets/js/11.c389195a.js"><link rel="prefetch" href="/rust_camp_tutorial/assets/js/12.1d996921.js"><link rel="prefetch" href="/rust_camp_tutorial/assets/js/13.4d4410c4.js"><link rel="prefetch" href="/rust_camp_tutorial/assets/js/14.37ef2a72.js"><link rel="prefetch" href="/rust_camp_tutorial/assets/js/15.5542c093.js"><link rel="prefetch" href="/rust_camp_tutorial/assets/js/16.d48fd1ce.js"><link rel="prefetch" href="/rust_camp_tutorial/assets/js/17.bd8d538c.js"><link rel="prefetch" href="/rust_camp_tutorial/assets/js/18.6d3b94c1.js"><link rel="prefetch" href="/rust_camp_tutorial/assets/js/19.eb35cfee.js"><link rel="prefetch" href="/rust_camp_tutorial/assets/js/20.c11ec329.js"><link rel="prefetch" href="/rust_camp_tutorial/assets/js/21.db1b5d88.js"><link rel="prefetch" href="/rust_camp_tutorial/assets/js/22.7714be7b.js"><link rel="prefetch" href="/rust_camp_tutorial/assets/js/24.2f88b37d.js"><link rel="prefetch" href="/rust_camp_tutorial/assets/js/25.df99dd5f.js"><link rel="prefetch" href="/rust_camp_tutorial/assets/js/26.606cfbc8.js"><link rel="prefetch" href="/rust_camp_tutorial/assets/js/27.928f1e6b.js"><link rel="prefetch" href="/rust_camp_tutorial/assets/js/28.f61a69ee.js"><link rel="prefetch" href="/rust_camp_tutorial/assets/js/29.802642cf.js"><link rel="prefetch" href="/rust_camp_tutorial/assets/js/3.5322f14a.js"><link rel="prefetch" href="/rust_camp_tutorial/assets/js/30.72f41aed.js"><link rel="prefetch" href="/rust_camp_tutorial/assets/js/31.e1aa8cbc.js"><link rel="prefetch" href="/rust_camp_tutorial/assets/js/32.30c1ff3b.js"><link rel="prefetch" href="/rust_camp_tutorial/assets/js/33.f21667a4.js"><link rel="prefetch" href="/rust_camp_tutorial/assets/js/4.84e1e480.js"><link rel="prefetch" href="/rust_camp_tutorial/assets/js/5.f0541060.js"><link rel="prefetch" href="/rust_camp_tutorial/assets/js/6.dfb06aa0.js"><link rel="prefetch" href="/rust_camp_tutorial/assets/js/7.7551a9fb.js"><link rel="prefetch" href="/rust_camp_tutorial/assets/js/vendors~docsearch.5e19b665.js">
  11. <link rel="stylesheet" href="/rust_camp_tutorial/assets/css/0.styles.7dd9be3e.css">
  12. </head>
  13. <body>
  14. <div id="app" data-server-rendered="true"><div class="theme-container"><header class="navbar"><div class="sidebar-button"><svg xmlns="http://www.w3.org/2000/svg" aria-hidden="true" role="img" viewBox="0 0 448 512" class="icon"><path fill="currentColor" d="M436 124H12c-6.627 0-12-5.373-12-12V80c0-6.627 5.373-12 12-12h424c6.627 0 12 5.373 12 12v32c0 6.627-5.373 12-12 12zm0 160H12c-6.627 0-12-5.373-12-12v-32c0-6.627 5.373-12 12-12h424c6.627 0 12 5.373 12 12v32c0 6.627-5.373 12-12 12zm0 160H12c-6.627 0-12-5.373-12-12v-32c0-6.627 5.373-12 12-12h424c6.627 0 12 5.373 12 12v32c0 6.627-5.373 12-12 12z"></path></svg></div> <a href="/rust_camp_tutorial/" class="home-link router-link-active"><img src="logo.png" alt="Rust训练营教程文档" class="logo"> <span class="site-name can-hide">Rust训练营教程文档</span></a> <div class="links"><div class="search-box"><input aria-label="Search" autocomplete="off" spellcheck="false" value=""> <!----></div> <nav class="nav-links can-hide"><div class="nav-item"><a href="/rust_camp_tutorial/" class="nav-link">
  15. 首页
  16. </a></div> <!----></nav></div></header> <div class="sidebar-mask"></div> <aside class="sidebar"><nav class="nav-links"><div class="nav-item"><a href="/rust_camp_tutorial/" class="nav-link">
  17. 首页
  18. </a></div> <!----></nav> <ul class="sidebar-links"><li><section class="sidebar-group depth-0"><p class="sidebar-heading open"><span>Mini-Redis教程</span> <!----></p> <ul class="sidebar-links sidebar-group-items"><li><a href="/rust_camp_tutorial/Rust%E6%96%87%E6%A1%A3/Mini-Redis%E6%95%99%E7%A8%8B.html#准备工作" class="sidebar-link">准备工作</a><ul class="sidebar-sub-headers"><li class="sidebar-sub-header"><a href="/rust_camp_tutorial/Rust%E6%96%87%E6%A1%A3/Mini-Redis%E6%95%99%E7%A8%8B.html#获取代码" class="sidebar-link">获取代码</a></li><li class="sidebar-sub-header"><a href="/rust_camp_tutorial/Rust%E6%96%87%E6%A1%A3/Mini-Redis%E6%95%99%E7%A8%8B.html#运行项目" class="sidebar-link">运行项目</a></li></ul></li><li><a href="/rust_camp_tutorial/Rust%E6%96%87%E6%A1%A3/Mini-Redis%E6%95%99%E7%A8%8B.html#module" class="sidebar-link">Module</a><ul class="sidebar-sub-headers"><li class="sidebar-sub-header"><a href="/rust_camp_tutorial/Rust%E6%96%87%E6%A1%A3/Mini-Redis%E6%95%99%E7%A8%8B.html#client" class="sidebar-link">client</a></li><li class="sidebar-sub-header"><a href="/rust_camp_tutorial/Rust%E6%96%87%E6%A1%A3/Mini-Redis%E6%95%99%E7%A8%8B.html#blocking-client" class="sidebar-link">blocking_client</a></li><li class="sidebar-sub-header"><a href="/rust_camp_tutorial/Rust%E6%96%87%E6%A1%A3/Mini-Redis%E6%95%99%E7%A8%8B.html#cmd" class="sidebar-link">cmd</a></li><li class="sidebar-sub-header"><a href="/rust_camp_tutorial/Rust%E6%96%87%E6%A1%A3/Mini-Redis%E6%95%99%E7%A8%8B.html#frame" class="sidebar-link">frame</a></li><li class="sidebar-sub-header"><a href="/rust_camp_tutorial/Rust%E6%96%87%E6%A1%A3/Mini-Redis%E6%95%99%E7%A8%8B.html#sever" class="sidebar-link">sever</a></li></ul></li><li><a href="/rust_camp_tutorial/Rust%E6%96%87%E6%A1%A3/Mini-Redis%E6%95%99%E7%A8%8B.html#function-2" class="sidebar-link">Function</a><ul class="sidebar-sub-headers"><li class="sidebar-sub-header"><a href="/rust_camp_tutorial/Rust%E6%96%87%E6%A1%A3/Mini-Redis%E6%95%99%E7%A8%8B.html#fn-buffer" class="sidebar-link">Fn Buffer</a></li></ul></li><li><a href="/rust_camp_tutorial/Rust%E6%96%87%E6%A1%A3/Mini-Redis%E6%95%99%E7%A8%8B.html#struct-16" class="sidebar-link">Struct</a><ul class="sidebar-sub-headers"><li class="sidebar-sub-header"><a href="/rust_camp_tutorial/Rust%E6%96%87%E6%A1%A3/Mini-Redis%E6%95%99%E7%A8%8B.html#buffer" class="sidebar-link">Buffer</a></li><li class="sidebar-sub-header"><a href="/rust_camp_tutorial/Rust%E6%96%87%E6%A1%A3/Mini-Redis%E6%95%99%E7%A8%8B.html#connection" class="sidebar-link">Connection</a></li></ul></li></ul></section></li></ul> </aside> <main class="page"> <div class="theme-default-content content__default"><h1 id="mini-redis教程"><a href="#mini-redis教程" class="header-anchor">#</a> Mini-Redis教程</h1> <h2 id="准备工作"><a href="#准备工作" class="header-anchor">#</a> 准备工作</h2> <h3 id="获取代码"><a href="#获取代码" class="header-anchor">#</a> 获取代码</h3> <div class="language-shell extra-class"><pre class="language-shell"><code><span class="token function">git</span> clone <span class="token parameter variable">--recursive</span> https://github.com/DragonOS-Community/rust_camp_code.git
  19. </code></pre></div><p>教程版请切换curse-mini-redis分支</p> <h3 id="运行项目"><a href="#运行项目" class="header-anchor">#</a> 运行项目</h3> <h4 id="运行服务端"><a href="#运行服务端" class="header-anchor">#</a> 运行服务端</h4> <div class="language-shell extra-class"><pre class="language-shell"><code><span class="token assign-left variable">RUST_LOG</span><span class="token operator">=</span>debug <span class="token function">cargo</span> run <span class="token parameter variable">--bin</span> mini-redis-server
  20. </code></pre></div><p>运行服务端之后,就可以运行redis操作的代码和redis的命令了。详情请参考example文件夹下的代码和redis的命令。</p> <h4 id="运行example的代码"><a href="#运行example的代码" class="header-anchor">#</a> 运行example的代码</h4> <div class="language-shell extra-class"><pre class="language-shell"><code><span class="token function">cargo</span> run <span class="token parameter variable">--example</span> hello_world
  21. </code></pre></div><h4 id="运行redis命令"><a href="#运行redis命令" class="header-anchor">#</a> 运行redis命令</h4> <div class="language-shell extra-class"><pre class="language-shell"><code><span class="token function">cargo</span> run <span class="token parameter variable">--bin</span> mini-redis-cli <span class="token builtin class-name">set</span> foo bar
  22. <span class="token function">cargo</span> run <span class="token parameter variable">--bin</span> mini-redis-cli get foo
  23. </code></pre></div><h2 id="module"><a href="#module" class="header-anchor">#</a> Module</h2> <h3 id="client"><a href="#client" class="header-anchor">#</a> client</h3> <blockquote><p>最小的redis会话。为所支持的命令提供异步连接。</p></blockquote> <p>代码位于src/clients/client.rs</p> <h4 id="client-2"><a href="#client-2" class="header-anchor">#</a> Client</h4> <h5 id="struct"><a href="#struct" class="header-anchor">#</a> Struct</h5> <p>用于建立和存储客户端连接</p> <div class="language-rust extra-class"><pre class="language-rust"><code><span class="token keyword">pub</span> <span class="token keyword">struct</span> <span class="token type-definition class-name">Client</span><span class="token punctuation">{</span>
  24. <span class="token comment">/// 客户端连接</span>
  25. connection<span class="token punctuation">:</span><span class="token class-name">Connection</span><span class="token punctuation">,</span>
  26. <span class="token punctuation">}</span>
  27. </code></pre></div><h5 id="implementation"><a href="#implementation" class="header-anchor">#</a> Implementation</h5> <div class="language-rust extra-class"><pre class="language-rust"><code><span class="token keyword">impl</span> <span class="token class-name">Client</span><span class="token punctuation">{</span>
  28. <span class="token comment">/// 建立连接</span>
  29. <span class="token keyword">pub</span> <span class="token keyword">async</span> <span class="token keyword">fn</span> <span class="token function-definition function">connect</span><span class="token operator">&lt;</span><span class="token class-name">T</span><span class="token punctuation">:</span> <span class="token class-name">ToSocketAddrs</span><span class="token operator">&gt;</span><span class="token punctuation">(</span>addr<span class="token punctuation">:</span> <span class="token class-name">T</span><span class="token punctuation">)</span> <span class="token punctuation">-&gt;</span> <span class="token keyword">crate</span><span class="token punctuation">::</span><span class="token class-name">Result</span><span class="token operator">&lt;</span><span class="token class-name">Client</span><span class="token operator">&gt;</span> <span class="token punctuation">{</span>
  30. <span class="token comment">// 与addr建立tcp连接并获取套接字</span>
  31. <span class="token keyword">let</span> socket <span class="token operator">=</span> <span class="token class-name">TcpStream</span><span class="token punctuation">::</span><span class="token function">connect</span><span class="token punctuation">(</span>addr<span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span><span class="token operator">?</span><span class="token punctuation">;</span>
  32. <span class="token comment">// 使用套接字建立连接</span>
  33. <span class="token keyword">let</span> connection <span class="token operator">=</span> <span class="token class-name">Connection</span><span class="token punctuation">::</span><span class="token function">new</span><span class="token punctuation">(</span>socket<span class="token punctuation">)</span><span class="token punctuation">;</span>
  34. <span class="token comment">// 返回客户端</span>
  35. <span class="token class-name">Ok</span><span class="token punctuation">(</span><span class="token class-name">Client</span> <span class="token punctuation">{</span> connection <span class="token punctuation">}</span><span class="token punctuation">)</span>
  36. <span class="token punctuation">}</span>
  37. <span class="token comment">/// ping客户端</span>
  38. <span class="token attribute attr-name">#[instrument(skip(self))]</span>
  39. <span class="token keyword">pub</span> <span class="token keyword">async</span> <span class="token keyword">fn</span> <span class="token function-definition function">ping</span><span class="token punctuation">(</span><span class="token operator">&amp;</span><span class="token keyword">mut</span> <span class="token keyword">self</span><span class="token punctuation">,</span> msg<span class="token punctuation">:</span> <span class="token class-name">Option</span><span class="token operator">&lt;</span><span class="token class-name">Bytes</span><span class="token operator">&gt;</span><span class="token punctuation">)</span> <span class="token punctuation">-&gt;</span> <span class="token keyword">crate</span><span class="token punctuation">::</span><span class="token class-name">Result</span><span class="token operator">&lt;</span><span class="token class-name">Bytes</span><span class="token operator">&gt;</span> <span class="token punctuation">{</span>
  40. <span class="token comment">// 构建ping消息</span>
  41. <span class="token keyword">let</span> frame <span class="token operator">=</span> <span class="token class-name">Ping</span><span class="token punctuation">::</span><span class="token function">new</span><span class="token punctuation">(</span>msg<span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token function">into_frame</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
  42. <span class="token comment">// 日志记录ping</span>
  43. <span class="token macro property">debug!</span><span class="token punctuation">(</span>request <span class="token operator">=</span> <span class="token operator">?</span>frame<span class="token punctuation">)</span><span class="token punctuation">;</span>
  44. <span class="token comment">// 发送ping消息</span>
  45. <span class="token keyword">self</span><span class="token punctuation">.</span>connection<span class="token punctuation">.</span><span class="token function">write_frame</span><span class="token punctuation">(</span><span class="token operator">&amp;</span>frame<span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span><span class="token operator">?</span><span class="token punctuation">;</span>
  46. <span class="token comment">// 读取响应,对Frame::Simple、Frame::Bulk、error类型进行处理</span>
  47. <span class="token keyword">match</span> <span class="token keyword">self</span><span class="token punctuation">.</span><span class="token function">read_response</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span><span class="token operator">?</span> <span class="token punctuation">{</span>
  48. <span class="token class-name">Frame</span><span class="token punctuation">::</span><span class="token class-name">Simple</span><span class="token punctuation">(</span>value<span class="token punctuation">)</span> <span class="token operator">=&gt;</span> <span class="token class-name">Ok</span><span class="token punctuation">(</span>value<span class="token punctuation">.</span><span class="token function">into</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">,</span>
  49. <span class="token class-name">Frame</span><span class="token punctuation">::</span><span class="token class-name">Bulk</span><span class="token punctuation">(</span>value<span class="token punctuation">)</span> <span class="token operator">=&gt;</span> <span class="token class-name">Ok</span><span class="token punctuation">(</span>value<span class="token punctuation">)</span><span class="token punctuation">,</span>
  50. frame <span class="token operator">=&gt;</span> <span class="token class-name">Err</span><span class="token punctuation">(</span>frame<span class="token punctuation">.</span><span class="token function">to_error</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">,</span>
  51. <span class="token punctuation">}</span>
  52. <span class="token punctuation">}</span>
  53. <span class="token comment">/// 获取key对应的value</span>
  54. <span class="token attribute attr-name">#[instrument(skip(self))]</span>
  55. <span class="token keyword">pub</span> <span class="token keyword">async</span> <span class="token keyword">fn</span> <span class="token function-definition function">get</span><span class="token punctuation">(</span><span class="token operator">&amp;</span><span class="token keyword">mut</span> <span class="token keyword">self</span><span class="token punctuation">,</span>key<span class="token punctuation">:</span><span class="token operator">&amp;</span><span class="token keyword">str</span><span class="token punctuation">)</span> <span class="token punctuation">-&gt;</span> <span class="token class-name">Result</span><span class="token operator">&lt;</span><span class="token class-name">Option</span><span class="token operator">&lt;</span><span class="token class-name">Bytes</span><span class="token operator">&gt;&gt;</span> <span class="token punctuation">{</span>
  56. <span class="token comment">// 创建获取'key'的值的'Get'命令,并将其转换成frame格式</span>
  57. <span class="token keyword">let</span> frame <span class="token operator">=</span> <span class="token class-name">Get</span><span class="token punctuation">::</span><span class="token function">new</span><span class="token punctuation">(</span>key<span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token function">into_frame</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
  58. <span class="token comment">// 日志记录frme</span>
  59. <span class="token macro property">debug!</span><span class="token punctuation">(</span>request <span class="token operator">=</span> <span class="token operator">?</span>frame<span class="token punctuation">)</span><span class="token punctuation">;</span>
  60. <span class="token comment">// 将frame通过连接异步写给套接字</span>
  61. <span class="token keyword">self</span><span class="token punctuation">.</span>connection<span class="token punctuation">.</span><span class="token function">write_frame</span><span class="token punctuation">(</span><span class="token operator">&amp;</span>frame<span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span><span class="token operator">?</span><span class="token punctuation">;</span>
  62. <span class="token comment">// 异步读响应,并匹配Simple、Bulk、NUll的情况</span>
  63. <span class="token keyword">match</span> <span class="token keyword">self</span><span class="token punctuation">.</span><span class="token function">read_response</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span><span class="token operator">?</span> <span class="token punctuation">{</span>
  64. <span class="token class-name">Frame</span><span class="token punctuation">::</span><span class="token class-name">Simple</span><span class="token punctuation">(</span>value<span class="token punctuation">)</span> <span class="token operator">=&gt;</span> <span class="token class-name">Ok</span><span class="token punctuation">(</span><span class="token class-name">Some</span><span class="token punctuation">(</span>value<span class="token punctuation">.</span><span class="token function">into</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">,</span>
  65. <span class="token class-name">Frame</span><span class="token punctuation">::</span><span class="token class-name">Bulk</span><span class="token punctuation">(</span>value<span class="token punctuation">)</span> <span class="token operator">=&gt;</span> <span class="token class-name">Ok</span><span class="token punctuation">(</span><span class="token class-name">Some</span><span class="token punctuation">(</span>value<span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">,</span>
  66. <span class="token class-name">Frame</span><span class="token punctuation">::</span><span class="token class-name">Null</span> <span class="token operator">=&gt;</span> <span class="token class-name">Ok</span><span class="token punctuation">(</span><span class="token class-name">None</span><span class="token punctuation">)</span><span class="token punctuation">,</span>
  67. frame <span class="token operator">=&gt;</span> <span class="token class-name">Err</span><span class="token punctuation">(</span>frame<span class="token punctuation">.</span><span class="token function">to_error</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">,</span>
  68. <span class="token punctuation">}</span>
  69. <span class="token punctuation">}</span>
  70. <span class="token comment">/// 设置key对应的value</span>
  71. <span class="token attribute attr-name">#[instrument(skip(self))]</span>
  72. <span class="token keyword">pub</span> <span class="token keyword">async</span> <span class="token keyword">fn</span> <span class="token function-definition function">set</span><span class="token punctuation">(</span><span class="token operator">&amp;</span><span class="token keyword">mut</span> <span class="token keyword">self</span><span class="token punctuation">,</span> key<span class="token punctuation">:</span> <span class="token operator">&amp;</span><span class="token keyword">str</span><span class="token punctuation">,</span> value<span class="token punctuation">:</span> <span class="token class-name">Bytes</span><span class="token punctuation">)</span> <span class="token punctuation">-&gt;</span> <span class="token keyword">crate</span><span class="token punctuation">::</span><span class="token class-name">Result</span><span class="token operator">&lt;</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token operator">&gt;</span> <span class="token punctuation">{</span>
  73. <span class="token comment">// 通过set_cmd异步创建过期时间为None的Set命令</span>
  74. <span class="token keyword">self</span><span class="token punctuation">.</span><span class="token function">set_cmd</span><span class="token punctuation">(</span><span class="token class-name">Set</span><span class="token punctuation">::</span><span class="token function">new</span><span class="token punctuation">(</span>key<span class="token punctuation">,</span> value<span class="token punctuation">,</span> <span class="token class-name">None</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span>
  75. <span class="token punctuation">}</span>
  76. <span class="token comment">/// 设置key对应的value,value在expiration后过期</span>
  77. <span class="token attribute attr-name">#[instrument(skip(self))]</span>
  78. <span class="token keyword">pub</span> <span class="token keyword">async</span> <span class="token keyword">fn</span> <span class="token function-definition function">set_expires</span><span class="token punctuation">(</span>
  79. <span class="token operator">&amp;</span><span class="token keyword">mut</span> <span class="token keyword">self</span><span class="token punctuation">,</span>
  80. key<span class="token punctuation">:</span> <span class="token operator">&amp;</span><span class="token keyword">str</span><span class="token punctuation">,</span>
  81. value<span class="token punctuation">:</span> <span class="token class-name">Bytes</span><span class="token punctuation">,</span>
  82. expiration<span class="token punctuation">:</span> <span class="token class-name">Duration</span><span class="token punctuation">)</span> <span class="token punctuation">-&gt;</span> <span class="token keyword">crate</span><span class="token punctuation">::</span><span class="token class-name">Result</span><span class="token operator">&lt;</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token operator">&gt;</span><span class="token punctuation">{</span>
  83. <span class="token comment">// 通过set_cmd异步创建过期时间为expiration的Set命令</span>
  84. <span class="token keyword">self</span><span class="token punctuation">.</span><span class="token function">set_cmd</span><span class="token punctuation">(</span><span class="token class-name">Set</span><span class="token punctuation">::</span><span class="token function">new</span><span class="token punctuation">(</span>key<span class="token punctuation">,</span> value<span class="token punctuation">,</span> <span class="token class-name">Some</span><span class="token punctuation">(</span>expiration<span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span>
  85. <span class="token punctuation">}</span>
  86. <span class="token comment">/// Set命令的主要逻辑</span>
  87. <span class="token keyword">async</span> <span class="token keyword">fn</span> <span class="token function-definition function">set_cmd</span><span class="token punctuation">(</span><span class="token operator">&amp;</span><span class="token keyword">mut</span> <span class="token keyword">self</span><span class="token punctuation">,</span> cmd<span class="token punctuation">:</span> <span class="token class-name">Set</span><span class="token punctuation">)</span> <span class="token punctuation">-&gt;</span> <span class="token keyword">crate</span><span class="token punctuation">::</span><span class="token class-name">Result</span><span class="token operator">&lt;</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token operator">&gt;</span> <span class="token punctuation">{</span>
  88. <span class="token comment">// 将cmd转化为frame的形式</span>
  89. <span class="token keyword">let</span> frame <span class="token operator">=</span> cmd<span class="token punctuation">.</span><span class="token function">into_frame</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
  90. <span class="token comment">// 将frame写入连接</span>
  91. <span class="token keyword">self</span><span class="token punctuation">.</span>connection<span class="token punctuation">.</span><span class="token function">write_frame</span><span class="token punctuation">(</span><span class="token operator">&amp;</span>frame<span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span><span class="token operator">?</span><span class="token punctuation">;</span>
  92. <span class="token comment">// 读取对frame的响应,并匹配响应状态</span>
  93. <span class="token comment">// 执行成功响应为&quot;OK&quot;</span>
  94. <span class="token comment">// 其他响应均为失败</span>
  95. <span class="token keyword">match</span> <span class="token keyword">self</span><span class="token punctuation">.</span><span class="token function">read_response</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span><span class="token operator">?</span> <span class="token punctuation">{</span>
  96. <span class="token class-name">Frame</span><span class="token punctuation">::</span><span class="token class-name">Simple</span><span class="token punctuation">(</span>response<span class="token punctuation">)</span> <span class="token keyword">if</span> response <span class="token operator">==</span> <span class="token string">&quot;OK&quot;</span> <span class="token operator">=&gt;</span> <span class="token class-name">Ok</span><span class="token punctuation">(</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">,</span>
  97. frame <span class="token operator">=&gt;</span> <span class="token class-name">Err</span><span class="token punctuation">(</span>frame<span class="token punctuation">.</span><span class="token function">to_error</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">,</span>
  98. <span class="token punctuation">}</span>
  99. <span class="token punctuation">}</span>
  100. <span class="token comment">/// 将信息推送给指定的频道</span>
  101. <span class="token attribute attr-name">#[instrument(skip(self))]</span>
  102. <span class="token keyword">pub</span> <span class="token keyword">async</span> <span class="token keyword">fn</span> <span class="token function-definition function">publish</span><span class="token punctuation">(</span><span class="token operator">&amp;</span><span class="token keyword">mut</span> <span class="token keyword">self</span><span class="token punctuation">,</span> channel<span class="token punctuation">:</span> <span class="token operator">&amp;</span><span class="token keyword">str</span><span class="token punctuation">,</span> message<span class="token punctuation">:</span> <span class="token class-name">Bytes</span><span class="token punctuation">)</span> <span class="token punctuation">-&gt;</span> <span class="token keyword">crate</span><span class="token punctuation">::</span><span class="token class-name">Result</span><span class="token operator">&lt;</span><span class="token keyword">u64</span><span class="token operator">&gt;</span> <span class="token punctuation">{</span>
  103. <span class="token comment">// 将Publish命令转成frame的格式</span>
  104. <span class="token keyword">let</span> frame <span class="token operator">=</span> <span class="token class-name">Publish</span><span class="token punctuation">::</span><span class="token function">new</span><span class="token punctuation">(</span>channel<span class="token punctuation">,</span> message<span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token function">into_frame</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
  105. <span class="token comment">// 日志记录frme</span>
  106. <span class="token macro property">debug!</span><span class="token punctuation">(</span>request <span class="token operator">=</span> <span class="token operator">?</span>frame<span class="token punctuation">)</span><span class="token punctuation">;</span>
  107. <span class="token comment">// 将frame通过连接异步写给套接字</span>
  108. <span class="token keyword">self</span><span class="token punctuation">.</span>connection<span class="token punctuation">.</span><span class="token function">write_frame</span><span class="token punctuation">(</span><span class="token operator">&amp;</span>frame<span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span><span class="token operator">?</span><span class="token punctuation">;</span>
  109. <span class="token comment">// 异步读请求,并匹配请求类型</span>
  110. <span class="token comment">// 如果是整数帧,则返回OK(response)</span>
  111. <span class="token comment">// 否则返回错误</span>
  112. <span class="token keyword">match</span> <span class="token keyword">self</span><span class="token punctuation">.</span><span class="token function">read_response</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span><span class="token operator">?</span> <span class="token punctuation">{</span>
  113. <span class="token class-name">Frame</span><span class="token punctuation">::</span><span class="token class-name">Integer</span><span class="token punctuation">(</span>response<span class="token punctuation">)</span> <span class="token operator">=&gt;</span> <span class="token class-name">Ok</span><span class="token punctuation">(</span>response<span class="token punctuation">)</span><span class="token punctuation">,</span>
  114. frame <span class="token operator">=&gt;</span> <span class="token class-name">Err</span><span class="token punctuation">(</span>frame<span class="token punctuation">.</span><span class="token function">to_error</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">,</span>
  115. <span class="token punctuation">}</span>
  116. <span class="token punctuation">}</span>
  117. <span class="token comment">/// SUBSCRIBE的主要逻辑</span>
  118. <span class="token keyword">async</span> <span class="token keyword">fn</span> <span class="token function-definition function">subscribe_cmd</span><span class="token punctuation">(</span><span class="token operator">&amp;</span><span class="token keyword">mut</span> <span class="token keyword">self</span><span class="token punctuation">,</span> channels<span class="token punctuation">:</span> <span class="token operator">&amp;</span><span class="token punctuation">[</span><span class="token class-name">String</span><span class="token punctuation">]</span><span class="token punctuation">)</span> <span class="token punctuation">-&gt;</span> <span class="token keyword">crate</span><span class="token punctuation">::</span><span class="token class-name">Result</span><span class="token operator">&lt;</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token operator">&gt;</span> <span class="token punctuation">{</span>
  119. <span class="token comment">// 将Subscribe命令转成frame的格式</span>
  120. <span class="token keyword">let</span> frame <span class="token operator">=</span> <span class="token class-name">Subscribe</span><span class="token punctuation">::</span><span class="token function">new</span><span class="token punctuation">(</span>channels<span class="token punctuation">.</span><span class="token function">to_vec</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token function">into_frame</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
  121. <span class="token comment">// 日志记录frme</span>
  122. <span class="token macro property">debug!</span><span class="token punctuation">(</span>request <span class="token operator">=</span> <span class="token operator">?</span>frame<span class="token punctuation">)</span><span class="token punctuation">;</span>
  123. <span class="token comment">// 将frame通过连接异步写给套接字</span>
  124. <span class="token keyword">self</span><span class="token punctuation">.</span>connection<span class="token punctuation">.</span><span class="token function">write_frame</span><span class="token punctuation">(</span><span class="token operator">&amp;</span>frame<span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span><span class="token operator">?</span><span class="token punctuation">;</span>
  125. <span class="token comment">// 遍历channels,服务端通过响应确认订阅每个频道</span>
  126. <span class="token keyword">for</span> channel <span class="token keyword">in</span> channels <span class="token punctuation">{</span>
  127. <span class="token comment">// 获取响应</span>
  128. <span class="token keyword">let</span> response <span class="token operator">=</span> <span class="token keyword">self</span><span class="token punctuation">.</span><span class="token function">read_response</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span><span class="token operator">?</span><span class="token punctuation">;</span>
  129. <span class="token comment">// 分析不同情况的响应</span>
  130. <span class="token keyword">match</span> response <span class="token punctuation">{</span>
  131. <span class="token class-name">Frame</span><span class="token punctuation">::</span><span class="token class-name">Array</span><span class="token punctuation">(</span><span class="token keyword">ref</span> frame<span class="token punctuation">)</span> <span class="token operator">=&gt;</span> <span class="token keyword">match</span> frame<span class="token punctuation">.</span><span class="token function">as_slice</span><span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token punctuation">{</span>
  132. <span class="token punctuation">[</span>subscribe<span class="token punctuation">,</span> schannel<span class="token punctuation">,</span> <span class="token punctuation">..</span><span class="token punctuation">]</span>
  133. <span class="token keyword">if</span> <span class="token operator">*</span>subscribe <span class="token operator">==</span> <span class="token string">&quot;subscribe&quot;</span> <span class="token operator">&amp;&amp;</span> <span class="token operator">*</span>schannel <span class="token operator">==</span> channel <span class="token operator">=&gt;</span> <span class="token punctuation">{</span><span class="token punctuation">}</span>
  134. _ <span class="token operator">=&gt;</span> <span class="token keyword">return</span> <span class="token class-name">Err</span><span class="token punctuation">(</span>response<span class="token punctuation">.</span><span class="token function">to_error</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">,</span>
  135. <span class="token punctuation">}</span><span class="token punctuation">,</span>
  136. frame <span class="token operator">=&gt;</span> <span class="token keyword">return</span> <span class="token class-name">Err</span><span class="token punctuation">(</span>frame<span class="token punctuation">.</span><span class="token function">to_error</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">,</span>
  137. <span class="token punctuation">}</span><span class="token punctuation">;</span>
  138. <span class="token punctuation">}</span>
  139. <span class="token class-name">Ok</span><span class="token punctuation">(</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
  140. <span class="token punctuation">}</span>
  141. <span class="token comment">/// 监听若干个指定的频道</span>
  142. <span class="token attribute attr-name">#[instrument(skip(self))]</span>
  143. <span class="token keyword">pub</span> <span class="token keyword">async</span> <span class="token keyword">fn</span> <span class="token function-definition function">subscribe</span><span class="token punctuation">(</span><span class="token keyword">mut</span> <span class="token keyword">self</span><span class="token punctuation">,</span> channels<span class="token punctuation">:</span> <span class="token class-name">Vec</span><span class="token operator">&lt;</span><span class="token class-name">String</span><span class="token operator">&gt;</span><span class="token punctuation">)</span> <span class="token punctuation">-&gt;</span> <span class="token keyword">crate</span><span class="token punctuation">::</span><span class="token class-name">Result</span><span class="token operator">&lt;</span><span class="token class-name">Subscriber</span><span class="token operator">&gt;</span> <span class="token punctuation">{</span>
  144. <span class="token comment">// 异步调用subscribe_cmd,客户端状态将会转为subscriber</span>
  145. <span class="token keyword">self</span><span class="token punctuation">.</span><span class="token function">subscribe_cmd</span><span class="token punctuation">(</span><span class="token operator">&amp;</span>channels<span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span><span class="token operator">?</span><span class="token punctuation">;</span>
  146. <span class="token comment">// 返回Subscriber类型对象</span>
  147. <span class="token class-name">Ok</span><span class="token punctuation">(</span><span class="token class-name">Subscriber</span> <span class="token punctuation">{</span>
  148. client<span class="token punctuation">:</span> <span class="token keyword">self</span><span class="token punctuation">,</span>
  149. subscribed_channels<span class="token punctuation">:</span> channels<span class="token punctuation">,</span>
  150. <span class="token punctuation">}</span><span class="token punctuation">)</span>
  151. <span class="token punctuation">}</span>
  152. <span class="token comment">/// 读取响应</span>
  153. <span class="token keyword">async</span> <span class="token keyword">fn</span> <span class="token function-definition function">read_response</span><span class="token punctuation">(</span><span class="token operator">&amp;</span><span class="token keyword">mut</span> <span class="token keyword">self</span><span class="token punctuation">)</span> <span class="token punctuation">-&gt;</span> <span class="token keyword">crate</span><span class="token punctuation">::</span><span class="token class-name">Result</span><span class="token operator">&lt;</span><span class="token class-name">Frame</span><span class="token operator">&gt;</span> <span class="token punctuation">{</span>
  154. <span class="token comment">// 异步读取连接的frame</span>
  155. <span class="token keyword">let</span> response <span class="token operator">=</span> <span class="token keyword">self</span><span class="token punctuation">.</span>connection<span class="token punctuation">.</span><span class="token function">read_frame</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span><span class="token operator">?</span><span class="token punctuation">;</span>
  156. <span class="token comment">// 日志记录读取信息</span>
  157. <span class="token macro property">debug!</span><span class="token punctuation">(</span><span class="token operator">?</span>response<span class="token punctuation">)</span><span class="token punctuation">;</span>
  158. <span class="token comment">// 解析响应,判断响应的类型</span>
  159. <span class="token comment">// Some(Frame::Error(msg))</span>
  160. <span class="token comment">// Some(frame)</span>
  161. <span class="token comment">// None:响应为None意味着服务端关闭</span>
  162. <span class="token keyword">match</span> response <span class="token punctuation">{</span>
  163. <span class="token comment">// Error frames are converted to `Err`</span>
  164. <span class="token class-name">Some</span><span class="token punctuation">(</span><span class="token class-name">Frame</span><span class="token punctuation">::</span><span class="token class-name">Error</span><span class="token punctuation">(</span>msg<span class="token punctuation">)</span><span class="token punctuation">)</span> <span class="token operator">=&gt;</span> <span class="token class-name">Err</span><span class="token punctuation">(</span>msg<span class="token punctuation">.</span><span class="token function">into</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">,</span>
  165. <span class="token class-name">Some</span><span class="token punctuation">(</span>frame<span class="token punctuation">)</span> <span class="token operator">=&gt;</span> <span class="token class-name">Ok</span><span class="token punctuation">(</span>frame<span class="token punctuation">)</span><span class="token punctuation">,</span>
  166. <span class="token class-name">None</span> <span class="token operator">=&gt;</span> <span class="token punctuation">{</span>
  167. <span class="token comment">// Receiving `None` here indicates the server has closed the</span>
  168. <span class="token comment">// connection without sending a frame. This is unexpected and is</span>
  169. <span class="token comment">// represented as a &quot;connection reset by peer&quot; error.</span>
  170. <span class="token keyword">let</span> err <span class="token operator">=</span> <span class="token class-name">Error</span><span class="token punctuation">::</span><span class="token function">new</span><span class="token punctuation">(</span><span class="token class-name">ErrorKind</span><span class="token punctuation">::</span><span class="token class-name">ConnectionReset</span><span class="token punctuation">,</span> <span class="token string">&quot;connection reset by server&quot;</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
  171. <span class="token class-name">Err</span><span class="token punctuation">(</span>err<span class="token punctuation">.</span><span class="token function">into</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
  172. <span class="token punctuation">}</span>
  173. <span class="token punctuation">}</span>
  174. <span class="token punctuation">}</span>
  175. <span class="token punctuation">}</span>
  176. </code></pre></div><h4 id="message"><a href="#message" class="header-anchor">#</a> Message</h4> <h5 id="struct-2"><a href="#struct-2" class="header-anchor">#</a> Struct</h5> <div class="language-rust extra-class"><pre class="language-rust"><code><span class="token comment">/// A message received on a subscribed channel.</span>
  177. <span class="token attribute attr-name">#[derive(Debug, Clone)]</span>
  178. <span class="token keyword">pub</span> <span class="token keyword">struct</span> <span class="token type-definition class-name">Message</span> <span class="token punctuation">{</span>
  179. <span class="token keyword">pub</span> channel<span class="token punctuation">:</span> <span class="token class-name">String</span><span class="token punctuation">,</span>
  180. <span class="token keyword">pub</span> content<span class="token punctuation">:</span> <span class="token class-name">Bytes</span><span class="token punctuation">,</span>
  181. <span class="token punctuation">}</span>
  182. </code></pre></div><h4 id="subscriber"><a href="#subscriber" class="header-anchor">#</a> Subscriber</h4> <blockquote><p>订阅者类型,在客户端订阅若干频道后会转变成订阅者,此时只能执行publish/subscribe相关的命令</p></blockquote> <p>代码位置:src/clients/client.rs</p> <h5 id="struct-3"><a href="#struct-3" class="header-anchor">#</a> Struct</h5> <div class="language-rust extra-class"><pre class="language-rust"><code><span class="token keyword">pub</span> <span class="token keyword">struct</span> <span class="token type-definition class-name">Subscriber</span> <span class="token punctuation">{</span>
  183. <span class="token comment">/// 客户端</span>
  184. client<span class="token punctuation">:</span> <span class="token class-name">Client</span><span class="token punctuation">,</span>
  185. <span class="token comment">/// 所订阅的频道集合</span>
  186. subscribed_channels<span class="token punctuation">:</span> <span class="token class-name">Vec</span><span class="token operator">&lt;</span><span class="token class-name">String</span><span class="token operator">&gt;</span><span class="token punctuation">,</span>
  187. <span class="token punctuation">}</span>
  188. </code></pre></div><h5 id="implementation-2"><a href="#implementation-2" class="header-anchor">#</a> Implementation</h5> <div class="language-rust extra-class"><pre class="language-rust"><code><span class="token keyword">impl</span> <span class="token class-name">Subscriber</span><span class="token punctuation">{</span>
  189. <span class="token comment">/// 获取订阅的频道集合</span>
  190. <span class="token keyword">pub</span> <span class="token keyword">fn</span> <span class="token function-definition function">get_subscribed</span><span class="token punctuation">(</span><span class="token operator">&amp;</span><span class="token keyword">self</span><span class="token punctuation">)</span> <span class="token punctuation">-&gt;</span> <span class="token operator">&amp;</span><span class="token punctuation">[</span><span class="token class-name">String</span><span class="token punctuation">]</span> <span class="token punctuation">{</span>
  191. <span class="token operator">&amp;</span><span class="token keyword">self</span><span class="token punctuation">.</span>subscribed_channels
  192. <span class="token punctuation">}</span>
  193. <span class="token comment">/// 获取客户端收到的下一条消息(可能需要等待)</span>
  194. <span class="token keyword">pub</span> <span class="token keyword">async</span> <span class="token keyword">fn</span> <span class="token function-definition function">next_message</span><span class="token punctuation">(</span><span class="token operator">&amp;</span><span class="token keyword">mut</span> <span class="token keyword">self</span><span class="token punctuation">)</span> <span class="token punctuation">-&gt;</span> <span class="token keyword">crate</span><span class="token punctuation">::</span><span class="token class-name">Result</span><span class="token operator">&lt;</span><span class="token class-name">Option</span><span class="token operator">&lt;</span><span class="token class-name">Message</span><span class="token operator">&gt;&gt;</span> <span class="token punctuation">{</span>
  195. <span class="token keyword">match</span> <span class="token keyword">self</span><span class="token punctuation">.</span>client<span class="token punctuation">.</span>connection<span class="token punctuation">.</span><span class="token function">read_frame</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span><span class="token operator">?</span> <span class="token punctuation">{</span>
  196. <span class="token class-name">Some</span><span class="token punctuation">(</span>mframe<span class="token punctuation">)</span> <span class="token operator">=&gt;</span> <span class="token punctuation">{</span>
  197. <span class="token comment">// 使用日志记录mframe</span>
  198. <span class="token macro property">debug!</span><span class="token punctuation">(</span><span class="token operator">?</span>mframe<span class="token punctuation">)</span><span class="token punctuation">;</span>
  199. <span class="token keyword">match</span> mframe <span class="token punctuation">{</span>
  200. <span class="token class-name">Frame</span><span class="token punctuation">::</span><span class="token class-name">Array</span><span class="token punctuation">(</span><span class="token keyword">ref</span> frame<span class="token punctuation">)</span> <span class="token operator">=&gt;</span> <span class="token keyword">match</span> frame<span class="token punctuation">.</span><span class="token function">as_slice</span><span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token punctuation">{</span>
  201. <span class="token comment">// frame分片后格式为[message,channel,content]</span>
  202. <span class="token comment">// 当接收到信息时,message == 'message'</span>
  203. <span class="token comment">// 成立则将信息转化为Message的形式返回</span>
  204. <span class="token comment">// 否则返回错误</span>
  205. <span class="token punctuation">[</span>message<span class="token punctuation">,</span> channel<span class="token punctuation">,</span> content<span class="token punctuation">]</span> <span class="token keyword">if</span> <span class="token operator">*</span>message <span class="token operator">==</span> <span class="token string">&quot;message&quot;</span> <span class="token operator">=&gt;</span> <span class="token class-name">Ok</span><span class="token punctuation">(</span><span class="token class-name">Some</span><span class="token punctuation">(</span><span class="token class-name">Message</span> <span class="token punctuation">{</span>
  206. channel<span class="token punctuation">:</span> channel<span class="token punctuation">.</span><span class="token function">to_string</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">,</span>
  207. content<span class="token punctuation">:</span> <span class="token class-name">Bytes</span><span class="token punctuation">::</span><span class="token function">from</span><span class="token punctuation">(</span>content<span class="token punctuation">.</span><span class="token function">to_string</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">,</span>
  208. <span class="token punctuation">}</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">,</span>
  209. _ <span class="token operator">=&gt;</span> <span class="token class-name">Err</span><span class="token punctuation">(</span>mframe<span class="token punctuation">.</span><span class="token function">to_error</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">,</span>
  210. <span class="token punctuation">}</span><span class="token punctuation">,</span>
  211. frame <span class="token operator">=&gt;</span> <span class="token class-name">Err</span><span class="token punctuation">(</span>frame<span class="token punctuation">.</span><span class="token function">to_error</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">,</span>
  212. <span class="token punctuation">}</span>
  213. <span class="token punctuation">}</span>
  214. <span class="token class-name">None</span> <span class="token operator">=&gt;</span> <span class="token class-name">Ok</span><span class="token punctuation">(</span><span class="token class-name">None</span><span class="token punctuation">)</span><span class="token punctuation">,</span>
  215. <span class="token punctuation">}</span>
  216. <span class="token punctuation">}</span>
  217. <span class="token comment">/// 将收到的message转换成stream形式</span>
  218. <span class="token keyword">pub</span> <span class="token keyword">fn</span> <span class="token function-definition function">into_stream</span><span class="token punctuation">(</span><span class="token keyword">mut</span> <span class="token keyword">self</span><span class="token punctuation">)</span> <span class="token punctuation">-&gt;</span> <span class="token keyword">impl</span> <span class="token class-name">Stream</span><span class="token operator">&lt;</span><span class="token class-name">Item</span> <span class="token operator">=</span> <span class="token keyword">crate</span><span class="token punctuation">::</span><span class="token class-name">Result</span><span class="token operator">&lt;</span><span class="token class-name">Message</span><span class="token operator">&gt;&gt;</span> <span class="token punctuation">{</span>
  219. <span class="token macro property">try_stream!</span> <span class="token punctuation">{</span>
  220. <span class="token keyword">while</span> <span class="token keyword">let</span> <span class="token class-name">Some</span><span class="token punctuation">(</span>message<span class="token punctuation">)</span> <span class="token operator">=</span> <span class="token keyword">self</span><span class="token punctuation">.</span><span class="token function">next_message</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span><span class="token operator">?</span> <span class="token punctuation">{</span>
  221. <span class="token keyword">yield</span> message<span class="token punctuation">;</span>
  222. <span class="token punctuation">}</span>
  223. <span class="token punctuation">}</span>
  224. <span class="token punctuation">}</span>
  225. <span class="token comment">/// 订阅新的频道集合</span>
  226. <span class="token attribute attr-name">#[instrument(skip(self))]</span>
  227. <span class="token keyword">pub</span> <span class="token keyword">async</span> <span class="token keyword">fn</span> <span class="token function-definition function">subscribe</span><span class="token punctuation">(</span><span class="token operator">&amp;</span><span class="token keyword">mut</span> <span class="token keyword">self</span><span class="token punctuation">,</span> channels<span class="token punctuation">:</span> <span class="token operator">&amp;</span><span class="token punctuation">[</span><span class="token class-name">String</span><span class="token punctuation">]</span><span class="token punctuation">)</span> <span class="token punctuation">-&gt;</span> <span class="token keyword">crate</span><span class="token punctuation">::</span><span class="token class-name">Result</span><span class="token operator">&lt;</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token operator">&gt;</span> <span class="token punctuation">{</span>
  228. <span class="token comment">// 调用subscribe_cmd,订阅新的频道集合</span>
  229. <span class="token keyword">self</span><span class="token punctuation">.</span>client<span class="token punctuation">.</span><span class="token function">subscribe_cmd</span><span class="token punctuation">(</span>channels<span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span><span class="token operator">?</span><span class="token punctuation">;</span>
  230. <span class="token comment">// 更新当前的已订阅频道</span>
  231. <span class="token keyword">self</span><span class="token punctuation">.</span>subscribed_channels
  232. <span class="token punctuation">.</span><span class="token function">extend</span><span class="token punctuation">(</span>channels<span class="token punctuation">.</span><span class="token function">iter</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token function">map</span><span class="token punctuation">(</span><span class="token class-name">Clone</span><span class="token punctuation">::</span>clone<span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
  233. <span class="token class-name">Ok</span><span class="token punctuation">(</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
  234. <span class="token punctuation">}</span>
  235. <span class="token comment">/// 取消订阅指定的频道</span>
  236. <span class="token attribute attr-name">#[instrument(skip(self))]</span>
  237. <span class="token keyword">pub</span> <span class="token keyword">async</span> <span class="token keyword">fn</span> <span class="token function-definition function">unsubscribe</span><span class="token punctuation">(</span><span class="token operator">&amp;</span><span class="token keyword">mut</span> <span class="token keyword">self</span><span class="token punctuation">,</span> channels<span class="token punctuation">:</span> <span class="token operator">&amp;</span><span class="token punctuation">[</span><span class="token class-name">String</span><span class="token punctuation">]</span><span class="token punctuation">)</span> <span class="token punctuation">-&gt;</span> <span class="token keyword">crate</span><span class="token punctuation">::</span><span class="token class-name">Result</span><span class="token operator">&lt;</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token operator">&gt;</span> <span class="token punctuation">{</span>
  238. <span class="token comment">// 创建Unsubcribe命令,并转为frame形式</span>
  239. <span class="token keyword">let</span> frame <span class="token operator">=</span> <span class="token class-name">Unsubscribe</span><span class="token punctuation">::</span><span class="token function">new</span><span class="token punctuation">(</span>channels<span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token function">into_frame</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
  240. <span class="token comment">// 日志记录请求</span>
  241. <span class="token macro property">debug!</span><span class="token punctuation">(</span>request <span class="token operator">=</span> <span class="token operator">?</span>frame<span class="token punctuation">)</span><span class="token punctuation">;</span>
  242. <span class="token comment">// 将请求写入连接</span>
  243. <span class="token keyword">self</span><span class="token punctuation">.</span>client<span class="token punctuation">.</span>connection<span class="token punctuation">.</span><span class="token function">write_frame</span><span class="token punctuation">(</span><span class="token operator">&amp;</span>frame<span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span><span class="token operator">?</span><span class="token punctuation">;</span>
  244. <span class="token comment">// 如果channels为空,那么取消订阅所有频道</span>
  245. <span class="token comment">// 否则只取消channels中指定的频道</span>
  246. <span class="token keyword">let</span> num <span class="token operator">=</span> <span class="token keyword">if</span> channels<span class="token punctuation">.</span><span class="token function">is_empty</span><span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token punctuation">{</span>
  247. <span class="token keyword">self</span><span class="token punctuation">.</span>subscribed_channels<span class="token punctuation">.</span><span class="token function">len</span><span class="token punctuation">(</span><span class="token punctuation">)</span>
  248. <span class="token punctuation">}</span> <span class="token keyword">else</span> <span class="token punctuation">{</span>
  249. channels<span class="token punctuation">.</span><span class="token function">len</span><span class="token punctuation">(</span><span class="token punctuation">)</span>
  250. <span class="token punctuation">}</span><span class="token punctuation">;</span>
  251. <span class="token comment">// 解析响应</span>
  252. <span class="token keyword">for</span> _ <span class="token keyword">in</span> <span class="token number">0</span><span class="token punctuation">..</span>num <span class="token punctuation">{</span>
  253. <span class="token comment">// 读取响应</span>
  254. <span class="token macro property">todo!</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
  255. <span class="token comment">// 判断响应类型</span>
  256. <span class="token keyword">match</span> response <span class="token punctuation">{</span>
  257. <span class="token class-name">Frame</span><span class="token punctuation">::</span><span class="token class-name">Array</span><span class="token punctuation">(</span><span class="token keyword">ref</span> frame<span class="token punctuation">)</span> <span class="token operator">=&gt;</span> <span class="token keyword">match</span> frame<span class="token punctuation">.</span><span class="token function">as_slice</span><span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token punctuation">{</span>
  258. <span class="token comment">// frame分片后格式为['unsubscribe',channel,..]</span>
  259. <span class="token comment">// 判断是否取消订阅</span>
  260. <span class="token punctuation">[</span>unsubscribe<span class="token punctuation">,</span> channel<span class="token punctuation">,</span> <span class="token punctuation">..</span><span class="token punctuation">]</span> <span class="token keyword">if</span> <span class="token operator">*</span>unsubscribe <span class="token operator">==</span> <span class="token string">&quot;unsubscribe&quot;</span> <span class="token operator">=&gt;</span> <span class="token punctuation">{</span>
  261. <span class="token comment">// 获取当前订阅数组的长度</span>
  262. <span class="token keyword">let</span> len <span class="token operator">=</span> <span class="token keyword">self</span><span class="token punctuation">.</span>subscribed_channels<span class="token punctuation">.</span><span class="token function">len</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
  263. <span class="token comment">// 长度为0,返回错误</span>
  264. <span class="token keyword">if</span> len <span class="token operator">==</span> <span class="token number">0</span> <span class="token punctuation">{</span>
  265. <span class="token comment">// There must be at least one channel</span>
  266. <span class="token keyword">return</span> <span class="token class-name">Err</span><span class="token punctuation">(</span>response<span class="token punctuation">.</span><span class="token function">to_error</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
  267. <span class="token punctuation">}</span>
  268. <span class="token comment">// 当channel存在于subscribed_channels时将其删除</span>
  269. <span class="token keyword">self</span><span class="token punctuation">.</span>subscribed_channels<span class="token punctuation">.</span><span class="token function">retain</span><span class="token punctuation">(</span><span class="token closure-params"><span class="token closure-punctuation punctuation">|</span>c<span class="token closure-punctuation punctuation">|</span></span> <span class="token operator">*</span>channel <span class="token operator">!=</span> <span class="token operator">&amp;</span>c<span class="token punctuation">[</span><span class="token punctuation">..</span><span class="token punctuation">]</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
  270. <span class="token comment">// 删除数大于1则返回错误</span>
  271. <span class="token keyword">if</span> <span class="token keyword">self</span><span class="token punctuation">.</span>subscribed_channels<span class="token punctuation">.</span><span class="token function">len</span><span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token operator">!=</span> len <span class="token operator">-</span> <span class="token number">1</span> <span class="token punctuation">{</span>
  272. <span class="token keyword">return</span> <span class="token class-name">Err</span><span class="token punctuation">(</span>response<span class="token punctuation">.</span><span class="token function">to_error</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
  273. <span class="token punctuation">}</span> <span class="token punctuation">}</span>
  274. _ <span class="token operator">=&gt;</span> <span class="token keyword">return</span> <span class="token class-name">Err</span><span class="token punctuation">(</span>response<span class="token punctuation">.</span><span class="token function">to_error</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">,</span>
  275. <span class="token punctuation">}</span><span class="token punctuation">,</span>
  276. frame <span class="token operator">=&gt;</span> <span class="token keyword">return</span> <span class="token class-name">Err</span><span class="token punctuation">(</span>frame<span class="token punctuation">.</span><span class="token function">to_error</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">,</span>
  277. <span class="token punctuation">}</span><span class="token punctuation">;</span>
  278. <span class="token punctuation">}</span>
  279. <span class="token class-name">Ok</span><span class="token punctuation">(</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
  280. <span class="token punctuation">}</span>
  281. <span class="token punctuation">}</span>
  282. </code></pre></div><h3 id="blocking-client"><a href="#blocking-client" class="header-anchor">#</a> blocking_client</h3> <blockquote><p>最小的阻塞性Redis客户端</p></blockquote> <p>代码位置:src/clients/blocking_client.rs</p> <h4 id="blockingclient"><a href="#blockingclient" class="header-anchor">#</a> BlockingClient</h4> <h5 id="struct-4"><a href="#struct-4" class="header-anchor">#</a> Struct</h5> <div class="language-rust extra-class"><pre class="language-rust"><code><span class="token keyword">pub</span> <span class="token keyword">struct</span> <span class="token type-definition class-name">BlockingClient</span><span class="token punctuation">{</span>
  283. <span class="token comment">/// 异步的客户端</span>
  284. inner<span class="token punctuation">:</span> <span class="token keyword">crate</span><span class="token module-declaration namespace"><span class="token punctuation">::</span>clients<span class="token punctuation">::</span></span><span class="token class-name">Client</span><span class="token punctuation">,</span>
  285. <span class="token comment">/// 运行时环境,负责调度和管理异步任务的执行</span>
  286. rt<span class="token punctuation">:</span> <span class="token class-name">Runtime</span><span class="token punctuation">,</span>
  287. <span class="token punctuation">}</span>
  288. </code></pre></div><h5 id="implementation-3"><a href="#implementation-3" class="header-anchor">#</a> Implementation</h5> <div class="language-rust extra-class"><pre class="language-rust"><code><span class="token keyword">impl</span> <span class="token class-name">BlockingClient</span><span class="token punctuation">{</span>
  289. <span class="token comment">/// 建立连接</span>
  290. <span class="token keyword">pub</span> <span class="token keyword">fn</span> <span class="token function-definition function">connect</span><span class="token operator">&lt;</span><span class="token class-name">T</span><span class="token punctuation">:</span> <span class="token class-name">ToSocketAddrs</span><span class="token operator">&gt;</span><span class="token punctuation">(</span>addr<span class="token punctuation">:</span> <span class="token class-name">T</span><span class="token punctuation">)</span> <span class="token punctuation">-&gt;</span> <span class="token keyword">crate</span><span class="token punctuation">::</span><span class="token class-name">Result</span><span class="token operator">&lt;</span><span class="token class-name">BlockingClient</span><span class="token operator">&gt;</span> <span class="token punctuation">{</span>
  291. <span class="token keyword">let</span> rt <span class="token operator">=</span> <span class="token namespace">tokio<span class="token punctuation">::</span>runtime<span class="token punctuation">::</span></span><span class="token class-name">Builder</span><span class="token punctuation">::</span><span class="token function">new_current_thread</span><span class="token punctuation">(</span><span class="token punctuation">)</span>
  292. <span class="token punctuation">.</span><span class="token function">enable_all</span><span class="token punctuation">(</span><span class="token punctuation">)</span>
  293. <span class="token punctuation">.</span><span class="token function">build</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token operator">?</span><span class="token punctuation">;</span>
  294. <span class="token keyword">let</span> inner <span class="token operator">=</span> rt<span class="token punctuation">.</span><span class="token function">block_on</span><span class="token punctuation">(</span><span class="token keyword">crate</span><span class="token module-declaration namespace"><span class="token punctuation">::</span>clients<span class="token punctuation">::</span></span><span class="token class-name">Client</span><span class="token punctuation">::</span><span class="token function">connect</span><span class="token punctuation">(</span>addr<span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token operator">?</span><span class="token punctuation">;</span>
  295. <span class="token class-name">Ok</span><span class="token punctuation">(</span><span class="token class-name">BlockingClient</span> <span class="token punctuation">{</span> inner<span class="token punctuation">,</span> rt <span class="token punctuation">}</span><span class="token punctuation">)</span>
  296. <span class="token punctuation">}</span>
  297. <span class="token comment">/// 异步获取指定值</span>
  298. <span class="token keyword">pub</span> <span class="token keyword">fn</span> <span class="token function-definition function">get</span><span class="token punctuation">(</span><span class="token operator">&amp;</span><span class="token keyword">mut</span> <span class="token keyword">self</span><span class="token punctuation">,</span> key<span class="token punctuation">:</span> <span class="token operator">&amp;</span><span class="token keyword">str</span><span class="token punctuation">)</span> <span class="token punctuation">-&gt;</span> <span class="token keyword">crate</span><span class="token punctuation">::</span><span class="token class-name">Result</span><span class="token operator">&lt;</span><span class="token class-name">Option</span><span class="token operator">&lt;</span><span class="token class-name">Bytes</span><span class="token operator">&gt;&gt;</span> <span class="token punctuation">{</span>
  299. <span class="token comment">// 异步调用get获取key对应的值</span>
  300. <span class="token keyword">self</span><span class="token punctuation">.</span>rt<span class="token punctuation">.</span><span class="token function">block_on</span><span class="token punctuation">(</span><span class="token keyword">self</span><span class="token punctuation">.</span>inner<span class="token punctuation">.</span><span class="token function">get</span><span class="token punctuation">(</span>key<span class="token punctuation">)</span><span class="token punctuation">)</span>
  301. <span class="token punctuation">}</span>
  302. <span class="token comment">/// 异步给key赋值</span>
  303. <span class="token keyword">pub</span> <span class="token keyword">fn</span> <span class="token function-definition function">set</span><span class="token punctuation">(</span><span class="token operator">&amp;</span><span class="token keyword">mut</span> <span class="token keyword">self</span><span class="token punctuation">,</span> key<span class="token punctuation">:</span> <span class="token operator">&amp;</span><span class="token keyword">str</span><span class="token punctuation">,</span> value<span class="token punctuation">:</span> <span class="token class-name">Bytes</span><span class="token punctuation">)</span> <span class="token punctuation">-&gt;</span> <span class="token keyword">crate</span><span class="token punctuation">::</span><span class="token class-name">Result</span><span class="token operator">&lt;</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token operator">&gt;</span> <span class="token punctuation">{</span>
  304. <span class="token comment">// 异步调用set给key赋值</span>
  305. <span class="token keyword">self</span><span class="token punctuation">.</span>rt<span class="token punctuation">.</span><span class="token function">block_on</span><span class="token punctuation">(</span><span class="token keyword">self</span><span class="token punctuation">.</span>inner<span class="token punctuation">.</span><span class="token function">set</span><span class="token punctuation">(</span>key<span class="token punctuation">,</span> value<span class="token punctuation">)</span><span class="token punctuation">)</span>
  306. <span class="token punctuation">}</span>
  307. <span class="token comment">/// 异步给key赋值,并指定过期时间</span>
  308. <span class="token keyword">pub</span> <span class="token keyword">fn</span> <span class="token function-definition function">set_expires</span><span class="token punctuation">(</span>
  309. <span class="token operator">&amp;</span><span class="token keyword">mut</span> <span class="token keyword">self</span><span class="token punctuation">,</span>
  310. key<span class="token punctuation">:</span> <span class="token operator">&amp;</span><span class="token keyword">str</span><span class="token punctuation">,</span>
  311. value<span class="token punctuation">:</span> <span class="token class-name">Bytes</span><span class="token punctuation">,</span>
  312. expiration<span class="token punctuation">:</span> <span class="token class-name">Duration</span><span class="token punctuation">,</span>
  313. <span class="token punctuation">)</span> <span class="token punctuation">-&gt;</span> <span class="token keyword">crate</span><span class="token punctuation">::</span><span class="token class-name">Result</span><span class="token operator">&lt;</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token operator">&gt;</span> <span class="token punctuation">{</span>
  314. <span class="token comment">// 异步调用set_expires给key赋值</span>
  315. <span class="token keyword">self</span><span class="token punctuation">.</span>rt
  316. <span class="token punctuation">.</span><span class="token function">block_on</span><span class="token punctuation">(</span><span class="token keyword">self</span><span class="token punctuation">.</span>inner<span class="token punctuation">.</span><span class="token function">set_expires</span><span class="token punctuation">(</span>key<span class="token punctuation">,</span> value<span class="token punctuation">,</span> expiration<span class="token punctuation">)</span><span class="token punctuation">)</span> <span class="token punctuation">}</span>
  317. <span class="token comment">/// 异步推送消息</span>
  318. <span class="token keyword">pub</span> <span class="token keyword">fn</span> <span class="token function-definition function">publish</span><span class="token punctuation">(</span><span class="token operator">&amp;</span><span class="token keyword">mut</span> <span class="token keyword">self</span><span class="token punctuation">,</span> channel<span class="token punctuation">:</span> <span class="token operator">&amp;</span><span class="token keyword">str</span><span class="token punctuation">,</span> message<span class="token punctuation">:</span> <span class="token class-name">Bytes</span><span class="token punctuation">)</span> <span class="token punctuation">-&gt;</span> <span class="token keyword">crate</span><span class="token punctuation">::</span><span class="token class-name">Result</span><span class="token operator">&lt;</span><span class="token keyword">u64</span><span class="token operator">&gt;</span> <span class="token punctuation">{</span>
  319. <span class="token comment">// 异步调用publish给指定channel推送消息</span>
  320. <span class="token keyword">self</span><span class="token punctuation">.</span>rt<span class="token punctuation">.</span><span class="token function">block_on</span><span class="token punctuation">(</span><span class="token keyword">self</span><span class="token punctuation">.</span>inner<span class="token punctuation">.</span><span class="token function">publish</span><span class="token punctuation">(</span>channel<span class="token punctuation">,</span> message<span class="token punctuation">)</span><span class="token punctuation">)</span>
  321. <span class="token punctuation">}</span>
  322. <span class="token comment">/// 异步执行订阅指定频道操作,并将Client转换为BlockingSubcriber状态</span>
  323. <span class="token keyword">pub</span> <span class="token keyword">fn</span> <span class="token function-definition function">subscribe</span><span class="token punctuation">(</span><span class="token keyword">self</span><span class="token punctuation">,</span> channels<span class="token punctuation">:</span> <span class="token class-name">Vec</span><span class="token operator">&lt;</span><span class="token class-name">String</span><span class="token operator">&gt;</span><span class="token punctuation">)</span> <span class="token punctuation">-&gt;</span> <span class="token keyword">crate</span><span class="token punctuation">::</span><span class="token class-name">Result</span><span class="token operator">&lt;</span><span class="token class-name">BlockingSubscriber</span><span class="token operator">&gt;</span> <span class="token punctuation">{</span>
  324. <span class="token comment">// 异步调用subscribe函数,转换client状态</span>
  325. <span class="token keyword">let</span> subscriber <span class="token operator">=</span> <span class="token keyword">self</span><span class="token punctuation">.</span>rt<span class="token punctuation">.</span><span class="token function">block_on</span><span class="token punctuation">(</span><span class="token keyword">self</span><span class="token punctuation">.</span>inner<span class="token punctuation">.</span><span class="token function">subscribe</span><span class="token punctuation">(</span>channels<span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token operator">?</span><span class="token punctuation">;</span>
  326. <span class="token comment">// 返回BlockingSubscriber</span>
  327. <span class="token class-name">Ok</span><span class="token punctuation">(</span><span class="token class-name">BlockingSubscriber</span> <span class="token punctuation">{</span>
  328. inner<span class="token punctuation">:</span> subscriber<span class="token punctuation">,</span>
  329. rt<span class="token punctuation">:</span> <span class="token keyword">self</span><span class="token punctuation">.</span>rt<span class="token punctuation">,</span>
  330. <span class="token punctuation">}</span><span class="token punctuation">)</span>
  331. <span class="token punctuation">}</span>
  332. <span class="token punctuation">}</span>
  333. </code></pre></div><h4 id="blockingsubscriber"><a href="#blockingsubscriber" class="header-anchor">#</a> BlockingSubscriber</h4> <h5 id="struct-5"><a href="#struct-5" class="header-anchor">#</a> Struct</h5> <div class="language-rust extra-class"><pre class="language-rust"><code><span class="token keyword">pub</span> <span class="token keyword">struct</span> <span class="token type-definition class-name">BlockingSubscriber</span> <span class="token punctuation">{</span>
  334. <span class="token comment">/// 异步的订阅者</span>
  335. inner<span class="token punctuation">:</span> <span class="token keyword">crate</span><span class="token module-declaration namespace"><span class="token punctuation">::</span>clients<span class="token punctuation">::</span></span><span class="token class-name">Subscriber</span><span class="token punctuation">,</span>
  336. <span class="token comment">/// 运行时环境,负责调度和管理异步任务的执行</span>
  337. rt<span class="token punctuation">:</span> <span class="token class-name">Runtime</span><span class="token punctuation">,</span>
  338. <span class="token punctuation">}</span>
  339. </code></pre></div><h5 id="implementation-4"><a href="#implementation-4" class="header-anchor">#</a> Implementation</h5> <div class="language-rust extra-class"><pre class="language-rust"><code><span class="token keyword">impl</span> <span class="token class-name">BlockingSubscriber</span><span class="token punctuation">{</span>
  340. <span class="token comment">/// 获取订阅的channel集合</span>
  341. <span class="token keyword">pub</span> <span class="token keyword">fn</span> <span class="token function-definition function">get_subscribed</span><span class="token punctuation">(</span><span class="token operator">&amp;</span><span class="token keyword">self</span><span class="token punctuation">)</span> <span class="token punctuation">-&gt;</span> <span class="token operator">&amp;</span><span class="token punctuation">[</span><span class="token class-name">String</span><span class="token punctuation">]</span> <span class="token punctuation">{</span>
  342. <span class="token comment">// 获取订阅的channel集合,并返回</span>
  343. <span class="token keyword">self</span><span class="token punctuation">.</span>inner<span class="token punctuation">.</span><span class="token function">get_subscribed</span><span class="token punctuation">(</span><span class="token punctuation">)</span>
  344. <span class="token punctuation">}</span>
  345. <span class="token comment">/// 异步接收下一条信息</span>
  346. <span class="token keyword">pub</span> <span class="token keyword">fn</span> <span class="token function-definition function">next_message</span><span class="token punctuation">(</span><span class="token operator">&amp;</span><span class="token keyword">mut</span> <span class="token keyword">self</span><span class="token punctuation">)</span> <span class="token punctuation">-&gt;</span> <span class="token keyword">crate</span><span class="token punctuation">::</span><span class="token class-name">Result</span><span class="token operator">&lt;</span><span class="token class-name">Option</span><span class="token operator">&lt;</span><span class="token class-name">Message</span><span class="token operator">&gt;&gt;</span> <span class="token punctuation">{</span>
  347. <span class="token comment">// 异步执行获取下一条信息的操作</span>
  348. <span class="token keyword">self</span><span class="token punctuation">.</span>rt<span class="token punctuation">.</span><span class="token function">block_on</span><span class="token punctuation">(</span><span class="token keyword">self</span><span class="token punctuation">.</span>inner<span class="token punctuation">.</span><span class="token function">next_message</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
  349. <span class="token punctuation">}</span>
  350. <span class="token comment">/// 异步订阅新的频道集合</span>
  351. <span class="token keyword">pub</span> <span class="token keyword">fn</span> <span class="token function-definition function">subscribe</span><span class="token punctuation">(</span><span class="token operator">&amp;</span><span class="token keyword">mut</span> <span class="token keyword">self</span><span class="token punctuation">,</span> channels<span class="token punctuation">:</span> <span class="token operator">&amp;</span><span class="token punctuation">[</span><span class="token class-name">String</span><span class="token punctuation">]</span><span class="token punctuation">)</span> <span class="token punctuation">-&gt;</span> <span class="token keyword">crate</span><span class="token punctuation">::</span><span class="token class-name">Result</span><span class="token operator">&lt;</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token operator">&gt;</span> <span class="token punctuation">{</span>
  352. <span class="token comment">// 异步订阅新的频道集合</span>
  353. <span class="token keyword">self</span><span class="token punctuation">.</span>rt<span class="token punctuation">.</span><span class="token function">block_on</span><span class="token punctuation">(</span><span class="token keyword">self</span><span class="token punctuation">.</span>inner<span class="token punctuation">.</span><span class="token function">subscribe</span><span class="token punctuation">(</span>channels<span class="token punctuation">)</span><span class="token punctuation">)</span>
  354. <span class="token punctuation">}</span>
  355. <span class="token comment">/// 异步执行取消订阅指定频道集操作</span>
  356. <span class="token keyword">pub</span> <span class="token keyword">fn</span> <span class="token function-definition function">unsubscribe</span><span class="token punctuation">(</span><span class="token operator">&amp;</span><span class="token keyword">mut</span> <span class="token keyword">self</span><span class="token punctuation">,</span> channels<span class="token punctuation">:</span> <span class="token operator">&amp;</span><span class="token punctuation">[</span><span class="token class-name">String</span><span class="token punctuation">]</span><span class="token punctuation">)</span> <span class="token punctuation">-&gt;</span> <span class="token keyword">crate</span><span class="token punctuation">::</span><span class="token class-name">Result</span><span class="token operator">&lt;</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token operator">&gt;</span> <span class="token punctuation">{</span>
  357. <span class="token comment">// 异步执行取消订阅指定频道集操作</span>
  358. <span class="token keyword">self</span><span class="token punctuation">.</span>rt<span class="token punctuation">.</span><span class="token function">block_on</span><span class="token punctuation">(</span><span class="token keyword">self</span><span class="token punctuation">.</span>inner<span class="token punctuation">.</span><span class="token function">unsubscribe</span><span class="token punctuation">(</span>channels<span class="token punctuation">)</span><span class="token punctuation">)</span>
  359. <span class="token punctuation">}</span>
  360. <span class="token comment">/// 获取BlockingSubscriber的迭代器</span>
  361. <span class="token keyword">pub</span> <span class="token keyword">fn</span> <span class="token function-definition function">into_iter</span><span class="token punctuation">(</span><span class="token keyword">self</span><span class="token punctuation">)</span> <span class="token punctuation">-&gt;</span> <span class="token keyword">impl</span> <span class="token class-name">Iterator</span><span class="token operator">&lt;</span><span class="token class-name">Item</span> <span class="token operator">=</span> <span class="token keyword">crate</span><span class="token punctuation">::</span><span class="token class-name">Result</span><span class="token operator">&lt;</span><span class="token class-name">Message</span><span class="token operator">&gt;&gt;</span> <span class="token punctuation">{</span>
  362. <span class="token class-name">SubscriberIterator</span> <span class="token punctuation">{</span>
  363. inner<span class="token punctuation">:</span> <span class="token keyword">self</span><span class="token punctuation">.</span>inner<span class="token punctuation">,</span>
  364. rt<span class="token punctuation">:</span> <span class="token keyword">self</span><span class="token punctuation">.</span>rt<span class="token punctuation">,</span>
  365. <span class="token punctuation">}</span>
  366. <span class="token punctuation">}</span>
  367. <span class="token punctuation">}</span>
  368. </code></pre></div><h4 id="subscriberiterator"><a href="#subscriberiterator" class="header-anchor">#</a> SubscriberIterator</h4> <h5 id="struct-6"><a href="#struct-6" class="header-anchor">#</a> Struct</h5> <div class="language-rust extra-class"><pre class="language-rust"><code><span class="token comment">/// BlockingSubscriber的迭代器</span>
  369. <span class="token comment">/// 通过`Subscriber::into_iter`可以获取</span>
  370. <span class="token keyword">struct</span> <span class="token type-definition class-name">SubscriberIterator</span> <span class="token punctuation">{</span>
  371. <span class="token comment">/// BlockingSubscriber中的Subscriber</span>
  372. inner<span class="token punctuation">:</span> <span class="token keyword">crate</span><span class="token module-declaration namespace"><span class="token punctuation">::</span>clients<span class="token punctuation">::</span></span><span class="token class-name">Subscriber</span><span class="token punctuation">,</span>
  373. <span class="token comment">/// BlockingSubscriber中的Runtime</span>
  374. rt<span class="token punctuation">:</span> <span class="token class-name">Runtime</span><span class="token punctuation">,</span>
  375. <span class="token punctuation">}</span>
  376. </code></pre></div><h5 id="implementation-5"><a href="#implementation-5" class="header-anchor">#</a> Implementation</h5> <div class="language-rust extra-class"><pre class="language-rust"><code><span class="token keyword">impl</span> <span class="token class-name">Iterator</span> <span class="token keyword">for</span> <span class="token class-name">SubscriberIterator</span> <span class="token punctuation">{</span>
  377. <span class="token comment">/// 定义迭代器的元素类型</span>
  378. <span class="token keyword">type</span> <span class="token type-definition class-name">Item</span> <span class="token operator">=</span> <span class="token keyword">crate</span><span class="token punctuation">::</span><span class="token class-name">Result</span><span class="token operator">&lt;</span><span class="token class-name">Message</span><span class="token operator">&gt;</span><span class="token punctuation">;</span>
  379. <span class="token comment">/// 获取迭代器的下一个元素</span>
  380. <span class="token comment">/// 返回Some(Ok(message)) 表示存在下一个元素</span>
  381. <span class="token comment">/// 返回Some(Err(error)) 表示获取下一个元素出错</span>
  382. <span class="token comment">/// 返回None 表示没有下一个元素</span>
  383. <span class="token keyword">fn</span> <span class="token function-definition function">next</span><span class="token punctuation">(</span><span class="token operator">&amp;</span><span class="token keyword">mut</span> <span class="token keyword">self</span><span class="token punctuation">)</span> <span class="token punctuation">-&gt;</span> <span class="token class-name">Option</span><span class="token operator">&lt;</span><span class="token keyword">crate</span><span class="token punctuation">::</span><span class="token class-name">Result</span><span class="token operator">&lt;</span><span class="token class-name">Message</span><span class="token operator">&gt;&gt;</span>
  384. <span class="token punctuation">{</span>
  385. <span class="token keyword">self</span><span class="token punctuation">.</span>rt<span class="token punctuation">.</span><span class="token function">block_on</span><span class="token punctuation">(</span><span class="token keyword">self</span><span class="token punctuation">.</span>inner<span class="token punctuation">.</span><span class="token function">next_message</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token function">transpose</span><span class="token punctuation">(</span><span class="token punctuation">)</span>
  386. <span class="token punctuation">}</span>
  387. <span class="token punctuation">}</span>
  388. </code></pre></div><h3 id="cmd"><a href="#cmd" class="header-anchor">#</a> cmd</h3> <h4 id="get"><a href="#get" class="header-anchor">#</a> Get</h4> <blockquote><p>获取key对应的value</p></blockquote> <ul><li>如果key不存在,返回nil</li> <li>如果value不是string类型,返回error</li></ul> <p>代码位置:src/cmd/get.rs</p> <h5 id="struct-7"><a href="#struct-7" class="header-anchor">#</a> Struct</h5> <div class="language-rust extra-class"><pre class="language-rust"><code><span class="token attribute attr-name">#[derive(Debug)]</span>
  389. <span class="token keyword">pub</span> <span class="token keyword">struct</span> <span class="token type-definition class-name">Get</span> <span class="token punctuation">{</span>
  390. <span class="token comment">/// Name of the key to get</span>
  391. key<span class="token punctuation">:</span> <span class="token class-name">String</span><span class="token punctuation">,</span>
  392. <span class="token punctuation">}</span>
  393. </code></pre></div><h5 id="implementation-6"><a href="#implementation-6" class="header-anchor">#</a> Implementation</h5> <div class="language-rust extra-class"><pre class="language-rust"><code><span class="token keyword">impl</span> <span class="token class-name">Get</span><span class="token punctuation">{</span>
  394. <span class="token comment">/// 创建一个Get命令对象</span>
  395. <span class="token keyword">pub</span> <span class="token keyword">fn</span> <span class="token function-definition function">new</span><span class="token punctuation">(</span>key<span class="token punctuation">:</span> <span class="token keyword">impl</span> <span class="token class-name">ToString</span><span class="token punctuation">)</span> <span class="token punctuation">-&gt;</span> <span class="token class-name">Get</span> <span class="token punctuation">{</span>
  396. <span class="token class-name">Get</span> <span class="token punctuation">{</span>
  397. key<span class="token punctuation">:</span> key<span class="token punctuation">.</span><span class="token function">to_string</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">,</span>
  398. <span class="token punctuation">}</span>
  399. <span class="token punctuation">}</span>
  400. <span class="token comment">/// Get the key</span>
  401. <span class="token keyword">pub</span> <span class="token keyword">fn</span> <span class="token function-definition function">key</span><span class="token punctuation">(</span><span class="token operator">&amp;</span><span class="token keyword">self</span><span class="token punctuation">)</span> <span class="token punctuation">-&gt;</span> <span class="token operator">&amp;</span><span class="token keyword">str</span> <span class="token punctuation">{</span>
  402. <span class="token operator">&amp;</span><span class="token keyword">self</span><span class="token punctuation">.</span>key
  403. <span class="token punctuation">}</span>
  404. <span class="token comment">/// 解析frame,并获取对应Get命令</span>
  405. <span class="token keyword">pub</span><span class="token punctuation">(</span><span class="token keyword">crate</span><span class="token punctuation">)</span> <span class="token keyword">fn</span> <span class="token function-definition function">parse_frames</span><span class="token punctuation">(</span>parse<span class="token punctuation">:</span> <span class="token operator">&amp;</span><span class="token keyword">mut</span> <span class="token class-name">Parse</span><span class="token punctuation">)</span> <span class="token punctuation">-&gt;</span> <span class="token keyword">crate</span><span class="token punctuation">::</span><span class="token class-name">Result</span><span class="token operator">&lt;</span><span class="token class-name">Get</span><span class="token operator">&gt;</span> <span class="token punctuation">{</span>
  406. <span class="token keyword">let</span> key <span class="token operator">=</span> parse<span class="token punctuation">.</span><span class="token function">next_string</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token operator">?</span><span class="token punctuation">;</span>
  407. <span class="token class-name">Ok</span><span class="token punctuation">(</span><span class="token class-name">Get</span> <span class="token punctuation">{</span> key <span class="token punctuation">}</span><span class="token punctuation">)</span>
  408. <span class="token punctuation">}</span>
  409. <span class="token comment">/// 对数据库执行Get命令</span>
  410. <span class="token attribute attr-name">#[instrument(skip(self, db, dst))]</span>
  411. <span class="token keyword">pub</span><span class="token punctuation">(</span><span class="token keyword">crate</span><span class="token punctuation">)</span> <span class="token keyword">async</span> <span class="token keyword">fn</span> <span class="token function-definition function">apply</span><span class="token punctuation">(</span><span class="token keyword">self</span><span class="token punctuation">,</span> db<span class="token punctuation">:</span> <span class="token operator">&amp;</span><span class="token class-name">Db</span><span class="token punctuation">,</span> dst<span class="token punctuation">:</span> <span class="token operator">&amp;</span><span class="token keyword">mut</span> <span class="token class-name">Connection</span><span class="token punctuation">)</span> <span class="token punctuation">-&gt;</span> <span class="token keyword">crate</span><span class="token punctuation">::</span><span class="token class-name">Result</span><span class="token operator">&lt;</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token operator">&gt;</span> <span class="token punctuation">{</span>
  412. <span class="token comment">// 从数据库中获取value</span>
  413. <span class="token comment">// 如果值存在,则将其转成Frame::Bulk形式</span>
  414. <span class="token comment">// 否则转成Frame::Null</span>
  415. <span class="token keyword">let</span> response <span class="token operator">=</span> <span class="token keyword">if</span> <span class="token keyword">let</span> <span class="token class-name">Some</span><span class="token punctuation">(</span>value<span class="token punctuation">)</span> <span class="token operator">=</span> db<span class="token punctuation">.</span><span class="token function">get</span><span class="token punctuation">(</span><span class="token operator">&amp;</span><span class="token keyword">self</span><span class="token punctuation">.</span>key<span class="token punctuation">)</span> <span class="token punctuation">{</span>
  416. <span class="token comment">// If a value is present, it is written to the client in &quot;bulk&quot;</span>
  417. <span class="token comment">// format.</span>
  418. <span class="token class-name">Frame</span><span class="token punctuation">::</span><span class="token class-name">Bulk</span><span class="token punctuation">(</span>value<span class="token punctuation">)</span>
  419. <span class="token punctuation">}</span> <span class="token keyword">else</span> <span class="token punctuation">{</span>
  420. <span class="token comment">// If there is no value, `Null` is written.</span>
  421. <span class="token class-name">Frame</span><span class="token punctuation">::</span><span class="token class-name">Null</span>
  422. <span class="token punctuation">}</span><span class="token punctuation">;</span>
  423. <span class="token comment">// 日志记录响应</span>
  424. <span class="token macro property">debug!</span><span class="token punctuation">(</span><span class="token operator">?</span>response<span class="token punctuation">)</span><span class="token punctuation">;</span>
  425. <span class="token comment">// 将响应写入连接</span>
  426. dst<span class="token punctuation">.</span><span class="token function">write_frame</span><span class="token punctuation">(</span><span class="token operator">&amp;</span>response<span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span><span class="token operator">?</span><span class="token punctuation">;</span>
  427. <span class="token class-name">Ok</span><span class="token punctuation">(</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
  428. <span class="token punctuation">}</span>
  429. <span class="token comment">/// 将命令转成Frame格式,在客户端将格式化的Get</span>
  430. <span class="token comment">///命令发给服务端时会调用。</span>
  431. <span class="token keyword">pub</span><span class="token punctuation">(</span><span class="token keyword">crate</span><span class="token punctuation">)</span> <span class="token keyword">fn</span> <span class="token function-definition function">into_frame</span><span class="token punctuation">(</span><span class="token keyword">self</span><span class="token punctuation">)</span> <span class="token punctuation">-&gt;</span> <span class="token class-name">Frame</span> <span class="token punctuation">{</span>
  432. <span class="token keyword">let</span> <span class="token keyword">mut</span> frame <span class="token operator">=</span> <span class="token class-name">Frame</span><span class="token punctuation">::</span><span class="token function">array</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
  433. <span class="token comment">/// 将命令以[&quot;get&quot;,key]的形式放入frame。注意每个元素都要转成字节形式</span>
  434. <span class="token macro property">todo!</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
  435. frame
  436. <span class="token punctuation">}</span>
  437. <span class="token punctuation">}</span>
  438. </code></pre></div><h4 id="publish"><a href="#publish" class="header-anchor">#</a> Publish</h4> <p>代码位置:src/cmd/publish.rs</p> <h5 id="struct-8"><a href="#struct-8" class="header-anchor">#</a> Struct</h5> <div class="language-rust extra-class"><pre class="language-rust"><code><span class="token comment">/// 向指定频道推送消息</span>
  439. <span class="token attribute attr-name">#[derive(Debug)]</span>
  440. <span class="token keyword">pub</span> <span class="token keyword">struct</span> <span class="token type-definition class-name">Publish</span> <span class="token punctuation">{</span>
  441. <span class="token comment">/// 频道名</span>
  442. channel<span class="token punctuation">:</span> <span class="token class-name">String</span><span class="token punctuation">,</span>
  443. <span class="token comment">/// 要发送的消息</span>
  444. message<span class="token punctuation">:</span> <span class="token class-name">Bytes</span><span class="token punctuation">,</span>
  445. <span class="token punctuation">}</span>
  446. </code></pre></div><h5 id="implementation-7"><a href="#implementation-7" class="header-anchor">#</a> Implementation</h5> <div class="language-rust extra-class"><pre class="language-rust"><code><span class="token keyword">impl</span> <span class="token class-name">Publish</span><span class="token punctuation">{</span>
  447. <span class="token comment">/// 创建Publish命令</span>
  448. <span class="token keyword">pub</span><span class="token punctuation">(</span><span class="token keyword">crate</span><span class="token punctuation">)</span> <span class="token keyword">fn</span> <span class="token function-definition function">new</span><span class="token punctuation">(</span>channel<span class="token punctuation">:</span> <span class="token keyword">impl</span> <span class="token class-name">ToString</span><span class="token punctuation">,</span> message<span class="token punctuation">:</span> <span class="token class-name">Bytes</span><span class="token punctuation">)</span> <span class="token punctuation">-&gt;</span> <span class="token class-name">Publish</span> <span class="token punctuation">{</span>
  449. <span class="token class-name">Publish</span> <span class="token punctuation">{</span>
  450. channel<span class="token punctuation">:</span> channel<span class="token punctuation">.</span><span class="token function">to_string</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">,</span>
  451. message<span class="token punctuation">,</span>
  452. <span class="token punctuation">}</span>
  453. <span class="token punctuation">}</span>
  454. <span class="token comment">/// 解析frame,并获取对应Publish命令</span>
  455. <span class="token keyword">pub</span><span class="token punctuation">(</span><span class="token keyword">crate</span><span class="token punctuation">)</span> <span class="token keyword">fn</span> <span class="token function-definition function">parse_frames</span><span class="token punctuation">(</span>parse<span class="token punctuation">:</span> <span class="token operator">&amp;</span><span class="token keyword">mut</span> <span class="token class-name">Parse</span><span class="token punctuation">)</span> <span class="token punctuation">-&gt;</span> <span class="token keyword">crate</span><span class="token punctuation">::</span><span class="token class-name">Result</span><span class="token operator">&lt;</span><span class="token class-name">Publish</span><span class="token operator">&gt;</span> <span class="token punctuation">{</span>
  456. <span class="token comment">// 获取string形式的channel</span>
  457. <span class="token keyword">let</span> channel <span class="token operator">=</span> parse<span class="token punctuation">.</span><span class="token function">next_string</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token operator">?</span><span class="token punctuation">;</span>
  458. <span class="token comment">// 获取字节形式的message</span>
  459. <span class="token keyword">let</span> message <span class="token operator">=</span> parse<span class="token punctuation">.</span><span class="token function">next_bytes</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token operator">?</span><span class="token punctuation">;</span>
  460. <span class="token comment">// 返回对应Publish</span>
  461. <span class="token class-name">Ok</span><span class="token punctuation">(</span><span class="token class-name">Publish</span> <span class="token punctuation">{</span> channel<span class="token punctuation">,</span> message <span class="token punctuation">}</span><span class="token punctuation">)</span>
  462. <span class="token punctuation">}</span>
  463. <span class="token comment">/// 对数据库执行publish命令</span>
  464. <span class="token keyword">pub</span><span class="token punctuation">(</span><span class="token keyword">crate</span><span class="token punctuation">)</span> <span class="token keyword">async</span> <span class="token keyword">fn</span> <span class="token function-definition function">apply</span><span class="token punctuation">(</span><span class="token keyword">self</span><span class="token punctuation">,</span> db<span class="token punctuation">:</span> <span class="token operator">&amp;</span><span class="token class-name">Db</span><span class="token punctuation">,</span> dst<span class="token punctuation">:</span> <span class="token operator">&amp;</span><span class="token keyword">mut</span> <span class="token class-name">Connection</span><span class="token punctuation">)</span> <span class="token punctuation">-&gt;</span> <span class="token keyword">crate</span><span class="token punctuation">::</span><span class="token class-name">Result</span><span class="token operator">&lt;</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token operator">&gt;</span><span class="token punctuation">{</span>
  465. <span class="token comment">// 向数据库连接执行publish操作,并获取到订阅channel的subscriber的数量</span>
  466. <span class="token keyword">let</span> num_subscribers <span class="token operator">=</span> db<span class="token punctuation">.</span><span class="token function">publish</span><span class="token punctuation">(</span><span class="token operator">&amp;</span><span class="token keyword">self</span><span class="token punctuation">.</span>channel<span class="token punctuation">,</span> <span class="token keyword">self</span><span class="token punctuation">.</span>message<span class="token punctuation">)</span><span class="token punctuation">;</span>
  467. <span class="token comment">// 将subscriber_num转换成整数帧,并将其写入连接</span>
  468. <span class="token keyword">let</span> response <span class="token operator">=</span> <span class="token class-name">Frame</span><span class="token punctuation">::</span><span class="token class-name">Integer</span><span class="token punctuation">(</span>num_subscribers <span class="token keyword">as</span> <span class="token keyword">u64</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
  469. dst<span class="token punctuation">.</span><span class="token function">write_frame</span><span class="token punctuation">(</span><span class="token operator">&amp;</span>response<span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span><span class="token operator">?</span><span class="token punctuation">;</span>
  470. <span class="token class-name">Ok</span><span class="token punctuation">(</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
  471. <span class="token punctuation">}</span>
  472. <span class="token comment">/// 将publish命令转成frame的形式</span>
  473. <span class="token keyword">pub</span><span class="token punctuation">(</span><span class="token keyword">crate</span><span class="token punctuation">)</span> <span class="token keyword">fn</span> <span class="token function-definition function">into_frame</span><span class="token punctuation">(</span><span class="token keyword">self</span><span class="token punctuation">)</span> <span class="token punctuation">-&gt;</span> <span class="token class-name">Frame</span> <span class="token punctuation">{</span>
  474. <span class="token keyword">let</span> <span class="token keyword">mut</span> frame <span class="token operator">=</span> <span class="token class-name">Frame</span><span class="token punctuation">::</span><span class="token function">array</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
  475. <span class="token comment">// 将publish命令以[&quot;publish&quot;,channel,message]的形式写入数组。注意每个元素都要转成字节形式</span>
  476. frame<span class="token punctuation">.</span><span class="token function">push_bulk</span><span class="token punctuation">(</span><span class="token class-name">Bytes</span><span class="token punctuation">::</span><span class="token function">from</span><span class="token punctuation">(</span><span class="token string">&quot;publish&quot;</span><span class="token punctuation">.</span><span class="token function">as_bytes</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
  477. frame<span class="token punctuation">.</span><span class="token function">push_bulk</span><span class="token punctuation">(</span><span class="token class-name">Bytes</span><span class="token punctuation">::</span><span class="token function">from</span><span class="token punctuation">(</span><span class="token keyword">self</span><span class="token punctuation">.</span>channel<span class="token punctuation">.</span><span class="token function">into_bytes</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
  478. frame<span class="token punctuation">.</span><span class="token function">push_bulk</span><span class="token punctuation">(</span><span class="token keyword">self</span><span class="token punctuation">.</span>message<span class="token punctuation">)</span><span class="token punctuation">;</span>
  479. frame
  480. <span class="token punctuation">}</span>
  481. <span class="token punctuation">}</span>
  482. </code></pre></div><h4 id="set"><a href="#set" class="header-anchor">#</a> Set</h4> <p>代码位置:src/cmd/set.rs</p> <h5 id="struct-9"><a href="#struct-9" class="header-anchor">#</a> Struct</h5> <div class="language-rust extra-class"><pre class="language-rust"><code><span class="token comment">/// 设置key的值</span>
  483. <span class="token attribute attr-name">#[derive(Debug)]</span>
  484. <span class="token keyword">pub</span> <span class="token keyword">struct</span> <span class="token type-definition class-name">Set</span> <span class="token punctuation">{</span>
  485. <span class="token comment">/// the lookup key</span>
  486. key<span class="token punctuation">:</span> <span class="token class-name">String</span><span class="token punctuation">,</span>
  487. <span class="token comment">/// the value to be stored</span>
  488. value<span class="token punctuation">:</span> <span class="token class-name">Bytes</span><span class="token punctuation">,</span>
  489. <span class="token comment">/// When to expire the key</span>
  490. expire<span class="token punctuation">:</span> <span class="token class-name">Option</span><span class="token operator">&lt;</span><span class="token class-name">Duration</span><span class="token operator">&gt;</span><span class="token punctuation">,</span>
  491. <span class="token punctuation">}</span>
  492. </code></pre></div><h5 id="implementation-8"><a href="#implementation-8" class="header-anchor">#</a> Implementation</h5> <div class="language-rust extra-class"><pre class="language-rust"><code><span class="token keyword">impl</span> <span class="token class-name">Set</span><span class="token punctuation">{</span>
  493. <span class="token comment">/// 创建一个set命令</span>
  494. <span class="token comment">/// </span>
  495. <span class="token comment">/// 如果不需要过期时间则expire为None</span>
  496. <span class="token keyword">pub</span> <span class="token keyword">fn</span> <span class="token function-definition function">new</span><span class="token punctuation">(</span>key<span class="token punctuation">:</span> <span class="token keyword">impl</span> <span class="token class-name">ToString</span><span class="token punctuation">,</span> value<span class="token punctuation">:</span> <span class="token class-name">Bytes</span><span class="token punctuation">,</span> expire<span class="token punctuation">:</span> <span class="token class-name">Option</span><span class="token operator">&lt;</span><span class="token class-name">Duration</span><span class="token operator">&gt;</span><span class="token punctuation">)</span> <span class="token punctuation">-&gt;</span> <span class="token class-name">Set</span> <span class="token punctuation">{</span>
  497. <span class="token class-name">Set</span> <span class="token punctuation">{</span>
  498. key<span class="token punctuation">:</span> key<span class="token punctuation">.</span><span class="token function">to_string</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">,</span>
  499. value<span class="token punctuation">,</span>
  500. expire<span class="token punctuation">,</span>
  501. <span class="token punctuation">}</span>
  502. <span class="token punctuation">}</span>
  503. <span class="token comment">/// Get the key</span>
  504. <span class="token keyword">pub</span> <span class="token keyword">fn</span> <span class="token function-definition function">key</span><span class="token punctuation">(</span><span class="token operator">&amp;</span><span class="token keyword">self</span><span class="token punctuation">)</span> <span class="token punctuation">-&gt;</span> <span class="token operator">&amp;</span><span class="token keyword">str</span> <span class="token punctuation">{</span>
  505. <span class="token operator">&amp;</span><span class="token keyword">self</span><span class="token punctuation">.</span>key
  506. <span class="token punctuation">}</span>
  507. <span class="token comment">/// Get the value</span>
  508. <span class="token keyword">pub</span> <span class="token keyword">fn</span> <span class="token function-definition function">value</span><span class="token punctuation">(</span><span class="token operator">&amp;</span><span class="token keyword">self</span><span class="token punctuation">)</span> <span class="token punctuation">-&gt;</span> <span class="token operator">&amp;</span><span class="token class-name">Bytes</span> <span class="token punctuation">{</span>
  509. <span class="token operator">&amp;</span><span class="token keyword">self</span><span class="token punctuation">.</span>value
  510. <span class="token punctuation">}</span>
  511. <span class="token comment">/// Get the expire</span>
  512. <span class="token keyword">pub</span> <span class="token keyword">fn</span> <span class="token function-definition function">expire</span><span class="token punctuation">(</span><span class="token operator">&amp;</span><span class="token keyword">self</span><span class="token punctuation">)</span> <span class="token punctuation">-&gt;</span> <span class="token class-name">Option</span><span class="token operator">&lt;</span><span class="token class-name">Duration</span><span class="token operator">&gt;</span> <span class="token punctuation">{</span>
  513. <span class="token keyword">self</span><span class="token punctuation">.</span>expire
  514. <span class="token punctuation">}</span>
  515. <span class="token comment">/// 解析frame</span>
  516. <span class="token keyword">pub</span><span class="token punctuation">(</span><span class="token keyword">crate</span><span class="token punctuation">)</span> <span class="token keyword">fn</span> <span class="token function-definition function">parse_frames</span><span class="token punctuation">(</span>parse<span class="token punctuation">:</span> <span class="token operator">&amp;</span><span class="token keyword">mut</span> <span class="token class-name">Parse</span><span class="token punctuation">)</span> <span class="token punctuation">-&gt;</span> <span class="token keyword">crate</span><span class="token punctuation">::</span><span class="token class-name">Result</span><span class="token operator">&lt;</span><span class="token class-name">Set</span><span class="token operator">&gt;</span> <span class="token punctuation">{</span>
  517. <span class="token keyword">use</span> <span class="token class-name">ParseError</span><span class="token punctuation">::</span><span class="token class-name">EndOfStream</span><span class="token punctuation">;</span>
  518. <span class="token comment">// 使用 parse.next_string() 方法读取下一个字符串字段作为键(key)。</span>
  519. <span class="token keyword">let</span> key <span class="token operator">=</span> parse<span class="token punctuation">.</span><span class="token function">next_string</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token operator">?</span><span class="token punctuation">;</span>
  520. <span class="token comment">// 使用 parse.next_bytes() 方法读取下一个字节字段作为值(value)。</span>
  521. <span class="token keyword">let</span> value <span class="token operator">=</span> parse<span class="token punctuation">.</span><span class="token function">next_bytes</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token operator">?</span><span class="token punctuation">;</span>
  522. <span class="token comment">// 创建一个mut expire,用于存储过期时间。初始值为 None。</span>
  523. <span class="token keyword">let</span> <span class="token keyword">mut</span> expire <span class="token operator">=</span> <span class="token class-name">None</span><span class="token punctuation">;</span>
  524. <span class="token comment">// 分析其他情况的帧</span>
  525. <span class="token keyword">match</span> parse<span class="token punctuation">.</span><span class="token function">next_string</span><span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token punctuation">{</span>
  526. <span class="token class-name">Ok</span><span class="token punctuation">(</span>s<span class="token punctuation">)</span> <span class="token keyword">if</span> s<span class="token punctuation">.</span><span class="token function">to_uppercase</span><span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token operator">==</span> <span class="token string">&quot;EX&quot;</span> <span class="token operator">=&gt;</span> <span class="token punctuation">{</span>
  527. <span class="token comment">// 如果下一个字符串字段是 &quot;EX&quot;(不区分大小写),则表示设置了过期时间且单位是secs</span>
  528. <span class="token comment">// 下一个值应该是一个整数,并转成Duration类型。</span>
  529. <span class="token keyword">let</span> secs <span class="token operator">=</span> parse<span class="token punctuation">.</span><span class="token function">next_int</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token operator">?</span><span class="token punctuation">;</span>
  530. expire <span class="token operator">=</span> <span class="token class-name">Some</span><span class="token punctuation">(</span><span class="token class-name">Duration</span><span class="token punctuation">::</span><span class="token function">from_secs</span><span class="token punctuation">(</span>secs<span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
  531. <span class="token punctuation">}</span>
  532. <span class="token class-name">Ok</span><span class="token punctuation">(</span>s<span class="token punctuation">)</span> <span class="token keyword">if</span> s<span class="token punctuation">.</span><span class="token function">to_uppercase</span><span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token operator">==</span> <span class="token string">&quot;PX&quot;</span> <span class="token operator">=&gt;</span> <span class="token punctuation">{</span>
  533. <span class="token comment">// 如果下一个字符串字段是 &quot;PX&quot;(不区分大小写),则表示设置了过期时间且单位是millis</span>
  534. <span class="token comment">// 下一个值应该是一个整数,并转成Duration类型。</span>
  535. <span class="token keyword">let</span> ms <span class="token operator">=</span> parse<span class="token punctuation">.</span><span class="token function">next_int</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token operator">?</span><span class="token punctuation">;</span>
  536. expire <span class="token operator">=</span> <span class="token class-name">Some</span><span class="token punctuation">(</span><span class="token class-name">Duration</span><span class="token punctuation">::</span><span class="token function">from_millis</span><span class="token punctuation">(</span>ms<span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
  537. <span class="token punctuation">}</span>
  538. <span class="token comment">// 如果下一个字符串字段不是 &quot;EX&quot; 或 &quot;PX&quot;,则表示设置了不支持的选项,将返回一个错误,指示 SET 目前仅支持过期时间选项。</span>
  539. <span class="token class-name">Ok</span><span class="token punctuation">(</span>_<span class="token punctuation">)</span> <span class="token operator">=&gt;</span> <span class="token keyword">return</span> <span class="token class-name">Err</span><span class="token punctuation">(</span><span class="token string">&quot;currently `SET` only supports the expiration option&quot;</span><span class="token punctuation">.</span><span class="token function">into</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">,</span>
  540. <span class="token comment">// 如果解析到达流的末尾,表示没有进一步的数据需要解析,这是正常的运行时情况,表示没有指定 SET 的选项。</span>
  541. <span class="token class-name">Err</span><span class="token punctuation">(</span><span class="token class-name">EndOfStream</span><span class="token punctuation">)</span> <span class="token operator">=&gt;</span> <span class="token punctuation">{</span><span class="token punctuation">}</span>
  542. <span class="token comment">// 其他所有错误都会返回一个错误,导致连接被终止。</span>
  543. <span class="token class-name">Err</span><span class="token punctuation">(</span>err<span class="token punctuation">)</span> <span class="token operator">=&gt;</span> <span class="token keyword">return</span> <span class="token class-name">Err</span><span class="token punctuation">(</span>err<span class="token punctuation">.</span><span class="token function">into</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">,</span>
  544. <span class="token punctuation">}</span>
  545. <span class="token class-name">Ok</span><span class="token punctuation">(</span><span class="token class-name">Set</span> <span class="token punctuation">{</span> key<span class="token punctuation">,</span> value<span class="token punctuation">,</span> expire <span class="token punctuation">}</span><span class="token punctuation">)</span>
  546. <span class="token punctuation">}</span>
  547. <span class="token comment">/// 对数据库连接执行set命令</span>
  548. <span class="token attribute attr-name">#[instrument(skip(self, db, dst))]</span>
  549. <span class="token keyword">pub</span><span class="token punctuation">(</span><span class="token keyword">crate</span><span class="token punctuation">)</span> <span class="token keyword">async</span> <span class="token keyword">fn</span> <span class="token function-definition function">apply</span><span class="token punctuation">(</span><span class="token keyword">self</span><span class="token punctuation">,</span> db<span class="token punctuation">:</span> <span class="token operator">&amp;</span><span class="token class-name">Db</span><span class="token punctuation">,</span> dst<span class="token punctuation">:</span> <span class="token operator">&amp;</span><span class="token keyword">mut</span> <span class="token class-name">Connection</span><span class="token punctuation">)</span> <span class="token punctuation">-&gt;</span> <span class="token keyword">crate</span><span class="token punctuation">::</span><span class="token class-name">Result</span><span class="token operator">&lt;</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token operator">&gt;</span> <span class="token punctuation">{</span>
  550. <span class="token comment">// 执行set</span>
  551. db<span class="token punctuation">.</span><span class="token function">set</span><span class="token punctuation">(</span><span class="token keyword">self</span><span class="token punctuation">.</span>key<span class="token punctuation">,</span> <span class="token keyword">self</span><span class="token punctuation">.</span>value<span class="token punctuation">,</span> <span class="token keyword">self</span><span class="token punctuation">.</span>expire<span class="token punctuation">)</span><span class="token punctuation">;</span>
  552. <span class="token comment">// 创建执行成功响应,并写给连接</span>
  553. <span class="token keyword">let</span> response <span class="token operator">=</span> <span class="token class-name">Frame</span><span class="token punctuation">::</span><span class="token class-name">Simple</span><span class="token punctuation">(</span><span class="token string">&quot;OK&quot;</span><span class="token punctuation">.</span><span class="token function">to_string</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
  554. <span class="token macro property">debug!</span><span class="token punctuation">(</span><span class="token operator">?</span>response<span class="token punctuation">)</span><span class="token punctuation">;</span>
  555. dst<span class="token punctuation">.</span><span class="token function">write_frame</span><span class="token punctuation">(</span><span class="token operator">&amp;</span>response<span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span><span class="token operator">?</span><span class="token punctuation">;</span>
  556. <span class="token class-name">Ok</span><span class="token punctuation">(</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
  557. <span class="token punctuation">}</span>
  558. <span class="token comment">/// 将set命令转成frame形式</span>
  559. <span class="token keyword">pub</span><span class="token punctuation">(</span><span class="token keyword">crate</span><span class="token punctuation">)</span> <span class="token keyword">fn</span> <span class="token function-definition function">into_frame</span><span class="token punctuation">(</span><span class="token keyword">self</span><span class="token punctuation">)</span> <span class="token punctuation">-&gt;</span> <span class="token class-name">Frame</span> <span class="token punctuation">{</span>
  560. <span class="token keyword">let</span> <span class="token keyword">mut</span> frame <span class="token operator">=</span> <span class="token class-name">Frame</span><span class="token punctuation">::</span><span class="token function">array</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
  561. <span class="token comment">// 将命令以[&quot;set&quot;,key,value]的形式写入frame</span>
  562. frame<span class="token punctuation">.</span><span class="token function">push_bulk</span><span class="token punctuation">(</span><span class="token class-name">Bytes</span><span class="token punctuation">::</span><span class="token function">from</span><span class="token punctuation">(</span><span class="token string">&quot;set&quot;</span><span class="token punctuation">.</span><span class="token function">as_bytes</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
  563. frame<span class="token punctuation">.</span><span class="token function">push_bulk</span><span class="token punctuation">(</span><span class="token class-name">Bytes</span><span class="token punctuation">::</span><span class="token function">from</span><span class="token punctuation">(</span><span class="token keyword">self</span><span class="token punctuation">.</span>key<span class="token punctuation">.</span><span class="token function">into_bytes</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
  564. frame<span class="token punctuation">.</span><span class="token function">push_bulk</span><span class="token punctuation">(</span><span class="token keyword">self</span><span class="token punctuation">.</span>value<span class="token punctuation">)</span><span class="token punctuation">;</span>
  565. <span class="token comment">// 判断是否有过期时间,如果有过期时间以[&quot;px&quot;,ms]的格式写入frame</span>
  566. <span class="token keyword">if</span> <span class="token keyword">let</span> <span class="token class-name">Some</span><span class="token punctuation">(</span>ms<span class="token punctuation">)</span> <span class="token operator">=</span> <span class="token keyword">self</span><span class="token punctuation">.</span>expire <span class="token punctuation">{</span>
  567. <span class="token comment">// 可以用以下两种方式设置过期时间,在这里使用第二种,因为精度更高</span>
  568. <span class="token comment">// 1. SET key value EX seconds</span>
  569. <span class="token comment">// 2. SET key value PX milliseconds</span>
  570. frame<span class="token punctuation">.</span><span class="token function">push_bulk</span><span class="token punctuation">(</span><span class="token class-name">Bytes</span><span class="token punctuation">::</span><span class="token function">from</span><span class="token punctuation">(</span><span class="token string">&quot;px&quot;</span><span class="token punctuation">.</span><span class="token function">as_bytes</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
  571. frame<span class="token punctuation">.</span><span class="token function">push_int</span><span class="token punctuation">(</span>ms<span class="token punctuation">.</span><span class="token function">as_millis</span><span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token keyword">as</span> <span class="token keyword">u64</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
  572. <span class="token punctuation">}</span>
  573. frame
  574. <span class="token punctuation">}</span>
  575. <span class="token punctuation">}</span>
  576. </code></pre></div><h4 id="subscribe-message"><a href="#subscribe-message" class="header-anchor">#</a> Subscribe+Message</h4> <p>代码位置:src/cmd/subscribe.rs</p> <h5 id="struct-10"><a href="#struct-10" class="header-anchor">#</a> Struct</h5> <div class="language-rust extra-class"><pre class="language-rust"><code><span class="token comment">/// 订阅命令</span>
  577. <span class="token comment">/// </span>
  578. <span class="token comment">/// 一旦客户端进入订阅状态,它只能提出 </span>
  579. <span class="token comment">/// SUBSCRIBE, PSUBSCRIBE, UNSUBSCRIBE,</span>
  580. <span class="token comment">/// PUNSUBSCRIBE, PING 和 QUIT命令</span>
  581. <span class="token attribute attr-name">#[derive(Debug)]</span>
  582. <span class="token keyword">pub</span> <span class="token keyword">struct</span> <span class="token type-definition class-name">Subscribe</span> <span class="token punctuation">{</span>
  583. channels<span class="token punctuation">:</span> <span class="token class-name">Vec</span><span class="token operator">&lt;</span><span class="token class-name">String</span><span class="token operator">&gt;</span><span class="token punctuation">,</span>
  584. <span class="token punctuation">}</span>
  585. <span class="token comment">/// 信息流</span>
  586. <span class="token keyword">type</span> <span class="token type-definition class-name">Messages</span> <span class="token operator">=</span> <span class="token class-name">Pin</span><span class="token operator">&lt;</span><span class="token class-name">Box</span><span class="token operator">&lt;</span><span class="token keyword">dyn</span> <span class="token class-name">Stream</span><span class="token operator">&lt;</span><span class="token class-name">Item</span> <span class="token operator">=</span> <span class="token class-name">Bytes</span><span class="token operator">&gt;</span> <span class="token operator">+</span> <span class="token class-name">Send</span><span class="token operator">&gt;&gt;</span><span class="token punctuation">;</span>
  587. </code></pre></div><h5 id="implementation-9"><a href="#implementation-9" class="header-anchor">#</a> Implementation</h5> <div class="language-rust extra-class"><pre class="language-rust"><code><span class="token keyword">impl</span> <span class="token class-name">Subscribe</span><span class="token punctuation">{</span>
  588. <span class="token comment">/// 创建一个新的订阅者以监听频道</span>
  589. <span class="token keyword">pub</span><span class="token punctuation">(</span><span class="token keyword">crate</span><span class="token punctuation">)</span> <span class="token keyword">fn</span> <span class="token function-definition function">new</span><span class="token punctuation">(</span>channels<span class="token punctuation">:</span> <span class="token operator">&amp;</span><span class="token punctuation">[</span><span class="token class-name">String</span><span class="token punctuation">]</span><span class="token punctuation">)</span> <span class="token punctuation">-&gt;</span> <span class="token class-name">Subscribe</span> <span class="token punctuation">{</span>
  590. <span class="token class-name">Subscribe</span> <span class="token punctuation">{</span>
  591. channels<span class="token punctuation">:</span> channels<span class="token punctuation">.</span><span class="token function">to_vec</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">,</span>
  592. <span class="token punctuation">}</span>
  593. <span class="token punctuation">}</span>
  594. <span class="token comment">/// 解析subscribe帧</span>
  595. <span class="token keyword">pub</span><span class="token punctuation">(</span><span class="token keyword">crate</span><span class="token punctuation">)</span> <span class="token keyword">fn</span> <span class="token function-definition function">parse_frames</span><span class="token punctuation">(</span>parse<span class="token punctuation">:</span> <span class="token operator">&amp;</span><span class="token keyword">mut</span> <span class="token class-name">Parse</span><span class="token punctuation">)</span> <span class="token punctuation">-&gt;</span> <span class="token keyword">crate</span><span class="token punctuation">::</span><span class="token class-name">Result</span><span class="token operator">&lt;</span><span class="token class-name">Subscribe</span><span class="token operator">&gt;</span> <span class="token punctuation">{</span>
  596. <span class="token keyword">use</span> <span class="token class-name">ParseError</span><span class="token punctuation">::</span><span class="token class-name">EndOfStream</span><span class="token punctuation">;</span>
  597. <span class="token comment">// 获取第一个channel,并判断是否为none,不为none则写入数组</span>
  598. <span class="token keyword">let</span> <span class="token keyword">mut</span> channels <span class="token operator">=</span> <span class="token macro property">vec!</span><span class="token punctuation">[</span>parse<span class="token punctuation">.</span><span class="token function">next_string</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token operator">?</span><span class="token punctuation">]</span><span class="token punctuation">;</span>
  599. <span class="token comment">// 循环解析剩下的channel,并将其写入数组</span>
  600. <span class="token keyword">loop</span> <span class="token punctuation">{</span>
  601. <span class="token keyword">match</span> parse<span class="token punctuation">.</span><span class="token function">next_string</span><span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token punctuation">{</span>
  602. <span class="token comment">// 推送消息给频道</span>
  603. <span class="token class-name">Ok</span><span class="token punctuation">(</span>s<span class="token punctuation">)</span> <span class="token operator">=&gt;</span> channels<span class="token punctuation">.</span><span class="token function">push</span><span class="token punctuation">(</span>s<span class="token punctuation">)</span><span class="token punctuation">,</span>
  604. <span class="token comment">// EndOfStream说明不会再有消息了</span>
  605. <span class="token class-name">Err</span><span class="token punctuation">(</span><span class="token class-name">EndOfStream</span><span class="token punctuation">)</span> <span class="token operator">=&gt;</span> <span class="token keyword">break</span><span class="token punctuation">,</span>
  606. <span class="token comment">// 错误报告</span>
  607. <span class="token class-name">Err</span><span class="token punctuation">(</span>err<span class="token punctuation">)</span> <span class="token operator">=&gt;</span> <span class="token keyword">return</span> <span class="token class-name">Err</span><span class="token punctuation">(</span>err<span class="token punctuation">.</span><span class="token function">into</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">,</span>
  608. <span class="token punctuation">}</span>
  609. <span class="token punctuation">}</span>
  610. <span class="token comment">// 返回结果</span>
  611. <span class="token class-name">Ok</span><span class="token punctuation">(</span><span class="token class-name">Subscribe</span> <span class="token punctuation">{</span> channels <span class="token punctuation">}</span><span class="token punctuation">)</span>
  612. <span class="token punctuation">}</span>
  613. <span class="token comment">/// 数据库执行subscribe命令</span>
  614. <span class="token keyword">pub</span><span class="token punctuation">(</span><span class="token keyword">crate</span><span class="token punctuation">)</span> <span class="token keyword">async</span> <span class="token keyword">fn</span> <span class="token function-definition function">apply</span><span class="token punctuation">(</span>
  615. <span class="token keyword">mut</span> <span class="token keyword">self</span><span class="token punctuation">,</span>
  616. db<span class="token punctuation">:</span> <span class="token operator">&amp;</span><span class="token class-name">Db</span><span class="token punctuation">,</span>
  617. dst<span class="token punctuation">:</span> <span class="token operator">&amp;</span><span class="token keyword">mut</span> <span class="token class-name">Connection</span><span class="token punctuation">,</span>
  618. shutdown<span class="token punctuation">:</span> <span class="token operator">&amp;</span><span class="token keyword">mut</span> <span class="token class-name">Shutdown</span><span class="token punctuation">,</span>
  619. <span class="token punctuation">)</span> <span class="token punctuation">-&gt;</span> <span class="token keyword">crate</span><span class="token punctuation">::</span><span class="token class-name">Result</span><span class="token operator">&lt;</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token operator">&gt;</span> <span class="token punctuation">{</span>
  620. <span class="token comment">// 创建一个 StreamMap 对象,用于跟踪活动的订阅。使用map可以防止订阅重复</span>
  621. <span class="token keyword">let</span> <span class="token keyword">mut</span> subscriptions <span class="token operator">=</span> <span class="token class-name">StreamMap</span><span class="token punctuation">::</span><span class="token function">new</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
  622. <span class="token keyword">loop</span> <span class="token punctuation">{</span>
  623. <span class="token comment">// 遍历channels,drain(..)会将管道从channels中移除</span>
  624. <span class="token comment">// subscribe_to_channel会订阅传入的管道</span>
  625. <span class="token comment">// await等待管道订阅成功</span>
  626. <span class="token keyword">for</span> channel_name <span class="token keyword">in</span> <span class="token keyword">self</span><span class="token punctuation">.</span>channels<span class="token punctuation">.</span><span class="token function">drain</span><span class="token punctuation">(</span><span class="token punctuation">..</span><span class="token punctuation">)</span> <span class="token punctuation">{</span>
  627. <span class="token function">subscribe_to_channel</span><span class="token punctuation">(</span>channel_name<span class="token punctuation">,</span> <span class="token operator">&amp;</span><span class="token keyword">mut</span> subscriptions<span class="token punctuation">,</span> db<span class="token punctuation">,</span> dst<span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span><span class="token operator">?</span><span class="token punctuation">;</span>
  628. <span class="token punctuation">}</span>
  629. <span class="token comment">// 使用 select! 宏等待以下几种情况中的任何一种发生:</span>
  630. <span class="token comment">// 1. 从已订阅的通道接收到消息。</span>
  631. <span class="token comment">// 2. 从客户端接收到订阅或取消订阅命令。</span>
  632. <span class="token comment">// 3. 收到服务器关闭信号。</span>
  633. <span class="token macro property">select!</span> <span class="token punctuation">{</span>
  634. <span class="token comment">// 接收来自已订阅通道的消息。</span>
  635. <span class="token comment">// subscriptions.next() 返回一个异步迭代器</span>
  636. <span class="token comment">// 当有新的消息到达时,会返回 Some((channel_name, msg))</span>
  637. <span class="token comment">// 其中 channel_name 是通道名称,msg 是消息内容。</span>
  638. <span class="token class-name">Some</span><span class="token punctuation">(</span><span class="token punctuation">(</span>channel_name<span class="token punctuation">,</span> msg<span class="token punctuation">)</span><span class="token punctuation">)</span> <span class="token operator">=</span> subscriptions<span class="token punctuation">.</span><span class="token function">next</span><span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token operator">=&gt;</span> <span class="token punctuation">{</span>
  639. <span class="token comment">// 将收到的消息构造成消息帧写给客户端</span>
  640. dst<span class="token punctuation">.</span><span class="token function">write_frame</span><span class="token punctuation">(</span><span class="token operator">&amp;</span><span class="token function">make_message_frame</span><span class="token punctuation">(</span>channel_name<span class="token punctuation">,</span> msg<span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span><span class="token operator">?</span><span class="token punctuation">;</span>
  641. <span class="token punctuation">}</span>
  642. <span class="token comment">// dst.read_frame() 返回一个异步操作结果,</span>
  643. <span class="token comment">//当有新的帧可用时,会返回 Some(frame),其中 frame 是读取到的帧。如果客户端断开连接,会返回 None。</span>
  644. res <span class="token operator">=</span> dst<span class="token punctuation">.</span><span class="token function">read_frame</span><span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token operator">=&gt;</span> <span class="token punctuation">{</span>
  645. <span class="token keyword">let</span> frame <span class="token operator">=</span> <span class="token keyword">match</span> res<span class="token operator">?</span> <span class="token punctuation">{</span>
  646. <span class="token class-name">Some</span><span class="token punctuation">(</span>frame<span class="token punctuation">)</span> <span class="token operator">=&gt;</span> frame<span class="token punctuation">,</span>
  647. <span class="token comment">// This happens if the remote client has disconnected.</span>
  648. <span class="token class-name">None</span> <span class="token operator">=&gt;</span> <span class="token keyword">return</span> <span class="token class-name">Ok</span><span class="token punctuation">(</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
  649. <span class="token punctuation">}</span><span class="token punctuation">;</span>
  650. <span class="token comment">// 处理从客户端接收到的命令帧</span>
  651. <span class="token function">handle_command</span><span class="token punctuation">(</span>
  652. frame<span class="token punctuation">,</span>
  653. <span class="token operator">&amp;</span><span class="token keyword">mut</span> <span class="token keyword">self</span><span class="token punctuation">.</span>channels<span class="token punctuation">,</span>
  654. <span class="token operator">&amp;</span><span class="token keyword">mut</span> subscriptions<span class="token punctuation">,</span>
  655. dst<span class="token punctuation">,</span>
  656. <span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span><span class="token operator">?</span><span class="token punctuation">;</span>
  657. <span class="token punctuation">}</span>
  658. <span class="token comment">// 接收服务器关闭信号。</span>
  659. _ <span class="token operator">=</span> shutdown<span class="token punctuation">.</span><span class="token function">recv</span><span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token operator">=&gt;</span> <span class="token punctuation">{</span>
  660. <span class="token keyword">return</span> <span class="token class-name">Ok</span><span class="token punctuation">(</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
  661. <span class="token punctuation">}</span>
  662. <span class="token punctuation">}</span><span class="token punctuation">;</span>
  663. <span class="token punctuation">}</span>
  664. <span class="token punctuation">}</span>
  665. <span class="token comment">/// 将命令转为frame类型</span>
  666. <span class="token keyword">pub</span><span class="token punctuation">(</span><span class="token keyword">crate</span><span class="token punctuation">)</span> <span class="token keyword">fn</span> <span class="token function-definition function">into_frame</span><span class="token punctuation">(</span><span class="token keyword">self</span><span class="token punctuation">)</span> <span class="token punctuation">-&gt;</span> <span class="token class-name">Frame</span> <span class="token punctuation">{</span>
  667. <span class="token keyword">let</span> <span class="token keyword">mut</span> frame <span class="token operator">=</span> <span class="token class-name">Frame</span><span class="token punctuation">::</span><span class="token function">array</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
  668. <span class="token comment">// 将命令以[&quot;subscribe&quot;,channel1,channel2...]的形</span>
  669. <span class="token comment">//式放入数组</span>
  670. frame<span class="token punctuation">.</span><span class="token function">push_bulk</span><span class="token punctuation">(</span><span class="token class-name">Bytes</span><span class="token punctuation">::</span><span class="token function">from</span><span class="token punctuation">(</span><span class="token string">&quot;unsubscribe&quot;</span><span class="token punctuation">.</span><span class="token function">as_bytes</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
  671. <span class="token keyword">for</span> channel <span class="token keyword">in</span> <span class="token keyword">self</span><span class="token punctuation">.</span>channels <span class="token punctuation">{</span>
  672. frame<span class="token punctuation">.</span><span class="token function">push_bulk</span><span class="token punctuation">(</span><span class="token class-name">Bytes</span><span class="token punctuation">::</span><span class="token function">from</span><span class="token punctuation">(</span>channel<span class="token punctuation">.</span><span class="token function">into_bytes</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
  673. <span class="token punctuation">}</span>
  674. frame
  675. <span class="token punctuation">}</span>
  676. <span class="token punctuation">}</span>
  677. </code></pre></div><h5 id="function"><a href="#function" class="header-anchor">#</a> Function</h5> <div class="language-rust extra-class"><pre class="language-rust"><code><span class="token keyword">async</span> <span class="token keyword">fn</span> <span class="token function-definition function">subscribe_to_channel</span><span class="token punctuation">(</span>
  678. channel_name<span class="token punctuation">:</span> <span class="token class-name">String</span><span class="token punctuation">,</span>
  679. subscriptions<span class="token punctuation">:</span> <span class="token operator">&amp;</span><span class="token keyword">mut</span> <span class="token class-name">StreamMap</span><span class="token operator">&lt;</span><span class="token class-name">String</span><span class="token punctuation">,</span> <span class="token class-name">Messages</span><span class="token operator">&gt;</span><span class="token punctuation">,</span>
  680. db<span class="token punctuation">:</span> <span class="token operator">&amp;</span><span class="token class-name">Db</span><span class="token punctuation">,</span>
  681. dst<span class="token punctuation">:</span> <span class="token operator">&amp;</span><span class="token keyword">mut</span> <span class="token class-name">Connection</span><span class="token punctuation">,</span>
  682. <span class="token punctuation">)</span> <span class="token punctuation">-&gt;</span> <span class="token keyword">crate</span><span class="token punctuation">::</span><span class="token class-name">Result</span><span class="token operator">&lt;</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token operator">&gt;</span> <span class="token punctuation">{</span>
  683. <span class="token comment">// 向数据发送订阅请求</span>
  684. <span class="token keyword">let</span> <span class="token keyword">mut</span> rx <span class="token operator">=</span> db<span class="token punctuation">.</span><span class="token function">subscribe</span><span class="token punctuation">(</span>channel_name<span class="token punctuation">.</span><span class="token function">clone</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
  685. <span class="token comment">// 接受订阅回复</span>
  686. <span class="token keyword">let</span> rx <span class="token operator">=</span> <span class="token class-name">Box</span><span class="token punctuation">::</span><span class="token function">pin</span><span class="token punctuation">(</span><span class="token namespace">async_stream<span class="token punctuation">::</span></span><span class="token macro property">stream!</span> <span class="token punctuation">{</span>
  687. <span class="token keyword">loop</span> <span class="token punctuation">{</span>
  688. <span class="token keyword">match</span> rx<span class="token punctuation">.</span><span class="token function">recv</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span> <span class="token punctuation">{</span>
  689. <span class="token class-name">Ok</span><span class="token punctuation">(</span>msg<span class="token punctuation">)</span> <span class="token operator">=&gt;</span> <span class="token keyword">yield</span> msg<span class="token punctuation">,</span>
  690. <span class="token comment">// If we lagged in consuming messages, just resume.</span>
  691. <span class="token class-name">Err</span><span class="token punctuation">(</span><span class="token namespace">broadcast<span class="token punctuation">::</span>error<span class="token punctuation">::</span></span><span class="token class-name">RecvError</span><span class="token punctuation">::</span><span class="token class-name">Lagged</span><span class="token punctuation">(</span>_<span class="token punctuation">)</span><span class="token punctuation">)</span> <span class="token operator">=&gt;</span> <span class="token punctuation">{</span><span class="token punctuation">}</span>
  692. <span class="token class-name">Err</span><span class="token punctuation">(</span>_<span class="token punctuation">)</span> <span class="token operator">=&gt;</span> <span class="token keyword">break</span><span class="token punctuation">,</span>
  693. <span class="token punctuation">}</span>
  694. <span class="token punctuation">}</span>
  695. <span class="token punctuation">}</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
  696. <span class="token comment">// 将订阅成功的管道加入到订阅map中</span>
  697. subscriptions<span class="token punctuation">.</span><span class="token function">insert</span><span class="token punctuation">(</span>channel_name<span class="token punctuation">.</span><span class="token function">clone</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">,</span> rx<span class="token punctuation">)</span><span class="token punctuation">;</span>
  698. <span class="token comment">// 响应成功订阅报文</span>
  699. <span class="token keyword">let</span> response <span class="token operator">=</span> <span class="token function">make_subscribe_frame</span><span class="token punctuation">(</span>channel_name<span class="token punctuation">,</span> subscriptions<span class="token punctuation">.</span><span class="token function">len</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
  700. dst<span class="token punctuation">.</span><span class="token function">write_frame</span><span class="token punctuation">(</span><span class="token operator">&amp;</span>response<span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span><span class="token operator">?</span><span class="token punctuation">;</span>
  701. <span class="token class-name">Ok</span><span class="token punctuation">(</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
  702. <span class="token punctuation">}</span>
  703. <span class="token comment">/// 处理subscribe和unsubscibe的逻辑</span>
  704. <span class="token keyword">async</span> <span class="token keyword">fn</span> <span class="token function-definition function">handle_command</span><span class="token punctuation">(</span>
  705. frame<span class="token punctuation">:</span> <span class="token class-name">Frame</span><span class="token punctuation">,</span>
  706. subscribe_to<span class="token punctuation">:</span> <span class="token operator">&amp;</span><span class="token keyword">mut</span> <span class="token class-name">Vec</span><span class="token operator">&lt;</span><span class="token class-name">String</span><span class="token operator">&gt;</span><span class="token punctuation">,</span>
  707. subscriptions<span class="token punctuation">:</span> <span class="token operator">&amp;</span><span class="token keyword">mut</span> <span class="token class-name">StreamMap</span><span class="token operator">&lt;</span><span class="token class-name">String</span><span class="token punctuation">,</span> <span class="token class-name">Messages</span><span class="token operator">&gt;</span><span class="token punctuation">,</span>
  708. dst<span class="token punctuation">:</span> <span class="token operator">&amp;</span><span class="token keyword">mut</span> <span class="token class-name">Connection</span><span class="token punctuation">,</span>
  709. <span class="token punctuation">)</span> <span class="token punctuation">-&gt;</span> <span class="token keyword">crate</span><span class="token punctuation">::</span><span class="token class-name">Result</span><span class="token operator">&lt;</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token operator">&gt;</span> <span class="token punctuation">{</span>
  710. <span class="token comment">// 匹配命令,判断要执行的操作</span>
  711. <span class="token keyword">match</span> <span class="token class-name">Command</span><span class="token punctuation">::</span><span class="token function">from_frame</span><span class="token punctuation">(</span>frame<span class="token punctuation">)</span><span class="token operator">?</span> <span class="token punctuation">{</span>
  712. <span class="token class-name">Command</span><span class="token punctuation">::</span><span class="token class-name">Subscribe</span><span class="token punctuation">(</span>subscribe<span class="token punctuation">)</span> <span class="token operator">=&gt;</span> <span class="token punctuation">{</span>
  713. <span class="token comment">// 将要订阅的频道加入到订阅map </span>
  714. subscribe_to<span class="token punctuation">.</span><span class="token function">extend</span><span class="token punctuation">(</span>subscribe<span class="token punctuation">.</span>channels<span class="token punctuation">.</span><span class="token function">into_iter</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
  715. <span class="token punctuation">}</span>
  716. <span class="token class-name">Command</span><span class="token punctuation">::</span><span class="token class-name">Unsubscribe</span><span class="token punctuation">(</span><span class="token keyword">mut</span> unsubscribe<span class="token punctuation">)</span> <span class="token operator">=&gt;</span> <span class="token punctuation">{</span>
  717. <span class="token comment">// 判断是否指定了要取消订阅的频道</span>
  718. <span class="token comment">// 如果没有则需要订阅所有channels</span>
  719. <span class="token keyword">if</span> unsubscribe<span class="token punctuation">.</span>channels<span class="token punctuation">.</span><span class="token function">is_empty</span><span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token punctuation">{</span>
  720. unsubscribe<span class="token punctuation">.</span>channels <span class="token operator">=</span> subscriptions
  721. <span class="token punctuation">.</span><span class="token function">keys</span><span class="token punctuation">(</span><span class="token punctuation">)</span>
  722. <span class="token punctuation">.</span><span class="token function">map</span><span class="token punctuation">(</span><span class="token closure-params"><span class="token closure-punctuation punctuation">|</span>channel_name<span class="token closure-punctuation punctuation">|</span></span> channel_name<span class="token punctuation">.</span><span class="token function">to_string</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
  723. <span class="token punctuation">.</span><span class="token function">collect</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
  724. <span class="token punctuation">}</span>
  725. <span class="token comment">// 将需要取消订阅的频道从map中移除</span>
  726. <span class="token keyword">for</span> channel_name <span class="token keyword">in</span> unsubscribe<span class="token punctuation">.</span>channels <span class="token punctuation">{</span>
  727. subscriptions<span class="token punctuation">.</span><span class="token function">remove</span><span class="token punctuation">(</span><span class="token operator">&amp;</span>channel_name<span class="token punctuation">)</span><span class="token punctuation">;</span>
  728. <span class="token comment">// 生成unsubscribe响应</span>
  729. <span class="token keyword">let</span> response <span class="token operator">=</span> <span class="token function">make_unsubscribe_frame</span><span class="token punctuation">(</span>channel_name<span class="token punctuation">,</span> subscriptions<span class="token punctuation">.</span><span class="token function">len</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
  730. <span class="token comment">// 写入unsubscribe响应</span>
  731. dst<span class="token punctuation">.</span><span class="token function">write_frame</span><span class="token punctuation">(</span><span class="token operator">&amp;</span>response<span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span><span class="token operator">?</span><span class="token punctuation">;</span>
  732. <span class="token punctuation">}</span>
  733. <span class="token punctuation">}</span>
  734. command <span class="token operator">=&gt;</span> <span class="token punctuation">{</span>
  735. <span class="token comment">// 其他命令都看作是unknown命令</span>
  736. <span class="token keyword">let</span> cmd <span class="token operator">=</span> <span class="token class-name">Unknown</span><span class="token punctuation">::</span><span class="token function">new</span><span class="token punctuation">(</span>command<span class="token punctuation">.</span><span class="token function">get_name</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
  737. cmd<span class="token punctuation">.</span><span class="token function">apply</span><span class="token punctuation">(</span>dst<span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span><span class="token operator">?</span><span class="token punctuation">;</span>
  738. <span class="token punctuation">}</span>
  739. <span class="token punctuation">}</span>
  740. <span class="token class-name">Ok</span><span class="token punctuation">(</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
  741. <span class="token punctuation">}</span>
  742. </code></pre></div><h4 id="unsubscribe"><a href="#unsubscribe" class="header-anchor">#</a> Unsubscribe</h4> <p>代码位置:src/cmd/subcribe.rs</p> <h5 id="struct-11"><a href="#struct-11" class="header-anchor">#</a> Struct</h5> <div class="language-rust extra-class"><pre class="language-rust"><code><span class="token comment">/// 取消订阅命令</span>
  743. <span class="token comment">///</span>
  744. <span class="token comment">/// 如果channels为空,则取消订阅所有频道</span>
  745. <span class="token attribute attr-name">#[derive(Clone, Debug)]</span>
  746. <span class="token keyword">pub</span> <span class="token keyword">struct</span> <span class="token type-definition class-name">Unsubscribe</span> <span class="token punctuation">{</span>
  747. channels<span class="token punctuation">:</span> <span class="token class-name">Vec</span><span class="token operator">&lt;</span><span class="token class-name">String</span><span class="token operator">&gt;</span><span class="token punctuation">,</span>
  748. <span class="token punctuation">}</span>
  749. </code></pre></div><h5 id="implementation-10"><a href="#implementation-10" class="header-anchor">#</a> Implementation</h5> <div class="language-rust extra-class"><pre class="language-rust"><code><span class="token keyword">impl</span> <span class="token class-name">Unsubscribe</span><span class="token punctuation">{</span>
  750. <span class="token keyword">pub</span><span class="token punctuation">(</span><span class="token keyword">crate</span><span class="token punctuation">)</span> <span class="token keyword">fn</span> <span class="token function-definition function">new</span><span class="token punctuation">(</span>channels<span class="token punctuation">:</span> <span class="token operator">&amp;</span><span class="token punctuation">[</span><span class="token class-name">String</span><span class="token punctuation">]</span><span class="token punctuation">)</span> <span class="token punctuation">-&gt;</span> <span class="token class-name">Unsubscribe</span> <span class="token punctuation">{</span>
  751. <span class="token class-name">Unsubscribe</span> <span class="token punctuation">{</span>
  752. channels<span class="token punctuation">:</span> channels<span class="token punctuation">.</span><span class="token function">to_vec</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">,</span>
  753. <span class="token punctuation">}</span>
  754. <span class="token punctuation">}</span>
  755. <span class="token comment">/// 解析unsubscribe帧</span>
  756. <span class="token keyword">pub</span><span class="token punctuation">(</span><span class="token keyword">crate</span><span class="token punctuation">)</span> <span class="token keyword">fn</span> <span class="token function-definition function">parse_frames</span><span class="token punctuation">(</span>parse<span class="token punctuation">:</span> <span class="token operator">&amp;</span><span class="token keyword">mut</span> <span class="token class-name">Parse</span><span class="token punctuation">)</span> <span class="token punctuation">-&gt;</span> <span class="token class-name">Result</span><span class="token operator">&lt;</span><span class="token class-name">Unsubscribe</span><span class="token punctuation">,</span> <span class="token class-name">ParseError</span><span class="token operator">&gt;</span> <span class="token punctuation">{</span>
  757. <span class="token keyword">use</span> <span class="token class-name">ParseError</span><span class="token punctuation">::</span><span class="token class-name">EndOfStream</span><span class="token punctuation">;</span>
  758. <span class="token comment">// 创建数组,用于存储channels</span>
  759. <span class="token keyword">let</span> <span class="token keyword">mut</span> channels <span class="token operator">=</span> <span class="token macro property">vec!</span><span class="token punctuation">[</span><span class="token punctuation">]</span><span class="token punctuation">;</span>
  760. <span class="token comment">// 循环解析数据,将channel放入数组</span>
  761. <span class="token keyword">loop</span> <span class="token punctuation">{</span>
  762. <span class="token keyword">match</span> parse<span class="token punctuation">.</span><span class="token function">next_string</span><span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token punctuation">{</span>
  763. <span class="token comment">// 将数据推送到要取消订阅的频道列表</span>
  764. <span class="token class-name">Ok</span><span class="token punctuation">(</span>s<span class="token punctuation">)</span> <span class="token operator">=&gt;</span> channels<span class="token punctuation">.</span><span class="token function">push</span><span class="token punctuation">(</span>s<span class="token punctuation">)</span><span class="token punctuation">,</span>
  765. <span class="token comment">// EndOfStream代表数据解析完成</span>
  766. <span class="token class-name">Err</span><span class="token punctuation">(</span><span class="token class-name">EndOfStream</span><span class="token punctuation">)</span> <span class="token operator">=&gt;</span> <span class="token keyword">break</span><span class="token punctuation">,</span>
  767. <span class="token comment">// 其他错误</span>
  768. <span class="token class-name">Err</span><span class="token punctuation">(</span>err<span class="token punctuation">)</span> <span class="token operator">=&gt;</span> <span class="token keyword">return</span> <span class="token class-name">Err</span><span class="token punctuation">(</span>err<span class="token punctuation">)</span><span class="token punctuation">,</span>
  769. <span class="token punctuation">}</span>
  770. <span class="token punctuation">}</span>
  771. <span class="token comment">// 返回命令</span>
  772. <span class="token punctuation">}</span>
  773. <span class="token comment">/// 将unsubscribe命令转成帧</span>
  774. <span class="token keyword">pub</span><span class="token punctuation">(</span><span class="token keyword">crate</span><span class="token punctuation">)</span> <span class="token keyword">fn</span> <span class="token function-definition function">into_frame</span><span class="token punctuation">(</span><span class="token keyword">self</span><span class="token punctuation">)</span> <span class="token punctuation">-&gt;</span> <span class="token class-name">Frame</span> <span class="token punctuation">{</span>
  775. <span class="token keyword">let</span> <span class="token keyword">mut</span> frame <span class="token operator">=</span> <span class="token class-name">Frame</span><span class="token punctuation">::</span><span class="token function">array</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
  776. <span class="token comment">// 将命令以[&quot;unsubscribe&quot;,channel1,channel2...]形</span>
  777. <span class="token comment">// 式放入数组</span>
  778. frame<span class="token punctuation">.</span><span class="token function">push_bulk</span><span class="token punctuation">(</span><span class="token class-name">Bytes</span><span class="token punctuation">::</span><span class="token function">from</span><span class="token punctuation">(</span><span class="token string">&quot;unsubscribe&quot;</span><span class="token punctuation">.</span><span class="token function">as_bytes</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
  779. <span class="token keyword">for</span> channel <span class="token keyword">in</span> <span class="token keyword">self</span><span class="token punctuation">.</span>channels <span class="token punctuation">{</span>
  780. frame<span class="token punctuation">.</span><span class="token function">push_bulk</span><span class="token punctuation">(</span><span class="token class-name">Bytes</span><span class="token punctuation">::</span><span class="token function">from</span><span class="token punctuation">(</span>channel<span class="token punctuation">.</span><span class="token function">into_bytes</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
  781. <span class="token punctuation">}</span>
  782. frame
  783. <span class="token punctuation">}</span>
  784. <span class="token punctuation">}</span>
  785. </code></pre></div><h4 id="unknown"><a href="#unknown" class="header-anchor">#</a> Unknown</h4> <h5 id="struct-12"><a href="#struct-12" class="header-anchor">#</a> Struct</h5> <div class="language-rust extra-class"><pre class="language-rust"><code><span class="token attribute attr-name">#[derive(Debug)]</span>
  786. <span class="token keyword">pub</span> <span class="token keyword">struct</span> <span class="token type-definition class-name">Unknown</span> <span class="token punctuation">{</span>
  787. command_name<span class="token punctuation">:</span> <span class="token class-name">String</span><span class="token punctuation">,</span>
  788. <span class="token punctuation">}</span>
  789. </code></pre></div><h5 id="implementation-11"><a href="#implementation-11" class="header-anchor">#</a> Implementation</h5> <div class="language-rust extra-class"><pre class="language-rust"><code><span class="token keyword">impl</span> <span class="token class-name">Unknown</span><span class="token punctuation">{</span>
  790. <span class="token comment">/// 创建unknown命令</span>
  791. <span class="token keyword">pub</span><span class="token punctuation">(</span><span class="token keyword">crate</span><span class="token punctuation">)</span> <span class="token keyword">fn</span> <span class="token function-definition function">new</span><span class="token punctuation">(</span>key<span class="token punctuation">:</span> <span class="token keyword">impl</span> <span class="token class-name">ToString</span><span class="token punctuation">)</span> <span class="token punctuation">-&gt;</span> <span class="token class-name">Unknown</span> <span class="token punctuation">{</span>
  792. <span class="token class-name">Unknown</span> <span class="token punctuation">{</span>
  793. command_name<span class="token punctuation">:</span> key<span class="token punctuation">.</span><span class="token function">to_string</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">,</span>
  794. <span class="token punctuation">}</span>
  795. <span class="token punctuation">}</span>
  796. <span class="token comment">/// 获取命令名</span>
  797. <span class="token keyword">pub</span><span class="token punctuation">(</span><span class="token keyword">crate</span><span class="token punctuation">)</span> <span class="token keyword">fn</span> <span class="token function-definition function">get_name</span><span class="token punctuation">(</span><span class="token operator">&amp;</span><span class="token keyword">self</span><span class="token punctuation">)</span> <span class="token punctuation">-&gt;</span> <span class="token operator">&amp;</span><span class="token keyword">str</span> <span class="token punctuation">{</span>
  798. <span class="token operator">&amp;</span><span class="token keyword">self</span><span class="token punctuation">.</span>command_name
  799. <span class="token punctuation">}</span>
  800. <span class="token comment">/// 回应客户端不支持当前命令</span>
  801. <span class="token attribute attr-name">#[instrument(skip(self, dst))]</span>
  802. <span class="token keyword">pub</span><span class="token punctuation">(</span><span class="token keyword">crate</span><span class="token punctuation">)</span> <span class="token keyword">async</span> <span class="token keyword">fn</span> <span class="token function-definition function">apply</span><span class="token punctuation">(</span><span class="token keyword">self</span><span class="token punctuation">,</span> dst<span class="token punctuation">:</span> <span class="token operator">&amp;</span><span class="token keyword">mut</span> <span class="token class-name">Connection</span><span class="token punctuation">)</span> <span class="token punctuation">-&gt;</span> <span class="token keyword">crate</span><span class="token punctuation">::</span><span class="token class-name">Result</span><span class="token operator">&lt;</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token operator">&gt;</span> <span class="token punctuation">{</span>
  803. <span class="token comment">// 生成错误帧响应</span>
  804. <span class="token keyword">let</span> response <span class="token operator">=</span> <span class="token class-name">Frame</span><span class="token punctuation">::</span><span class="token class-name">Error</span><span class="token punctuation">(</span><span class="token macro property">format!</span><span class="token punctuation">(</span><span class="token string">&quot;ERR unknown command '{}'&quot;</span><span class="token punctuation">,</span> <span class="token keyword">self</span><span class="token punctuation">.</span>command_name<span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
  805. <span class="token comment">// 日志记录响应</span>
  806. <span class="token macro property">debug!</span><span class="token punctuation">(</span><span class="token operator">?</span>response<span class="token punctuation">)</span><span class="token punctuation">;</span>
  807. <span class="token comment">// 将帧写入连接</span>
  808. dst<span class="token punctuation">.</span><span class="token function">write_frame</span><span class="token punctuation">(</span><span class="token operator">&amp;</span>response<span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span><span class="token operator">?</span><span class="token punctuation">;</span>
  809. <span class="token class-name">Ok</span><span class="token punctuation">(</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
  810. <span class="token punctuation">}</span>
  811. <span class="token punctuation">}</span>
  812. </code></pre></div><h4 id="ping"><a href="#ping" class="header-anchor">#</a> Ping</h4> <h5 id="struct-13"><a href="#struct-13" class="header-anchor">#</a> Struct</h5> <div class="language-rust extra-class"><pre class="language-rust"><code><span class="token attribute attr-name">#[derive(Debug, Default)]</span>
  813. <span class="token keyword">pub</span> <span class="token keyword">struct</span> <span class="token type-definition class-name">Ping</span> <span class="token punctuation">{</span>
  814. <span class="token comment">/// 自定义消息返回</span>
  815. msg<span class="token punctuation">:</span> <span class="token class-name">Option</span><span class="token operator">&lt;</span><span class="token class-name">Bytes</span><span class="token operator">&gt;</span><span class="token punctuation">,</span>
  816. <span class="token punctuation">}</span>
  817. </code></pre></div><h5 id="implement"><a href="#implement" class="header-anchor">#</a> Implement</h5> <div class="language-rust extra-class"><pre class="language-rust"><code><span class="token keyword">impl</span> <span class="token class-name">Ping</span> <span class="token punctuation">{</span>
  818. <span class="token comment">/// 创建一个新的携带自定信息的Ping命令</span>
  819. <span class="token keyword">pub</span> <span class="token keyword">fn</span> <span class="token function-definition function">new</span><span class="token punctuation">(</span>msg<span class="token punctuation">:</span> <span class="token class-name">Option</span><span class="token operator">&lt;</span><span class="token class-name">Bytes</span><span class="token operator">&gt;</span><span class="token punctuation">)</span> <span class="token punctuation">-&gt;</span> <span class="token class-name">Ping</span> <span class="token punctuation">{</span>
  820. <span class="token class-name">Ping</span> <span class="token punctuation">{</span> msg <span class="token punctuation">}</span>
  821. <span class="token punctuation">}</span>
  822. <span class="token comment">/// 解析ping命令</span>
  823. <span class="token keyword">pub</span><span class="token punctuation">(</span><span class="token keyword">crate</span><span class="token punctuation">)</span> <span class="token keyword">fn</span> <span class="token function-definition function">parse_frames</span><span class="token punctuation">(</span>parse<span class="token punctuation">:</span> <span class="token operator">&amp;</span><span class="token keyword">mut</span> <span class="token class-name">Parse</span><span class="token punctuation">)</span> <span class="token punctuation">-&gt;</span> <span class="token keyword">crate</span><span class="token punctuation">::</span><span class="token class-name">Result</span><span class="token operator">&lt;</span><span class="token class-name">Ping</span><span class="token operator">&gt;</span> <span class="token punctuation">{</span>
  824. <span class="token keyword">match</span> parse<span class="token punctuation">.</span><span class="token function">next_bytes</span><span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token punctuation">{</span>
  825. <span class="token class-name">Ok</span><span class="token punctuation">(</span>msg<span class="token punctuation">)</span> <span class="token operator">=&gt;</span> <span class="token class-name">Ok</span><span class="token punctuation">(</span><span class="token class-name">Ping</span><span class="token punctuation">::</span><span class="token function">new</span><span class="token punctuation">(</span><span class="token class-name">Some</span><span class="token punctuation">(</span>msg<span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">,</span>
  826. <span class="token class-name">Err</span><span class="token punctuation">(</span><span class="token class-name">ParseError</span><span class="token punctuation">::</span><span class="token class-name">EndOfStream</span><span class="token punctuation">)</span> <span class="token operator">=&gt;</span> <span class="token class-name">Ok</span><span class="token punctuation">(</span><span class="token class-name">Ping</span><span class="token punctuation">::</span><span class="token function">default</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">,</span>
  827. <span class="token class-name">Err</span><span class="token punctuation">(</span>e<span class="token punctuation">)</span> <span class="token operator">=&gt;</span> <span class="token class-name">Err</span><span class="token punctuation">(</span>e<span class="token punctuation">.</span><span class="token function">into</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">,</span>
  828. <span class="token punctuation">}</span>
  829. <span class="token punctuation">}</span>
  830. <span class="token comment">/// ping命令执行逻辑</span>
  831. <span class="token comment">///</span>
  832. <span class="token comment">/// 响应会被写入dst。</span>
  833. <span class="token comment">/// 服务端受到命令并执行时会执行。</span>
  834. <span class="token attribute attr-name">#[instrument(skip(self, dst))]</span>
  835. <span class="token keyword">pub</span><span class="token punctuation">(</span><span class="token keyword">crate</span><span class="token punctuation">)</span> <span class="token keyword">async</span> <span class="token keyword">fn</span> <span class="token function-definition function">apply</span><span class="token punctuation">(</span><span class="token keyword">self</span><span class="token punctuation">,</span> dst<span class="token punctuation">:</span> <span class="token operator">&amp;</span><span class="token keyword">mut</span> <span class="token class-name">Connection</span><span class="token punctuation">)</span> <span class="token punctuation">-&gt;</span> <span class="token keyword">crate</span><span class="token punctuation">::</span><span class="token class-name">Result</span><span class="token operator">&lt;</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token operator">&gt;</span> <span class="token punctuation">{</span>
  836. <span class="token keyword">let</span> response <span class="token operator">=</span> <span class="token keyword">match</span> <span class="token keyword">self</span><span class="token punctuation">.</span>msg <span class="token punctuation">{</span>
  837. <span class="token class-name">None</span> <span class="token operator">=&gt;</span> <span class="token class-name">Frame</span><span class="token punctuation">::</span><span class="token class-name">Simple</span><span class="token punctuation">(</span><span class="token string">&quot;PONG&quot;</span><span class="token punctuation">.</span><span class="token function">to_string</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">,</span>
  838. <span class="token class-name">Some</span><span class="token punctuation">(</span>msg<span class="token punctuation">)</span> <span class="token operator">=&gt;</span> <span class="token class-name">Frame</span><span class="token punctuation">::</span><span class="token class-name">Bulk</span><span class="token punctuation">(</span>msg<span class="token punctuation">)</span><span class="token punctuation">,</span>
  839. <span class="token punctuation">}</span><span class="token punctuation">;</span>
  840. <span class="token macro property">debug!</span><span class="token punctuation">(</span><span class="token operator">?</span>response<span class="token punctuation">)</span><span class="token punctuation">;</span>
  841. dst<span class="token punctuation">.</span><span class="token function">write_frame</span><span class="token punctuation">(</span><span class="token operator">&amp;</span>response<span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span><span class="token operator">?</span><span class="token punctuation">;</span>
  842. <span class="token class-name">Ok</span><span class="token punctuation">(</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
  843. <span class="token punctuation">}</span>
  844. <span class="token comment">/// 将命令转换成frame</span>
  845. <span class="token comment">///This is called by the client when encoding a `Ping` command to send</span>
  846. <span class="token comment">/// to the server.</span>
  847. <span class="token comment">/// 当客户端需要发送ping命令给服务端时会被调用</span>
  848. <span class="token keyword">pub</span><span class="token punctuation">(</span><span class="token keyword">crate</span><span class="token punctuation">)</span> <span class="token keyword">fn</span> <span class="token function-definition function">into_frame</span><span class="token punctuation">(</span><span class="token keyword">self</span><span class="token punctuation">)</span> <span class="token punctuation">-&gt;</span> <span class="token class-name">Frame</span> <span class="token punctuation">{</span>
  849. <span class="token keyword">let</span> <span class="token keyword">mut</span> frame <span class="token operator">=</span> <span class="token class-name">Frame</span><span class="token punctuation">::</span><span class="token function">array</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
  850. frame<span class="token punctuation">.</span><span class="token function">push_bulk</span><span class="token punctuation">(</span><span class="token class-name">Bytes</span><span class="token punctuation">::</span><span class="token function">from</span><span class="token punctuation">(</span><span class="token string">&quot;ping&quot;</span><span class="token punctuation">.</span><span class="token function">as_bytes</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
  851. <span class="token keyword">if</span> <span class="token keyword">let</span> <span class="token class-name">Some</span><span class="token punctuation">(</span>msg<span class="token punctuation">)</span> <span class="token operator">=</span> <span class="token keyword">self</span><span class="token punctuation">.</span>msg <span class="token punctuation">{</span>
  852. frame<span class="token punctuation">.</span><span class="token function">push_bulk</span><span class="token punctuation">(</span>msg<span class="token punctuation">)</span><span class="token punctuation">;</span>
  853. <span class="token punctuation">}</span>
  854. frame
  855. <span class="token punctuation">}</span>
  856. <span class="token punctuation">}</span>
  857. </code></pre></div><h3 id="frame"><a href="#frame" class="header-anchor">#</a> frame</h3> <blockquote><p>redis协议帧</p></blockquote> <h4 id="enum"><a href="#enum" class="header-anchor">#</a> enum</h4> <div class="language-rust extra-class"><pre class="language-rust"><code><span class="token comment">/// A frame in the Redis protocol.</span>
  858. <span class="token attribute attr-name">#[derive(Clone, Debug)]</span>
  859. <span class="token keyword">pub</span> <span class="token keyword">enum</span> <span class="token type-definition class-name">Frame</span> <span class="token punctuation">{</span>
  860. <span class="token class-name">Simple</span><span class="token punctuation">(</span><span class="token class-name">String</span><span class="token punctuation">)</span><span class="token punctuation">,</span>
  861. <span class="token class-name">Error</span><span class="token punctuation">(</span><span class="token class-name">String</span><span class="token punctuation">)</span><span class="token punctuation">,</span>
  862. <span class="token class-name">Integer</span><span class="token punctuation">(</span><span class="token keyword">u64</span><span class="token punctuation">)</span><span class="token punctuation">,</span>
  863. <span class="token class-name">Bulk</span><span class="token punctuation">(</span><span class="token class-name">Bytes</span><span class="token punctuation">)</span><span class="token punctuation">,</span>
  864. <span class="token class-name">Null</span><span class="token punctuation">,</span>
  865. <span class="token class-name">Array</span><span class="token punctuation">(</span><span class="token class-name">Vec</span><span class="token operator">&lt;</span><span class="token class-name">Frame</span><span class="token operator">&gt;</span><span class="token punctuation">)</span><span class="token punctuation">,</span>
  866. <span class="token punctuation">}</span>
  867. <span class="token attribute attr-name">#[derive(Debug)]</span>
  868. <span class="token keyword">pub</span> <span class="token keyword">enum</span> <span class="token type-definition class-name">Error</span> <span class="token punctuation">{</span>
  869. <span class="token comment">/// Not enough data is available to parse a message</span>
  870. <span class="token class-name">Incomplete</span><span class="token punctuation">,</span>
  871. <span class="token comment">/// Invalid message encoding</span>
  872. <span class="token class-name">Other</span><span class="token punctuation">(</span><span class="token keyword">crate</span><span class="token punctuation">::</span><span class="token class-name">Error</span><span class="token punctuation">)</span><span class="token punctuation">,</span>
  873. <span class="token punctuation">}</span>
  874. </code></pre></div><h3 id="sever"><a href="#sever" class="header-anchor">#</a> sever</h3> <blockquote><p>mini-redis的服务端。提供了一个异步的<code>run</code>函数,用于监听连接,并为每个连接生成一个任务</p></blockquote> <h4 id="listener"><a href="#listener" class="header-anchor">#</a> Listener</h4> <h5 id="struct-14"><a href="#struct-14" class="header-anchor">#</a> Struct</h5> <div class="language-rust extra-class"><pre class="language-rust"><code><span class="token comment">/// 服务端用于监听的连接</span>
  875. <span class="token attribute attr-name">#[derive(Debug)]</span>
  876. <span class="token keyword">struct</span> <span class="token type-definition class-name">Listener</span> <span class="token punctuation">{</span>
  877. <span class="token comment">/// 可共享的数据库连接句柄(Arc形式)</span>
  878. db_holder<span class="token punctuation">:</span> <span class="token class-name">DbDropGuard</span><span class="token punctuation">,</span>
  879. <span class="token comment">/// TCP监听类,由run函数提供</span>
  880. listener<span class="token punctuation">:</span> <span class="token class-name">TcpListener</span><span class="token punctuation">,</span>
  881. <span class="token comment">/// 信号量,用于限制连接数</span>
  882. limit_connections<span class="token punctuation">:</span> <span class="token class-name">Arc</span><span class="token operator">&lt;</span><span class="token class-name">Semaphore</span><span class="token operator">&gt;</span><span class="token punctuation">,</span>
  883. <span class="token comment">/// 用于广播shutdown信号</span>
  884. notify_shutdown<span class="token punctuation">:</span> <span class="token namespace">broadcast<span class="token punctuation">::</span></span><span class="token class-name">Sender</span><span class="token operator">&lt;</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token operator">&gt;</span><span class="token punctuation">,</span>
  885. <span class="token comment">/// 用于在客户端完成所有任务后安全关闭连接。</span>
  886. <span class="token comment">/// 当一个连接被初始化,它会保存‵shutdown_complete_tx‵的clone,被关闭时回收</span>
  887. <span class="token comment">/// 当所有listener的连接都被关闭后,sender也会被释放</span>
  888. <span class="token comment">/// 所有任务被完成后,`shutdown_complete_rx.recv()`会返回`None`</span>
  889. shutdown_complete_tx<span class="token punctuation">:</span> <span class="token namespace">mpsc<span class="token punctuation">::</span></span><span class="token class-name">Sender</span><span class="token operator">&lt;</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token operator">&gt;</span><span class="token punctuation">,</span>
  890. <span class="token punctuation">}</span>
  891. </code></pre></div><h5 id="implementation-12"><a href="#implementation-12" class="header-anchor">#</a> Implementation</h5> <div class="language-rust extra-class"><pre class="language-rust"><code><span class="token keyword">impl</span> <span class="token class-name">Listener</span> <span class="token punctuation">{</span>
  892. <span class="token comment">/// 启动server监听服务</span>
  893. <span class="token keyword">async</span> <span class="token keyword">fn</span> <span class="token function-definition function">run</span><span class="token punctuation">(</span><span class="token operator">&amp;</span><span class="token keyword">mut</span> <span class="token keyword">self</span><span class="token punctuation">)</span> <span class="token punctuation">-&gt;</span> <span class="token keyword">crate</span><span class="token punctuation">::</span><span class="token class-name">Result</span><span class="token operator">&lt;</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token operator">&gt;</span> <span class="token punctuation">{</span>
  894. <span class="token macro property">info!</span><span class="token punctuation">(</span><span class="token string">&quot;accepting inbound connections&quot;</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
  895. <span class="token keyword">loop</span> <span class="token punctuation">{</span>
  896. <span class="token comment">// 等待有名额创建连接</span>
  897. <span class="token keyword">let</span> permit <span class="token operator">=</span> <span class="token keyword">self</span>
  898. <span class="token punctuation">.</span>limit_connections
  899. <span class="token punctuation">.</span><span class="token function">clone</span><span class="token punctuation">(</span><span class="token punctuation">)</span>
  900. <span class="token punctuation">.</span><span class="token function">acquire_owned</span><span class="token punctuation">(</span><span class="token punctuation">)</span>
  901. <span class="token punctuation">.</span><span class="token keyword">await</span>
  902. <span class="token punctuation">.</span><span class="token function">unwrap</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
  903. <span class="token comment">// 建立tcp连接,获取套接字</span>
  904. <span class="token keyword">let</span> socket <span class="token operator">=</span> <span class="token keyword">self</span><span class="token punctuation">.</span><span class="token function">accept</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span><span class="token operator">?</span><span class="token punctuation">;</span>
  905. <span class="token comment">// 存储连接状态</span>
  906. <span class="token keyword">let</span> <span class="token keyword">mut</span> handler <span class="token operator">=</span> <span class="token class-name">Handler</span> <span class="token punctuation">{</span>
  907. <span class="token comment">// 获取数据库连接</span>
  908. db<span class="token punctuation">:</span> <span class="token keyword">self</span><span class="token punctuation">.</span>db_holder<span class="token punctuation">.</span><span class="token function">db</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">,</span>
  909. <span class="token comment">// 新建连接</span>
  910. connection<span class="token punctuation">:</span> <span class="token class-name">Connection</span><span class="token punctuation">::</span><span class="token function">new</span><span class="token punctuation">(</span>socket<span class="token punctuation">)</span><span class="token punctuation">,</span>
  911. <span class="token comment">// 用于接受shutdown信号</span>
  912. shutdown<span class="token punctuation">:</span> <span class="token class-name">Shutdown</span><span class="token punctuation">::</span><span class="token function">new</span><span class="token punctuation">(</span><span class="token keyword">self</span><span class="token punctuation">.</span>notify_shutdown<span class="token punctuation">.</span><span class="token function">subscribe</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">,</span>
  913. <span class="token comment">// 一旦所有的clone被释放,会通过此成员来通知</span>
  914. _shutdown_complete<span class="token punctuation">:</span> <span class="token keyword">self</span><span class="token punctuation">.</span>shutdown_complete_tx<span class="token punctuation">.</span><span class="token function">clone</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">,</span>
  915. <span class="token punctuation">}</span><span class="token punctuation">;</span>
  916. <span class="token comment">// 创建一个异步的任务执行连接要做的操作</span>
  917. <span class="token namespace">tokio<span class="token punctuation">::</span></span><span class="token function">spawn</span><span class="token punctuation">(</span><span class="token keyword">async</span> <span class="token keyword">move</span> <span class="token punctuation">{</span>
  918. <span class="token comment">// 执行连接,并记录错误</span>
  919. <span class="token keyword">if</span> <span class="token keyword">let</span> <span class="token class-name">Err</span><span class="token punctuation">(</span>err<span class="token punctuation">)</span> <span class="token operator">=</span> handler<span class="token punctuation">.</span><span class="token function">run</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span> <span class="token punctuation">{</span>
  920. <span class="token macro property">error!</span><span class="token punctuation">(</span>cause <span class="token operator">=</span> <span class="token operator">?</span>err<span class="token punctuation">,</span> <span class="token string">&quot;connection error&quot;</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
  921. <span class="token punctuation">}</span>
  922. <span class="token comment">// 归还连接名额</span>
  923. <span class="token function">drop</span><span class="token punctuation">(</span>permit<span class="token punctuation">)</span><span class="token punctuation">;</span>
  924. <span class="token punctuation">}</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
  925. <span class="token punctuation">}</span>
  926. <span class="token punctuation">}</span>
  927. <span class="token comment">/// 建立tcp连接,获取套接字</span>
  928. <span class="token comment">/// 如果发生错误,最大重连次数为6,最后一次等待时间为64s。重连流程参考tcp超时重发机制。</span>
  929. <span class="token keyword">async</span> <span class="token keyword">fn</span> <span class="token function-definition function">accept</span><span class="token punctuation">(</span><span class="token operator">&amp;</span><span class="token keyword">mut</span> <span class="token keyword">self</span><span class="token punctuation">)</span> <span class="token punctuation">-&gt;</span> <span class="token keyword">crate</span><span class="token punctuation">::</span><span class="token class-name">Result</span><span class="token operator">&lt;</span><span class="token class-name">TcpStream</span><span class="token operator">&gt;</span> <span class="token punctuation">{</span>
  930. <span class="token keyword">let</span> <span class="token keyword">mut</span> backoff <span class="token operator">=</span> <span class="token number">1</span><span class="token punctuation">;</span>
  931. <span class="token comment">// 尝试接受连接</span>
  932. <span class="token keyword">loop</span> <span class="token punctuation">{</span>
  933. <span class="token comment">// 成功直接返回tcp连接 失败返回错误信息</span>
  934. <span class="token keyword">match</span> <span class="token keyword">self</span><span class="token punctuation">.</span>listener<span class="token punctuation">.</span><span class="token function">accept</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span> <span class="token punctuation">{</span>
  935. <span class="token class-name">Ok</span><span class="token punctuation">(</span><span class="token punctuation">(</span>socket<span class="token punctuation">,</span> _<span class="token punctuation">)</span><span class="token punctuation">)</span> <span class="token operator">=&gt;</span> <span class="token keyword">return</span> <span class="token class-name">Ok</span><span class="token punctuation">(</span>socket<span class="token punctuation">)</span><span class="token punctuation">,</span>
  936. <span class="token class-name">Err</span><span class="token punctuation">(</span>err<span class="token punctuation">)</span> <span class="token operator">=&gt;</span> <span class="token punctuation">{</span>
  937. <span class="token keyword">if</span> backoff <span class="token operator">&gt;</span> <span class="token number">64</span> <span class="token punctuation">{</span>
  938. <span class="token comment">// 等待时间过长,返回错误</span>
  939. <span class="token keyword">return</span> <span class="token class-name">Err</span><span class="token punctuation">(</span>err<span class="token punctuation">.</span><span class="token function">into</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
  940. <span class="token punctuation">}</span>
  941. <span class="token punctuation">}</span>
  942. <span class="token punctuation">}</span>
  943. <span class="token comment">// 睡眠</span>
  944. <span class="token namespace">time<span class="token punctuation">::</span></span><span class="token function">sleep</span><span class="token punctuation">(</span><span class="token class-name">Duration</span><span class="token punctuation">::</span><span class="token function">from_secs</span><span class="token punctuation">(</span>backoff<span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span><span class="token punctuation">;</span>
  945. <span class="token comment">// 指数增长等待时间</span>
  946. backoff <span class="token operator">*=</span> <span class="token number">2</span><span class="token punctuation">;</span>
  947. <span class="token punctuation">}</span>
  948. <span class="token punctuation">}</span>
  949. <span class="token punctuation">}</span>
  950. </code></pre></div><h4 id="handler"><a href="#handler" class="header-anchor">#</a> Handler</h4> <h5 id="struct-15"><a href="#struct-15" class="header-anchor">#</a> Struct</h5> <div class="language-rust extra-class"><pre class="language-rust"><code><span class="token comment">/// 服务端的具体连接</span>
  951. <span class="token attribute attr-name">#[derive(Debug)]</span>
  952. <span class="token keyword">struct</span> <span class="token type-definition class-name">Handler</span> <span class="token punctuation">{</span>
  953. <span class="token comment">/// 数据库连接</span>
  954. db<span class="token punctuation">:</span> <span class="token class-name">Db</span><span class="token punctuation">,</span>
  955. <span class="token comment">/// tcp连接</span>
  956. connection<span class="token punctuation">:</span> <span class="token class-name">Connection</span><span class="token punctuation">,</span>
  957. <span class="token comment">/// 监听shutdown信号</span>
  958. shutdown<span class="token punctuation">:</span> <span class="token class-name">Shutdown</span><span class="token punctuation">,</span>
  959. <span class="token comment">/// 不直接使用,用来判断当前类对象是否被释放</span>
  960. _shutdown_complete<span class="token punctuation">:</span> <span class="token namespace">mpsc<span class="token punctuation">::</span></span><span class="token class-name">Sender</span><span class="token operator">&lt;</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token operator">&gt;</span><span class="token punctuation">,</span>
  961. <span class="token punctuation">}</span>
  962. </code></pre></div><h5 id="implementation-13"><a href="#implementation-13" class="header-anchor">#</a> Implementation</h5> <div class="language-rust extra-class"><pre class="language-rust"><code><span class="token keyword">impl</span> <span class="token class-name">Handler</span> <span class="token punctuation">{</span>
  963. <span class="token comment">/// 处理单个连接</span>
  964. <span class="token comment">/// </span>
  965. <span class="token comment">/// 当接收到shutdown信号时,等到该连接处于安全状态时,连接会断开</span>
  966. <span class="token attribute attr-name">#[instrument(skip(self))]</span>
  967. <span class="token keyword">async</span> <span class="token keyword">fn</span> <span class="token function-definition function">run</span><span class="token punctuation">(</span><span class="token operator">&amp;</span><span class="token keyword">mut</span> <span class="token keyword">self</span><span class="token punctuation">)</span> <span class="token punctuation">-&gt;</span> <span class="token keyword">crate</span><span class="token punctuation">::</span><span class="token class-name">Result</span><span class="token operator">&lt;</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token operator">&gt;</span> <span class="token punctuation">{</span>
  968. <span class="token comment">// 循环处理帧</span>
  969. <span class="token keyword">while</span> <span class="token operator">!</span><span class="token keyword">self</span><span class="token punctuation">.</span>shutdown<span class="token punctuation">.</span><span class="token function">s_shutdown</span><span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token punctuation">{</span>
  970. <span class="token keyword">let</span> maybe_frame <span class="token operator">=</span> <span class="token namespace">tokio<span class="token punctuation">::</span></span><span class="token macro property">select!</span> <span class="token punctuation">{</span>
  971. res <span class="token operator">=</span> <span class="token keyword">self</span><span class="token punctuation">.</span>connection<span class="token punctuation">.</span><span class="token function">read_frame</span><span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token operator">=&gt;</span> res<span class="token operator">?</span><span class="token punctuation">,</span>
  972. _ <span class="token operator">=</span> <span class="token keyword">self</span><span class="token punctuation">.</span>shutdown<span class="token punctuation">.</span><span class="token function">recv</span><span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token operator">=&gt;</span> <span class="token punctuation">{</span>
  973. <span class="token comment">// 接收到shutdown信号,退出run函数会使任务结束</span>
  974. <span class="token keyword">return</span> <span class="token class-name">Ok</span><span class="token punctuation">(</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
  975. <span class="token punctuation">}</span>
  976. <span class="token punctuation">}</span><span class="token punctuation">;</span>
  977. <span class="token comment">// 判断从远端收到的帧(maybe_frame)是否有内容,收到‵None`时说明远端关闭</span>
  978. <span class="token keyword">let</span> frame <span class="token operator">=</span> <span class="token keyword">match</span> maybe_frame <span class="token punctuation">{</span>
  979. <span class="token class-name">Some</span><span class="token punctuation">(</span>frame<span class="token punctuation">)</span> <span class="token operator">=&gt;</span> frame<span class="token punctuation">,</span>
  980. <span class="token class-name">None</span> <span class="token operator">=&gt;</span> <span class="token keyword">return</span> <span class="token class-name">Ok</span><span class="token punctuation">(</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">,</span>
  981. <span class="token punctuation">}</span><span class="token punctuation">;</span>
  982. <span class="token comment">// 将frame转为命令形式</span>
  983. <span class="token keyword">let</span> cmd <span class="token operator">=</span> <span class="token class-name">Command</span><span class="token punctuation">::</span><span class="token function">from_frame</span><span class="token punctuation">(</span>frame<span class="token punctuation">)</span><span class="token operator">?</span><span class="token punctuation">;</span>
  984. <span class="token comment">// 日志记录接收到的命令</span>
  985. <span class="token macro property">debug!</span><span class="token punctuation">(</span><span class="token operator">?</span>cmd<span class="token punctuation">)</span><span class="token punctuation">;</span>
  986. <span class="token comment">// 异步执行命令</span>
  987. cmd<span class="token punctuation">.</span><span class="token function">apply</span><span class="token punctuation">(</span><span class="token operator">&amp;</span><span class="token keyword">self</span><span class="token punctuation">.</span>db<span class="token punctuation">,</span> <span class="token operator">&amp;</span><span class="token keyword">mut</span> <span class="token keyword">self</span><span class="token punctuation">.</span>connection<span class="token punctuation">,</span> <span class="token operator">&amp;</span><span class="token keyword">mut</span> <span class="token keyword">self</span><span class="token punctuation">.</span>shutdown<span class="token punctuation">)</span>
  988. <span class="token punctuation">.</span><span class="token keyword">await</span><span class="token operator">?</span><span class="token punctuation">;</span>
  989. <span class="token punctuation">}</span>
  990. <span class="token class-name">Ok</span><span class="token punctuation">(</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
  991. <span class="token punctuation">}</span>
  992. <span class="token punctuation">}</span>
  993. </code></pre></div><h4 id="fn-run"><a href="#fn-run" class="header-anchor">#</a> Fn Run</h4> <div class="language-rust extra-class"><pre class="language-rust"><code><span class="token comment">/// mini-redis的服务端启动函数</span>
  994. <span class="token keyword">pub</span> <span class="token keyword">async</span> <span class="token keyword">fn</span> <span class="token function-definition function">run</span><span class="token punctuation">(</span>listener<span class="token punctuation">:</span> <span class="token class-name">TcpListener</span><span class="token punctuation">,</span> shutdown<span class="token punctuation">:</span> <span class="token keyword">impl</span> <span class="token class-name">Future</span><span class="token punctuation">)</span> <span class="token punctuation">{</span>
  995. <span class="token comment">// 初始化shutdown信号广播管道</span>
  996. <span class="token keyword">let</span> <span class="token punctuation">(</span>notify_shutdown<span class="token punctuation">,</span> _<span class="token punctuation">)</span> <span class="token operator">=</span> <span class="token namespace">broadcast<span class="token punctuation">::</span></span><span class="token function">channel</span><span class="token punctuation">(</span><span class="token number">1</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
  997. <span class="token comment">// 初始化shutdown关闭机制</span>
  998. <span class="token keyword">let</span> <span class="token punctuation">(</span>shutdown_complete_tx<span class="token punctuation">,</span> <span class="token keyword">mut</span> shutdown_complete_rx<span class="token punctuation">)</span> <span class="token operator">=</span> <span class="token namespace">mpsc<span class="token punctuation">::</span></span><span class="token function">channel</span><span class="token punctuation">(</span><span class="token number">1</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
  999. <span class="token comment">// 初始化监听连接</span>
  1000. <span class="token keyword">let</span> <span class="token keyword">mut</span> server <span class="token operator">=</span> <span class="token class-name">Listener</span> <span class="token punctuation">{</span>
  1001. listener<span class="token punctuation">,</span>
  1002. db_holder<span class="token punctuation">:</span> <span class="token class-name">DbDropGuard</span><span class="token punctuation">::</span><span class="token function">new</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">,</span>
  1003. limit_connections<span class="token punctuation">:</span> <span class="token class-name">Arc</span><span class="token punctuation">::</span><span class="token function">new</span><span class="token punctuation">(</span><span class="token class-name">Semaphore</span><span class="token punctuation">::</span><span class="token function">new</span><span class="token punctuation">(</span><span class="token constant">MAX_CONNECTIONS</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">,</span>
  1004. notify_shutdown<span class="token punctuation">,</span>
  1005. shutdown_complete_tx<span class="token punctuation">,</span>
  1006. <span class="token punctuation">}</span><span class="token punctuation">;</span>
  1007. <span class="token comment">// 监听是否有连接到达</span>
  1008. <span class="token namespace">tokio<span class="token punctuation">::</span></span><span class="token macro property">select!</span> <span class="token punctuation">{</span>
  1009. res <span class="token operator">=</span> server<span class="token punctuation">.</span><span class="token function">run</span><span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token operator">=&gt;</span> <span class="token punctuation">{</span>
  1010. <span class="token comment">// 处理连接失败的情况</span>
  1011. <span class="token keyword">if</span> <span class="token keyword">let</span> <span class="token class-name">Err</span><span class="token punctuation">(</span>err<span class="token punctuation">)</span> <span class="token operator">=</span> res <span class="token punctuation">{</span>
  1012. <span class="token macro property">error!</span><span class="token punctuation">(</span>cause <span class="token operator">=</span> <span class="token operator">%</span>err<span class="token punctuation">,</span> <span class="token string">&quot;failed to accept&quot;</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
  1013. <span class="token punctuation">}</span>
  1014. <span class="token punctuation">}</span>
  1015. _ <span class="token operator">=</span> shutdown <span class="token operator">=&gt;</span> <span class="token punctuation">{</span>
  1016. <span class="token macro property">info!</span><span class="token punctuation">(</span><span class="token string">&quot;shutting down&quot;</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
  1017. <span class="token punctuation">}</span>
  1018. <span class="token punctuation">}</span>
  1019. <span class="token keyword">let</span> <span class="token class-name">Listener</span> <span class="token punctuation">{</span>
  1020. shutdown_complete_tx<span class="token punctuation">,</span>
  1021. notify_shutdown<span class="token punctuation">,</span>
  1022. <span class="token punctuation">..</span>
  1023. <span class="token punctuation">}</span> <span class="token operator">=</span> server<span class="token punctuation">;</span>
  1024. <span class="token comment">// 所有的处在subscribe状态的任务会接收到shutdown信号并且退出</span>
  1025. <span class="token function">drop</span><span class="token punctuation">(</span>notify_shutdown<span class="token punctuation">)</span><span class="token punctuation">;</span>
  1026. <span class="token comment">// 回收监听者的shutdown_complete_tx,意味着只有其他连接持有shutdown_complete_tx了</span>
  1027. <span class="token function">drop</span><span class="token punctuation">(</span>shutdown_complete_tx<span class="token punctuation">)</span><span class="token punctuation">;</span>
  1028. <span class="token comment">// 等待所有活跃连接完成其任务</span>
  1029. <span class="token comment">// 当所有shutdown_complete_tx都被释放,说明所有连接都断开了</span>
  1030. <span class="token comment">// 此时`recv()`会返回`None`并且`mpsc`管道会被关闭</span>
  1031. <span class="token keyword">let</span> _ <span class="token operator">=</span> shutdown_complete_rx<span class="token punctuation">.</span><span class="token function">recv</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span><span class="token punctuation">;</span>
  1032. <span class="token punctuation">}</span>
  1033. </code></pre></div><h2 id="function-2"><a href="#function-2" class="header-anchor">#</a> Function</h2> <h3 id="fn-buffer"><a href="#fn-buffer" class="header-anchor">#</a> Fn Buffer</h3> <div class="language-rust extra-class"><pre class="language-rust"><code><span class="token comment">/// 创建一个请求缓冲区</span>
  1034. <span class="token keyword">pub</span> <span class="token keyword">fn</span> <span class="token function-definition function">buffer</span><span class="token punctuation">(</span>client<span class="token punctuation">:</span> <span class="token class-name">Client</span><span class="token punctuation">)</span> <span class="token punctuation">-&gt;</span> <span class="token class-name">Buffer</span> <span class="token punctuation">{</span>
  1035. <span class="token comment">// 创建一个容量为32的异步通道。tx为发送端,rx为接收端</span>
  1036. <span class="token keyword">let</span> <span class="token punctuation">(</span>tx<span class="token punctuation">,</span> rx<span class="token punctuation">)</span> <span class="token operator">=</span> <span class="token function">channel</span><span class="token punctuation">(</span><span class="token number">32</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
  1037. <span class="token comment">// 使用 tokio::spawn 函数创建一个异步任务(task)</span>
  1038. <span class="token comment">// run 函数接受 client 和 rx(接收端)作为参数</span>
  1039. <span class="token comment">// 并在 await 关键字处暂停执行,直到接收端接收到消息。</span>
  1040. <span class="token namespace">tokio<span class="token punctuation">::</span></span><span class="token function">spawn</span><span class="token punctuation">(</span><span class="token keyword">async</span> <span class="token keyword">move</span> <span class="token punctuation">{</span> <span class="token function">run</span><span class="token punctuation">(</span>client<span class="token punctuation">,</span> rx<span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span> <span class="token punctuation">}</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
  1041. <span class="token comment">// 返回缓冲区</span>
  1042. <span class="token class-name">Buffer</span> <span class="token punctuation">{</span> tx <span class="token punctuation">}</span>
  1043. <span class="token punctuation">}</span>
  1044. </code></pre></div><h2 id="struct-16"><a href="#struct-16" class="header-anchor">#</a> Struct</h2> <h3 id="buffer"><a href="#buffer" class="header-anchor">#</a> Buffer</h3> <h4 id="struct-17"><a href="#struct-17" class="header-anchor">#</a> struct</h4> <div class="language-rust extra-class"><pre class="language-rust"><code><span class="token comment">/// 客户端的请求窗口</span>
  1045. <span class="token attribute attr-name">#[derive(Clone)]</span>
  1046. <span class="token keyword">pub</span> <span class="token keyword">struct</span> <span class="token type-definition class-name">Buffer</span> <span class="token punctuation">{</span>
  1047. tx<span class="token punctuation">:</span> <span class="token class-name">Sender</span><span class="token operator">&lt;</span><span class="token class-name">Message</span><span class="token operator">&gt;</span><span class="token punctuation">,</span>
  1048. <span class="token punctuation">}</span>
  1049. </code></pre></div><h4 id="implementation-14"><a href="#implementation-14" class="header-anchor">#</a> Implementation</h4> <div class="language-rust extra-class"><pre class="language-rust"><code><span class="token keyword">impl</span> <span class="token class-name">Buffer</span><span class="token punctuation">{</span>
  1050. <span class="token comment">/// 获取key的值。请求会被缓存直到连接能够发送请求</span>
  1051. <span class="token keyword">pub</span> <span class="token keyword">async</span> <span class="token keyword">fn</span> <span class="token function-definition function">get</span><span class="token punctuation">(</span><span class="token operator">&amp;</span><span class="token keyword">mut</span> <span class="token keyword">self</span><span class="token punctuation">,</span> key<span class="token punctuation">:</span> <span class="token operator">&amp;</span><span class="token keyword">str</span><span class="token punctuation">)</span> <span class="token punctuation">-&gt;</span> <span class="token class-name">Result</span><span class="token operator">&lt;</span><span class="token class-name">Option</span><span class="token operator">&lt;</span><span class="token class-name">Bytes</span><span class="token operator">&gt;&gt;</span> <span class="token punctuation">{</span>
  1052. <span class="token comment">// 初始化一个get命令</span>
  1053. <span class="token keyword">let</span> get <span class="token operator">=</span> <span class="token class-name">Command</span><span class="token punctuation">::</span><span class="token class-name">Get</span><span class="token punctuation">(</span>key<span class="token punctuation">.</span><span class="token function">into</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
  1054. <span class="token comment">// 初始化一次性的管道以接收连接的回应</span>
  1055. <span class="token keyword">let</span> <span class="token punctuation">(</span>tx<span class="token punctuation">,</span> rx<span class="token punctuation">)</span> <span class="token operator">=</span> <span class="token namespace">oneshot<span class="token punctuation">::</span></span><span class="token function">channel</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
  1056. <span class="token comment">// 发送请求</span>
  1057. <span class="token keyword">self</span><span class="token punctuation">.</span>tx<span class="token punctuation">.</span><span class="token function">send</span><span class="token punctuation">(</span><span class="token punctuation">(</span>get<span class="token punctuation">,</span> tx<span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span><span class="token operator">?</span><span class="token punctuation">;</span>
  1058. <span class="token comment">// 等待回应,并返回结果</span>
  1059. <span class="token keyword">match</span> rx<span class="token punctuation">.</span><span class="token keyword">await</span> <span class="token punctuation">{</span>
  1060. <span class="token class-name">Ok</span><span class="token punctuation">(</span>res<span class="token punctuation">)</span> <span class="token operator">=&gt;</span> res<span class="token punctuation">,</span>
  1061. <span class="token class-name">Err</span><span class="token punctuation">(</span>err<span class="token punctuation">)</span> <span class="token operator">=&gt;</span> <span class="token class-name">Err</span><span class="token punctuation">(</span>err<span class="token punctuation">.</span><span class="token function">into</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">,</span>
  1062. <span class="token punctuation">}</span>
  1063. <span class="token punctuation">}</span>
  1064. <span class="token comment">/// 设置key的值</span>
  1065. <span class="token keyword">pub</span> <span class="token keyword">async</span> <span class="token keyword">fn</span> <span class="token function-definition function">set</span><span class="token punctuation">(</span><span class="token operator">&amp;</span><span class="token keyword">mut</span> <span class="token keyword">self</span><span class="token punctuation">,</span> key<span class="token punctuation">:</span> <span class="token operator">&amp;</span><span class="token keyword">str</span><span class="token punctuation">,</span> value<span class="token punctuation">:</span> <span class="token class-name">Bytes</span><span class="token punctuation">)</span> <span class="token punctuation">-&gt;</span> <span class="token class-name">Result</span><span class="token operator">&lt;</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token operator">&gt;</span> <span class="token punctuation">{</span>
  1066. <span class="token comment">// 初始化一个set命令</span>
  1067. <span class="token keyword">let</span> set <span class="token operator">=</span> <span class="token class-name">Command</span><span class="token punctuation">::</span><span class="token class-name">Set</span><span class="token punctuation">(</span>key<span class="token punctuation">.</span><span class="token function">into</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">,</span> value<span class="token punctuation">)</span><span class="token punctuation">;</span>
  1068. <span class="token comment">// 初始化一次性的管道以接收连接的回应</span>
  1069. <span class="token keyword">let</span> <span class="token punctuation">(</span>tx<span class="token punctuation">,</span> rx<span class="token punctuation">)</span> <span class="token operator">=</span> <span class="token namespace">oneshot<span class="token punctuation">::</span></span><span class="token function">channel</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
  1070. <span class="token comment">// 发送请求</span>
  1071. <span class="token keyword">self</span><span class="token punctuation">.</span>tx<span class="token punctuation">.</span><span class="token function">send</span><span class="token punctuation">(</span><span class="token punctuation">(</span>set<span class="token punctuation">,</span> tx<span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span><span class="token operator">?</span><span class="token punctuation">;</span>
  1072. <span class="token comment">// 等待回应,并返回结果</span>
  1073. <span class="token keyword">match</span> rx<span class="token punctuation">.</span><span class="token keyword">await</span> <span class="token punctuation">{</span>
  1074. <span class="token class-name">Ok</span><span class="token punctuation">(</span>res<span class="token punctuation">)</span> <span class="token operator">=&gt;</span> res<span class="token punctuation">.</span><span class="token function">map</span><span class="token punctuation">(</span><span class="token closure-params"><span class="token closure-punctuation punctuation">|</span>_<span class="token closure-punctuation punctuation">|</span></span> <span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">,</span>
  1075. <span class="token class-name">Err</span><span class="token punctuation">(</span>err<span class="token punctuation">)</span> <span class="token operator">=&gt;</span> <span class="token class-name">Err</span><span class="token punctuation">(</span>err<span class="token punctuation">.</span><span class="token function">into</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">,</span>
  1076. <span class="token punctuation">}</span>
  1077. <span class="token punctuation">}</span>
  1078. <span class="token punctuation">}</span>
  1079. </code></pre></div><h3 id="connection"><a href="#connection" class="header-anchor">#</a> Connection</h3> <h4 id="struct-18"><a href="#struct-18" class="header-anchor">#</a> struct</h4> <div class="language-rust extra-class"><pre class="language-rust"><code><span class="token comment">/// 发送和接收来自远端的帧</span>
  1080. <span class="token attribute attr-name">#[derive(Debug)]</span>
  1081. <span class="token keyword">pub</span> <span class="token keyword">struct</span> <span class="token type-definition class-name">Connection</span> <span class="token punctuation">{</span>
  1082. <span class="token comment">// 带有写入缓冲区的tcp流</span>
  1083. stream<span class="token punctuation">:</span> <span class="token class-name">BufWriter</span><span class="token operator">&lt;</span><span class="token class-name">TcpStream</span><span class="token operator">&gt;</span><span class="token punctuation">,</span>
  1084. <span class="token comment">// 帧读缓冲区</span>
  1085. buffer<span class="token punctuation">:</span> <span class="token class-name">BytesMut</span><span class="token punctuation">,</span>
  1086. <span class="token punctuation">}</span>
  1087. </code></pre></div><h4 id="implementation-15"><a href="#implementation-15" class="header-anchor">#</a> Implementation</h4> <div class="language-rust extra-class"><pre class="language-rust"><code><span class="token keyword">impl</span> <span class="token class-name">Connection</span><span class="token punctuation">{</span>
  1088. <span class="token comment">/// 创建新的连接并返回socket,同时初始化缓冲区</span>
  1089. <span class="token keyword">pub</span> <span class="token keyword">fn</span> <span class="token function-definition function">new</span><span class="token punctuation">(</span>socket<span class="token punctuation">:</span> <span class="token class-name">TcpStream</span><span class="token punctuation">)</span> <span class="token punctuation">-&gt;</span> <span class="token class-name">Connection</span> <span class="token punctuation">{</span>
  1090. <span class="token class-name">Connection</span> <span class="token punctuation">{</span>
  1091. stream<span class="token punctuation">:</span> <span class="token class-name">BufWriter</span><span class="token punctuation">::</span><span class="token function">new</span><span class="token punctuation">(</span>socket<span class="token punctuation">)</span><span class="token punctuation">,</span>
  1092. buffer<span class="token punctuation">:</span> <span class="token class-name">BytesMut</span><span class="token punctuation">::</span><span class="token function">with_capacity</span><span class="token punctuation">(</span><span class="token number">4</span> <span class="token operator">*</span> <span class="token number">1024</span><span class="token punctuation">)</span><span class="token punctuation">,</span>
  1093. <span class="token punctuation">}</span>
  1094. <span class="token punctuation">}</span>
  1095. <span class="token comment">/// 从tcp流中读取一个帧</span>
  1096. <span class="token keyword">pub</span> <span class="token keyword">async</span> <span class="token keyword">fn</span> <span class="token function-definition function">read_frame</span><span class="token punctuation">(</span><span class="token operator">&amp;</span><span class="token keyword">mut</span> <span class="token keyword">self</span><span class="token punctuation">)</span> <span class="token punctuation">-&gt;</span> <span class="token keyword">crate</span><span class="token punctuation">::</span><span class="token class-name">Result</span><span class="token operator">&lt;</span><span class="token class-name">Option</span><span class="token operator">&lt;</span><span class="token class-name">Frame</span><span class="token operator">&gt;&gt;</span> <span class="token punctuation">{</span>
  1097. <span class="token keyword">loop</span> <span class="token punctuation">{</span>
  1098. <span class="token comment">// 解析并获取帧</span>
  1099. <span class="token keyword">if</span> <span class="token keyword">let</span> <span class="token class-name">Some</span><span class="token punctuation">(</span>frame<span class="token punctuation">)</span> <span class="token operator">=</span> <span class="token keyword">self</span><span class="token punctuation">.</span><span class="token function">parse_frame</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token operator">?</span> <span class="token punctuation">{</span>
  1100. <span class="token keyword">return</span> <span class="token class-name">Ok</span><span class="token punctuation">(</span><span class="token class-name">Some</span><span class="token punctuation">(</span>frame<span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
  1101. <span class="token punctuation">}</span>
  1102. <span class="token comment">// 尝试从socket中读取更多的数据,如果读到的字节数为0,说明流被关闭</span>
  1103. <span class="token keyword">if</span> <span class="token number">0</span> <span class="token operator">==</span> <span class="token keyword">self</span><span class="token punctuation">.</span>stream<span class="token punctuation">.</span><span class="token function">read_buf</span><span class="token punctuation">(</span><span class="token operator">&amp;</span><span class="token keyword">mut</span> <span class="token keyword">self</span><span class="token punctuation">.</span>buffer<span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span><span class="token operator">?</span> <span class="token punctuation">{</span>
  1104. <span class="token comment">// 如果缓冲区为空说明远端正常关闭,否则为远端发送rst保温</span>
  1105. <span class="token keyword">if</span> <span class="token keyword">self</span><span class="token punctuation">.</span>buffer<span class="token punctuation">.</span><span class="token function">is_empty</span><span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token punctuation">{</span>
  1106. <span class="token keyword">return</span> <span class="token class-name">Ok</span><span class="token punctuation">(</span><span class="token class-name">None</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
  1107. <span class="token punctuation">}</span> <span class="token keyword">else</span> <span class="token punctuation">{</span>
  1108. <span class="token keyword">return</span> <span class="token class-name">Err</span><span class="token punctuation">(</span><span class="token string">&quot;connection reset by peer&quot;</span><span class="token punctuation">.</span><span class="token function">into</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
  1109. <span class="token punctuation">}</span>
  1110. <span class="token punctuation">}</span>
  1111. <span class="token punctuation">}</span>
  1112. <span class="token punctuation">}</span>
  1113. <span class="token comment">/// 解析并获取frame。</span>
  1114. <span class="token comment">/// 如果数据不够则返回 `Ok(None)`</span>
  1115. <span class="token comment">/// 如果数据不合理则返回Err</span>
  1116. <span class="token keyword">fn</span> <span class="token function-definition function">parse_frame</span><span class="token punctuation">(</span><span class="token operator">&amp;</span><span class="token keyword">mut</span> <span class="token keyword">self</span><span class="token punctuation">)</span> <span class="token punctuation">-&gt;</span> <span class="token keyword">crate</span><span class="token punctuation">::</span><span class="token class-name">Result</span><span class="token operator">&lt;</span><span class="token class-name">Option</span><span class="token operator">&lt;</span><span class="token class-name">Frame</span><span class="token operator">&gt;&gt;</span> <span class="token punctuation">{</span>
  1117. <span class="token keyword">use</span> <span class="token namespace">frame<span class="token punctuation">::</span></span><span class="token class-name">Error</span><span class="token punctuation">::</span><span class="token class-name">Incomplete</span><span class="token punctuation">;</span>
  1118. <span class="token comment">// 创建cursor用于追踪buffer中的数据位置</span>
  1119. <span class="token keyword">let</span> <span class="token keyword">mut</span> buf <span class="token operator">=</span> <span class="token class-name">Cursor</span><span class="token punctuation">::</span><span class="token function">new</span><span class="token punctuation">(</span><span class="token operator">&amp;</span><span class="token keyword">self</span><span class="token punctuation">.</span>buffer<span class="token punctuation">[</span><span class="token punctuation">..</span><span class="token punctuation">]</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
  1120. <span class="token comment">// 检查缓冲区中是否有足够的数据</span>
  1121. <span class="token keyword">match</span> <span class="token class-name">Frame</span><span class="token punctuation">::</span><span class="token function">check</span><span class="token punctuation">(</span><span class="token operator">&amp;</span><span class="token keyword">mut</span> buf<span class="token punctuation">)</span> <span class="token punctuation">{</span>
  1122. <span class="token class-name">Ok</span><span class="token punctuation">(</span>_<span class="token punctuation">)</span> <span class="token operator">=&gt;</span> <span class="token punctuation">{</span>
  1123. <span class="token comment">// 由于Frame::check()开始时会定位到缓冲区的末尾,所以可以获取到缓冲区的数据长度</span>
  1124. <span class="token keyword">let</span> len <span class="token operator">=</span> buf<span class="token punctuation">.</span><span class="token function">position</span><span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token keyword">as</span> <span class="token keyword">usize</span><span class="token punctuation">;</span>
  1125. <span class="token comment">// 重置cursor位置</span>
  1126. buf<span class="token punctuation">.</span><span class="token function">set_position</span><span class="token punctuation">(</span><span class="token number">0</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
  1127. <span class="token comment">// 解析frame</span>
  1128. <span class="token keyword">let</span> frame <span class="token operator">=</span> <span class="token class-name">Frame</span><span class="token punctuation">::</span><span class="token function">parse</span><span class="token punctuation">(</span><span class="token operator">&amp;</span><span class="token keyword">mut</span> buf<span class="token punctuation">)</span><span class="token operator">?</span><span class="token punctuation">;</span>
  1129. <span class="token comment">// 从buffer中删除所有数据(因为已经被解析了)</span>
  1130. <span class="token keyword">self</span><span class="token punctuation">.</span>buffer<span class="token punctuation">.</span><span class="token function">advance</span><span class="token punctuation">(</span>len<span class="token punctuation">)</span><span class="token punctuation">;</span>
  1131. <span class="token comment">// 返回frame</span>
  1132. <span class="token class-name">Ok</span><span class="token punctuation">(</span><span class="token class-name">Some</span><span class="token punctuation">(</span>frame<span class="token punctuation">)</span><span class="token punctuation">)</span>
  1133. <span class="token punctuation">}</span>
  1134. <span class="token comment">// 数据不足以解析成单独的帧,返回None,说明需要继续等待</span>
  1135. <span class="token class-name">Err</span><span class="token punctuation">(</span><span class="token class-name">Incomplete</span><span class="token punctuation">)</span> <span class="token operator">=&gt;</span> <span class="token class-name">Ok</span><span class="token punctuation">(</span><span class="token class-name">None</span><span class="token punctuation">)</span><span class="token punctuation">,</span>
  1136. <span class="token comment">// 返回错误</span>
  1137. <span class="token class-name">Err</span><span class="token punctuation">(</span>e<span class="token punctuation">)</span> <span class="token operator">=&gt;</span> <span class="token class-name">Err</span><span class="token punctuation">(</span>e<span class="token punctuation">.</span><span class="token function">into</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">,</span>
  1138. <span class="token punctuation">}</span>
  1139. <span class="token punctuation">}</span>
  1140. <span class="token comment">/// 向TCP流中写数据</span>
  1141. <span class="token keyword">pub</span> <span class="token keyword">async</span> <span class="token keyword">fn</span> <span class="token function-definition function">write_frame</span><span class="token punctuation">(</span><span class="token operator">&amp;</span><span class="token keyword">mut</span> <span class="token keyword">self</span><span class="token punctuation">,</span> frame<span class="token punctuation">:</span> <span class="token operator">&amp;</span><span class="token class-name">Frame</span><span class="token punctuation">)</span> <span class="token punctuation">-&gt;</span> <span class="token namespace">io<span class="token punctuation">::</span></span><span class="token class-name">Result</span><span class="token operator">&lt;</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token operator">&gt;</span> <span class="token punctuation">{</span>
  1142. <span class="token keyword">match</span> frame <span class="token punctuation">{</span>
  1143. <span class="token class-name">Frame</span><span class="token punctuation">::</span><span class="token class-name">Array</span><span class="token punctuation">(</span>val<span class="token punctuation">)</span> <span class="token operator">=&gt;</span> <span class="token punctuation">{</span>
  1144. <span class="token comment">// 对帧类型前缀进行编码。对于数组,它是“*”。</span>
  1145. <span class="token keyword">self</span><span class="token punctuation">.</span>stream<span class="token punctuation">.</span><span class="token function">write_u8</span><span class="token punctuation">(</span><span class="token char">b'*'</span><span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span><span class="token operator">?</span><span class="token punctuation">;</span>
  1146. <span class="token comment">// 写入数组长度</span>
  1147. <span class="token keyword">self</span><span class="token punctuation">.</span><span class="token function">write_decimal</span><span class="token punctuation">(</span>val<span class="token punctuation">.</span><span class="token function">len</span><span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token keyword">as</span> <span class="token keyword">u64</span><span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span><span class="token operator">?</span><span class="token punctuation">;</span>
  1148. <span class="token comment">// 遍历并写入数组中每个元素</span>
  1149. <span class="token keyword">for</span> entry <span class="token keyword">in</span> <span class="token operator">&amp;</span><span class="token operator">*</span><span class="token operator">*</span>val <span class="token punctuation">{</span>
  1150. <span class="token keyword">self</span><span class="token punctuation">.</span><span class="token function">write_value</span><span class="token punctuation">(</span>entry<span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span><span class="token operator">?</span><span class="token punctuation">;</span>
  1151. <span class="token punctuation">}</span>
  1152. <span class="token punctuation">}</span>
  1153. <span class="token comment">// 直接写入整个帧</span>
  1154. _ <span class="token operator">=&gt;</span> <span class="token keyword">self</span><span class="token punctuation">.</span><span class="token function">write_value</span><span class="token punctuation">(</span>frame<span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span><span class="token operator">?</span><span class="token punctuation">,</span>
  1155. <span class="token punctuation">}</span>
  1156. <span class="token comment">// 保证缓冲区都被写入套接字</span>
  1157. <span class="token keyword">self</span><span class="token punctuation">.</span>stream<span class="token punctuation">.</span><span class="token function">flush</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span>
  1158. <span class="token punctuation">}</span>
  1159. <span class="token comment">/// 将帧文字写入流</span>
  1160. <span class="token keyword">async</span> <span class="token keyword">fn</span> <span class="token function-definition function">write_value</span><span class="token punctuation">(</span><span class="token operator">&amp;</span><span class="token keyword">mut</span> <span class="token keyword">self</span><span class="token punctuation">,</span> frame<span class="token punctuation">:</span> <span class="token operator">&amp;</span><span class="token class-name">Frame</span><span class="token punctuation">)</span> <span class="token punctuation">-&gt;</span> <span class="token namespace">io<span class="token punctuation">::</span></span><span class="token class-name">Result</span><span class="token operator">&lt;</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token operator">&gt;</span> <span class="token punctuation">{</span>
  1161. <span class="token comment">// 判断帧的类型,将不同的帧按照格式写入流</span>
  1162. <span class="token keyword">match</span> frame <span class="token punctuation">{</span>
  1163. <span class="token class-name">Frame</span><span class="token punctuation">::</span><span class="token class-name">Simple</span><span class="token punctuation">(</span>val<span class="token punctuation">)</span> <span class="token operator">=&gt;</span> <span class="token punctuation">{</span>
  1164. <span class="token keyword">self</span><span class="token punctuation">.</span>stream<span class="token punctuation">.</span><span class="token function">write_u8</span><span class="token punctuation">(</span><span class="token char">b'+'</span><span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span><span class="token operator">?</span><span class="token punctuation">;</span>
  1165. <span class="token keyword">self</span><span class="token punctuation">.</span>stream<span class="token punctuation">.</span><span class="token function">write_all</span><span class="token punctuation">(</span>val<span class="token punctuation">.</span><span class="token function">as_bytes</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span><span class="token operator">?</span><span class="token punctuation">;</span>
  1166. <span class="token keyword">self</span><span class="token punctuation">.</span>stream<span class="token punctuation">.</span><span class="token function">write_all</span><span class="token punctuation">(</span><span class="token string">b&quot;\r\n&quot;</span><span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span><span class="token operator">?</span><span class="token punctuation">;</span>
  1167. <span class="token punctuation">}</span>
  1168. <span class="token class-name">Frame</span><span class="token punctuation">::</span><span class="token class-name">Error</span><span class="token punctuation">(</span>val<span class="token punctuation">)</span> <span class="token operator">=&gt;</span> <span class="token punctuation">{</span>
  1169. <span class="token keyword">self</span><span class="token punctuation">.</span>stream<span class="token punctuation">.</span><span class="token function">write_u8</span><span class="token punctuation">(</span><span class="token char">b'-'</span><span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span><span class="token operator">?</span><span class="token punctuation">;</span>
  1170. <span class="token keyword">self</span><span class="token punctuation">.</span>stream<span class="token punctuation">.</span><span class="token function">write_all</span><span class="token punctuation">(</span>val<span class="token punctuation">.</span><span class="token function">as_bytes</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span><span class="token operator">?</span><span class="token punctuation">;</span>
  1171. <span class="token keyword">self</span><span class="token punctuation">.</span>stream<span class="token punctuation">.</span><span class="token function">write_all</span><span class="token punctuation">(</span><span class="token string">b&quot;\r\n&quot;</span><span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span><span class="token operator">?</span><span class="token punctuation">;</span>
  1172. <span class="token punctuation">}</span>
  1173. <span class="token class-name">Frame</span><span class="token punctuation">::</span><span class="token class-name">Integer</span><span class="token punctuation">(</span>val<span class="token punctuation">)</span> <span class="token operator">=&gt;</span> <span class="token punctuation">{</span>
  1174. <span class="token keyword">self</span><span class="token punctuation">.</span>stream<span class="token punctuation">.</span><span class="token function">write_u8</span><span class="token punctuation">(</span><span class="token char">b':'</span><span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span><span class="token operator">?</span><span class="token punctuation">;</span>
  1175. <span class="token keyword">self</span><span class="token punctuation">.</span><span class="token function">write_decimal</span><span class="token punctuation">(</span><span class="token operator">*</span>val<span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span><span class="token operator">?</span><span class="token punctuation">;</span>
  1176. <span class="token punctuation">}</span>
  1177. <span class="token class-name">Frame</span><span class="token punctuation">::</span><span class="token class-name">Null</span> <span class="token operator">=&gt;</span> <span class="token punctuation">{</span>
  1178. <span class="token keyword">self</span><span class="token punctuation">.</span>stream<span class="token punctuation">.</span><span class="token function">write_all</span><span class="token punctuation">(</span><span class="token string">b&quot;$-1\r\n&quot;</span><span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span><span class="token operator">?</span><span class="token punctuation">;</span>
  1179. <span class="token punctuation">}</span>
  1180. <span class="token class-name">Frame</span><span class="token punctuation">::</span><span class="token class-name">Bulk</span><span class="token punctuation">(</span>val<span class="token punctuation">)</span> <span class="token operator">=&gt;</span> <span class="token punctuation">{</span>
  1181. <span class="token keyword">let</span> len <span class="token operator">=</span> val<span class="token punctuation">.</span><span class="token function">len</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
  1182. <span class="token keyword">self</span><span class="token punctuation">.</span>stream<span class="token punctuation">.</span><span class="token function">write_u8</span><span class="token punctuation">(</span><span class="token char">b'$'</span><span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span><span class="token operator">?</span><span class="token punctuation">;</span>
  1183. <span class="token keyword">self</span><span class="token punctuation">.</span><span class="token function">write_decimal</span><span class="token punctuation">(</span>len <span class="token keyword">as</span> <span class="token keyword">u64</span><span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span><span class="token operator">?</span><span class="token punctuation">;</span>
  1184. <span class="token keyword">self</span><span class="token punctuation">.</span>stream<span class="token punctuation">.</span><span class="token function">write_all</span><span class="token punctuation">(</span>val<span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span><span class="token operator">?</span><span class="token punctuation">;</span>
  1185. <span class="token keyword">self</span><span class="token punctuation">.</span>stream<span class="token punctuation">.</span><span class="token function">write_all</span><span class="token punctuation">(</span><span class="token string">b&quot;\r\n&quot;</span><span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span><span class="token operator">?</span><span class="token punctuation">;</span>
  1186. <span class="token punctuation">}</span>
  1187. <span class="token comment">// 暂时不支持解析数组类型</span>
  1188. <span class="token class-name">Frame</span><span class="token punctuation">::</span><span class="token class-name">Array</span><span class="token punctuation">(</span>_val<span class="token punctuation">)</span> <span class="token operator">=&gt;</span> <span class="token macro property">unreachable!</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">,</span>
  1189. <span class="token punctuation">}</span>
  1190. <span class="token class-name">Ok</span><span class="token punctuation">(</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
  1191. <span class="token punctuation">}</span>
  1192. <span class="token comment">/// 将十进制帧写入流</span>
  1193. <span class="token keyword">async</span> <span class="token keyword">fn</span> <span class="token function-definition function">write_decimal</span><span class="token punctuation">(</span><span class="token operator">&amp;</span><span class="token keyword">mut</span> <span class="token keyword">self</span><span class="token punctuation">,</span> val<span class="token punctuation">:</span> <span class="token keyword">u64</span><span class="token punctuation">)</span> <span class="token punctuation">-&gt;</span> <span class="token namespace">io<span class="token punctuation">::</span></span><span class="token class-name">Result</span><span class="token operator">&lt;</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token operator">&gt;</span> <span class="token punctuation">{</span>
  1194. <span class="token keyword">use</span> <span class="token namespace">std<span class="token punctuation">::</span>io<span class="token punctuation">::</span></span><span class="token class-name">Write</span><span class="token punctuation">;</span>
  1195. <span class="token comment">// 将数据转换成string</span>
  1196. <span class="token keyword">let</span> <span class="token keyword">mut</span> buf <span class="token operator">=</span> <span class="token punctuation">[</span><span class="token number">0u8</span><span class="token punctuation">;</span> <span class="token number">20</span><span class="token punctuation">]</span><span class="token punctuation">;</span>
  1197. <span class="token keyword">let</span> <span class="token keyword">mut</span> buf <span class="token operator">=</span> <span class="token class-name">Cursor</span><span class="token punctuation">::</span><span class="token function">new</span><span class="token punctuation">(</span><span class="token operator">&amp;</span><span class="token keyword">mut</span> buf<span class="token punctuation">[</span><span class="token punctuation">..</span><span class="token punctuation">]</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
  1198. <span class="token macro property">write!</span><span class="token punctuation">(</span><span class="token operator">&amp;</span><span class="token keyword">mut</span> buf<span class="token punctuation">,</span> <span class="token string">&quot;{}&quot;</span><span class="token punctuation">,</span> val<span class="token punctuation">)</span><span class="token operator">?</span><span class="token punctuation">;</span>
  1199. <span class="token comment">// 将数据写入流</span>
  1200. <span class="token keyword">let</span> pos <span class="token operator">=</span> buf<span class="token punctuation">.</span><span class="token function">position</span><span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token keyword">as</span> <span class="token keyword">usize</span><span class="token punctuation">;</span>
  1201. <span class="token keyword">self</span><span class="token punctuation">.</span>stream<span class="token punctuation">.</span><span class="token function">write_all</span><span class="token punctuation">(</span><span class="token operator">&amp;</span>buf<span class="token punctuation">.</span><span class="token function">get_ref</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">[</span><span class="token punctuation">..</span>pos<span class="token punctuation">]</span><span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span><span class="token operator">?</span><span class="token punctuation">;</span>
  1202. <span class="token keyword">self</span><span class="token punctuation">.</span>stream<span class="token punctuation">.</span><span class="token function">write_all</span><span class="token punctuation">(</span><span class="token string">b&quot;\r\n&quot;</span><span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span><span class="token operator">?</span><span class="token punctuation">;</span>
  1203. <span class="token class-name">Ok</span><span class="token punctuation">(</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
  1204. <span class="token punctuation">}</span>
  1205. <span class="token punctuation">}</span>
  1206. </code></pre></div></div> <footer class="page-edit"><!----> <!----></footer> <!----> </main></div><div class="global-ui"></div></div>
  1207. <script src="/rust_camp_tutorial/assets/js/app.d7ab8f65.js" defer></script><script src="/rust_camp_tutorial/assets/js/2.3dc1b8de.js" defer></script><script src="/rust_camp_tutorial/assets/js/1.7f771cfb.js" defer></script><script src="/rust_camp_tutorial/assets/js/23.7f3a9620.js" defer></script>
  1208. </body>
  1209. </html>