|
- <!DOCTYPE html>
- <html lang="en-US">
- <head>
- <meta charset="utf-8">
- <meta name="viewport" content="width=device-width,initial-scale=1">
- <title>Mini-Redis教程 | Rust训练营教程文档</title>
- <meta name="generator" content="VuePress 1.9.10">
- <link rel="icon" href="/rust_camp_tutorial/logo.png">
- <meta name="description" content="DragonOS-Rust camp">
-
- <link rel="preload" href="/rust_camp_tutorial/assets/css/0.styles.7dd9be3e.css" as="style"><link rel="preload" href="/rust_camp_tutorial/assets/js/app.d7ab8f65.js" as="script"><link rel="preload" href="/rust_camp_tutorial/assets/js/2.3dc1b8de.js" as="script"><link rel="preload" href="/rust_camp_tutorial/assets/js/1.7f771cfb.js" as="script"><link rel="preload" href="/rust_camp_tutorial/assets/js/23.7f3a9620.js" as="script"><link rel="prefetch" href="/rust_camp_tutorial/assets/js/10.23a1f579.js"><link rel="prefetch" href="/rust_camp_tutorial/assets/js/11.c389195a.js"><link rel="prefetch" href="/rust_camp_tutorial/assets/js/12.1d996921.js"><link rel="prefetch" href="/rust_camp_tutorial/assets/js/13.4d4410c4.js"><link rel="prefetch" href="/rust_camp_tutorial/assets/js/14.37ef2a72.js"><link rel="prefetch" href="/rust_camp_tutorial/assets/js/15.5542c093.js"><link rel="prefetch" href="/rust_camp_tutorial/assets/js/16.d48fd1ce.js"><link rel="prefetch" href="/rust_camp_tutorial/assets/js/17.bd8d538c.js"><link rel="prefetch" href="/rust_camp_tutorial/assets/js/18.6d3b94c1.js"><link rel="prefetch" href="/rust_camp_tutorial/assets/js/19.eb35cfee.js"><link rel="prefetch" href="/rust_camp_tutorial/assets/js/20.c11ec329.js"><link rel="prefetch" href="/rust_camp_tutorial/assets/js/21.db1b5d88.js"><link rel="prefetch" href="/rust_camp_tutorial/assets/js/22.7714be7b.js"><link rel="prefetch" href="/rust_camp_tutorial/assets/js/24.2f88b37d.js"><link rel="prefetch" href="/rust_camp_tutorial/assets/js/25.df99dd5f.js"><link rel="prefetch" href="/rust_camp_tutorial/assets/js/26.606cfbc8.js"><link rel="prefetch" href="/rust_camp_tutorial/assets/js/27.928f1e6b.js"><link rel="prefetch" href="/rust_camp_tutorial/assets/js/28.f61a69ee.js"><link rel="prefetch" href="/rust_camp_tutorial/assets/js/29.802642cf.js"><link rel="prefetch" href="/rust_camp_tutorial/assets/js/3.5322f14a.js"><link rel="prefetch" href="/rust_camp_tutorial/assets/js/30.72f41aed.js"><link rel="prefetch" href="/rust_camp_tutorial/assets/js/31.e1aa8cbc.js"><link rel="prefetch" href="/rust_camp_tutorial/assets/js/32.30c1ff3b.js"><link rel="prefetch" href="/rust_camp_tutorial/assets/js/33.f21667a4.js"><link rel="prefetch" href="/rust_camp_tutorial/assets/js/4.84e1e480.js"><link rel="prefetch" href="/rust_camp_tutorial/assets/js/5.f0541060.js"><link rel="prefetch" href="/rust_camp_tutorial/assets/js/6.dfb06aa0.js"><link rel="prefetch" href="/rust_camp_tutorial/assets/js/7.7551a9fb.js"><link rel="prefetch" href="/rust_camp_tutorial/assets/js/vendors~docsearch.5e19b665.js">
- <link rel="stylesheet" href="/rust_camp_tutorial/assets/css/0.styles.7dd9be3e.css">
- </head>
- <body>
- <div id="app" data-server-rendered="true"><div class="theme-container"><header class="navbar"><div class="sidebar-button"><svg xmlns="http://www.w3.org/2000/svg" aria-hidden="true" role="img" viewBox="0 0 448 512" class="icon"><path fill="currentColor" d="M436 124H12c-6.627 0-12-5.373-12-12V80c0-6.627 5.373-12 12-12h424c6.627 0 12 5.373 12 12v32c0 6.627-5.373 12-12 12zm0 160H12c-6.627 0-12-5.373-12-12v-32c0-6.627 5.373-12 12-12h424c6.627 0 12 5.373 12 12v32c0 6.627-5.373 12-12 12zm0 160H12c-6.627 0-12-5.373-12-12v-32c0-6.627 5.373-12 12-12h424c6.627 0 12 5.373 12 12v32c0 6.627-5.373 12-12 12z"></path></svg></div> <a href="/rust_camp_tutorial/" class="home-link router-link-active"><img src="logo.png" alt="Rust训练营教程文档" class="logo"> <span class="site-name can-hide">Rust训练营教程文档</span></a> <div class="links"><div class="search-box"><input aria-label="Search" autocomplete="off" spellcheck="false" value=""> <!----></div> <nav class="nav-links can-hide"><div class="nav-item"><a href="/rust_camp_tutorial/" class="nav-link">
- 首页
- </a></div> <!----></nav></div></header> <div class="sidebar-mask"></div> <aside class="sidebar"><nav class="nav-links"><div class="nav-item"><a href="/rust_camp_tutorial/" class="nav-link">
- 首页
- </a></div> <!----></nav> <ul class="sidebar-links"><li><section class="sidebar-group depth-0"><p class="sidebar-heading open"><span>Mini-Redis教程</span> <!----></p> <ul class="sidebar-links sidebar-group-items"><li><a href="/rust_camp_tutorial/Rust%E6%96%87%E6%A1%A3/Mini-Redis%E6%95%99%E7%A8%8B.html#准备工作" class="sidebar-link">准备工作</a><ul class="sidebar-sub-headers"><li class="sidebar-sub-header"><a href="/rust_camp_tutorial/Rust%E6%96%87%E6%A1%A3/Mini-Redis%E6%95%99%E7%A8%8B.html#获取代码" class="sidebar-link">获取代码</a></li><li class="sidebar-sub-header"><a href="/rust_camp_tutorial/Rust%E6%96%87%E6%A1%A3/Mini-Redis%E6%95%99%E7%A8%8B.html#运行项目" class="sidebar-link">运行项目</a></li></ul></li><li><a href="/rust_camp_tutorial/Rust%E6%96%87%E6%A1%A3/Mini-Redis%E6%95%99%E7%A8%8B.html#module" class="sidebar-link">Module</a><ul class="sidebar-sub-headers"><li class="sidebar-sub-header"><a href="/rust_camp_tutorial/Rust%E6%96%87%E6%A1%A3/Mini-Redis%E6%95%99%E7%A8%8B.html#client" class="sidebar-link">client</a></li><li class="sidebar-sub-header"><a href="/rust_camp_tutorial/Rust%E6%96%87%E6%A1%A3/Mini-Redis%E6%95%99%E7%A8%8B.html#blocking-client" class="sidebar-link">blocking_client</a></li><li class="sidebar-sub-header"><a href="/rust_camp_tutorial/Rust%E6%96%87%E6%A1%A3/Mini-Redis%E6%95%99%E7%A8%8B.html#cmd" class="sidebar-link">cmd</a></li><li class="sidebar-sub-header"><a href="/rust_camp_tutorial/Rust%E6%96%87%E6%A1%A3/Mini-Redis%E6%95%99%E7%A8%8B.html#frame" class="sidebar-link">frame</a></li><li class="sidebar-sub-header"><a href="/rust_camp_tutorial/Rust%E6%96%87%E6%A1%A3/Mini-Redis%E6%95%99%E7%A8%8B.html#sever" class="sidebar-link">sever</a></li></ul></li><li><a href="/rust_camp_tutorial/Rust%E6%96%87%E6%A1%A3/Mini-Redis%E6%95%99%E7%A8%8B.html#function-2" class="sidebar-link">Function</a><ul class="sidebar-sub-headers"><li class="sidebar-sub-header"><a href="/rust_camp_tutorial/Rust%E6%96%87%E6%A1%A3/Mini-Redis%E6%95%99%E7%A8%8B.html#fn-buffer" class="sidebar-link">Fn Buffer</a></li></ul></li><li><a href="/rust_camp_tutorial/Rust%E6%96%87%E6%A1%A3/Mini-Redis%E6%95%99%E7%A8%8B.html#struct-16" class="sidebar-link">Struct</a><ul class="sidebar-sub-headers"><li class="sidebar-sub-header"><a href="/rust_camp_tutorial/Rust%E6%96%87%E6%A1%A3/Mini-Redis%E6%95%99%E7%A8%8B.html#buffer" class="sidebar-link">Buffer</a></li><li class="sidebar-sub-header"><a href="/rust_camp_tutorial/Rust%E6%96%87%E6%A1%A3/Mini-Redis%E6%95%99%E7%A8%8B.html#connection" class="sidebar-link">Connection</a></li></ul></li></ul></section></li></ul> </aside> <main class="page"> <div class="theme-default-content content__default"><h1 id="mini-redis教程"><a href="#mini-redis教程" class="header-anchor">#</a> Mini-Redis教程</h1> <h2 id="准备工作"><a href="#准备工作" class="header-anchor">#</a> 准备工作</h2> <h3 id="获取代码"><a href="#获取代码" class="header-anchor">#</a> 获取代码</h3> <div class="language-shell extra-class"><pre class="language-shell"><code><span class="token function">git</span> clone <span class="token parameter variable">--recursive</span> https://github.com/DragonOS-Community/rust_camp_code.git
- </code></pre></div><p>教程版请切换curse-mini-redis分支</p> <h3 id="运行项目"><a href="#运行项目" class="header-anchor">#</a> 运行项目</h3> <h4 id="运行服务端"><a href="#运行服务端" class="header-anchor">#</a> 运行服务端</h4> <div class="language-shell extra-class"><pre class="language-shell"><code><span class="token assign-left variable">RUST_LOG</span><span class="token operator">=</span>debug <span class="token function">cargo</span> run <span class="token parameter variable">--bin</span> mini-redis-server
- </code></pre></div><p>运行服务端之后,就可以运行redis操作的代码和redis的命令了。详情请参考example文件夹下的代码和redis的命令。</p> <h4 id="运行example的代码"><a href="#运行example的代码" class="header-anchor">#</a> 运行example的代码</h4> <div class="language-shell extra-class"><pre class="language-shell"><code><span class="token function">cargo</span> run <span class="token parameter variable">--example</span> hello_world
- </code></pre></div><h4 id="运行redis命令"><a href="#运行redis命令" class="header-anchor">#</a> 运行redis命令</h4> <div class="language-shell extra-class"><pre class="language-shell"><code><span class="token function">cargo</span> run <span class="token parameter variable">--bin</span> mini-redis-cli <span class="token builtin class-name">set</span> foo bar
- <span class="token function">cargo</span> run <span class="token parameter variable">--bin</span> mini-redis-cli get foo
- </code></pre></div><h2 id="module"><a href="#module" class="header-anchor">#</a> Module</h2> <h3 id="client"><a href="#client" class="header-anchor">#</a> client</h3> <blockquote><p>最小的redis会话。为所支持的命令提供异步连接。</p></blockquote> <p>代码位于src/clients/client.rs</p> <h4 id="client-2"><a href="#client-2" class="header-anchor">#</a> Client</h4> <h5 id="struct"><a href="#struct" class="header-anchor">#</a> Struct</h5> <p>用于建立和存储客户端连接</p> <div class="language-rust extra-class"><pre class="language-rust"><code><span class="token keyword">pub</span> <span class="token keyword">struct</span> <span class="token type-definition class-name">Client</span><span class="token punctuation">{</span>
- <span class="token comment">/// 客户端连接</span>
- connection<span class="token punctuation">:</span><span class="token class-name">Connection</span><span class="token punctuation">,</span>
- <span class="token punctuation">}</span>
- </code></pre></div><h5 id="implementation"><a href="#implementation" class="header-anchor">#</a> Implementation</h5> <div class="language-rust extra-class"><pre class="language-rust"><code><span class="token keyword">impl</span> <span class="token class-name">Client</span><span class="token punctuation">{</span>
- <span class="token comment">/// 建立连接</span>
- <span class="token keyword">pub</span> <span class="token keyword">async</span> <span class="token keyword">fn</span> <span class="token function-definition function">connect</span><span class="token operator"><</span><span class="token class-name">T</span><span class="token punctuation">:</span> <span class="token class-name">ToSocketAddrs</span><span class="token operator">></span><span class="token punctuation">(</span>addr<span class="token punctuation">:</span> <span class="token class-name">T</span><span class="token punctuation">)</span> <span class="token punctuation">-></span> <span class="token keyword">crate</span><span class="token punctuation">::</span><span class="token class-name">Result</span><span class="token operator"><</span><span class="token class-name">Client</span><span class="token operator">></span> <span class="token punctuation">{</span>
- <span class="token comment">// 与addr建立tcp连接并获取套接字</span>
- <span class="token keyword">let</span> socket <span class="token operator">=</span> <span class="token class-name">TcpStream</span><span class="token punctuation">::</span><span class="token function">connect</span><span class="token punctuation">(</span>addr<span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span><span class="token operator">?</span><span class="token punctuation">;</span>
- <span class="token comment">// 使用套接字建立连接</span>
- <span class="token keyword">let</span> connection <span class="token operator">=</span> <span class="token class-name">Connection</span><span class="token punctuation">::</span><span class="token function">new</span><span class="token punctuation">(</span>socket<span class="token punctuation">)</span><span class="token punctuation">;</span>
- <span class="token comment">// 返回客户端</span>
- <span class="token class-name">Ok</span><span class="token punctuation">(</span><span class="token class-name">Client</span> <span class="token punctuation">{</span> connection <span class="token punctuation">}</span><span class="token punctuation">)</span>
- <span class="token punctuation">}</span>
- <span class="token comment">/// ping客户端</span>
- <span class="token attribute attr-name">#[instrument(skip(self))]</span>
- <span class="token keyword">pub</span> <span class="token keyword">async</span> <span class="token keyword">fn</span> <span class="token function-definition function">ping</span><span class="token punctuation">(</span><span class="token operator">&</span><span class="token keyword">mut</span> <span class="token keyword">self</span><span class="token punctuation">,</span> msg<span class="token punctuation">:</span> <span class="token class-name">Option</span><span class="token operator"><</span><span class="token class-name">Bytes</span><span class="token operator">></span><span class="token punctuation">)</span> <span class="token punctuation">-></span> <span class="token keyword">crate</span><span class="token punctuation">::</span><span class="token class-name">Result</span><span class="token operator"><</span><span class="token class-name">Bytes</span><span class="token operator">></span> <span class="token punctuation">{</span>
- <span class="token comment">// 构建ping消息</span>
- <span class="token keyword">let</span> frame <span class="token operator">=</span> <span class="token class-name">Ping</span><span class="token punctuation">::</span><span class="token function">new</span><span class="token punctuation">(</span>msg<span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token function">into_frame</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
- <span class="token comment">// 日志记录ping</span>
- <span class="token macro property">debug!</span><span class="token punctuation">(</span>request <span class="token operator">=</span> <span class="token operator">?</span>frame<span class="token punctuation">)</span><span class="token punctuation">;</span>
- <span class="token comment">// 发送ping消息</span>
- <span class="token keyword">self</span><span class="token punctuation">.</span>connection<span class="token punctuation">.</span><span class="token function">write_frame</span><span class="token punctuation">(</span><span class="token operator">&</span>frame<span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span><span class="token operator">?</span><span class="token punctuation">;</span>
- <span class="token comment">// 读取响应,对Frame::Simple、Frame::Bulk、error类型进行处理</span>
- <span class="token keyword">match</span> <span class="token keyword">self</span><span class="token punctuation">.</span><span class="token function">read_response</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span><span class="token operator">?</span> <span class="token punctuation">{</span>
- <span class="token class-name">Frame</span><span class="token punctuation">::</span><span class="token class-name">Simple</span><span class="token punctuation">(</span>value<span class="token punctuation">)</span> <span class="token operator">=></span> <span class="token class-name">Ok</span><span class="token punctuation">(</span>value<span class="token punctuation">.</span><span class="token function">into</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">,</span>
- <span class="token class-name">Frame</span><span class="token punctuation">::</span><span class="token class-name">Bulk</span><span class="token punctuation">(</span>value<span class="token punctuation">)</span> <span class="token operator">=></span> <span class="token class-name">Ok</span><span class="token punctuation">(</span>value<span class="token punctuation">)</span><span class="token punctuation">,</span>
- frame <span class="token operator">=></span> <span class="token class-name">Err</span><span class="token punctuation">(</span>frame<span class="token punctuation">.</span><span class="token function">to_error</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">,</span>
- <span class="token punctuation">}</span>
- <span class="token punctuation">}</span>
- <span class="token comment">/// 获取key对应的value</span>
- <span class="token attribute attr-name">#[instrument(skip(self))]</span>
- <span class="token keyword">pub</span> <span class="token keyword">async</span> <span class="token keyword">fn</span> <span class="token function-definition function">get</span><span class="token punctuation">(</span><span class="token operator">&</span><span class="token keyword">mut</span> <span class="token keyword">self</span><span class="token punctuation">,</span>key<span class="token punctuation">:</span><span class="token operator">&</span><span class="token keyword">str</span><span class="token punctuation">)</span> <span class="token punctuation">-></span> <span class="token class-name">Result</span><span class="token operator"><</span><span class="token class-name">Option</span><span class="token operator"><</span><span class="token class-name">Bytes</span><span class="token operator">>></span> <span class="token punctuation">{</span>
- <span class="token comment">// 创建获取'key'的值的'Get'命令,并将其转换成frame格式</span>
- <span class="token keyword">let</span> frame <span class="token operator">=</span> <span class="token class-name">Get</span><span class="token punctuation">::</span><span class="token function">new</span><span class="token punctuation">(</span>key<span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token function">into_frame</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
- <span class="token comment">// 日志记录frme</span>
- <span class="token macro property">debug!</span><span class="token punctuation">(</span>request <span class="token operator">=</span> <span class="token operator">?</span>frame<span class="token punctuation">)</span><span class="token punctuation">;</span>
- <span class="token comment">// 将frame通过连接异步写给套接字</span>
- <span class="token keyword">self</span><span class="token punctuation">.</span>connection<span class="token punctuation">.</span><span class="token function">write_frame</span><span class="token punctuation">(</span><span class="token operator">&</span>frame<span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span><span class="token operator">?</span><span class="token punctuation">;</span>
- <span class="token comment">// 异步读响应,并匹配Simple、Bulk、NUll的情况</span>
- <span class="token keyword">match</span> <span class="token keyword">self</span><span class="token punctuation">.</span><span class="token function">read_response</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span><span class="token operator">?</span> <span class="token punctuation">{</span>
- <span class="token class-name">Frame</span><span class="token punctuation">::</span><span class="token class-name">Simple</span><span class="token punctuation">(</span>value<span class="token punctuation">)</span> <span class="token operator">=></span> <span class="token class-name">Ok</span><span class="token punctuation">(</span><span class="token class-name">Some</span><span class="token punctuation">(</span>value<span class="token punctuation">.</span><span class="token function">into</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">,</span>
- <span class="token class-name">Frame</span><span class="token punctuation">::</span><span class="token class-name">Bulk</span><span class="token punctuation">(</span>value<span class="token punctuation">)</span> <span class="token operator">=></span> <span class="token class-name">Ok</span><span class="token punctuation">(</span><span class="token class-name">Some</span><span class="token punctuation">(</span>value<span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">,</span>
- <span class="token class-name">Frame</span><span class="token punctuation">::</span><span class="token class-name">Null</span> <span class="token operator">=></span> <span class="token class-name">Ok</span><span class="token punctuation">(</span><span class="token class-name">None</span><span class="token punctuation">)</span><span class="token punctuation">,</span>
- frame <span class="token operator">=></span> <span class="token class-name">Err</span><span class="token punctuation">(</span>frame<span class="token punctuation">.</span><span class="token function">to_error</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">,</span>
- <span class="token punctuation">}</span>
- <span class="token punctuation">}</span>
- <span class="token comment">/// 设置key对应的value</span>
- <span class="token attribute attr-name">#[instrument(skip(self))]</span>
- <span class="token keyword">pub</span> <span class="token keyword">async</span> <span class="token keyword">fn</span> <span class="token function-definition function">set</span><span class="token punctuation">(</span><span class="token operator">&</span><span class="token keyword">mut</span> <span class="token keyword">self</span><span class="token punctuation">,</span> key<span class="token punctuation">:</span> <span class="token operator">&</span><span class="token keyword">str</span><span class="token punctuation">,</span> value<span class="token punctuation">:</span> <span class="token class-name">Bytes</span><span class="token punctuation">)</span> <span class="token punctuation">-></span> <span class="token keyword">crate</span><span class="token punctuation">::</span><span class="token class-name">Result</span><span class="token operator"><</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token operator">></span> <span class="token punctuation">{</span>
- <span class="token comment">// 通过set_cmd异步创建过期时间为None的Set命令</span>
- <span class="token keyword">self</span><span class="token punctuation">.</span><span class="token function">set_cmd</span><span class="token punctuation">(</span><span class="token class-name">Set</span><span class="token punctuation">::</span><span class="token function">new</span><span class="token punctuation">(</span>key<span class="token punctuation">,</span> value<span class="token punctuation">,</span> <span class="token class-name">None</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span>
- <span class="token punctuation">}</span>
- <span class="token comment">/// 设置key对应的value,value在expiration后过期</span>
- <span class="token attribute attr-name">#[instrument(skip(self))]</span>
- <span class="token keyword">pub</span> <span class="token keyword">async</span> <span class="token keyword">fn</span> <span class="token function-definition function">set_expires</span><span class="token punctuation">(</span>
- <span class="token operator">&</span><span class="token keyword">mut</span> <span class="token keyword">self</span><span class="token punctuation">,</span>
- key<span class="token punctuation">:</span> <span class="token operator">&</span><span class="token keyword">str</span><span class="token punctuation">,</span>
- value<span class="token punctuation">:</span> <span class="token class-name">Bytes</span><span class="token punctuation">,</span>
- expiration<span class="token punctuation">:</span> <span class="token class-name">Duration</span><span class="token punctuation">)</span> <span class="token punctuation">-></span> <span class="token keyword">crate</span><span class="token punctuation">::</span><span class="token class-name">Result</span><span class="token operator"><</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token operator">></span><span class="token punctuation">{</span>
- <span class="token comment">// 通过set_cmd异步创建过期时间为expiration的Set命令</span>
- <span class="token keyword">self</span><span class="token punctuation">.</span><span class="token function">set_cmd</span><span class="token punctuation">(</span><span class="token class-name">Set</span><span class="token punctuation">::</span><span class="token function">new</span><span class="token punctuation">(</span>key<span class="token punctuation">,</span> value<span class="token punctuation">,</span> <span class="token class-name">Some</span><span class="token punctuation">(</span>expiration<span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span>
- <span class="token punctuation">}</span>
- <span class="token comment">/// Set命令的主要逻辑</span>
- <span class="token keyword">async</span> <span class="token keyword">fn</span> <span class="token function-definition function">set_cmd</span><span class="token punctuation">(</span><span class="token operator">&</span><span class="token keyword">mut</span> <span class="token keyword">self</span><span class="token punctuation">,</span> cmd<span class="token punctuation">:</span> <span class="token class-name">Set</span><span class="token punctuation">)</span> <span class="token punctuation">-></span> <span class="token keyword">crate</span><span class="token punctuation">::</span><span class="token class-name">Result</span><span class="token operator"><</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token operator">></span> <span class="token punctuation">{</span>
- <span class="token comment">// 将cmd转化为frame的形式</span>
- <span class="token keyword">let</span> frame <span class="token operator">=</span> cmd<span class="token punctuation">.</span><span class="token function">into_frame</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
- <span class="token comment">// 将frame写入连接</span>
- <span class="token keyword">self</span><span class="token punctuation">.</span>connection<span class="token punctuation">.</span><span class="token function">write_frame</span><span class="token punctuation">(</span><span class="token operator">&</span>frame<span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span><span class="token operator">?</span><span class="token punctuation">;</span>
- <span class="token comment">// 读取对frame的响应,并匹配响应状态</span>
- <span class="token comment">// 执行成功响应为"OK"</span>
- <span class="token comment">// 其他响应均为失败</span>
- <span class="token keyword">match</span> <span class="token keyword">self</span><span class="token punctuation">.</span><span class="token function">read_response</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span><span class="token operator">?</span> <span class="token punctuation">{</span>
- <span class="token class-name">Frame</span><span class="token punctuation">::</span><span class="token class-name">Simple</span><span class="token punctuation">(</span>response<span class="token punctuation">)</span> <span class="token keyword">if</span> response <span class="token operator">==</span> <span class="token string">"OK"</span> <span class="token operator">=></span> <span class="token class-name">Ok</span><span class="token punctuation">(</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">,</span>
- frame <span class="token operator">=></span> <span class="token class-name">Err</span><span class="token punctuation">(</span>frame<span class="token punctuation">.</span><span class="token function">to_error</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">,</span>
- <span class="token punctuation">}</span>
- <span class="token punctuation">}</span>
- <span class="token comment">/// 将信息推送给指定的频道</span>
- <span class="token attribute attr-name">#[instrument(skip(self))]</span>
- <span class="token keyword">pub</span> <span class="token keyword">async</span> <span class="token keyword">fn</span> <span class="token function-definition function">publish</span><span class="token punctuation">(</span><span class="token operator">&</span><span class="token keyword">mut</span> <span class="token keyword">self</span><span class="token punctuation">,</span> channel<span class="token punctuation">:</span> <span class="token operator">&</span><span class="token keyword">str</span><span class="token punctuation">,</span> message<span class="token punctuation">:</span> <span class="token class-name">Bytes</span><span class="token punctuation">)</span> <span class="token punctuation">-></span> <span class="token keyword">crate</span><span class="token punctuation">::</span><span class="token class-name">Result</span><span class="token operator"><</span><span class="token keyword">u64</span><span class="token operator">></span> <span class="token punctuation">{</span>
- <span class="token comment">// 将Publish命令转成frame的格式</span>
- <span class="token keyword">let</span> frame <span class="token operator">=</span> <span class="token class-name">Publish</span><span class="token punctuation">::</span><span class="token function">new</span><span class="token punctuation">(</span>channel<span class="token punctuation">,</span> message<span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token function">into_frame</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
- <span class="token comment">// 日志记录frme</span>
- <span class="token macro property">debug!</span><span class="token punctuation">(</span>request <span class="token operator">=</span> <span class="token operator">?</span>frame<span class="token punctuation">)</span><span class="token punctuation">;</span>
- <span class="token comment">// 将frame通过连接异步写给套接字</span>
- <span class="token keyword">self</span><span class="token punctuation">.</span>connection<span class="token punctuation">.</span><span class="token function">write_frame</span><span class="token punctuation">(</span><span class="token operator">&</span>frame<span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span><span class="token operator">?</span><span class="token punctuation">;</span>
- <span class="token comment">// 异步读请求,并匹配请求类型</span>
- <span class="token comment">// 如果是整数帧,则返回OK(response)</span>
- <span class="token comment">// 否则返回错误</span>
- <span class="token keyword">match</span> <span class="token keyword">self</span><span class="token punctuation">.</span><span class="token function">read_response</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span><span class="token operator">?</span> <span class="token punctuation">{</span>
- <span class="token class-name">Frame</span><span class="token punctuation">::</span><span class="token class-name">Integer</span><span class="token punctuation">(</span>response<span class="token punctuation">)</span> <span class="token operator">=></span> <span class="token class-name">Ok</span><span class="token punctuation">(</span>response<span class="token punctuation">)</span><span class="token punctuation">,</span>
- frame <span class="token operator">=></span> <span class="token class-name">Err</span><span class="token punctuation">(</span>frame<span class="token punctuation">.</span><span class="token function">to_error</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">,</span>
- <span class="token punctuation">}</span>
- <span class="token punctuation">}</span>
- <span class="token comment">/// SUBSCRIBE的主要逻辑</span>
- <span class="token keyword">async</span> <span class="token keyword">fn</span> <span class="token function-definition function">subscribe_cmd</span><span class="token punctuation">(</span><span class="token operator">&</span><span class="token keyword">mut</span> <span class="token keyword">self</span><span class="token punctuation">,</span> channels<span class="token punctuation">:</span> <span class="token operator">&</span><span class="token punctuation">[</span><span class="token class-name">String</span><span class="token punctuation">]</span><span class="token punctuation">)</span> <span class="token punctuation">-></span> <span class="token keyword">crate</span><span class="token punctuation">::</span><span class="token class-name">Result</span><span class="token operator"><</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token operator">></span> <span class="token punctuation">{</span>
- <span class="token comment">// 将Subscribe命令转成frame的格式</span>
- <span class="token keyword">let</span> frame <span class="token operator">=</span> <span class="token class-name">Subscribe</span><span class="token punctuation">::</span><span class="token function">new</span><span class="token punctuation">(</span>channels<span class="token punctuation">.</span><span class="token function">to_vec</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token function">into_frame</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
- <span class="token comment">// 日志记录frme</span>
- <span class="token macro property">debug!</span><span class="token punctuation">(</span>request <span class="token operator">=</span> <span class="token operator">?</span>frame<span class="token punctuation">)</span><span class="token punctuation">;</span>
- <span class="token comment">// 将frame通过连接异步写给套接字</span>
- <span class="token keyword">self</span><span class="token punctuation">.</span>connection<span class="token punctuation">.</span><span class="token function">write_frame</span><span class="token punctuation">(</span><span class="token operator">&</span>frame<span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span><span class="token operator">?</span><span class="token punctuation">;</span>
- <span class="token comment">// 遍历channels,服务端通过响应确认订阅每个频道</span>
- <span class="token keyword">for</span> channel <span class="token keyword">in</span> channels <span class="token punctuation">{</span>
- <span class="token comment">// 获取响应</span>
- <span class="token keyword">let</span> response <span class="token operator">=</span> <span class="token keyword">self</span><span class="token punctuation">.</span><span class="token function">read_response</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span><span class="token operator">?</span><span class="token punctuation">;</span>
- <span class="token comment">// 分析不同情况的响应</span>
- <span class="token keyword">match</span> response <span class="token punctuation">{</span>
- <span class="token class-name">Frame</span><span class="token punctuation">::</span><span class="token class-name">Array</span><span class="token punctuation">(</span><span class="token keyword">ref</span> frame<span class="token punctuation">)</span> <span class="token operator">=></span> <span class="token keyword">match</span> frame<span class="token punctuation">.</span><span class="token function">as_slice</span><span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token punctuation">{</span>
- <span class="token punctuation">[</span>subscribe<span class="token punctuation">,</span> schannel<span class="token punctuation">,</span> <span class="token punctuation">..</span><span class="token punctuation">]</span>
- <span class="token keyword">if</span> <span class="token operator">*</span>subscribe <span class="token operator">==</span> <span class="token string">"subscribe"</span> <span class="token operator">&&</span> <span class="token operator">*</span>schannel <span class="token operator">==</span> channel <span class="token operator">=></span> <span class="token punctuation">{</span><span class="token punctuation">}</span>
- _ <span class="token operator">=></span> <span class="token keyword">return</span> <span class="token class-name">Err</span><span class="token punctuation">(</span>response<span class="token punctuation">.</span><span class="token function">to_error</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">,</span>
- <span class="token punctuation">}</span><span class="token punctuation">,</span>
- frame <span class="token operator">=></span> <span class="token keyword">return</span> <span class="token class-name">Err</span><span class="token punctuation">(</span>frame<span class="token punctuation">.</span><span class="token function">to_error</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">,</span>
- <span class="token punctuation">}</span><span class="token punctuation">;</span>
- <span class="token punctuation">}</span>
- <span class="token class-name">Ok</span><span class="token punctuation">(</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
- <span class="token punctuation">}</span>
- <span class="token comment">/// 监听若干个指定的频道</span>
- <span class="token attribute attr-name">#[instrument(skip(self))]</span>
- <span class="token keyword">pub</span> <span class="token keyword">async</span> <span class="token keyword">fn</span> <span class="token function-definition function">subscribe</span><span class="token punctuation">(</span><span class="token keyword">mut</span> <span class="token keyword">self</span><span class="token punctuation">,</span> channels<span class="token punctuation">:</span> <span class="token class-name">Vec</span><span class="token operator"><</span><span class="token class-name">String</span><span class="token operator">></span><span class="token punctuation">)</span> <span class="token punctuation">-></span> <span class="token keyword">crate</span><span class="token punctuation">::</span><span class="token class-name">Result</span><span class="token operator"><</span><span class="token class-name">Subscriber</span><span class="token operator">></span> <span class="token punctuation">{</span>
- <span class="token comment">// 异步调用subscribe_cmd,客户端状态将会转为subscriber</span>
- <span class="token keyword">self</span><span class="token punctuation">.</span><span class="token function">subscribe_cmd</span><span class="token punctuation">(</span><span class="token operator">&</span>channels<span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span><span class="token operator">?</span><span class="token punctuation">;</span>
- <span class="token comment">// 返回Subscriber类型对象</span>
- <span class="token class-name">Ok</span><span class="token punctuation">(</span><span class="token class-name">Subscriber</span> <span class="token punctuation">{</span>
- client<span class="token punctuation">:</span> <span class="token keyword">self</span><span class="token punctuation">,</span>
- subscribed_channels<span class="token punctuation">:</span> channels<span class="token punctuation">,</span>
- <span class="token punctuation">}</span><span class="token punctuation">)</span>
- <span class="token punctuation">}</span>
- <span class="token comment">/// 读取响应</span>
- <span class="token keyword">async</span> <span class="token keyword">fn</span> <span class="token function-definition function">read_response</span><span class="token punctuation">(</span><span class="token operator">&</span><span class="token keyword">mut</span> <span class="token keyword">self</span><span class="token punctuation">)</span> <span class="token punctuation">-></span> <span class="token keyword">crate</span><span class="token punctuation">::</span><span class="token class-name">Result</span><span class="token operator"><</span><span class="token class-name">Frame</span><span class="token operator">></span> <span class="token punctuation">{</span>
- <span class="token comment">// 异步读取连接的frame</span>
- <span class="token keyword">let</span> response <span class="token operator">=</span> <span class="token keyword">self</span><span class="token punctuation">.</span>connection<span class="token punctuation">.</span><span class="token function">read_frame</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span><span class="token operator">?</span><span class="token punctuation">;</span>
- <span class="token comment">// 日志记录读取信息</span>
- <span class="token macro property">debug!</span><span class="token punctuation">(</span><span class="token operator">?</span>response<span class="token punctuation">)</span><span class="token punctuation">;</span>
- <span class="token comment">// 解析响应,判断响应的类型</span>
- <span class="token comment">// Some(Frame::Error(msg))</span>
- <span class="token comment">// Some(frame)</span>
- <span class="token comment">// None:响应为None意味着服务端关闭</span>
- <span class="token keyword">match</span> response <span class="token punctuation">{</span>
- <span class="token comment">// Error frames are converted to `Err`</span>
- <span class="token class-name">Some</span><span class="token punctuation">(</span><span class="token class-name">Frame</span><span class="token punctuation">::</span><span class="token class-name">Error</span><span class="token punctuation">(</span>msg<span class="token punctuation">)</span><span class="token punctuation">)</span> <span class="token operator">=></span> <span class="token class-name">Err</span><span class="token punctuation">(</span>msg<span class="token punctuation">.</span><span class="token function">into</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">,</span>
- <span class="token class-name">Some</span><span class="token punctuation">(</span>frame<span class="token punctuation">)</span> <span class="token operator">=></span> <span class="token class-name">Ok</span><span class="token punctuation">(</span>frame<span class="token punctuation">)</span><span class="token punctuation">,</span>
- <span class="token class-name">None</span> <span class="token operator">=></span> <span class="token punctuation">{</span>
- <span class="token comment">// Receiving `None` here indicates the server has closed the</span>
- <span class="token comment">// connection without sending a frame. This is unexpected and is</span>
- <span class="token comment">// represented as a "connection reset by peer" error.</span>
- <span class="token keyword">let</span> err <span class="token operator">=</span> <span class="token class-name">Error</span><span class="token punctuation">::</span><span class="token function">new</span><span class="token punctuation">(</span><span class="token class-name">ErrorKind</span><span class="token punctuation">::</span><span class="token class-name">ConnectionReset</span><span class="token punctuation">,</span> <span class="token string">"connection reset by server"</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
- <span class="token class-name">Err</span><span class="token punctuation">(</span>err<span class="token punctuation">.</span><span class="token function">into</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
- <span class="token punctuation">}</span>
- <span class="token punctuation">}</span>
- <span class="token punctuation">}</span>
- <span class="token punctuation">}</span>
- </code></pre></div><h4 id="message"><a href="#message" class="header-anchor">#</a> Message</h4> <h5 id="struct-2"><a href="#struct-2" class="header-anchor">#</a> Struct</h5> <div class="language-rust extra-class"><pre class="language-rust"><code><span class="token comment">/// A message received on a subscribed channel.</span>
- <span class="token attribute attr-name">#[derive(Debug, Clone)]</span>
- <span class="token keyword">pub</span> <span class="token keyword">struct</span> <span class="token type-definition class-name">Message</span> <span class="token punctuation">{</span>
- <span class="token keyword">pub</span> channel<span class="token punctuation">:</span> <span class="token class-name">String</span><span class="token punctuation">,</span>
- <span class="token keyword">pub</span> content<span class="token punctuation">:</span> <span class="token class-name">Bytes</span><span class="token punctuation">,</span>
- <span class="token punctuation">}</span>
- </code></pre></div><h4 id="subscriber"><a href="#subscriber" class="header-anchor">#</a> Subscriber</h4> <blockquote><p>订阅者类型,在客户端订阅若干频道后会转变成订阅者,此时只能执行publish/subscribe相关的命令</p></blockquote> <p>代码位置:src/clients/client.rs</p> <h5 id="struct-3"><a href="#struct-3" class="header-anchor">#</a> Struct</h5> <div class="language-rust extra-class"><pre class="language-rust"><code><span class="token keyword">pub</span> <span class="token keyword">struct</span> <span class="token type-definition class-name">Subscriber</span> <span class="token punctuation">{</span>
- <span class="token comment">/// 客户端</span>
- client<span class="token punctuation">:</span> <span class="token class-name">Client</span><span class="token punctuation">,</span>
- <span class="token comment">/// 所订阅的频道集合</span>
- subscribed_channels<span class="token punctuation">:</span> <span class="token class-name">Vec</span><span class="token operator"><</span><span class="token class-name">String</span><span class="token operator">></span><span class="token punctuation">,</span>
- <span class="token punctuation">}</span>
- </code></pre></div><h5 id="implementation-2"><a href="#implementation-2" class="header-anchor">#</a> Implementation</h5> <div class="language-rust extra-class"><pre class="language-rust"><code><span class="token keyword">impl</span> <span class="token class-name">Subscriber</span><span class="token punctuation">{</span>
- <span class="token comment">/// 获取订阅的频道集合</span>
- <span class="token keyword">pub</span> <span class="token keyword">fn</span> <span class="token function-definition function">get_subscribed</span><span class="token punctuation">(</span><span class="token operator">&</span><span class="token keyword">self</span><span class="token punctuation">)</span> <span class="token punctuation">-></span> <span class="token operator">&</span><span class="token punctuation">[</span><span class="token class-name">String</span><span class="token punctuation">]</span> <span class="token punctuation">{</span>
- <span class="token operator">&</span><span class="token keyword">self</span><span class="token punctuation">.</span>subscribed_channels
- <span class="token punctuation">}</span>
- <span class="token comment">/// 获取客户端收到的下一条消息(可能需要等待)</span>
- <span class="token keyword">pub</span> <span class="token keyword">async</span> <span class="token keyword">fn</span> <span class="token function-definition function">next_message</span><span class="token punctuation">(</span><span class="token operator">&</span><span class="token keyword">mut</span> <span class="token keyword">self</span><span class="token punctuation">)</span> <span class="token punctuation">-></span> <span class="token keyword">crate</span><span class="token punctuation">::</span><span class="token class-name">Result</span><span class="token operator"><</span><span class="token class-name">Option</span><span class="token operator"><</span><span class="token class-name">Message</span><span class="token operator">>></span> <span class="token punctuation">{</span>
- <span class="token keyword">match</span> <span class="token keyword">self</span><span class="token punctuation">.</span>client<span class="token punctuation">.</span>connection<span class="token punctuation">.</span><span class="token function">read_frame</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span><span class="token operator">?</span> <span class="token punctuation">{</span>
- <span class="token class-name">Some</span><span class="token punctuation">(</span>mframe<span class="token punctuation">)</span> <span class="token operator">=></span> <span class="token punctuation">{</span>
- <span class="token comment">// 使用日志记录mframe</span>
- <span class="token macro property">debug!</span><span class="token punctuation">(</span><span class="token operator">?</span>mframe<span class="token punctuation">)</span><span class="token punctuation">;</span>
- <span class="token keyword">match</span> mframe <span class="token punctuation">{</span>
- <span class="token class-name">Frame</span><span class="token punctuation">::</span><span class="token class-name">Array</span><span class="token punctuation">(</span><span class="token keyword">ref</span> frame<span class="token punctuation">)</span> <span class="token operator">=></span> <span class="token keyword">match</span> frame<span class="token punctuation">.</span><span class="token function">as_slice</span><span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token punctuation">{</span>
- <span class="token comment">// frame分片后格式为[message,channel,content]</span>
- <span class="token comment">// 当接收到信息时,message == 'message'</span>
- <span class="token comment">// 成立则将信息转化为Message的形式返回</span>
- <span class="token comment">// 否则返回错误</span>
- <span class="token punctuation">[</span>message<span class="token punctuation">,</span> channel<span class="token punctuation">,</span> content<span class="token punctuation">]</span> <span class="token keyword">if</span> <span class="token operator">*</span>message <span class="token operator">==</span> <span class="token string">"message"</span> <span class="token operator">=></span> <span class="token class-name">Ok</span><span class="token punctuation">(</span><span class="token class-name">Some</span><span class="token punctuation">(</span><span class="token class-name">Message</span> <span class="token punctuation">{</span>
- channel<span class="token punctuation">:</span> channel<span class="token punctuation">.</span><span class="token function">to_string</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">,</span>
- content<span class="token punctuation">:</span> <span class="token class-name">Bytes</span><span class="token punctuation">::</span><span class="token function">from</span><span class="token punctuation">(</span>content<span class="token punctuation">.</span><span class="token function">to_string</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">,</span>
- <span class="token punctuation">}</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">,</span>
- _ <span class="token operator">=></span> <span class="token class-name">Err</span><span class="token punctuation">(</span>mframe<span class="token punctuation">.</span><span class="token function">to_error</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">,</span>
- <span class="token punctuation">}</span><span class="token punctuation">,</span>
- frame <span class="token operator">=></span> <span class="token class-name">Err</span><span class="token punctuation">(</span>frame<span class="token punctuation">.</span><span class="token function">to_error</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">,</span>
- <span class="token punctuation">}</span>
- <span class="token punctuation">}</span>
- <span class="token class-name">None</span> <span class="token operator">=></span> <span class="token class-name">Ok</span><span class="token punctuation">(</span><span class="token class-name">None</span><span class="token punctuation">)</span><span class="token punctuation">,</span>
- <span class="token punctuation">}</span>
- <span class="token punctuation">}</span>
- <span class="token comment">/// 将收到的message转换成stream形式</span>
- <span class="token keyword">pub</span> <span class="token keyword">fn</span> <span class="token function-definition function">into_stream</span><span class="token punctuation">(</span><span class="token keyword">mut</span> <span class="token keyword">self</span><span class="token punctuation">)</span> <span class="token punctuation">-></span> <span class="token keyword">impl</span> <span class="token class-name">Stream</span><span class="token operator"><</span><span class="token class-name">Item</span> <span class="token operator">=</span> <span class="token keyword">crate</span><span class="token punctuation">::</span><span class="token class-name">Result</span><span class="token operator"><</span><span class="token class-name">Message</span><span class="token operator">>></span> <span class="token punctuation">{</span>
- <span class="token macro property">try_stream!</span> <span class="token punctuation">{</span>
- <span class="token keyword">while</span> <span class="token keyword">let</span> <span class="token class-name">Some</span><span class="token punctuation">(</span>message<span class="token punctuation">)</span> <span class="token operator">=</span> <span class="token keyword">self</span><span class="token punctuation">.</span><span class="token function">next_message</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span><span class="token operator">?</span> <span class="token punctuation">{</span>
- <span class="token keyword">yield</span> message<span class="token punctuation">;</span>
- <span class="token punctuation">}</span>
- <span class="token punctuation">}</span>
- <span class="token punctuation">}</span>
- <span class="token comment">/// 订阅新的频道集合</span>
- <span class="token attribute attr-name">#[instrument(skip(self))]</span>
- <span class="token keyword">pub</span> <span class="token keyword">async</span> <span class="token keyword">fn</span> <span class="token function-definition function">subscribe</span><span class="token punctuation">(</span><span class="token operator">&</span><span class="token keyword">mut</span> <span class="token keyword">self</span><span class="token punctuation">,</span> channels<span class="token punctuation">:</span> <span class="token operator">&</span><span class="token punctuation">[</span><span class="token class-name">String</span><span class="token punctuation">]</span><span class="token punctuation">)</span> <span class="token punctuation">-></span> <span class="token keyword">crate</span><span class="token punctuation">::</span><span class="token class-name">Result</span><span class="token operator"><</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token operator">></span> <span class="token punctuation">{</span>
- <span class="token comment">// 调用subscribe_cmd,订阅新的频道集合</span>
- <span class="token keyword">self</span><span class="token punctuation">.</span>client<span class="token punctuation">.</span><span class="token function">subscribe_cmd</span><span class="token punctuation">(</span>channels<span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span><span class="token operator">?</span><span class="token punctuation">;</span>
- <span class="token comment">// 更新当前的已订阅频道</span>
- <span class="token keyword">self</span><span class="token punctuation">.</span>subscribed_channels
- <span class="token punctuation">.</span><span class="token function">extend</span><span class="token punctuation">(</span>channels<span class="token punctuation">.</span><span class="token function">iter</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token function">map</span><span class="token punctuation">(</span><span class="token class-name">Clone</span><span class="token punctuation">::</span>clone<span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
- <span class="token class-name">Ok</span><span class="token punctuation">(</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
- <span class="token punctuation">}</span>
- <span class="token comment">/// 取消订阅指定的频道</span>
- <span class="token attribute attr-name">#[instrument(skip(self))]</span>
- <span class="token keyword">pub</span> <span class="token keyword">async</span> <span class="token keyword">fn</span> <span class="token function-definition function">unsubscribe</span><span class="token punctuation">(</span><span class="token operator">&</span><span class="token keyword">mut</span> <span class="token keyword">self</span><span class="token punctuation">,</span> channels<span class="token punctuation">:</span> <span class="token operator">&</span><span class="token punctuation">[</span><span class="token class-name">String</span><span class="token punctuation">]</span><span class="token punctuation">)</span> <span class="token punctuation">-></span> <span class="token keyword">crate</span><span class="token punctuation">::</span><span class="token class-name">Result</span><span class="token operator"><</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token operator">></span> <span class="token punctuation">{</span>
- <span class="token comment">// 创建Unsubcribe命令,并转为frame形式</span>
- <span class="token keyword">let</span> frame <span class="token operator">=</span> <span class="token class-name">Unsubscribe</span><span class="token punctuation">::</span><span class="token function">new</span><span class="token punctuation">(</span>channels<span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token function">into_frame</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
- <span class="token comment">// 日志记录请求</span>
- <span class="token macro property">debug!</span><span class="token punctuation">(</span>request <span class="token operator">=</span> <span class="token operator">?</span>frame<span class="token punctuation">)</span><span class="token punctuation">;</span>
- <span class="token comment">// 将请求写入连接</span>
- <span class="token keyword">self</span><span class="token punctuation">.</span>client<span class="token punctuation">.</span>connection<span class="token punctuation">.</span><span class="token function">write_frame</span><span class="token punctuation">(</span><span class="token operator">&</span>frame<span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span><span class="token operator">?</span><span class="token punctuation">;</span>
- <span class="token comment">// 如果channels为空,那么取消订阅所有频道</span>
- <span class="token comment">// 否则只取消channels中指定的频道</span>
- <span class="token keyword">let</span> num <span class="token operator">=</span> <span class="token keyword">if</span> channels<span class="token punctuation">.</span><span class="token function">is_empty</span><span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token punctuation">{</span>
- <span class="token keyword">self</span><span class="token punctuation">.</span>subscribed_channels<span class="token punctuation">.</span><span class="token function">len</span><span class="token punctuation">(</span><span class="token punctuation">)</span>
- <span class="token punctuation">}</span> <span class="token keyword">else</span> <span class="token punctuation">{</span>
- channels<span class="token punctuation">.</span><span class="token function">len</span><span class="token punctuation">(</span><span class="token punctuation">)</span>
- <span class="token punctuation">}</span><span class="token punctuation">;</span>
- <span class="token comment">// 解析响应</span>
- <span class="token keyword">for</span> _ <span class="token keyword">in</span> <span class="token number">0</span><span class="token punctuation">..</span>num <span class="token punctuation">{</span>
- <span class="token comment">// 读取响应</span>
- <span class="token macro property">todo!</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
- <span class="token comment">// 判断响应类型</span>
- <span class="token keyword">match</span> response <span class="token punctuation">{</span>
- <span class="token class-name">Frame</span><span class="token punctuation">::</span><span class="token class-name">Array</span><span class="token punctuation">(</span><span class="token keyword">ref</span> frame<span class="token punctuation">)</span> <span class="token operator">=></span> <span class="token keyword">match</span> frame<span class="token punctuation">.</span><span class="token function">as_slice</span><span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token punctuation">{</span>
- <span class="token comment">// frame分片后格式为['unsubscribe',channel,..]</span>
- <span class="token comment">// 判断是否取消订阅</span>
- <span class="token punctuation">[</span>unsubscribe<span class="token punctuation">,</span> channel<span class="token punctuation">,</span> <span class="token punctuation">..</span><span class="token punctuation">]</span> <span class="token keyword">if</span> <span class="token operator">*</span>unsubscribe <span class="token operator">==</span> <span class="token string">"unsubscribe"</span> <span class="token operator">=></span> <span class="token punctuation">{</span>
- <span class="token comment">// 获取当前订阅数组的长度</span>
- <span class="token keyword">let</span> len <span class="token operator">=</span> <span class="token keyword">self</span><span class="token punctuation">.</span>subscribed_channels<span class="token punctuation">.</span><span class="token function">len</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
- <span class="token comment">// 长度为0,返回错误</span>
- <span class="token keyword">if</span> len <span class="token operator">==</span> <span class="token number">0</span> <span class="token punctuation">{</span>
- <span class="token comment">// There must be at least one channel</span>
- <span class="token keyword">return</span> <span class="token class-name">Err</span><span class="token punctuation">(</span>response<span class="token punctuation">.</span><span class="token function">to_error</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
- <span class="token punctuation">}</span>
- <span class="token comment">// 当channel存在于subscribed_channels时将其删除</span>
- <span class="token keyword">self</span><span class="token punctuation">.</span>subscribed_channels<span class="token punctuation">.</span><span class="token function">retain</span><span class="token punctuation">(</span><span class="token closure-params"><span class="token closure-punctuation punctuation">|</span>c<span class="token closure-punctuation punctuation">|</span></span> <span class="token operator">*</span>channel <span class="token operator">!=</span> <span class="token operator">&</span>c<span class="token punctuation">[</span><span class="token punctuation">..</span><span class="token punctuation">]</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
- <span class="token comment">// 删除数大于1则返回错误</span>
- <span class="token keyword">if</span> <span class="token keyword">self</span><span class="token punctuation">.</span>subscribed_channels<span class="token punctuation">.</span><span class="token function">len</span><span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token operator">!=</span> len <span class="token operator">-</span> <span class="token number">1</span> <span class="token punctuation">{</span>
- <span class="token keyword">return</span> <span class="token class-name">Err</span><span class="token punctuation">(</span>response<span class="token punctuation">.</span><span class="token function">to_error</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
- <span class="token punctuation">}</span> <span class="token punctuation">}</span>
- _ <span class="token operator">=></span> <span class="token keyword">return</span> <span class="token class-name">Err</span><span class="token punctuation">(</span>response<span class="token punctuation">.</span><span class="token function">to_error</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">,</span>
- <span class="token punctuation">}</span><span class="token punctuation">,</span>
- frame <span class="token operator">=></span> <span class="token keyword">return</span> <span class="token class-name">Err</span><span class="token punctuation">(</span>frame<span class="token punctuation">.</span><span class="token function">to_error</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">,</span>
- <span class="token punctuation">}</span><span class="token punctuation">;</span>
- <span class="token punctuation">}</span>
- <span class="token class-name">Ok</span><span class="token punctuation">(</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
- <span class="token punctuation">}</span>
- <span class="token punctuation">}</span>
- </code></pre></div><h3 id="blocking-client"><a href="#blocking-client" class="header-anchor">#</a> blocking_client</h3> <blockquote><p>最小的阻塞性Redis客户端</p></blockquote> <p>代码位置:src/clients/blocking_client.rs</p> <h4 id="blockingclient"><a href="#blockingclient" class="header-anchor">#</a> BlockingClient</h4> <h5 id="struct-4"><a href="#struct-4" class="header-anchor">#</a> Struct</h5> <div class="language-rust extra-class"><pre class="language-rust"><code><span class="token keyword">pub</span> <span class="token keyword">struct</span> <span class="token type-definition class-name">BlockingClient</span><span class="token punctuation">{</span>
- <span class="token comment">/// 异步的客户端</span>
- inner<span class="token punctuation">:</span> <span class="token keyword">crate</span><span class="token module-declaration namespace"><span class="token punctuation">::</span>clients<span class="token punctuation">::</span></span><span class="token class-name">Client</span><span class="token punctuation">,</span>
- <span class="token comment">/// 运行时环境,负责调度和管理异步任务的执行</span>
- rt<span class="token punctuation">:</span> <span class="token class-name">Runtime</span><span class="token punctuation">,</span>
- <span class="token punctuation">}</span>
- </code></pre></div><h5 id="implementation-3"><a href="#implementation-3" class="header-anchor">#</a> Implementation</h5> <div class="language-rust extra-class"><pre class="language-rust"><code><span class="token keyword">impl</span> <span class="token class-name">BlockingClient</span><span class="token punctuation">{</span>
- <span class="token comment">/// 建立连接</span>
- <span class="token keyword">pub</span> <span class="token keyword">fn</span> <span class="token function-definition function">connect</span><span class="token operator"><</span><span class="token class-name">T</span><span class="token punctuation">:</span> <span class="token class-name">ToSocketAddrs</span><span class="token operator">></span><span class="token punctuation">(</span>addr<span class="token punctuation">:</span> <span class="token class-name">T</span><span class="token punctuation">)</span> <span class="token punctuation">-></span> <span class="token keyword">crate</span><span class="token punctuation">::</span><span class="token class-name">Result</span><span class="token operator"><</span><span class="token class-name">BlockingClient</span><span class="token operator">></span> <span class="token punctuation">{</span>
- <span class="token keyword">let</span> rt <span class="token operator">=</span> <span class="token namespace">tokio<span class="token punctuation">::</span>runtime<span class="token punctuation">::</span></span><span class="token class-name">Builder</span><span class="token punctuation">::</span><span class="token function">new_current_thread</span><span class="token punctuation">(</span><span class="token punctuation">)</span>
- <span class="token punctuation">.</span><span class="token function">enable_all</span><span class="token punctuation">(</span><span class="token punctuation">)</span>
- <span class="token punctuation">.</span><span class="token function">build</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token operator">?</span><span class="token punctuation">;</span>
- <span class="token keyword">let</span> inner <span class="token operator">=</span> rt<span class="token punctuation">.</span><span class="token function">block_on</span><span class="token punctuation">(</span><span class="token keyword">crate</span><span class="token module-declaration namespace"><span class="token punctuation">::</span>clients<span class="token punctuation">::</span></span><span class="token class-name">Client</span><span class="token punctuation">::</span><span class="token function">connect</span><span class="token punctuation">(</span>addr<span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token operator">?</span><span class="token punctuation">;</span>
- <span class="token class-name">Ok</span><span class="token punctuation">(</span><span class="token class-name">BlockingClient</span> <span class="token punctuation">{</span> inner<span class="token punctuation">,</span> rt <span class="token punctuation">}</span><span class="token punctuation">)</span>
- <span class="token punctuation">}</span>
- <span class="token comment">/// 异步获取指定值</span>
- <span class="token keyword">pub</span> <span class="token keyword">fn</span> <span class="token function-definition function">get</span><span class="token punctuation">(</span><span class="token operator">&</span><span class="token keyword">mut</span> <span class="token keyword">self</span><span class="token punctuation">,</span> key<span class="token punctuation">:</span> <span class="token operator">&</span><span class="token keyword">str</span><span class="token punctuation">)</span> <span class="token punctuation">-></span> <span class="token keyword">crate</span><span class="token punctuation">::</span><span class="token class-name">Result</span><span class="token operator"><</span><span class="token class-name">Option</span><span class="token operator"><</span><span class="token class-name">Bytes</span><span class="token operator">>></span> <span class="token punctuation">{</span>
- <span class="token comment">// 异步调用get获取key对应的值</span>
- <span class="token keyword">self</span><span class="token punctuation">.</span>rt<span class="token punctuation">.</span><span class="token function">block_on</span><span class="token punctuation">(</span><span class="token keyword">self</span><span class="token punctuation">.</span>inner<span class="token punctuation">.</span><span class="token function">get</span><span class="token punctuation">(</span>key<span class="token punctuation">)</span><span class="token punctuation">)</span>
- <span class="token punctuation">}</span>
- <span class="token comment">/// 异步给key赋值</span>
- <span class="token keyword">pub</span> <span class="token keyword">fn</span> <span class="token function-definition function">set</span><span class="token punctuation">(</span><span class="token operator">&</span><span class="token keyword">mut</span> <span class="token keyword">self</span><span class="token punctuation">,</span> key<span class="token punctuation">:</span> <span class="token operator">&</span><span class="token keyword">str</span><span class="token punctuation">,</span> value<span class="token punctuation">:</span> <span class="token class-name">Bytes</span><span class="token punctuation">)</span> <span class="token punctuation">-></span> <span class="token keyword">crate</span><span class="token punctuation">::</span><span class="token class-name">Result</span><span class="token operator"><</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token operator">></span> <span class="token punctuation">{</span>
- <span class="token comment">// 异步调用set给key赋值</span>
- <span class="token keyword">self</span><span class="token punctuation">.</span>rt<span class="token punctuation">.</span><span class="token function">block_on</span><span class="token punctuation">(</span><span class="token keyword">self</span><span class="token punctuation">.</span>inner<span class="token punctuation">.</span><span class="token function">set</span><span class="token punctuation">(</span>key<span class="token punctuation">,</span> value<span class="token punctuation">)</span><span class="token punctuation">)</span>
- <span class="token punctuation">}</span>
- <span class="token comment">/// 异步给key赋值,并指定过期时间</span>
- <span class="token keyword">pub</span> <span class="token keyword">fn</span> <span class="token function-definition function">set_expires</span><span class="token punctuation">(</span>
- <span class="token operator">&</span><span class="token keyword">mut</span> <span class="token keyword">self</span><span class="token punctuation">,</span>
- key<span class="token punctuation">:</span> <span class="token operator">&</span><span class="token keyword">str</span><span class="token punctuation">,</span>
- value<span class="token punctuation">:</span> <span class="token class-name">Bytes</span><span class="token punctuation">,</span>
- expiration<span class="token punctuation">:</span> <span class="token class-name">Duration</span><span class="token punctuation">,</span>
- <span class="token punctuation">)</span> <span class="token punctuation">-></span> <span class="token keyword">crate</span><span class="token punctuation">::</span><span class="token class-name">Result</span><span class="token operator"><</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token operator">></span> <span class="token punctuation">{</span>
- <span class="token comment">// 异步调用set_expires给key赋值</span>
- <span class="token keyword">self</span><span class="token punctuation">.</span>rt
- <span class="token punctuation">.</span><span class="token function">block_on</span><span class="token punctuation">(</span><span class="token keyword">self</span><span class="token punctuation">.</span>inner<span class="token punctuation">.</span><span class="token function">set_expires</span><span class="token punctuation">(</span>key<span class="token punctuation">,</span> value<span class="token punctuation">,</span> expiration<span class="token punctuation">)</span><span class="token punctuation">)</span> <span class="token punctuation">}</span>
- <span class="token comment">/// 异步推送消息</span>
- <span class="token keyword">pub</span> <span class="token keyword">fn</span> <span class="token function-definition function">publish</span><span class="token punctuation">(</span><span class="token operator">&</span><span class="token keyword">mut</span> <span class="token keyword">self</span><span class="token punctuation">,</span> channel<span class="token punctuation">:</span> <span class="token operator">&</span><span class="token keyword">str</span><span class="token punctuation">,</span> message<span class="token punctuation">:</span> <span class="token class-name">Bytes</span><span class="token punctuation">)</span> <span class="token punctuation">-></span> <span class="token keyword">crate</span><span class="token punctuation">::</span><span class="token class-name">Result</span><span class="token operator"><</span><span class="token keyword">u64</span><span class="token operator">></span> <span class="token punctuation">{</span>
- <span class="token comment">// 异步调用publish给指定channel推送消息</span>
- <span class="token keyword">self</span><span class="token punctuation">.</span>rt<span class="token punctuation">.</span><span class="token function">block_on</span><span class="token punctuation">(</span><span class="token keyword">self</span><span class="token punctuation">.</span>inner<span class="token punctuation">.</span><span class="token function">publish</span><span class="token punctuation">(</span>channel<span class="token punctuation">,</span> message<span class="token punctuation">)</span><span class="token punctuation">)</span>
- <span class="token punctuation">}</span>
- <span class="token comment">/// 异步执行订阅指定频道操作,并将Client转换为BlockingSubcriber状态</span>
- <span class="token keyword">pub</span> <span class="token keyword">fn</span> <span class="token function-definition function">subscribe</span><span class="token punctuation">(</span><span class="token keyword">self</span><span class="token punctuation">,</span> channels<span class="token punctuation">:</span> <span class="token class-name">Vec</span><span class="token operator"><</span><span class="token class-name">String</span><span class="token operator">></span><span class="token punctuation">)</span> <span class="token punctuation">-></span> <span class="token keyword">crate</span><span class="token punctuation">::</span><span class="token class-name">Result</span><span class="token operator"><</span><span class="token class-name">BlockingSubscriber</span><span class="token operator">></span> <span class="token punctuation">{</span>
- <span class="token comment">// 异步调用subscribe函数,转换client状态</span>
- <span class="token keyword">let</span> subscriber <span class="token operator">=</span> <span class="token keyword">self</span><span class="token punctuation">.</span>rt<span class="token punctuation">.</span><span class="token function">block_on</span><span class="token punctuation">(</span><span class="token keyword">self</span><span class="token punctuation">.</span>inner<span class="token punctuation">.</span><span class="token function">subscribe</span><span class="token punctuation">(</span>channels<span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token operator">?</span><span class="token punctuation">;</span>
- <span class="token comment">// 返回BlockingSubscriber</span>
- <span class="token class-name">Ok</span><span class="token punctuation">(</span><span class="token class-name">BlockingSubscriber</span> <span class="token punctuation">{</span>
- inner<span class="token punctuation">:</span> subscriber<span class="token punctuation">,</span>
- rt<span class="token punctuation">:</span> <span class="token keyword">self</span><span class="token punctuation">.</span>rt<span class="token punctuation">,</span>
- <span class="token punctuation">}</span><span class="token punctuation">)</span>
- <span class="token punctuation">}</span>
- <span class="token punctuation">}</span>
- </code></pre></div><h4 id="blockingsubscriber"><a href="#blockingsubscriber" class="header-anchor">#</a> BlockingSubscriber</h4> <h5 id="struct-5"><a href="#struct-5" class="header-anchor">#</a> Struct</h5> <div class="language-rust extra-class"><pre class="language-rust"><code><span class="token keyword">pub</span> <span class="token keyword">struct</span> <span class="token type-definition class-name">BlockingSubscriber</span> <span class="token punctuation">{</span>
- <span class="token comment">/// 异步的订阅者</span>
- inner<span class="token punctuation">:</span> <span class="token keyword">crate</span><span class="token module-declaration namespace"><span class="token punctuation">::</span>clients<span class="token punctuation">::</span></span><span class="token class-name">Subscriber</span><span class="token punctuation">,</span>
- <span class="token comment">/// 运行时环境,负责调度和管理异步任务的执行</span>
- rt<span class="token punctuation">:</span> <span class="token class-name">Runtime</span><span class="token punctuation">,</span>
- <span class="token punctuation">}</span>
- </code></pre></div><h5 id="implementation-4"><a href="#implementation-4" class="header-anchor">#</a> Implementation</h5> <div class="language-rust extra-class"><pre class="language-rust"><code><span class="token keyword">impl</span> <span class="token class-name">BlockingSubscriber</span><span class="token punctuation">{</span>
- <span class="token comment">/// 获取订阅的channel集合</span>
- <span class="token keyword">pub</span> <span class="token keyword">fn</span> <span class="token function-definition function">get_subscribed</span><span class="token punctuation">(</span><span class="token operator">&</span><span class="token keyword">self</span><span class="token punctuation">)</span> <span class="token punctuation">-></span> <span class="token operator">&</span><span class="token punctuation">[</span><span class="token class-name">String</span><span class="token punctuation">]</span> <span class="token punctuation">{</span>
- <span class="token comment">// 获取订阅的channel集合,并返回</span>
- <span class="token keyword">self</span><span class="token punctuation">.</span>inner<span class="token punctuation">.</span><span class="token function">get_subscribed</span><span class="token punctuation">(</span><span class="token punctuation">)</span>
- <span class="token punctuation">}</span>
- <span class="token comment">/// 异步接收下一条信息</span>
- <span class="token keyword">pub</span> <span class="token keyword">fn</span> <span class="token function-definition function">next_message</span><span class="token punctuation">(</span><span class="token operator">&</span><span class="token keyword">mut</span> <span class="token keyword">self</span><span class="token punctuation">)</span> <span class="token punctuation">-></span> <span class="token keyword">crate</span><span class="token punctuation">::</span><span class="token class-name">Result</span><span class="token operator"><</span><span class="token class-name">Option</span><span class="token operator"><</span><span class="token class-name">Message</span><span class="token operator">>></span> <span class="token punctuation">{</span>
- <span class="token comment">// 异步执行获取下一条信息的操作</span>
- <span class="token keyword">self</span><span class="token punctuation">.</span>rt<span class="token punctuation">.</span><span class="token function">block_on</span><span class="token punctuation">(</span><span class="token keyword">self</span><span class="token punctuation">.</span>inner<span class="token punctuation">.</span><span class="token function">next_message</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
- <span class="token punctuation">}</span>
- <span class="token comment">/// 异步订阅新的频道集合</span>
- <span class="token keyword">pub</span> <span class="token keyword">fn</span> <span class="token function-definition function">subscribe</span><span class="token punctuation">(</span><span class="token operator">&</span><span class="token keyword">mut</span> <span class="token keyword">self</span><span class="token punctuation">,</span> channels<span class="token punctuation">:</span> <span class="token operator">&</span><span class="token punctuation">[</span><span class="token class-name">String</span><span class="token punctuation">]</span><span class="token punctuation">)</span> <span class="token punctuation">-></span> <span class="token keyword">crate</span><span class="token punctuation">::</span><span class="token class-name">Result</span><span class="token operator"><</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token operator">></span> <span class="token punctuation">{</span>
- <span class="token comment">// 异步订阅新的频道集合</span>
- <span class="token keyword">self</span><span class="token punctuation">.</span>rt<span class="token punctuation">.</span><span class="token function">block_on</span><span class="token punctuation">(</span><span class="token keyword">self</span><span class="token punctuation">.</span>inner<span class="token punctuation">.</span><span class="token function">subscribe</span><span class="token punctuation">(</span>channels<span class="token punctuation">)</span><span class="token punctuation">)</span>
- <span class="token punctuation">}</span>
- <span class="token comment">/// 异步执行取消订阅指定频道集操作</span>
- <span class="token keyword">pub</span> <span class="token keyword">fn</span> <span class="token function-definition function">unsubscribe</span><span class="token punctuation">(</span><span class="token operator">&</span><span class="token keyword">mut</span> <span class="token keyword">self</span><span class="token punctuation">,</span> channels<span class="token punctuation">:</span> <span class="token operator">&</span><span class="token punctuation">[</span><span class="token class-name">String</span><span class="token punctuation">]</span><span class="token punctuation">)</span> <span class="token punctuation">-></span> <span class="token keyword">crate</span><span class="token punctuation">::</span><span class="token class-name">Result</span><span class="token operator"><</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token operator">></span> <span class="token punctuation">{</span>
- <span class="token comment">// 异步执行取消订阅指定频道集操作</span>
- <span class="token keyword">self</span><span class="token punctuation">.</span>rt<span class="token punctuation">.</span><span class="token function">block_on</span><span class="token punctuation">(</span><span class="token keyword">self</span><span class="token punctuation">.</span>inner<span class="token punctuation">.</span><span class="token function">unsubscribe</span><span class="token punctuation">(</span>channels<span class="token punctuation">)</span><span class="token punctuation">)</span>
- <span class="token punctuation">}</span>
- <span class="token comment">/// 获取BlockingSubscriber的迭代器</span>
- <span class="token keyword">pub</span> <span class="token keyword">fn</span> <span class="token function-definition function">into_iter</span><span class="token punctuation">(</span><span class="token keyword">self</span><span class="token punctuation">)</span> <span class="token punctuation">-></span> <span class="token keyword">impl</span> <span class="token class-name">Iterator</span><span class="token operator"><</span><span class="token class-name">Item</span> <span class="token operator">=</span> <span class="token keyword">crate</span><span class="token punctuation">::</span><span class="token class-name">Result</span><span class="token operator"><</span><span class="token class-name">Message</span><span class="token operator">>></span> <span class="token punctuation">{</span>
- <span class="token class-name">SubscriberIterator</span> <span class="token punctuation">{</span>
- inner<span class="token punctuation">:</span> <span class="token keyword">self</span><span class="token punctuation">.</span>inner<span class="token punctuation">,</span>
- rt<span class="token punctuation">:</span> <span class="token keyword">self</span><span class="token punctuation">.</span>rt<span class="token punctuation">,</span>
- <span class="token punctuation">}</span>
- <span class="token punctuation">}</span>
- <span class="token punctuation">}</span>
- </code></pre></div><h4 id="subscriberiterator"><a href="#subscriberiterator" class="header-anchor">#</a> SubscriberIterator</h4> <h5 id="struct-6"><a href="#struct-6" class="header-anchor">#</a> Struct</h5> <div class="language-rust extra-class"><pre class="language-rust"><code><span class="token comment">/// BlockingSubscriber的迭代器</span>
- <span class="token comment">/// 通过`Subscriber::into_iter`可以获取</span>
- <span class="token keyword">struct</span> <span class="token type-definition class-name">SubscriberIterator</span> <span class="token punctuation">{</span>
- <span class="token comment">/// BlockingSubscriber中的Subscriber</span>
- inner<span class="token punctuation">:</span> <span class="token keyword">crate</span><span class="token module-declaration namespace"><span class="token punctuation">::</span>clients<span class="token punctuation">::</span></span><span class="token class-name">Subscriber</span><span class="token punctuation">,</span>
- <span class="token comment">/// BlockingSubscriber中的Runtime</span>
- rt<span class="token punctuation">:</span> <span class="token class-name">Runtime</span><span class="token punctuation">,</span>
- <span class="token punctuation">}</span>
- </code></pre></div><h5 id="implementation-5"><a href="#implementation-5" class="header-anchor">#</a> Implementation</h5> <div class="language-rust extra-class"><pre class="language-rust"><code><span class="token keyword">impl</span> <span class="token class-name">Iterator</span> <span class="token keyword">for</span> <span class="token class-name">SubscriberIterator</span> <span class="token punctuation">{</span>
- <span class="token comment">/// 定义迭代器的元素类型</span>
- <span class="token keyword">type</span> <span class="token type-definition class-name">Item</span> <span class="token operator">=</span> <span class="token keyword">crate</span><span class="token punctuation">::</span><span class="token class-name">Result</span><span class="token operator"><</span><span class="token class-name">Message</span><span class="token operator">></span><span class="token punctuation">;</span>
- <span class="token comment">/// 获取迭代器的下一个元素</span>
- <span class="token comment">/// 返回Some(Ok(message)) 表示存在下一个元素</span>
- <span class="token comment">/// 返回Some(Err(error)) 表示获取下一个元素出错</span>
- <span class="token comment">/// 返回None 表示没有下一个元素</span>
- <span class="token keyword">fn</span> <span class="token function-definition function">next</span><span class="token punctuation">(</span><span class="token operator">&</span><span class="token keyword">mut</span> <span class="token keyword">self</span><span class="token punctuation">)</span> <span class="token punctuation">-></span> <span class="token class-name">Option</span><span class="token operator"><</span><span class="token keyword">crate</span><span class="token punctuation">::</span><span class="token class-name">Result</span><span class="token operator"><</span><span class="token class-name">Message</span><span class="token operator">>></span>
- <span class="token punctuation">{</span>
- <span class="token keyword">self</span><span class="token punctuation">.</span>rt<span class="token punctuation">.</span><span class="token function">block_on</span><span class="token punctuation">(</span><span class="token keyword">self</span><span class="token punctuation">.</span>inner<span class="token punctuation">.</span><span class="token function">next_message</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token function">transpose</span><span class="token punctuation">(</span><span class="token punctuation">)</span>
- <span class="token punctuation">}</span>
- <span class="token punctuation">}</span>
- </code></pre></div><h3 id="cmd"><a href="#cmd" class="header-anchor">#</a> cmd</h3> <h4 id="get"><a href="#get" class="header-anchor">#</a> Get</h4> <blockquote><p>获取key对应的value</p></blockquote> <ul><li>如果key不存在,返回nil</li> <li>如果value不是string类型,返回error</li></ul> <p>代码位置:src/cmd/get.rs</p> <h5 id="struct-7"><a href="#struct-7" class="header-anchor">#</a> Struct</h5> <div class="language-rust extra-class"><pre class="language-rust"><code><span class="token attribute attr-name">#[derive(Debug)]</span>
- <span class="token keyword">pub</span> <span class="token keyword">struct</span> <span class="token type-definition class-name">Get</span> <span class="token punctuation">{</span>
- <span class="token comment">/// Name of the key to get</span>
- key<span class="token punctuation">:</span> <span class="token class-name">String</span><span class="token punctuation">,</span>
- <span class="token punctuation">}</span>
- </code></pre></div><h5 id="implementation-6"><a href="#implementation-6" class="header-anchor">#</a> Implementation</h5> <div class="language-rust extra-class"><pre class="language-rust"><code><span class="token keyword">impl</span> <span class="token class-name">Get</span><span class="token punctuation">{</span>
- <span class="token comment">/// 创建一个Get命令对象</span>
- <span class="token keyword">pub</span> <span class="token keyword">fn</span> <span class="token function-definition function">new</span><span class="token punctuation">(</span>key<span class="token punctuation">:</span> <span class="token keyword">impl</span> <span class="token class-name">ToString</span><span class="token punctuation">)</span> <span class="token punctuation">-></span> <span class="token class-name">Get</span> <span class="token punctuation">{</span>
- <span class="token class-name">Get</span> <span class="token punctuation">{</span>
- key<span class="token punctuation">:</span> key<span class="token punctuation">.</span><span class="token function">to_string</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">,</span>
- <span class="token punctuation">}</span>
- <span class="token punctuation">}</span>
- <span class="token comment">/// Get the key</span>
- <span class="token keyword">pub</span> <span class="token keyword">fn</span> <span class="token function-definition function">key</span><span class="token punctuation">(</span><span class="token operator">&</span><span class="token keyword">self</span><span class="token punctuation">)</span> <span class="token punctuation">-></span> <span class="token operator">&</span><span class="token keyword">str</span> <span class="token punctuation">{</span>
- <span class="token operator">&</span><span class="token keyword">self</span><span class="token punctuation">.</span>key
- <span class="token punctuation">}</span>
- <span class="token comment">/// 解析frame,并获取对应Get命令</span>
- <span class="token keyword">pub</span><span class="token punctuation">(</span><span class="token keyword">crate</span><span class="token punctuation">)</span> <span class="token keyword">fn</span> <span class="token function-definition function">parse_frames</span><span class="token punctuation">(</span>parse<span class="token punctuation">:</span> <span class="token operator">&</span><span class="token keyword">mut</span> <span class="token class-name">Parse</span><span class="token punctuation">)</span> <span class="token punctuation">-></span> <span class="token keyword">crate</span><span class="token punctuation">::</span><span class="token class-name">Result</span><span class="token operator"><</span><span class="token class-name">Get</span><span class="token operator">></span> <span class="token punctuation">{</span>
- <span class="token keyword">let</span> key <span class="token operator">=</span> parse<span class="token punctuation">.</span><span class="token function">next_string</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token operator">?</span><span class="token punctuation">;</span>
- <span class="token class-name">Ok</span><span class="token punctuation">(</span><span class="token class-name">Get</span> <span class="token punctuation">{</span> key <span class="token punctuation">}</span><span class="token punctuation">)</span>
- <span class="token punctuation">}</span>
- <span class="token comment">/// 对数据库执行Get命令</span>
- <span class="token attribute attr-name">#[instrument(skip(self, db, dst))]</span>
- <span class="token keyword">pub</span><span class="token punctuation">(</span><span class="token keyword">crate</span><span class="token punctuation">)</span> <span class="token keyword">async</span> <span class="token keyword">fn</span> <span class="token function-definition function">apply</span><span class="token punctuation">(</span><span class="token keyword">self</span><span class="token punctuation">,</span> db<span class="token punctuation">:</span> <span class="token operator">&</span><span class="token class-name">Db</span><span class="token punctuation">,</span> dst<span class="token punctuation">:</span> <span class="token operator">&</span><span class="token keyword">mut</span> <span class="token class-name">Connection</span><span class="token punctuation">)</span> <span class="token punctuation">-></span> <span class="token keyword">crate</span><span class="token punctuation">::</span><span class="token class-name">Result</span><span class="token operator"><</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token operator">></span> <span class="token punctuation">{</span>
- <span class="token comment">// 从数据库中获取value</span>
- <span class="token comment">// 如果值存在,则将其转成Frame::Bulk形式</span>
- <span class="token comment">// 否则转成Frame::Null</span>
- <span class="token keyword">let</span> response <span class="token operator">=</span> <span class="token keyword">if</span> <span class="token keyword">let</span> <span class="token class-name">Some</span><span class="token punctuation">(</span>value<span class="token punctuation">)</span> <span class="token operator">=</span> db<span class="token punctuation">.</span><span class="token function">get</span><span class="token punctuation">(</span><span class="token operator">&</span><span class="token keyword">self</span><span class="token punctuation">.</span>key<span class="token punctuation">)</span> <span class="token punctuation">{</span>
- <span class="token comment">// If a value is present, it is written to the client in "bulk"</span>
- <span class="token comment">// format.</span>
- <span class="token class-name">Frame</span><span class="token punctuation">::</span><span class="token class-name">Bulk</span><span class="token punctuation">(</span>value<span class="token punctuation">)</span>
- <span class="token punctuation">}</span> <span class="token keyword">else</span> <span class="token punctuation">{</span>
- <span class="token comment">// If there is no value, `Null` is written.</span>
- <span class="token class-name">Frame</span><span class="token punctuation">::</span><span class="token class-name">Null</span>
- <span class="token punctuation">}</span><span class="token punctuation">;</span>
- <span class="token comment">// 日志记录响应</span>
- <span class="token macro property">debug!</span><span class="token punctuation">(</span><span class="token operator">?</span>response<span class="token punctuation">)</span><span class="token punctuation">;</span>
- <span class="token comment">// 将响应写入连接</span>
- dst<span class="token punctuation">.</span><span class="token function">write_frame</span><span class="token punctuation">(</span><span class="token operator">&</span>response<span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span><span class="token operator">?</span><span class="token punctuation">;</span>
- <span class="token class-name">Ok</span><span class="token punctuation">(</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
- <span class="token punctuation">}</span>
- <span class="token comment">/// 将命令转成Frame格式,在客户端将格式化的Get</span>
- <span class="token comment">///命令发给服务端时会调用。</span>
- <span class="token keyword">pub</span><span class="token punctuation">(</span><span class="token keyword">crate</span><span class="token punctuation">)</span> <span class="token keyword">fn</span> <span class="token function-definition function">into_frame</span><span class="token punctuation">(</span><span class="token keyword">self</span><span class="token punctuation">)</span> <span class="token punctuation">-></span> <span class="token class-name">Frame</span> <span class="token punctuation">{</span>
- <span class="token keyword">let</span> <span class="token keyword">mut</span> frame <span class="token operator">=</span> <span class="token class-name">Frame</span><span class="token punctuation">::</span><span class="token function">array</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
- <span class="token comment">/// 将命令以["get",key]的形式放入frame。注意每个元素都要转成字节形式</span>
- <span class="token macro property">todo!</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
- frame
- <span class="token punctuation">}</span>
- <span class="token punctuation">}</span>
- </code></pre></div><h4 id="publish"><a href="#publish" class="header-anchor">#</a> Publish</h4> <p>代码位置:src/cmd/publish.rs</p> <h5 id="struct-8"><a href="#struct-8" class="header-anchor">#</a> Struct</h5> <div class="language-rust extra-class"><pre class="language-rust"><code><span class="token comment">/// 向指定频道推送消息</span>
- <span class="token attribute attr-name">#[derive(Debug)]</span>
- <span class="token keyword">pub</span> <span class="token keyword">struct</span> <span class="token type-definition class-name">Publish</span> <span class="token punctuation">{</span>
- <span class="token comment">/// 频道名</span>
- channel<span class="token punctuation">:</span> <span class="token class-name">String</span><span class="token punctuation">,</span>
- <span class="token comment">/// 要发送的消息</span>
- message<span class="token punctuation">:</span> <span class="token class-name">Bytes</span><span class="token punctuation">,</span>
- <span class="token punctuation">}</span>
- </code></pre></div><h5 id="implementation-7"><a href="#implementation-7" class="header-anchor">#</a> Implementation</h5> <div class="language-rust extra-class"><pre class="language-rust"><code><span class="token keyword">impl</span> <span class="token class-name">Publish</span><span class="token punctuation">{</span>
- <span class="token comment">/// 创建Publish命令</span>
- <span class="token keyword">pub</span><span class="token punctuation">(</span><span class="token keyword">crate</span><span class="token punctuation">)</span> <span class="token keyword">fn</span> <span class="token function-definition function">new</span><span class="token punctuation">(</span>channel<span class="token punctuation">:</span> <span class="token keyword">impl</span> <span class="token class-name">ToString</span><span class="token punctuation">,</span> message<span class="token punctuation">:</span> <span class="token class-name">Bytes</span><span class="token punctuation">)</span> <span class="token punctuation">-></span> <span class="token class-name">Publish</span> <span class="token punctuation">{</span>
- <span class="token class-name">Publish</span> <span class="token punctuation">{</span>
- channel<span class="token punctuation">:</span> channel<span class="token punctuation">.</span><span class="token function">to_string</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">,</span>
- message<span class="token punctuation">,</span>
- <span class="token punctuation">}</span>
- <span class="token punctuation">}</span>
- <span class="token comment">/// 解析frame,并获取对应Publish命令</span>
- <span class="token keyword">pub</span><span class="token punctuation">(</span><span class="token keyword">crate</span><span class="token punctuation">)</span> <span class="token keyword">fn</span> <span class="token function-definition function">parse_frames</span><span class="token punctuation">(</span>parse<span class="token punctuation">:</span> <span class="token operator">&</span><span class="token keyword">mut</span> <span class="token class-name">Parse</span><span class="token punctuation">)</span> <span class="token punctuation">-></span> <span class="token keyword">crate</span><span class="token punctuation">::</span><span class="token class-name">Result</span><span class="token operator"><</span><span class="token class-name">Publish</span><span class="token operator">></span> <span class="token punctuation">{</span>
- <span class="token comment">// 获取string形式的channel</span>
- <span class="token keyword">let</span> channel <span class="token operator">=</span> parse<span class="token punctuation">.</span><span class="token function">next_string</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token operator">?</span><span class="token punctuation">;</span>
- <span class="token comment">// 获取字节形式的message</span>
- <span class="token keyword">let</span> message <span class="token operator">=</span> parse<span class="token punctuation">.</span><span class="token function">next_bytes</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token operator">?</span><span class="token punctuation">;</span>
- <span class="token comment">// 返回对应Publish</span>
- <span class="token class-name">Ok</span><span class="token punctuation">(</span><span class="token class-name">Publish</span> <span class="token punctuation">{</span> channel<span class="token punctuation">,</span> message <span class="token punctuation">}</span><span class="token punctuation">)</span>
- <span class="token punctuation">}</span>
- <span class="token comment">/// 对数据库执行publish命令</span>
- <span class="token keyword">pub</span><span class="token punctuation">(</span><span class="token keyword">crate</span><span class="token punctuation">)</span> <span class="token keyword">async</span> <span class="token keyword">fn</span> <span class="token function-definition function">apply</span><span class="token punctuation">(</span><span class="token keyword">self</span><span class="token punctuation">,</span> db<span class="token punctuation">:</span> <span class="token operator">&</span><span class="token class-name">Db</span><span class="token punctuation">,</span> dst<span class="token punctuation">:</span> <span class="token operator">&</span><span class="token keyword">mut</span> <span class="token class-name">Connection</span><span class="token punctuation">)</span> <span class="token punctuation">-></span> <span class="token keyword">crate</span><span class="token punctuation">::</span><span class="token class-name">Result</span><span class="token operator"><</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token operator">></span><span class="token punctuation">{</span>
- <span class="token comment">// 向数据库连接执行publish操作,并获取到订阅channel的subscriber的数量</span>
- <span class="token keyword">let</span> num_subscribers <span class="token operator">=</span> db<span class="token punctuation">.</span><span class="token function">publish</span><span class="token punctuation">(</span><span class="token operator">&</span><span class="token keyword">self</span><span class="token punctuation">.</span>channel<span class="token punctuation">,</span> <span class="token keyword">self</span><span class="token punctuation">.</span>message<span class="token punctuation">)</span><span class="token punctuation">;</span>
- <span class="token comment">// 将subscriber_num转换成整数帧,并将其写入连接</span>
- <span class="token keyword">let</span> response <span class="token operator">=</span> <span class="token class-name">Frame</span><span class="token punctuation">::</span><span class="token class-name">Integer</span><span class="token punctuation">(</span>num_subscribers <span class="token keyword">as</span> <span class="token keyword">u64</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
- dst<span class="token punctuation">.</span><span class="token function">write_frame</span><span class="token punctuation">(</span><span class="token operator">&</span>response<span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span><span class="token operator">?</span><span class="token punctuation">;</span>
- <span class="token class-name">Ok</span><span class="token punctuation">(</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
- <span class="token punctuation">}</span>
- <span class="token comment">/// 将publish命令转成frame的形式</span>
- <span class="token keyword">pub</span><span class="token punctuation">(</span><span class="token keyword">crate</span><span class="token punctuation">)</span> <span class="token keyword">fn</span> <span class="token function-definition function">into_frame</span><span class="token punctuation">(</span><span class="token keyword">self</span><span class="token punctuation">)</span> <span class="token punctuation">-></span> <span class="token class-name">Frame</span> <span class="token punctuation">{</span>
- <span class="token keyword">let</span> <span class="token keyword">mut</span> frame <span class="token operator">=</span> <span class="token class-name">Frame</span><span class="token punctuation">::</span><span class="token function">array</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
- <span class="token comment">// 将publish命令以["publish",channel,message]的形式写入数组。注意每个元素都要转成字节形式</span>
- frame<span class="token punctuation">.</span><span class="token function">push_bulk</span><span class="token punctuation">(</span><span class="token class-name">Bytes</span><span class="token punctuation">::</span><span class="token function">from</span><span class="token punctuation">(</span><span class="token string">"publish"</span><span class="token punctuation">.</span><span class="token function">as_bytes</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
- frame<span class="token punctuation">.</span><span class="token function">push_bulk</span><span class="token punctuation">(</span><span class="token class-name">Bytes</span><span class="token punctuation">::</span><span class="token function">from</span><span class="token punctuation">(</span><span class="token keyword">self</span><span class="token punctuation">.</span>channel<span class="token punctuation">.</span><span class="token function">into_bytes</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
- frame<span class="token punctuation">.</span><span class="token function">push_bulk</span><span class="token punctuation">(</span><span class="token keyword">self</span><span class="token punctuation">.</span>message<span class="token punctuation">)</span><span class="token punctuation">;</span>
- frame
- <span class="token punctuation">}</span>
- <span class="token punctuation">}</span>
- </code></pre></div><h4 id="set"><a href="#set" class="header-anchor">#</a> Set</h4> <p>代码位置:src/cmd/set.rs</p> <h5 id="struct-9"><a href="#struct-9" class="header-anchor">#</a> Struct</h5> <div class="language-rust extra-class"><pre class="language-rust"><code><span class="token comment">/// 设置key的值</span>
- <span class="token attribute attr-name">#[derive(Debug)]</span>
- <span class="token keyword">pub</span> <span class="token keyword">struct</span> <span class="token type-definition class-name">Set</span> <span class="token punctuation">{</span>
- <span class="token comment">/// the lookup key</span>
- key<span class="token punctuation">:</span> <span class="token class-name">String</span><span class="token punctuation">,</span>
- <span class="token comment">/// the value to be stored</span>
- value<span class="token punctuation">:</span> <span class="token class-name">Bytes</span><span class="token punctuation">,</span>
- <span class="token comment">/// When to expire the key</span>
- expire<span class="token punctuation">:</span> <span class="token class-name">Option</span><span class="token operator"><</span><span class="token class-name">Duration</span><span class="token operator">></span><span class="token punctuation">,</span>
- <span class="token punctuation">}</span>
- </code></pre></div><h5 id="implementation-8"><a href="#implementation-8" class="header-anchor">#</a> Implementation</h5> <div class="language-rust extra-class"><pre class="language-rust"><code><span class="token keyword">impl</span> <span class="token class-name">Set</span><span class="token punctuation">{</span>
- <span class="token comment">/// 创建一个set命令</span>
- <span class="token comment">/// </span>
- <span class="token comment">/// 如果不需要过期时间则expire为None</span>
- <span class="token keyword">pub</span> <span class="token keyword">fn</span> <span class="token function-definition function">new</span><span class="token punctuation">(</span>key<span class="token punctuation">:</span> <span class="token keyword">impl</span> <span class="token class-name">ToString</span><span class="token punctuation">,</span> value<span class="token punctuation">:</span> <span class="token class-name">Bytes</span><span class="token punctuation">,</span> expire<span class="token punctuation">:</span> <span class="token class-name">Option</span><span class="token operator"><</span><span class="token class-name">Duration</span><span class="token operator">></span><span class="token punctuation">)</span> <span class="token punctuation">-></span> <span class="token class-name">Set</span> <span class="token punctuation">{</span>
- <span class="token class-name">Set</span> <span class="token punctuation">{</span>
- key<span class="token punctuation">:</span> key<span class="token punctuation">.</span><span class="token function">to_string</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">,</span>
- value<span class="token punctuation">,</span>
- expire<span class="token punctuation">,</span>
- <span class="token punctuation">}</span>
- <span class="token punctuation">}</span>
- <span class="token comment">/// Get the key</span>
- <span class="token keyword">pub</span> <span class="token keyword">fn</span> <span class="token function-definition function">key</span><span class="token punctuation">(</span><span class="token operator">&</span><span class="token keyword">self</span><span class="token punctuation">)</span> <span class="token punctuation">-></span> <span class="token operator">&</span><span class="token keyword">str</span> <span class="token punctuation">{</span>
- <span class="token operator">&</span><span class="token keyword">self</span><span class="token punctuation">.</span>key
- <span class="token punctuation">}</span>
- <span class="token comment">/// Get the value</span>
- <span class="token keyword">pub</span> <span class="token keyword">fn</span> <span class="token function-definition function">value</span><span class="token punctuation">(</span><span class="token operator">&</span><span class="token keyword">self</span><span class="token punctuation">)</span> <span class="token punctuation">-></span> <span class="token operator">&</span><span class="token class-name">Bytes</span> <span class="token punctuation">{</span>
- <span class="token operator">&</span><span class="token keyword">self</span><span class="token punctuation">.</span>value
- <span class="token punctuation">}</span>
- <span class="token comment">/// Get the expire</span>
- <span class="token keyword">pub</span> <span class="token keyword">fn</span> <span class="token function-definition function">expire</span><span class="token punctuation">(</span><span class="token operator">&</span><span class="token keyword">self</span><span class="token punctuation">)</span> <span class="token punctuation">-></span> <span class="token class-name">Option</span><span class="token operator"><</span><span class="token class-name">Duration</span><span class="token operator">></span> <span class="token punctuation">{</span>
- <span class="token keyword">self</span><span class="token punctuation">.</span>expire
- <span class="token punctuation">}</span>
- <span class="token comment">/// 解析frame</span>
- <span class="token keyword">pub</span><span class="token punctuation">(</span><span class="token keyword">crate</span><span class="token punctuation">)</span> <span class="token keyword">fn</span> <span class="token function-definition function">parse_frames</span><span class="token punctuation">(</span>parse<span class="token punctuation">:</span> <span class="token operator">&</span><span class="token keyword">mut</span> <span class="token class-name">Parse</span><span class="token punctuation">)</span> <span class="token punctuation">-></span> <span class="token keyword">crate</span><span class="token punctuation">::</span><span class="token class-name">Result</span><span class="token operator"><</span><span class="token class-name">Set</span><span class="token operator">></span> <span class="token punctuation">{</span>
- <span class="token keyword">use</span> <span class="token class-name">ParseError</span><span class="token punctuation">::</span><span class="token class-name">EndOfStream</span><span class="token punctuation">;</span>
- <span class="token comment">// 使用 parse.next_string() 方法读取下一个字符串字段作为键(key)。</span>
- <span class="token keyword">let</span> key <span class="token operator">=</span> parse<span class="token punctuation">.</span><span class="token function">next_string</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token operator">?</span><span class="token punctuation">;</span>
- <span class="token comment">// 使用 parse.next_bytes() 方法读取下一个字节字段作为值(value)。</span>
- <span class="token keyword">let</span> value <span class="token operator">=</span> parse<span class="token punctuation">.</span><span class="token function">next_bytes</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token operator">?</span><span class="token punctuation">;</span>
- <span class="token comment">// 创建一个mut expire,用于存储过期时间。初始值为 None。</span>
- <span class="token keyword">let</span> <span class="token keyword">mut</span> expire <span class="token operator">=</span> <span class="token class-name">None</span><span class="token punctuation">;</span>
- <span class="token comment">// 分析其他情况的帧</span>
- <span class="token keyword">match</span> parse<span class="token punctuation">.</span><span class="token function">next_string</span><span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token punctuation">{</span>
- <span class="token class-name">Ok</span><span class="token punctuation">(</span>s<span class="token punctuation">)</span> <span class="token keyword">if</span> s<span class="token punctuation">.</span><span class="token function">to_uppercase</span><span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token operator">==</span> <span class="token string">"EX"</span> <span class="token operator">=></span> <span class="token punctuation">{</span>
- <span class="token comment">// 如果下一个字符串字段是 "EX"(不区分大小写),则表示设置了过期时间且单位是secs</span>
- <span class="token comment">// 下一个值应该是一个整数,并转成Duration类型。</span>
- <span class="token keyword">let</span> secs <span class="token operator">=</span> parse<span class="token punctuation">.</span><span class="token function">next_int</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token operator">?</span><span class="token punctuation">;</span>
- expire <span class="token operator">=</span> <span class="token class-name">Some</span><span class="token punctuation">(</span><span class="token class-name">Duration</span><span class="token punctuation">::</span><span class="token function">from_secs</span><span class="token punctuation">(</span>secs<span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
- <span class="token punctuation">}</span>
- <span class="token class-name">Ok</span><span class="token punctuation">(</span>s<span class="token punctuation">)</span> <span class="token keyword">if</span> s<span class="token punctuation">.</span><span class="token function">to_uppercase</span><span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token operator">==</span> <span class="token string">"PX"</span> <span class="token operator">=></span> <span class="token punctuation">{</span>
- <span class="token comment">// 如果下一个字符串字段是 "PX"(不区分大小写),则表示设置了过期时间且单位是millis</span>
- <span class="token comment">// 下一个值应该是一个整数,并转成Duration类型。</span>
- <span class="token keyword">let</span> ms <span class="token operator">=</span> parse<span class="token punctuation">.</span><span class="token function">next_int</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token operator">?</span><span class="token punctuation">;</span>
- expire <span class="token operator">=</span> <span class="token class-name">Some</span><span class="token punctuation">(</span><span class="token class-name">Duration</span><span class="token punctuation">::</span><span class="token function">from_millis</span><span class="token punctuation">(</span>ms<span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
- <span class="token punctuation">}</span>
- <span class="token comment">// 如果下一个字符串字段不是 "EX" 或 "PX",则表示设置了不支持的选项,将返回一个错误,指示 SET 目前仅支持过期时间选项。</span>
- <span class="token class-name">Ok</span><span class="token punctuation">(</span>_<span class="token punctuation">)</span> <span class="token operator">=></span> <span class="token keyword">return</span> <span class="token class-name">Err</span><span class="token punctuation">(</span><span class="token string">"currently `SET` only supports the expiration option"</span><span class="token punctuation">.</span><span class="token function">into</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">,</span>
- <span class="token comment">// 如果解析到达流的末尾,表示没有进一步的数据需要解析,这是正常的运行时情况,表示没有指定 SET 的选项。</span>
- <span class="token class-name">Err</span><span class="token punctuation">(</span><span class="token class-name">EndOfStream</span><span class="token punctuation">)</span> <span class="token operator">=></span> <span class="token punctuation">{</span><span class="token punctuation">}</span>
- <span class="token comment">// 其他所有错误都会返回一个错误,导致连接被终止。</span>
- <span class="token class-name">Err</span><span class="token punctuation">(</span>err<span class="token punctuation">)</span> <span class="token operator">=></span> <span class="token keyword">return</span> <span class="token class-name">Err</span><span class="token punctuation">(</span>err<span class="token punctuation">.</span><span class="token function">into</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">,</span>
- <span class="token punctuation">}</span>
- <span class="token class-name">Ok</span><span class="token punctuation">(</span><span class="token class-name">Set</span> <span class="token punctuation">{</span> key<span class="token punctuation">,</span> value<span class="token punctuation">,</span> expire <span class="token punctuation">}</span><span class="token punctuation">)</span>
- <span class="token punctuation">}</span>
- <span class="token comment">/// 对数据库连接执行set命令</span>
- <span class="token attribute attr-name">#[instrument(skip(self, db, dst))]</span>
- <span class="token keyword">pub</span><span class="token punctuation">(</span><span class="token keyword">crate</span><span class="token punctuation">)</span> <span class="token keyword">async</span> <span class="token keyword">fn</span> <span class="token function-definition function">apply</span><span class="token punctuation">(</span><span class="token keyword">self</span><span class="token punctuation">,</span> db<span class="token punctuation">:</span> <span class="token operator">&</span><span class="token class-name">Db</span><span class="token punctuation">,</span> dst<span class="token punctuation">:</span> <span class="token operator">&</span><span class="token keyword">mut</span> <span class="token class-name">Connection</span><span class="token punctuation">)</span> <span class="token punctuation">-></span> <span class="token keyword">crate</span><span class="token punctuation">::</span><span class="token class-name">Result</span><span class="token operator"><</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token operator">></span> <span class="token punctuation">{</span>
- <span class="token comment">// 执行set</span>
- db<span class="token punctuation">.</span><span class="token function">set</span><span class="token punctuation">(</span><span class="token keyword">self</span><span class="token punctuation">.</span>key<span class="token punctuation">,</span> <span class="token keyword">self</span><span class="token punctuation">.</span>value<span class="token punctuation">,</span> <span class="token keyword">self</span><span class="token punctuation">.</span>expire<span class="token punctuation">)</span><span class="token punctuation">;</span>
- <span class="token comment">// 创建执行成功响应,并写给连接</span>
- <span class="token keyword">let</span> response <span class="token operator">=</span> <span class="token class-name">Frame</span><span class="token punctuation">::</span><span class="token class-name">Simple</span><span class="token punctuation">(</span><span class="token string">"OK"</span><span class="token punctuation">.</span><span class="token function">to_string</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
- <span class="token macro property">debug!</span><span class="token punctuation">(</span><span class="token operator">?</span>response<span class="token punctuation">)</span><span class="token punctuation">;</span>
- dst<span class="token punctuation">.</span><span class="token function">write_frame</span><span class="token punctuation">(</span><span class="token operator">&</span>response<span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span><span class="token operator">?</span><span class="token punctuation">;</span>
- <span class="token class-name">Ok</span><span class="token punctuation">(</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
- <span class="token punctuation">}</span>
- <span class="token comment">/// 将set命令转成frame形式</span>
- <span class="token keyword">pub</span><span class="token punctuation">(</span><span class="token keyword">crate</span><span class="token punctuation">)</span> <span class="token keyword">fn</span> <span class="token function-definition function">into_frame</span><span class="token punctuation">(</span><span class="token keyword">self</span><span class="token punctuation">)</span> <span class="token punctuation">-></span> <span class="token class-name">Frame</span> <span class="token punctuation">{</span>
- <span class="token keyword">let</span> <span class="token keyword">mut</span> frame <span class="token operator">=</span> <span class="token class-name">Frame</span><span class="token punctuation">::</span><span class="token function">array</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
- <span class="token comment">// 将命令以["set",key,value]的形式写入frame</span>
- frame<span class="token punctuation">.</span><span class="token function">push_bulk</span><span class="token punctuation">(</span><span class="token class-name">Bytes</span><span class="token punctuation">::</span><span class="token function">from</span><span class="token punctuation">(</span><span class="token string">"set"</span><span class="token punctuation">.</span><span class="token function">as_bytes</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
- frame<span class="token punctuation">.</span><span class="token function">push_bulk</span><span class="token punctuation">(</span><span class="token class-name">Bytes</span><span class="token punctuation">::</span><span class="token function">from</span><span class="token punctuation">(</span><span class="token keyword">self</span><span class="token punctuation">.</span>key<span class="token punctuation">.</span><span class="token function">into_bytes</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
- frame<span class="token punctuation">.</span><span class="token function">push_bulk</span><span class="token punctuation">(</span><span class="token keyword">self</span><span class="token punctuation">.</span>value<span class="token punctuation">)</span><span class="token punctuation">;</span>
- <span class="token comment">// 判断是否有过期时间,如果有过期时间以["px",ms]的格式写入frame</span>
- <span class="token keyword">if</span> <span class="token keyword">let</span> <span class="token class-name">Some</span><span class="token punctuation">(</span>ms<span class="token punctuation">)</span> <span class="token operator">=</span> <span class="token keyword">self</span><span class="token punctuation">.</span>expire <span class="token punctuation">{</span>
- <span class="token comment">// 可以用以下两种方式设置过期时间,在这里使用第二种,因为精度更高</span>
- <span class="token comment">// 1. SET key value EX seconds</span>
- <span class="token comment">// 2. SET key value PX milliseconds</span>
- frame<span class="token punctuation">.</span><span class="token function">push_bulk</span><span class="token punctuation">(</span><span class="token class-name">Bytes</span><span class="token punctuation">::</span><span class="token function">from</span><span class="token punctuation">(</span><span class="token string">"px"</span><span class="token punctuation">.</span><span class="token function">as_bytes</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
- frame<span class="token punctuation">.</span><span class="token function">push_int</span><span class="token punctuation">(</span>ms<span class="token punctuation">.</span><span class="token function">as_millis</span><span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token keyword">as</span> <span class="token keyword">u64</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
- <span class="token punctuation">}</span>
- frame
- <span class="token punctuation">}</span>
- <span class="token punctuation">}</span>
- </code></pre></div><h4 id="subscribe-message"><a href="#subscribe-message" class="header-anchor">#</a> Subscribe+Message</h4> <p>代码位置:src/cmd/subscribe.rs</p> <h5 id="struct-10"><a href="#struct-10" class="header-anchor">#</a> Struct</h5> <div class="language-rust extra-class"><pre class="language-rust"><code><span class="token comment">/// 订阅命令</span>
- <span class="token comment">/// </span>
- <span class="token comment">/// 一旦客户端进入订阅状态,它只能提出 </span>
- <span class="token comment">/// SUBSCRIBE, PSUBSCRIBE, UNSUBSCRIBE,</span>
- <span class="token comment">/// PUNSUBSCRIBE, PING 和 QUIT命令</span>
- <span class="token attribute attr-name">#[derive(Debug)]</span>
- <span class="token keyword">pub</span> <span class="token keyword">struct</span> <span class="token type-definition class-name">Subscribe</span> <span class="token punctuation">{</span>
- channels<span class="token punctuation">:</span> <span class="token class-name">Vec</span><span class="token operator"><</span><span class="token class-name">String</span><span class="token operator">></span><span class="token punctuation">,</span>
- <span class="token punctuation">}</span>
- <span class="token comment">/// 信息流</span>
- <span class="token keyword">type</span> <span class="token type-definition class-name">Messages</span> <span class="token operator">=</span> <span class="token class-name">Pin</span><span class="token operator"><</span><span class="token class-name">Box</span><span class="token operator"><</span><span class="token keyword">dyn</span> <span class="token class-name">Stream</span><span class="token operator"><</span><span class="token class-name">Item</span> <span class="token operator">=</span> <span class="token class-name">Bytes</span><span class="token operator">></span> <span class="token operator">+</span> <span class="token class-name">Send</span><span class="token operator">>></span><span class="token punctuation">;</span>
- </code></pre></div><h5 id="implementation-9"><a href="#implementation-9" class="header-anchor">#</a> Implementation</h5> <div class="language-rust extra-class"><pre class="language-rust"><code><span class="token keyword">impl</span> <span class="token class-name">Subscribe</span><span class="token punctuation">{</span>
- <span class="token comment">/// 创建一个新的订阅者以监听频道</span>
- <span class="token keyword">pub</span><span class="token punctuation">(</span><span class="token keyword">crate</span><span class="token punctuation">)</span> <span class="token keyword">fn</span> <span class="token function-definition function">new</span><span class="token punctuation">(</span>channels<span class="token punctuation">:</span> <span class="token operator">&</span><span class="token punctuation">[</span><span class="token class-name">String</span><span class="token punctuation">]</span><span class="token punctuation">)</span> <span class="token punctuation">-></span> <span class="token class-name">Subscribe</span> <span class="token punctuation">{</span>
- <span class="token class-name">Subscribe</span> <span class="token punctuation">{</span>
- channels<span class="token punctuation">:</span> channels<span class="token punctuation">.</span><span class="token function">to_vec</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">,</span>
- <span class="token punctuation">}</span>
- <span class="token punctuation">}</span>
- <span class="token comment">/// 解析subscribe帧</span>
- <span class="token keyword">pub</span><span class="token punctuation">(</span><span class="token keyword">crate</span><span class="token punctuation">)</span> <span class="token keyword">fn</span> <span class="token function-definition function">parse_frames</span><span class="token punctuation">(</span>parse<span class="token punctuation">:</span> <span class="token operator">&</span><span class="token keyword">mut</span> <span class="token class-name">Parse</span><span class="token punctuation">)</span> <span class="token punctuation">-></span> <span class="token keyword">crate</span><span class="token punctuation">::</span><span class="token class-name">Result</span><span class="token operator"><</span><span class="token class-name">Subscribe</span><span class="token operator">></span> <span class="token punctuation">{</span>
- <span class="token keyword">use</span> <span class="token class-name">ParseError</span><span class="token punctuation">::</span><span class="token class-name">EndOfStream</span><span class="token punctuation">;</span>
- <span class="token comment">// 获取第一个channel,并判断是否为none,不为none则写入数组</span>
- <span class="token keyword">let</span> <span class="token keyword">mut</span> channels <span class="token operator">=</span> <span class="token macro property">vec!</span><span class="token punctuation">[</span>parse<span class="token punctuation">.</span><span class="token function">next_string</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token operator">?</span><span class="token punctuation">]</span><span class="token punctuation">;</span>
- <span class="token comment">// 循环解析剩下的channel,并将其写入数组</span>
- <span class="token keyword">loop</span> <span class="token punctuation">{</span>
- <span class="token keyword">match</span> parse<span class="token punctuation">.</span><span class="token function">next_string</span><span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token punctuation">{</span>
- <span class="token comment">// 推送消息给频道</span>
- <span class="token class-name">Ok</span><span class="token punctuation">(</span>s<span class="token punctuation">)</span> <span class="token operator">=></span> channels<span class="token punctuation">.</span><span class="token function">push</span><span class="token punctuation">(</span>s<span class="token punctuation">)</span><span class="token punctuation">,</span>
- <span class="token comment">// EndOfStream说明不会再有消息了</span>
- <span class="token class-name">Err</span><span class="token punctuation">(</span><span class="token class-name">EndOfStream</span><span class="token punctuation">)</span> <span class="token operator">=></span> <span class="token keyword">break</span><span class="token punctuation">,</span>
- <span class="token comment">// 错误报告</span>
- <span class="token class-name">Err</span><span class="token punctuation">(</span>err<span class="token punctuation">)</span> <span class="token operator">=></span> <span class="token keyword">return</span> <span class="token class-name">Err</span><span class="token punctuation">(</span>err<span class="token punctuation">.</span><span class="token function">into</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">,</span>
- <span class="token punctuation">}</span>
- <span class="token punctuation">}</span>
- <span class="token comment">// 返回结果</span>
- <span class="token class-name">Ok</span><span class="token punctuation">(</span><span class="token class-name">Subscribe</span> <span class="token punctuation">{</span> channels <span class="token punctuation">}</span><span class="token punctuation">)</span>
- <span class="token punctuation">}</span>
- <span class="token comment">/// 数据库执行subscribe命令</span>
- <span class="token keyword">pub</span><span class="token punctuation">(</span><span class="token keyword">crate</span><span class="token punctuation">)</span> <span class="token keyword">async</span> <span class="token keyword">fn</span> <span class="token function-definition function">apply</span><span class="token punctuation">(</span>
- <span class="token keyword">mut</span> <span class="token keyword">self</span><span class="token punctuation">,</span>
- db<span class="token punctuation">:</span> <span class="token operator">&</span><span class="token class-name">Db</span><span class="token punctuation">,</span>
- dst<span class="token punctuation">:</span> <span class="token operator">&</span><span class="token keyword">mut</span> <span class="token class-name">Connection</span><span class="token punctuation">,</span>
- shutdown<span class="token punctuation">:</span> <span class="token operator">&</span><span class="token keyword">mut</span> <span class="token class-name">Shutdown</span><span class="token punctuation">,</span>
- <span class="token punctuation">)</span> <span class="token punctuation">-></span> <span class="token keyword">crate</span><span class="token punctuation">::</span><span class="token class-name">Result</span><span class="token operator"><</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token operator">></span> <span class="token punctuation">{</span>
- <span class="token comment">// 创建一个 StreamMap 对象,用于跟踪活动的订阅。使用map可以防止订阅重复</span>
- <span class="token keyword">let</span> <span class="token keyword">mut</span> subscriptions <span class="token operator">=</span> <span class="token class-name">StreamMap</span><span class="token punctuation">::</span><span class="token function">new</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
- <span class="token keyword">loop</span> <span class="token punctuation">{</span>
- <span class="token comment">// 遍历channels,drain(..)会将管道从channels中移除</span>
- <span class="token comment">// subscribe_to_channel会订阅传入的管道</span>
- <span class="token comment">// await等待管道订阅成功</span>
- <span class="token keyword">for</span> channel_name <span class="token keyword">in</span> <span class="token keyword">self</span><span class="token punctuation">.</span>channels<span class="token punctuation">.</span><span class="token function">drain</span><span class="token punctuation">(</span><span class="token punctuation">..</span><span class="token punctuation">)</span> <span class="token punctuation">{</span>
- <span class="token function">subscribe_to_channel</span><span class="token punctuation">(</span>channel_name<span class="token punctuation">,</span> <span class="token operator">&</span><span class="token keyword">mut</span> subscriptions<span class="token punctuation">,</span> db<span class="token punctuation">,</span> dst<span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span><span class="token operator">?</span><span class="token punctuation">;</span>
- <span class="token punctuation">}</span>
- <span class="token comment">// 使用 select! 宏等待以下几种情况中的任何一种发生:</span>
- <span class="token comment">// 1. 从已订阅的通道接收到消息。</span>
- <span class="token comment">// 2. 从客户端接收到订阅或取消订阅命令。</span>
- <span class="token comment">// 3. 收到服务器关闭信号。</span>
- <span class="token macro property">select!</span> <span class="token punctuation">{</span>
- <span class="token comment">// 接收来自已订阅通道的消息。</span>
- <span class="token comment">// subscriptions.next() 返回一个异步迭代器</span>
- <span class="token comment">// 当有新的消息到达时,会返回 Some((channel_name, msg))</span>
- <span class="token comment">// 其中 channel_name 是通道名称,msg 是消息内容。</span>
- <span class="token class-name">Some</span><span class="token punctuation">(</span><span class="token punctuation">(</span>channel_name<span class="token punctuation">,</span> msg<span class="token punctuation">)</span><span class="token punctuation">)</span> <span class="token operator">=</span> subscriptions<span class="token punctuation">.</span><span class="token function">next</span><span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token operator">=></span> <span class="token punctuation">{</span>
- <span class="token comment">// 将收到的消息构造成消息帧写给客户端</span>
- dst<span class="token punctuation">.</span><span class="token function">write_frame</span><span class="token punctuation">(</span><span class="token operator">&</span><span class="token function">make_message_frame</span><span class="token punctuation">(</span>channel_name<span class="token punctuation">,</span> msg<span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span><span class="token operator">?</span><span class="token punctuation">;</span>
- <span class="token punctuation">}</span>
- <span class="token comment">// dst.read_frame() 返回一个异步操作结果,</span>
- <span class="token comment">//当有新的帧可用时,会返回 Some(frame),其中 frame 是读取到的帧。如果客户端断开连接,会返回 None。</span>
- res <span class="token operator">=</span> dst<span class="token punctuation">.</span><span class="token function">read_frame</span><span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token operator">=></span> <span class="token punctuation">{</span>
- <span class="token keyword">let</span> frame <span class="token operator">=</span> <span class="token keyword">match</span> res<span class="token operator">?</span> <span class="token punctuation">{</span>
- <span class="token class-name">Some</span><span class="token punctuation">(</span>frame<span class="token punctuation">)</span> <span class="token operator">=></span> frame<span class="token punctuation">,</span>
- <span class="token comment">// This happens if the remote client has disconnected.</span>
- <span class="token class-name">None</span> <span class="token operator">=></span> <span class="token keyword">return</span> <span class="token class-name">Ok</span><span class="token punctuation">(</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
- <span class="token punctuation">}</span><span class="token punctuation">;</span>
- <span class="token comment">// 处理从客户端接收到的命令帧</span>
- <span class="token function">handle_command</span><span class="token punctuation">(</span>
- frame<span class="token punctuation">,</span>
- <span class="token operator">&</span><span class="token keyword">mut</span> <span class="token keyword">self</span><span class="token punctuation">.</span>channels<span class="token punctuation">,</span>
- <span class="token operator">&</span><span class="token keyword">mut</span> subscriptions<span class="token punctuation">,</span>
- dst<span class="token punctuation">,</span>
- <span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span><span class="token operator">?</span><span class="token punctuation">;</span>
- <span class="token punctuation">}</span>
- <span class="token comment">// 接收服务器关闭信号。</span>
- _ <span class="token operator">=</span> shutdown<span class="token punctuation">.</span><span class="token function">recv</span><span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token operator">=></span> <span class="token punctuation">{</span>
- <span class="token keyword">return</span> <span class="token class-name">Ok</span><span class="token punctuation">(</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
- <span class="token punctuation">}</span>
- <span class="token punctuation">}</span><span class="token punctuation">;</span>
- <span class="token punctuation">}</span>
- <span class="token punctuation">}</span>
- <span class="token comment">/// 将命令转为frame类型</span>
- <span class="token keyword">pub</span><span class="token punctuation">(</span><span class="token keyword">crate</span><span class="token punctuation">)</span> <span class="token keyword">fn</span> <span class="token function-definition function">into_frame</span><span class="token punctuation">(</span><span class="token keyword">self</span><span class="token punctuation">)</span> <span class="token punctuation">-></span> <span class="token class-name">Frame</span> <span class="token punctuation">{</span>
- <span class="token keyword">let</span> <span class="token keyword">mut</span> frame <span class="token operator">=</span> <span class="token class-name">Frame</span><span class="token punctuation">::</span><span class="token function">array</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
- <span class="token comment">// 将命令以["subscribe",channel1,channel2...]的形</span>
- <span class="token comment">//式放入数组</span>
- frame<span class="token punctuation">.</span><span class="token function">push_bulk</span><span class="token punctuation">(</span><span class="token class-name">Bytes</span><span class="token punctuation">::</span><span class="token function">from</span><span class="token punctuation">(</span><span class="token string">"unsubscribe"</span><span class="token punctuation">.</span><span class="token function">as_bytes</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
- <span class="token keyword">for</span> channel <span class="token keyword">in</span> <span class="token keyword">self</span><span class="token punctuation">.</span>channels <span class="token punctuation">{</span>
- frame<span class="token punctuation">.</span><span class="token function">push_bulk</span><span class="token punctuation">(</span><span class="token class-name">Bytes</span><span class="token punctuation">::</span><span class="token function">from</span><span class="token punctuation">(</span>channel<span class="token punctuation">.</span><span class="token function">into_bytes</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
- <span class="token punctuation">}</span>
- frame
- <span class="token punctuation">}</span>
- <span class="token punctuation">}</span>
- </code></pre></div><h5 id="function"><a href="#function" class="header-anchor">#</a> Function</h5> <div class="language-rust extra-class"><pre class="language-rust"><code><span class="token keyword">async</span> <span class="token keyword">fn</span> <span class="token function-definition function">subscribe_to_channel</span><span class="token punctuation">(</span>
- channel_name<span class="token punctuation">:</span> <span class="token class-name">String</span><span class="token punctuation">,</span>
- subscriptions<span class="token punctuation">:</span> <span class="token operator">&</span><span class="token keyword">mut</span> <span class="token class-name">StreamMap</span><span class="token operator"><</span><span class="token class-name">String</span><span class="token punctuation">,</span> <span class="token class-name">Messages</span><span class="token operator">></span><span class="token punctuation">,</span>
- db<span class="token punctuation">:</span> <span class="token operator">&</span><span class="token class-name">Db</span><span class="token punctuation">,</span>
- dst<span class="token punctuation">:</span> <span class="token operator">&</span><span class="token keyword">mut</span> <span class="token class-name">Connection</span><span class="token punctuation">,</span>
- <span class="token punctuation">)</span> <span class="token punctuation">-></span> <span class="token keyword">crate</span><span class="token punctuation">::</span><span class="token class-name">Result</span><span class="token operator"><</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token operator">></span> <span class="token punctuation">{</span>
- <span class="token comment">// 向数据发送订阅请求</span>
- <span class="token keyword">let</span> <span class="token keyword">mut</span> rx <span class="token operator">=</span> db<span class="token punctuation">.</span><span class="token function">subscribe</span><span class="token punctuation">(</span>channel_name<span class="token punctuation">.</span><span class="token function">clone</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
- <span class="token comment">// 接受订阅回复</span>
- <span class="token keyword">let</span> rx <span class="token operator">=</span> <span class="token class-name">Box</span><span class="token punctuation">::</span><span class="token function">pin</span><span class="token punctuation">(</span><span class="token namespace">async_stream<span class="token punctuation">::</span></span><span class="token macro property">stream!</span> <span class="token punctuation">{</span>
- <span class="token keyword">loop</span> <span class="token punctuation">{</span>
- <span class="token keyword">match</span> rx<span class="token punctuation">.</span><span class="token function">recv</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span> <span class="token punctuation">{</span>
- <span class="token class-name">Ok</span><span class="token punctuation">(</span>msg<span class="token punctuation">)</span> <span class="token operator">=></span> <span class="token keyword">yield</span> msg<span class="token punctuation">,</span>
- <span class="token comment">// If we lagged in consuming messages, just resume.</span>
- <span class="token class-name">Err</span><span class="token punctuation">(</span><span class="token namespace">broadcast<span class="token punctuation">::</span>error<span class="token punctuation">::</span></span><span class="token class-name">RecvError</span><span class="token punctuation">::</span><span class="token class-name">Lagged</span><span class="token punctuation">(</span>_<span class="token punctuation">)</span><span class="token punctuation">)</span> <span class="token operator">=></span> <span class="token punctuation">{</span><span class="token punctuation">}</span>
- <span class="token class-name">Err</span><span class="token punctuation">(</span>_<span class="token punctuation">)</span> <span class="token operator">=></span> <span class="token keyword">break</span><span class="token punctuation">,</span>
- <span class="token punctuation">}</span>
- <span class="token punctuation">}</span>
- <span class="token punctuation">}</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
- <span class="token comment">// 将订阅成功的管道加入到订阅map中</span>
- subscriptions<span class="token punctuation">.</span><span class="token function">insert</span><span class="token punctuation">(</span>channel_name<span class="token punctuation">.</span><span class="token function">clone</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">,</span> rx<span class="token punctuation">)</span><span class="token punctuation">;</span>
- <span class="token comment">// 响应成功订阅报文</span>
- <span class="token keyword">let</span> response <span class="token operator">=</span> <span class="token function">make_subscribe_frame</span><span class="token punctuation">(</span>channel_name<span class="token punctuation">,</span> subscriptions<span class="token punctuation">.</span><span class="token function">len</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
- dst<span class="token punctuation">.</span><span class="token function">write_frame</span><span class="token punctuation">(</span><span class="token operator">&</span>response<span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span><span class="token operator">?</span><span class="token punctuation">;</span>
- <span class="token class-name">Ok</span><span class="token punctuation">(</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
- <span class="token punctuation">}</span>
- <span class="token comment">/// 处理subscribe和unsubscibe的逻辑</span>
- <span class="token keyword">async</span> <span class="token keyword">fn</span> <span class="token function-definition function">handle_command</span><span class="token punctuation">(</span>
- frame<span class="token punctuation">:</span> <span class="token class-name">Frame</span><span class="token punctuation">,</span>
- subscribe_to<span class="token punctuation">:</span> <span class="token operator">&</span><span class="token keyword">mut</span> <span class="token class-name">Vec</span><span class="token operator"><</span><span class="token class-name">String</span><span class="token operator">></span><span class="token punctuation">,</span>
- subscriptions<span class="token punctuation">:</span> <span class="token operator">&</span><span class="token keyword">mut</span> <span class="token class-name">StreamMap</span><span class="token operator"><</span><span class="token class-name">String</span><span class="token punctuation">,</span> <span class="token class-name">Messages</span><span class="token operator">></span><span class="token punctuation">,</span>
- dst<span class="token punctuation">:</span> <span class="token operator">&</span><span class="token keyword">mut</span> <span class="token class-name">Connection</span><span class="token punctuation">,</span>
- <span class="token punctuation">)</span> <span class="token punctuation">-></span> <span class="token keyword">crate</span><span class="token punctuation">::</span><span class="token class-name">Result</span><span class="token operator"><</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token operator">></span> <span class="token punctuation">{</span>
- <span class="token comment">// 匹配命令,判断要执行的操作</span>
- <span class="token keyword">match</span> <span class="token class-name">Command</span><span class="token punctuation">::</span><span class="token function">from_frame</span><span class="token punctuation">(</span>frame<span class="token punctuation">)</span><span class="token operator">?</span> <span class="token punctuation">{</span>
- <span class="token class-name">Command</span><span class="token punctuation">::</span><span class="token class-name">Subscribe</span><span class="token punctuation">(</span>subscribe<span class="token punctuation">)</span> <span class="token operator">=></span> <span class="token punctuation">{</span>
- <span class="token comment">// 将要订阅的频道加入到订阅map </span>
- subscribe_to<span class="token punctuation">.</span><span class="token function">extend</span><span class="token punctuation">(</span>subscribe<span class="token punctuation">.</span>channels<span class="token punctuation">.</span><span class="token function">into_iter</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
- <span class="token punctuation">}</span>
- <span class="token class-name">Command</span><span class="token punctuation">::</span><span class="token class-name">Unsubscribe</span><span class="token punctuation">(</span><span class="token keyword">mut</span> unsubscribe<span class="token punctuation">)</span> <span class="token operator">=></span> <span class="token punctuation">{</span>
- <span class="token comment">// 判断是否指定了要取消订阅的频道</span>
- <span class="token comment">// 如果没有则需要订阅所有channels</span>
- <span class="token keyword">if</span> unsubscribe<span class="token punctuation">.</span>channels<span class="token punctuation">.</span><span class="token function">is_empty</span><span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token punctuation">{</span>
- unsubscribe<span class="token punctuation">.</span>channels <span class="token operator">=</span> subscriptions
- <span class="token punctuation">.</span><span class="token function">keys</span><span class="token punctuation">(</span><span class="token punctuation">)</span>
- <span class="token punctuation">.</span><span class="token function">map</span><span class="token punctuation">(</span><span class="token closure-params"><span class="token closure-punctuation punctuation">|</span>channel_name<span class="token closure-punctuation punctuation">|</span></span> channel_name<span class="token punctuation">.</span><span class="token function">to_string</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
- <span class="token punctuation">.</span><span class="token function">collect</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
- <span class="token punctuation">}</span>
- <span class="token comment">// 将需要取消订阅的频道从map中移除</span>
- <span class="token keyword">for</span> channel_name <span class="token keyword">in</span> unsubscribe<span class="token punctuation">.</span>channels <span class="token punctuation">{</span>
- subscriptions<span class="token punctuation">.</span><span class="token function">remove</span><span class="token punctuation">(</span><span class="token operator">&</span>channel_name<span class="token punctuation">)</span><span class="token punctuation">;</span>
- <span class="token comment">// 生成unsubscribe响应</span>
- <span class="token keyword">let</span> response <span class="token operator">=</span> <span class="token function">make_unsubscribe_frame</span><span class="token punctuation">(</span>channel_name<span class="token punctuation">,</span> subscriptions<span class="token punctuation">.</span><span class="token function">len</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
- <span class="token comment">// 写入unsubscribe响应</span>
- dst<span class="token punctuation">.</span><span class="token function">write_frame</span><span class="token punctuation">(</span><span class="token operator">&</span>response<span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span><span class="token operator">?</span><span class="token punctuation">;</span>
- <span class="token punctuation">}</span>
- <span class="token punctuation">}</span>
- command <span class="token operator">=></span> <span class="token punctuation">{</span>
- <span class="token comment">// 其他命令都看作是unknown命令</span>
- <span class="token keyword">let</span> cmd <span class="token operator">=</span> <span class="token class-name">Unknown</span><span class="token punctuation">::</span><span class="token function">new</span><span class="token punctuation">(</span>command<span class="token punctuation">.</span><span class="token function">get_name</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
- cmd<span class="token punctuation">.</span><span class="token function">apply</span><span class="token punctuation">(</span>dst<span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span><span class="token operator">?</span><span class="token punctuation">;</span>
- <span class="token punctuation">}</span>
- <span class="token punctuation">}</span>
- <span class="token class-name">Ok</span><span class="token punctuation">(</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
- <span class="token punctuation">}</span>
- </code></pre></div><h4 id="unsubscribe"><a href="#unsubscribe" class="header-anchor">#</a> Unsubscribe</h4> <p>代码位置:src/cmd/subcribe.rs</p> <h5 id="struct-11"><a href="#struct-11" class="header-anchor">#</a> Struct</h5> <div class="language-rust extra-class"><pre class="language-rust"><code><span class="token comment">/// 取消订阅命令</span>
- <span class="token comment">///</span>
- <span class="token comment">/// 如果channels为空,则取消订阅所有频道</span>
- <span class="token attribute attr-name">#[derive(Clone, Debug)]</span>
- <span class="token keyword">pub</span> <span class="token keyword">struct</span> <span class="token type-definition class-name">Unsubscribe</span> <span class="token punctuation">{</span>
- channels<span class="token punctuation">:</span> <span class="token class-name">Vec</span><span class="token operator"><</span><span class="token class-name">String</span><span class="token operator">></span><span class="token punctuation">,</span>
- <span class="token punctuation">}</span>
- </code></pre></div><h5 id="implementation-10"><a href="#implementation-10" class="header-anchor">#</a> Implementation</h5> <div class="language-rust extra-class"><pre class="language-rust"><code><span class="token keyword">impl</span> <span class="token class-name">Unsubscribe</span><span class="token punctuation">{</span>
- <span class="token keyword">pub</span><span class="token punctuation">(</span><span class="token keyword">crate</span><span class="token punctuation">)</span> <span class="token keyword">fn</span> <span class="token function-definition function">new</span><span class="token punctuation">(</span>channels<span class="token punctuation">:</span> <span class="token operator">&</span><span class="token punctuation">[</span><span class="token class-name">String</span><span class="token punctuation">]</span><span class="token punctuation">)</span> <span class="token punctuation">-></span> <span class="token class-name">Unsubscribe</span> <span class="token punctuation">{</span>
- <span class="token class-name">Unsubscribe</span> <span class="token punctuation">{</span>
- channels<span class="token punctuation">:</span> channels<span class="token punctuation">.</span><span class="token function">to_vec</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">,</span>
- <span class="token punctuation">}</span>
- <span class="token punctuation">}</span>
- <span class="token comment">/// 解析unsubscribe帧</span>
- <span class="token keyword">pub</span><span class="token punctuation">(</span><span class="token keyword">crate</span><span class="token punctuation">)</span> <span class="token keyword">fn</span> <span class="token function-definition function">parse_frames</span><span class="token punctuation">(</span>parse<span class="token punctuation">:</span> <span class="token operator">&</span><span class="token keyword">mut</span> <span class="token class-name">Parse</span><span class="token punctuation">)</span> <span class="token punctuation">-></span> <span class="token class-name">Result</span><span class="token operator"><</span><span class="token class-name">Unsubscribe</span><span class="token punctuation">,</span> <span class="token class-name">ParseError</span><span class="token operator">></span> <span class="token punctuation">{</span>
- <span class="token keyword">use</span> <span class="token class-name">ParseError</span><span class="token punctuation">::</span><span class="token class-name">EndOfStream</span><span class="token punctuation">;</span>
- <span class="token comment">// 创建数组,用于存储channels</span>
- <span class="token keyword">let</span> <span class="token keyword">mut</span> channels <span class="token operator">=</span> <span class="token macro property">vec!</span><span class="token punctuation">[</span><span class="token punctuation">]</span><span class="token punctuation">;</span>
- <span class="token comment">// 循环解析数据,将channel放入数组</span>
- <span class="token keyword">loop</span> <span class="token punctuation">{</span>
- <span class="token keyword">match</span> parse<span class="token punctuation">.</span><span class="token function">next_string</span><span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token punctuation">{</span>
- <span class="token comment">// 将数据推送到要取消订阅的频道列表</span>
- <span class="token class-name">Ok</span><span class="token punctuation">(</span>s<span class="token punctuation">)</span> <span class="token operator">=></span> channels<span class="token punctuation">.</span><span class="token function">push</span><span class="token punctuation">(</span>s<span class="token punctuation">)</span><span class="token punctuation">,</span>
- <span class="token comment">// EndOfStream代表数据解析完成</span>
- <span class="token class-name">Err</span><span class="token punctuation">(</span><span class="token class-name">EndOfStream</span><span class="token punctuation">)</span> <span class="token operator">=></span> <span class="token keyword">break</span><span class="token punctuation">,</span>
- <span class="token comment">// 其他错误</span>
- <span class="token class-name">Err</span><span class="token punctuation">(</span>err<span class="token punctuation">)</span> <span class="token operator">=></span> <span class="token keyword">return</span> <span class="token class-name">Err</span><span class="token punctuation">(</span>err<span class="token punctuation">)</span><span class="token punctuation">,</span>
- <span class="token punctuation">}</span>
- <span class="token punctuation">}</span>
- <span class="token comment">// 返回命令</span>
- <span class="token punctuation">}</span>
- <span class="token comment">/// 将unsubscribe命令转成帧</span>
- <span class="token keyword">pub</span><span class="token punctuation">(</span><span class="token keyword">crate</span><span class="token punctuation">)</span> <span class="token keyword">fn</span> <span class="token function-definition function">into_frame</span><span class="token punctuation">(</span><span class="token keyword">self</span><span class="token punctuation">)</span> <span class="token punctuation">-></span> <span class="token class-name">Frame</span> <span class="token punctuation">{</span>
- <span class="token keyword">let</span> <span class="token keyword">mut</span> frame <span class="token operator">=</span> <span class="token class-name">Frame</span><span class="token punctuation">::</span><span class="token function">array</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
- <span class="token comment">// 将命令以["unsubscribe",channel1,channel2...]形</span>
- <span class="token comment">// 式放入数组</span>
- frame<span class="token punctuation">.</span><span class="token function">push_bulk</span><span class="token punctuation">(</span><span class="token class-name">Bytes</span><span class="token punctuation">::</span><span class="token function">from</span><span class="token punctuation">(</span><span class="token string">"unsubscribe"</span><span class="token punctuation">.</span><span class="token function">as_bytes</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
- <span class="token keyword">for</span> channel <span class="token keyword">in</span> <span class="token keyword">self</span><span class="token punctuation">.</span>channels <span class="token punctuation">{</span>
- frame<span class="token punctuation">.</span><span class="token function">push_bulk</span><span class="token punctuation">(</span><span class="token class-name">Bytes</span><span class="token punctuation">::</span><span class="token function">from</span><span class="token punctuation">(</span>channel<span class="token punctuation">.</span><span class="token function">into_bytes</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
- <span class="token punctuation">}</span>
- frame
- <span class="token punctuation">}</span>
- <span class="token punctuation">}</span>
- </code></pre></div><h4 id="unknown"><a href="#unknown" class="header-anchor">#</a> Unknown</h4> <h5 id="struct-12"><a href="#struct-12" class="header-anchor">#</a> Struct</h5> <div class="language-rust extra-class"><pre class="language-rust"><code><span class="token attribute attr-name">#[derive(Debug)]</span>
- <span class="token keyword">pub</span> <span class="token keyword">struct</span> <span class="token type-definition class-name">Unknown</span> <span class="token punctuation">{</span>
- command_name<span class="token punctuation">:</span> <span class="token class-name">String</span><span class="token punctuation">,</span>
- <span class="token punctuation">}</span>
- </code></pre></div><h5 id="implementation-11"><a href="#implementation-11" class="header-anchor">#</a> Implementation</h5> <div class="language-rust extra-class"><pre class="language-rust"><code><span class="token keyword">impl</span> <span class="token class-name">Unknown</span><span class="token punctuation">{</span>
- <span class="token comment">/// 创建unknown命令</span>
- <span class="token keyword">pub</span><span class="token punctuation">(</span><span class="token keyword">crate</span><span class="token punctuation">)</span> <span class="token keyword">fn</span> <span class="token function-definition function">new</span><span class="token punctuation">(</span>key<span class="token punctuation">:</span> <span class="token keyword">impl</span> <span class="token class-name">ToString</span><span class="token punctuation">)</span> <span class="token punctuation">-></span> <span class="token class-name">Unknown</span> <span class="token punctuation">{</span>
- <span class="token class-name">Unknown</span> <span class="token punctuation">{</span>
- command_name<span class="token punctuation">:</span> key<span class="token punctuation">.</span><span class="token function">to_string</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">,</span>
- <span class="token punctuation">}</span>
- <span class="token punctuation">}</span>
- <span class="token comment">/// 获取命令名</span>
- <span class="token keyword">pub</span><span class="token punctuation">(</span><span class="token keyword">crate</span><span class="token punctuation">)</span> <span class="token keyword">fn</span> <span class="token function-definition function">get_name</span><span class="token punctuation">(</span><span class="token operator">&</span><span class="token keyword">self</span><span class="token punctuation">)</span> <span class="token punctuation">-></span> <span class="token operator">&</span><span class="token keyword">str</span> <span class="token punctuation">{</span>
- <span class="token operator">&</span><span class="token keyword">self</span><span class="token punctuation">.</span>command_name
- <span class="token punctuation">}</span>
- <span class="token comment">/// 回应客户端不支持当前命令</span>
- <span class="token attribute attr-name">#[instrument(skip(self, dst))]</span>
- <span class="token keyword">pub</span><span class="token punctuation">(</span><span class="token keyword">crate</span><span class="token punctuation">)</span> <span class="token keyword">async</span> <span class="token keyword">fn</span> <span class="token function-definition function">apply</span><span class="token punctuation">(</span><span class="token keyword">self</span><span class="token punctuation">,</span> dst<span class="token punctuation">:</span> <span class="token operator">&</span><span class="token keyword">mut</span> <span class="token class-name">Connection</span><span class="token punctuation">)</span> <span class="token punctuation">-></span> <span class="token keyword">crate</span><span class="token punctuation">::</span><span class="token class-name">Result</span><span class="token operator"><</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token operator">></span> <span class="token punctuation">{</span>
- <span class="token comment">// 生成错误帧响应</span>
- <span class="token keyword">let</span> response <span class="token operator">=</span> <span class="token class-name">Frame</span><span class="token punctuation">::</span><span class="token class-name">Error</span><span class="token punctuation">(</span><span class="token macro property">format!</span><span class="token punctuation">(</span><span class="token string">"ERR unknown command '{}'"</span><span class="token punctuation">,</span> <span class="token keyword">self</span><span class="token punctuation">.</span>command_name<span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
- <span class="token comment">// 日志记录响应</span>
- <span class="token macro property">debug!</span><span class="token punctuation">(</span><span class="token operator">?</span>response<span class="token punctuation">)</span><span class="token punctuation">;</span>
- <span class="token comment">// 将帧写入连接</span>
- dst<span class="token punctuation">.</span><span class="token function">write_frame</span><span class="token punctuation">(</span><span class="token operator">&</span>response<span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span><span class="token operator">?</span><span class="token punctuation">;</span>
- <span class="token class-name">Ok</span><span class="token punctuation">(</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
- <span class="token punctuation">}</span>
- <span class="token punctuation">}</span>
- </code></pre></div><h4 id="ping"><a href="#ping" class="header-anchor">#</a> Ping</h4> <h5 id="struct-13"><a href="#struct-13" class="header-anchor">#</a> Struct</h5> <div class="language-rust extra-class"><pre class="language-rust"><code><span class="token attribute attr-name">#[derive(Debug, Default)]</span>
- <span class="token keyword">pub</span> <span class="token keyword">struct</span> <span class="token type-definition class-name">Ping</span> <span class="token punctuation">{</span>
- <span class="token comment">/// 自定义消息返回</span>
- msg<span class="token punctuation">:</span> <span class="token class-name">Option</span><span class="token operator"><</span><span class="token class-name">Bytes</span><span class="token operator">></span><span class="token punctuation">,</span>
- <span class="token punctuation">}</span>
- </code></pre></div><h5 id="implement"><a href="#implement" class="header-anchor">#</a> Implement</h5> <div class="language-rust extra-class"><pre class="language-rust"><code><span class="token keyword">impl</span> <span class="token class-name">Ping</span> <span class="token punctuation">{</span>
- <span class="token comment">/// 创建一个新的携带自定信息的Ping命令</span>
- <span class="token keyword">pub</span> <span class="token keyword">fn</span> <span class="token function-definition function">new</span><span class="token punctuation">(</span>msg<span class="token punctuation">:</span> <span class="token class-name">Option</span><span class="token operator"><</span><span class="token class-name">Bytes</span><span class="token operator">></span><span class="token punctuation">)</span> <span class="token punctuation">-></span> <span class="token class-name">Ping</span> <span class="token punctuation">{</span>
- <span class="token class-name">Ping</span> <span class="token punctuation">{</span> msg <span class="token punctuation">}</span>
- <span class="token punctuation">}</span>
- <span class="token comment">/// 解析ping命令</span>
- <span class="token keyword">pub</span><span class="token punctuation">(</span><span class="token keyword">crate</span><span class="token punctuation">)</span> <span class="token keyword">fn</span> <span class="token function-definition function">parse_frames</span><span class="token punctuation">(</span>parse<span class="token punctuation">:</span> <span class="token operator">&</span><span class="token keyword">mut</span> <span class="token class-name">Parse</span><span class="token punctuation">)</span> <span class="token punctuation">-></span> <span class="token keyword">crate</span><span class="token punctuation">::</span><span class="token class-name">Result</span><span class="token operator"><</span><span class="token class-name">Ping</span><span class="token operator">></span> <span class="token punctuation">{</span>
- <span class="token keyword">match</span> parse<span class="token punctuation">.</span><span class="token function">next_bytes</span><span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token punctuation">{</span>
- <span class="token class-name">Ok</span><span class="token punctuation">(</span>msg<span class="token punctuation">)</span> <span class="token operator">=></span> <span class="token class-name">Ok</span><span class="token punctuation">(</span><span class="token class-name">Ping</span><span class="token punctuation">::</span><span class="token function">new</span><span class="token punctuation">(</span><span class="token class-name">Some</span><span class="token punctuation">(</span>msg<span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">,</span>
- <span class="token class-name">Err</span><span class="token punctuation">(</span><span class="token class-name">ParseError</span><span class="token punctuation">::</span><span class="token class-name">EndOfStream</span><span class="token punctuation">)</span> <span class="token operator">=></span> <span class="token class-name">Ok</span><span class="token punctuation">(</span><span class="token class-name">Ping</span><span class="token punctuation">::</span><span class="token function">default</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">,</span>
- <span class="token class-name">Err</span><span class="token punctuation">(</span>e<span class="token punctuation">)</span> <span class="token operator">=></span> <span class="token class-name">Err</span><span class="token punctuation">(</span>e<span class="token punctuation">.</span><span class="token function">into</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">,</span>
- <span class="token punctuation">}</span>
- <span class="token punctuation">}</span>
- <span class="token comment">/// ping命令执行逻辑</span>
- <span class="token comment">///</span>
- <span class="token comment">/// 响应会被写入dst。</span>
- <span class="token comment">/// 服务端受到命令并执行时会执行。</span>
- <span class="token attribute attr-name">#[instrument(skip(self, dst))]</span>
- <span class="token keyword">pub</span><span class="token punctuation">(</span><span class="token keyword">crate</span><span class="token punctuation">)</span> <span class="token keyword">async</span> <span class="token keyword">fn</span> <span class="token function-definition function">apply</span><span class="token punctuation">(</span><span class="token keyword">self</span><span class="token punctuation">,</span> dst<span class="token punctuation">:</span> <span class="token operator">&</span><span class="token keyword">mut</span> <span class="token class-name">Connection</span><span class="token punctuation">)</span> <span class="token punctuation">-></span> <span class="token keyword">crate</span><span class="token punctuation">::</span><span class="token class-name">Result</span><span class="token operator"><</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token operator">></span> <span class="token punctuation">{</span>
- <span class="token keyword">let</span> response <span class="token operator">=</span> <span class="token keyword">match</span> <span class="token keyword">self</span><span class="token punctuation">.</span>msg <span class="token punctuation">{</span>
- <span class="token class-name">None</span> <span class="token operator">=></span> <span class="token class-name">Frame</span><span class="token punctuation">::</span><span class="token class-name">Simple</span><span class="token punctuation">(</span><span class="token string">"PONG"</span><span class="token punctuation">.</span><span class="token function">to_string</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">,</span>
- <span class="token class-name">Some</span><span class="token punctuation">(</span>msg<span class="token punctuation">)</span> <span class="token operator">=></span> <span class="token class-name">Frame</span><span class="token punctuation">::</span><span class="token class-name">Bulk</span><span class="token punctuation">(</span>msg<span class="token punctuation">)</span><span class="token punctuation">,</span>
- <span class="token punctuation">}</span><span class="token punctuation">;</span>
- <span class="token macro property">debug!</span><span class="token punctuation">(</span><span class="token operator">?</span>response<span class="token punctuation">)</span><span class="token punctuation">;</span>
- dst<span class="token punctuation">.</span><span class="token function">write_frame</span><span class="token punctuation">(</span><span class="token operator">&</span>response<span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span><span class="token operator">?</span><span class="token punctuation">;</span>
- <span class="token class-name">Ok</span><span class="token punctuation">(</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
- <span class="token punctuation">}</span>
- <span class="token comment">/// 将命令转换成frame</span>
- <span class="token comment">///This is called by the client when encoding a `Ping` command to send</span>
- <span class="token comment">/// to the server.</span>
- <span class="token comment">/// 当客户端需要发送ping命令给服务端时会被调用</span>
- <span class="token keyword">pub</span><span class="token punctuation">(</span><span class="token keyword">crate</span><span class="token punctuation">)</span> <span class="token keyword">fn</span> <span class="token function-definition function">into_frame</span><span class="token punctuation">(</span><span class="token keyword">self</span><span class="token punctuation">)</span> <span class="token punctuation">-></span> <span class="token class-name">Frame</span> <span class="token punctuation">{</span>
- <span class="token keyword">let</span> <span class="token keyword">mut</span> frame <span class="token operator">=</span> <span class="token class-name">Frame</span><span class="token punctuation">::</span><span class="token function">array</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
- frame<span class="token punctuation">.</span><span class="token function">push_bulk</span><span class="token punctuation">(</span><span class="token class-name">Bytes</span><span class="token punctuation">::</span><span class="token function">from</span><span class="token punctuation">(</span><span class="token string">"ping"</span><span class="token punctuation">.</span><span class="token function">as_bytes</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
- <span class="token keyword">if</span> <span class="token keyword">let</span> <span class="token class-name">Some</span><span class="token punctuation">(</span>msg<span class="token punctuation">)</span> <span class="token operator">=</span> <span class="token keyword">self</span><span class="token punctuation">.</span>msg <span class="token punctuation">{</span>
- frame<span class="token punctuation">.</span><span class="token function">push_bulk</span><span class="token punctuation">(</span>msg<span class="token punctuation">)</span><span class="token punctuation">;</span>
- <span class="token punctuation">}</span>
- frame
- <span class="token punctuation">}</span>
- <span class="token punctuation">}</span>
- </code></pre></div><h3 id="frame"><a href="#frame" class="header-anchor">#</a> frame</h3> <blockquote><p>redis协议帧</p></blockquote> <h4 id="enum"><a href="#enum" class="header-anchor">#</a> enum</h4> <div class="language-rust extra-class"><pre class="language-rust"><code><span class="token comment">/// A frame in the Redis protocol.</span>
- <span class="token attribute attr-name">#[derive(Clone, Debug)]</span>
- <span class="token keyword">pub</span> <span class="token keyword">enum</span> <span class="token type-definition class-name">Frame</span> <span class="token punctuation">{</span>
- <span class="token class-name">Simple</span><span class="token punctuation">(</span><span class="token class-name">String</span><span class="token punctuation">)</span><span class="token punctuation">,</span>
- <span class="token class-name">Error</span><span class="token punctuation">(</span><span class="token class-name">String</span><span class="token punctuation">)</span><span class="token punctuation">,</span>
- <span class="token class-name">Integer</span><span class="token punctuation">(</span><span class="token keyword">u64</span><span class="token punctuation">)</span><span class="token punctuation">,</span>
- <span class="token class-name">Bulk</span><span class="token punctuation">(</span><span class="token class-name">Bytes</span><span class="token punctuation">)</span><span class="token punctuation">,</span>
- <span class="token class-name">Null</span><span class="token punctuation">,</span>
- <span class="token class-name">Array</span><span class="token punctuation">(</span><span class="token class-name">Vec</span><span class="token operator"><</span><span class="token class-name">Frame</span><span class="token operator">></span><span class="token punctuation">)</span><span class="token punctuation">,</span>
- <span class="token punctuation">}</span>
- <span class="token attribute attr-name">#[derive(Debug)]</span>
- <span class="token keyword">pub</span> <span class="token keyword">enum</span> <span class="token type-definition class-name">Error</span> <span class="token punctuation">{</span>
- <span class="token comment">/// Not enough data is available to parse a message</span>
- <span class="token class-name">Incomplete</span><span class="token punctuation">,</span>
- <span class="token comment">/// Invalid message encoding</span>
- <span class="token class-name">Other</span><span class="token punctuation">(</span><span class="token keyword">crate</span><span class="token punctuation">::</span><span class="token class-name">Error</span><span class="token punctuation">)</span><span class="token punctuation">,</span>
- <span class="token punctuation">}</span>
- </code></pre></div><h3 id="sever"><a href="#sever" class="header-anchor">#</a> sever</h3> <blockquote><p>mini-redis的服务端。提供了一个异步的<code>run</code>函数,用于监听连接,并为每个连接生成一个任务</p></blockquote> <h4 id="listener"><a href="#listener" class="header-anchor">#</a> Listener</h4> <h5 id="struct-14"><a href="#struct-14" class="header-anchor">#</a> Struct</h5> <div class="language-rust extra-class"><pre class="language-rust"><code><span class="token comment">/// 服务端用于监听的连接</span>
- <span class="token attribute attr-name">#[derive(Debug)]</span>
- <span class="token keyword">struct</span> <span class="token type-definition class-name">Listener</span> <span class="token punctuation">{</span>
- <span class="token comment">/// 可共享的数据库连接句柄(Arc形式)</span>
- db_holder<span class="token punctuation">:</span> <span class="token class-name">DbDropGuard</span><span class="token punctuation">,</span>
- <span class="token comment">/// TCP监听类,由run函数提供</span>
- listener<span class="token punctuation">:</span> <span class="token class-name">TcpListener</span><span class="token punctuation">,</span>
- <span class="token comment">/// 信号量,用于限制连接数</span>
- limit_connections<span class="token punctuation">:</span> <span class="token class-name">Arc</span><span class="token operator"><</span><span class="token class-name">Semaphore</span><span class="token operator">></span><span class="token punctuation">,</span>
- <span class="token comment">/// 用于广播shutdown信号</span>
- notify_shutdown<span class="token punctuation">:</span> <span class="token namespace">broadcast<span class="token punctuation">::</span></span><span class="token class-name">Sender</span><span class="token operator"><</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token operator">></span><span class="token punctuation">,</span>
- <span class="token comment">/// 用于在客户端完成所有任务后安全关闭连接。</span>
- <span class="token comment">/// 当一个连接被初始化,它会保存‵shutdown_complete_tx‵的clone,被关闭时回收</span>
- <span class="token comment">/// 当所有listener的连接都被关闭后,sender也会被释放</span>
- <span class="token comment">/// 所有任务被完成后,`shutdown_complete_rx.recv()`会返回`None`</span>
- shutdown_complete_tx<span class="token punctuation">:</span> <span class="token namespace">mpsc<span class="token punctuation">::</span></span><span class="token class-name">Sender</span><span class="token operator"><</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token operator">></span><span class="token punctuation">,</span>
- <span class="token punctuation">}</span>
- </code></pre></div><h5 id="implementation-12"><a href="#implementation-12" class="header-anchor">#</a> Implementation</h5> <div class="language-rust extra-class"><pre class="language-rust"><code><span class="token keyword">impl</span> <span class="token class-name">Listener</span> <span class="token punctuation">{</span>
- <span class="token comment">/// 启动server监听服务</span>
- <span class="token keyword">async</span> <span class="token keyword">fn</span> <span class="token function-definition function">run</span><span class="token punctuation">(</span><span class="token operator">&</span><span class="token keyword">mut</span> <span class="token keyword">self</span><span class="token punctuation">)</span> <span class="token punctuation">-></span> <span class="token keyword">crate</span><span class="token punctuation">::</span><span class="token class-name">Result</span><span class="token operator"><</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token operator">></span> <span class="token punctuation">{</span>
- <span class="token macro property">info!</span><span class="token punctuation">(</span><span class="token string">"accepting inbound connections"</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
- <span class="token keyword">loop</span> <span class="token punctuation">{</span>
- <span class="token comment">// 等待有名额创建连接</span>
- <span class="token keyword">let</span> permit <span class="token operator">=</span> <span class="token keyword">self</span>
- <span class="token punctuation">.</span>limit_connections
- <span class="token punctuation">.</span><span class="token function">clone</span><span class="token punctuation">(</span><span class="token punctuation">)</span>
- <span class="token punctuation">.</span><span class="token function">acquire_owned</span><span class="token punctuation">(</span><span class="token punctuation">)</span>
- <span class="token punctuation">.</span><span class="token keyword">await</span>
- <span class="token punctuation">.</span><span class="token function">unwrap</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
- <span class="token comment">// 建立tcp连接,获取套接字</span>
- <span class="token keyword">let</span> socket <span class="token operator">=</span> <span class="token keyword">self</span><span class="token punctuation">.</span><span class="token function">accept</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span><span class="token operator">?</span><span class="token punctuation">;</span>
- <span class="token comment">// 存储连接状态</span>
- <span class="token keyword">let</span> <span class="token keyword">mut</span> handler <span class="token operator">=</span> <span class="token class-name">Handler</span> <span class="token punctuation">{</span>
- <span class="token comment">// 获取数据库连接</span>
- db<span class="token punctuation">:</span> <span class="token keyword">self</span><span class="token punctuation">.</span>db_holder<span class="token punctuation">.</span><span class="token function">db</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">,</span>
- <span class="token comment">// 新建连接</span>
- connection<span class="token punctuation">:</span> <span class="token class-name">Connection</span><span class="token punctuation">::</span><span class="token function">new</span><span class="token punctuation">(</span>socket<span class="token punctuation">)</span><span class="token punctuation">,</span>
- <span class="token comment">// 用于接受shutdown信号</span>
- shutdown<span class="token punctuation">:</span> <span class="token class-name">Shutdown</span><span class="token punctuation">::</span><span class="token function">new</span><span class="token punctuation">(</span><span class="token keyword">self</span><span class="token punctuation">.</span>notify_shutdown<span class="token punctuation">.</span><span class="token function">subscribe</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">,</span>
- <span class="token comment">// 一旦所有的clone被释放,会通过此成员来通知</span>
- _shutdown_complete<span class="token punctuation">:</span> <span class="token keyword">self</span><span class="token punctuation">.</span>shutdown_complete_tx<span class="token punctuation">.</span><span class="token function">clone</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">,</span>
- <span class="token punctuation">}</span><span class="token punctuation">;</span>
- <span class="token comment">// 创建一个异步的任务执行连接要做的操作</span>
- <span class="token namespace">tokio<span class="token punctuation">::</span></span><span class="token function">spawn</span><span class="token punctuation">(</span><span class="token keyword">async</span> <span class="token keyword">move</span> <span class="token punctuation">{</span>
- <span class="token comment">// 执行连接,并记录错误</span>
- <span class="token keyword">if</span> <span class="token keyword">let</span> <span class="token class-name">Err</span><span class="token punctuation">(</span>err<span class="token punctuation">)</span> <span class="token operator">=</span> handler<span class="token punctuation">.</span><span class="token function">run</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span> <span class="token punctuation">{</span>
- <span class="token macro property">error!</span><span class="token punctuation">(</span>cause <span class="token operator">=</span> <span class="token operator">?</span>err<span class="token punctuation">,</span> <span class="token string">"connection error"</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
- <span class="token punctuation">}</span>
- <span class="token comment">// 归还连接名额</span>
- <span class="token function">drop</span><span class="token punctuation">(</span>permit<span class="token punctuation">)</span><span class="token punctuation">;</span>
- <span class="token punctuation">}</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
- <span class="token punctuation">}</span>
- <span class="token punctuation">}</span>
- <span class="token comment">/// 建立tcp连接,获取套接字</span>
- <span class="token comment">/// 如果发生错误,最大重连次数为6,最后一次等待时间为64s。重连流程参考tcp超时重发机制。</span>
- <span class="token keyword">async</span> <span class="token keyword">fn</span> <span class="token function-definition function">accept</span><span class="token punctuation">(</span><span class="token operator">&</span><span class="token keyword">mut</span> <span class="token keyword">self</span><span class="token punctuation">)</span> <span class="token punctuation">-></span> <span class="token keyword">crate</span><span class="token punctuation">::</span><span class="token class-name">Result</span><span class="token operator"><</span><span class="token class-name">TcpStream</span><span class="token operator">></span> <span class="token punctuation">{</span>
- <span class="token keyword">let</span> <span class="token keyword">mut</span> backoff <span class="token operator">=</span> <span class="token number">1</span><span class="token punctuation">;</span>
- <span class="token comment">// 尝试接受连接</span>
- <span class="token keyword">loop</span> <span class="token punctuation">{</span>
- <span class="token comment">// 成功直接返回tcp连接 失败返回错误信息</span>
- <span class="token keyword">match</span> <span class="token keyword">self</span><span class="token punctuation">.</span>listener<span class="token punctuation">.</span><span class="token function">accept</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span> <span class="token punctuation">{</span>
- <span class="token class-name">Ok</span><span class="token punctuation">(</span><span class="token punctuation">(</span>socket<span class="token punctuation">,</span> _<span class="token punctuation">)</span><span class="token punctuation">)</span> <span class="token operator">=></span> <span class="token keyword">return</span> <span class="token class-name">Ok</span><span class="token punctuation">(</span>socket<span class="token punctuation">)</span><span class="token punctuation">,</span>
- <span class="token class-name">Err</span><span class="token punctuation">(</span>err<span class="token punctuation">)</span> <span class="token operator">=></span> <span class="token punctuation">{</span>
- <span class="token keyword">if</span> backoff <span class="token operator">></span> <span class="token number">64</span> <span class="token punctuation">{</span>
- <span class="token comment">// 等待时间过长,返回错误</span>
- <span class="token keyword">return</span> <span class="token class-name">Err</span><span class="token punctuation">(</span>err<span class="token punctuation">.</span><span class="token function">into</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
- <span class="token punctuation">}</span>
- <span class="token punctuation">}</span>
- <span class="token punctuation">}</span>
- <span class="token comment">// 睡眠</span>
- <span class="token namespace">time<span class="token punctuation">::</span></span><span class="token function">sleep</span><span class="token punctuation">(</span><span class="token class-name">Duration</span><span class="token punctuation">::</span><span class="token function">from_secs</span><span class="token punctuation">(</span>backoff<span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span><span class="token punctuation">;</span>
- <span class="token comment">// 指数增长等待时间</span>
- backoff <span class="token operator">*=</span> <span class="token number">2</span><span class="token punctuation">;</span>
- <span class="token punctuation">}</span>
- <span class="token punctuation">}</span>
- <span class="token punctuation">}</span>
- </code></pre></div><h4 id="handler"><a href="#handler" class="header-anchor">#</a> Handler</h4> <h5 id="struct-15"><a href="#struct-15" class="header-anchor">#</a> Struct</h5> <div class="language-rust extra-class"><pre class="language-rust"><code><span class="token comment">/// 服务端的具体连接</span>
- <span class="token attribute attr-name">#[derive(Debug)]</span>
- <span class="token keyword">struct</span> <span class="token type-definition class-name">Handler</span> <span class="token punctuation">{</span>
- <span class="token comment">/// 数据库连接</span>
- db<span class="token punctuation">:</span> <span class="token class-name">Db</span><span class="token punctuation">,</span>
- <span class="token comment">/// tcp连接</span>
- connection<span class="token punctuation">:</span> <span class="token class-name">Connection</span><span class="token punctuation">,</span>
- <span class="token comment">/// 监听shutdown信号</span>
- shutdown<span class="token punctuation">:</span> <span class="token class-name">Shutdown</span><span class="token punctuation">,</span>
- <span class="token comment">/// 不直接使用,用来判断当前类对象是否被释放</span>
- _shutdown_complete<span class="token punctuation">:</span> <span class="token namespace">mpsc<span class="token punctuation">::</span></span><span class="token class-name">Sender</span><span class="token operator"><</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token operator">></span><span class="token punctuation">,</span>
- <span class="token punctuation">}</span>
- </code></pre></div><h5 id="implementation-13"><a href="#implementation-13" class="header-anchor">#</a> Implementation</h5> <div class="language-rust extra-class"><pre class="language-rust"><code><span class="token keyword">impl</span> <span class="token class-name">Handler</span> <span class="token punctuation">{</span>
- <span class="token comment">/// 处理单个连接</span>
- <span class="token comment">/// </span>
- <span class="token comment">/// 当接收到shutdown信号时,等到该连接处于安全状态时,连接会断开</span>
- <span class="token attribute attr-name">#[instrument(skip(self))]</span>
- <span class="token keyword">async</span> <span class="token keyword">fn</span> <span class="token function-definition function">run</span><span class="token punctuation">(</span><span class="token operator">&</span><span class="token keyword">mut</span> <span class="token keyword">self</span><span class="token punctuation">)</span> <span class="token punctuation">-></span> <span class="token keyword">crate</span><span class="token punctuation">::</span><span class="token class-name">Result</span><span class="token operator"><</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token operator">></span> <span class="token punctuation">{</span>
- <span class="token comment">// 循环处理帧</span>
- <span class="token keyword">while</span> <span class="token operator">!</span><span class="token keyword">self</span><span class="token punctuation">.</span>shutdown<span class="token punctuation">.</span><span class="token function">s_shutdown</span><span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token punctuation">{</span>
- <span class="token keyword">let</span> maybe_frame <span class="token operator">=</span> <span class="token namespace">tokio<span class="token punctuation">::</span></span><span class="token macro property">select!</span> <span class="token punctuation">{</span>
- res <span class="token operator">=</span> <span class="token keyword">self</span><span class="token punctuation">.</span>connection<span class="token punctuation">.</span><span class="token function">read_frame</span><span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token operator">=></span> res<span class="token operator">?</span><span class="token punctuation">,</span>
- _ <span class="token operator">=</span> <span class="token keyword">self</span><span class="token punctuation">.</span>shutdown<span class="token punctuation">.</span><span class="token function">recv</span><span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token operator">=></span> <span class="token punctuation">{</span>
- <span class="token comment">// 接收到shutdown信号,退出run函数会使任务结束</span>
- <span class="token keyword">return</span> <span class="token class-name">Ok</span><span class="token punctuation">(</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
- <span class="token punctuation">}</span>
- <span class="token punctuation">}</span><span class="token punctuation">;</span>
- <span class="token comment">// 判断从远端收到的帧(maybe_frame)是否有内容,收到‵None`时说明远端关闭</span>
- <span class="token keyword">let</span> frame <span class="token operator">=</span> <span class="token keyword">match</span> maybe_frame <span class="token punctuation">{</span>
- <span class="token class-name">Some</span><span class="token punctuation">(</span>frame<span class="token punctuation">)</span> <span class="token operator">=></span> frame<span class="token punctuation">,</span>
- <span class="token class-name">None</span> <span class="token operator">=></span> <span class="token keyword">return</span> <span class="token class-name">Ok</span><span class="token punctuation">(</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">,</span>
- <span class="token punctuation">}</span><span class="token punctuation">;</span>
- <span class="token comment">// 将frame转为命令形式</span>
- <span class="token keyword">let</span> cmd <span class="token operator">=</span> <span class="token class-name">Command</span><span class="token punctuation">::</span><span class="token function">from_frame</span><span class="token punctuation">(</span>frame<span class="token punctuation">)</span><span class="token operator">?</span><span class="token punctuation">;</span>
- <span class="token comment">// 日志记录接收到的命令</span>
- <span class="token macro property">debug!</span><span class="token punctuation">(</span><span class="token operator">?</span>cmd<span class="token punctuation">)</span><span class="token punctuation">;</span>
- <span class="token comment">// 异步执行命令</span>
- cmd<span class="token punctuation">.</span><span class="token function">apply</span><span class="token punctuation">(</span><span class="token operator">&</span><span class="token keyword">self</span><span class="token punctuation">.</span>db<span class="token punctuation">,</span> <span class="token operator">&</span><span class="token keyword">mut</span> <span class="token keyword">self</span><span class="token punctuation">.</span>connection<span class="token punctuation">,</span> <span class="token operator">&</span><span class="token keyword">mut</span> <span class="token keyword">self</span><span class="token punctuation">.</span>shutdown<span class="token punctuation">)</span>
- <span class="token punctuation">.</span><span class="token keyword">await</span><span class="token operator">?</span><span class="token punctuation">;</span>
- <span class="token punctuation">}</span>
- <span class="token class-name">Ok</span><span class="token punctuation">(</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
- <span class="token punctuation">}</span>
- <span class="token punctuation">}</span>
- </code></pre></div><h4 id="fn-run"><a href="#fn-run" class="header-anchor">#</a> Fn Run</h4> <div class="language-rust extra-class"><pre class="language-rust"><code><span class="token comment">/// mini-redis的服务端启动函数</span>
- <span class="token keyword">pub</span> <span class="token keyword">async</span> <span class="token keyword">fn</span> <span class="token function-definition function">run</span><span class="token punctuation">(</span>listener<span class="token punctuation">:</span> <span class="token class-name">TcpListener</span><span class="token punctuation">,</span> shutdown<span class="token punctuation">:</span> <span class="token keyword">impl</span> <span class="token class-name">Future</span><span class="token punctuation">)</span> <span class="token punctuation">{</span>
- <span class="token comment">// 初始化shutdown信号广播管道</span>
- <span class="token keyword">let</span> <span class="token punctuation">(</span>notify_shutdown<span class="token punctuation">,</span> _<span class="token punctuation">)</span> <span class="token operator">=</span> <span class="token namespace">broadcast<span class="token punctuation">::</span></span><span class="token function">channel</span><span class="token punctuation">(</span><span class="token number">1</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
- <span class="token comment">// 初始化shutdown关闭机制</span>
- <span class="token keyword">let</span> <span class="token punctuation">(</span>shutdown_complete_tx<span class="token punctuation">,</span> <span class="token keyword">mut</span> shutdown_complete_rx<span class="token punctuation">)</span> <span class="token operator">=</span> <span class="token namespace">mpsc<span class="token punctuation">::</span></span><span class="token function">channel</span><span class="token punctuation">(</span><span class="token number">1</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
- <span class="token comment">// 初始化监听连接</span>
- <span class="token keyword">let</span> <span class="token keyword">mut</span> server <span class="token operator">=</span> <span class="token class-name">Listener</span> <span class="token punctuation">{</span>
- listener<span class="token punctuation">,</span>
- db_holder<span class="token punctuation">:</span> <span class="token class-name">DbDropGuard</span><span class="token punctuation">::</span><span class="token function">new</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">,</span>
- limit_connections<span class="token punctuation">:</span> <span class="token class-name">Arc</span><span class="token punctuation">::</span><span class="token function">new</span><span class="token punctuation">(</span><span class="token class-name">Semaphore</span><span class="token punctuation">::</span><span class="token function">new</span><span class="token punctuation">(</span><span class="token constant">MAX_CONNECTIONS</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">,</span>
- notify_shutdown<span class="token punctuation">,</span>
- shutdown_complete_tx<span class="token punctuation">,</span>
- <span class="token punctuation">}</span><span class="token punctuation">;</span>
- <span class="token comment">// 监听是否有连接到达</span>
- <span class="token namespace">tokio<span class="token punctuation">::</span></span><span class="token macro property">select!</span> <span class="token punctuation">{</span>
- res <span class="token operator">=</span> server<span class="token punctuation">.</span><span class="token function">run</span><span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token operator">=></span> <span class="token punctuation">{</span>
- <span class="token comment">// 处理连接失败的情况</span>
- <span class="token keyword">if</span> <span class="token keyword">let</span> <span class="token class-name">Err</span><span class="token punctuation">(</span>err<span class="token punctuation">)</span> <span class="token operator">=</span> res <span class="token punctuation">{</span>
- <span class="token macro property">error!</span><span class="token punctuation">(</span>cause <span class="token operator">=</span> <span class="token operator">%</span>err<span class="token punctuation">,</span> <span class="token string">"failed to accept"</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
- <span class="token punctuation">}</span>
- <span class="token punctuation">}</span>
- _ <span class="token operator">=</span> shutdown <span class="token operator">=></span> <span class="token punctuation">{</span>
- <span class="token macro property">info!</span><span class="token punctuation">(</span><span class="token string">"shutting down"</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
- <span class="token punctuation">}</span>
- <span class="token punctuation">}</span>
- <span class="token keyword">let</span> <span class="token class-name">Listener</span> <span class="token punctuation">{</span>
- shutdown_complete_tx<span class="token punctuation">,</span>
- notify_shutdown<span class="token punctuation">,</span>
- <span class="token punctuation">..</span>
- <span class="token punctuation">}</span> <span class="token operator">=</span> server<span class="token punctuation">;</span>
- <span class="token comment">// 所有的处在subscribe状态的任务会接收到shutdown信号并且退出</span>
- <span class="token function">drop</span><span class="token punctuation">(</span>notify_shutdown<span class="token punctuation">)</span><span class="token punctuation">;</span>
- <span class="token comment">// 回收监听者的shutdown_complete_tx,意味着只有其他连接持有shutdown_complete_tx了</span>
- <span class="token function">drop</span><span class="token punctuation">(</span>shutdown_complete_tx<span class="token punctuation">)</span><span class="token punctuation">;</span>
- <span class="token comment">// 等待所有活跃连接完成其任务</span>
- <span class="token comment">// 当所有shutdown_complete_tx都被释放,说明所有连接都断开了</span>
- <span class="token comment">// 此时`recv()`会返回`None`并且`mpsc`管道会被关闭</span>
- <span class="token keyword">let</span> _ <span class="token operator">=</span> shutdown_complete_rx<span class="token punctuation">.</span><span class="token function">recv</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span><span class="token punctuation">;</span>
- <span class="token punctuation">}</span>
- </code></pre></div><h2 id="function-2"><a href="#function-2" class="header-anchor">#</a> Function</h2> <h3 id="fn-buffer"><a href="#fn-buffer" class="header-anchor">#</a> Fn Buffer</h3> <div class="language-rust extra-class"><pre class="language-rust"><code><span class="token comment">/// 创建一个请求缓冲区</span>
- <span class="token keyword">pub</span> <span class="token keyword">fn</span> <span class="token function-definition function">buffer</span><span class="token punctuation">(</span>client<span class="token punctuation">:</span> <span class="token class-name">Client</span><span class="token punctuation">)</span> <span class="token punctuation">-></span> <span class="token class-name">Buffer</span> <span class="token punctuation">{</span>
- <span class="token comment">// 创建一个容量为32的异步通道。tx为发送端,rx为接收端</span>
- <span class="token keyword">let</span> <span class="token punctuation">(</span>tx<span class="token punctuation">,</span> rx<span class="token punctuation">)</span> <span class="token operator">=</span> <span class="token function">channel</span><span class="token punctuation">(</span><span class="token number">32</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
- <span class="token comment">// 使用 tokio::spawn 函数创建一个异步任务(task)</span>
- <span class="token comment">// run 函数接受 client 和 rx(接收端)作为参数</span>
- <span class="token comment">// 并在 await 关键字处暂停执行,直到接收端接收到消息。</span>
- <span class="token namespace">tokio<span class="token punctuation">::</span></span><span class="token function">spawn</span><span class="token punctuation">(</span><span class="token keyword">async</span> <span class="token keyword">move</span> <span class="token punctuation">{</span> <span class="token function">run</span><span class="token punctuation">(</span>client<span class="token punctuation">,</span> rx<span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span> <span class="token punctuation">}</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
- <span class="token comment">// 返回缓冲区</span>
- <span class="token class-name">Buffer</span> <span class="token punctuation">{</span> tx <span class="token punctuation">}</span>
- <span class="token punctuation">}</span>
- </code></pre></div><h2 id="struct-16"><a href="#struct-16" class="header-anchor">#</a> Struct</h2> <h3 id="buffer"><a href="#buffer" class="header-anchor">#</a> Buffer</h3> <h4 id="struct-17"><a href="#struct-17" class="header-anchor">#</a> struct</h4> <div class="language-rust extra-class"><pre class="language-rust"><code><span class="token comment">/// 客户端的请求窗口</span>
- <span class="token attribute attr-name">#[derive(Clone)]</span>
- <span class="token keyword">pub</span> <span class="token keyword">struct</span> <span class="token type-definition class-name">Buffer</span> <span class="token punctuation">{</span>
- tx<span class="token punctuation">:</span> <span class="token class-name">Sender</span><span class="token operator"><</span><span class="token class-name">Message</span><span class="token operator">></span><span class="token punctuation">,</span>
- <span class="token punctuation">}</span>
- </code></pre></div><h4 id="implementation-14"><a href="#implementation-14" class="header-anchor">#</a> Implementation</h4> <div class="language-rust extra-class"><pre class="language-rust"><code><span class="token keyword">impl</span> <span class="token class-name">Buffer</span><span class="token punctuation">{</span>
- <span class="token comment">/// 获取key的值。请求会被缓存直到连接能够发送请求</span>
- <span class="token keyword">pub</span> <span class="token keyword">async</span> <span class="token keyword">fn</span> <span class="token function-definition function">get</span><span class="token punctuation">(</span><span class="token operator">&</span><span class="token keyword">mut</span> <span class="token keyword">self</span><span class="token punctuation">,</span> key<span class="token punctuation">:</span> <span class="token operator">&</span><span class="token keyword">str</span><span class="token punctuation">)</span> <span class="token punctuation">-></span> <span class="token class-name">Result</span><span class="token operator"><</span><span class="token class-name">Option</span><span class="token operator"><</span><span class="token class-name">Bytes</span><span class="token operator">>></span> <span class="token punctuation">{</span>
- <span class="token comment">// 初始化一个get命令</span>
- <span class="token keyword">let</span> get <span class="token operator">=</span> <span class="token class-name">Command</span><span class="token punctuation">::</span><span class="token class-name">Get</span><span class="token punctuation">(</span>key<span class="token punctuation">.</span><span class="token function">into</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
- <span class="token comment">// 初始化一次性的管道以接收连接的回应</span>
- <span class="token keyword">let</span> <span class="token punctuation">(</span>tx<span class="token punctuation">,</span> rx<span class="token punctuation">)</span> <span class="token operator">=</span> <span class="token namespace">oneshot<span class="token punctuation">::</span></span><span class="token function">channel</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
- <span class="token comment">// 发送请求</span>
- <span class="token keyword">self</span><span class="token punctuation">.</span>tx<span class="token punctuation">.</span><span class="token function">send</span><span class="token punctuation">(</span><span class="token punctuation">(</span>get<span class="token punctuation">,</span> tx<span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span><span class="token operator">?</span><span class="token punctuation">;</span>
- <span class="token comment">// 等待回应,并返回结果</span>
- <span class="token keyword">match</span> rx<span class="token punctuation">.</span><span class="token keyword">await</span> <span class="token punctuation">{</span>
- <span class="token class-name">Ok</span><span class="token punctuation">(</span>res<span class="token punctuation">)</span> <span class="token operator">=></span> res<span class="token punctuation">,</span>
- <span class="token class-name">Err</span><span class="token punctuation">(</span>err<span class="token punctuation">)</span> <span class="token operator">=></span> <span class="token class-name">Err</span><span class="token punctuation">(</span>err<span class="token punctuation">.</span><span class="token function">into</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">,</span>
- <span class="token punctuation">}</span>
- <span class="token punctuation">}</span>
- <span class="token comment">/// 设置key的值</span>
- <span class="token keyword">pub</span> <span class="token keyword">async</span> <span class="token keyword">fn</span> <span class="token function-definition function">set</span><span class="token punctuation">(</span><span class="token operator">&</span><span class="token keyword">mut</span> <span class="token keyword">self</span><span class="token punctuation">,</span> key<span class="token punctuation">:</span> <span class="token operator">&</span><span class="token keyword">str</span><span class="token punctuation">,</span> value<span class="token punctuation">:</span> <span class="token class-name">Bytes</span><span class="token punctuation">)</span> <span class="token punctuation">-></span> <span class="token class-name">Result</span><span class="token operator"><</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token operator">></span> <span class="token punctuation">{</span>
- <span class="token comment">// 初始化一个set命令</span>
- <span class="token keyword">let</span> set <span class="token operator">=</span> <span class="token class-name">Command</span><span class="token punctuation">::</span><span class="token class-name">Set</span><span class="token punctuation">(</span>key<span class="token punctuation">.</span><span class="token function">into</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">,</span> value<span class="token punctuation">)</span><span class="token punctuation">;</span>
- <span class="token comment">// 初始化一次性的管道以接收连接的回应</span>
- <span class="token keyword">let</span> <span class="token punctuation">(</span>tx<span class="token punctuation">,</span> rx<span class="token punctuation">)</span> <span class="token operator">=</span> <span class="token namespace">oneshot<span class="token punctuation">::</span></span><span class="token function">channel</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
- <span class="token comment">// 发送请求</span>
- <span class="token keyword">self</span><span class="token punctuation">.</span>tx<span class="token punctuation">.</span><span class="token function">send</span><span class="token punctuation">(</span><span class="token punctuation">(</span>set<span class="token punctuation">,</span> tx<span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span><span class="token operator">?</span><span class="token punctuation">;</span>
- <span class="token comment">// 等待回应,并返回结果</span>
- <span class="token keyword">match</span> rx<span class="token punctuation">.</span><span class="token keyword">await</span> <span class="token punctuation">{</span>
- <span class="token class-name">Ok</span><span class="token punctuation">(</span>res<span class="token punctuation">)</span> <span class="token operator">=></span> res<span class="token punctuation">.</span><span class="token function">map</span><span class="token punctuation">(</span><span class="token closure-params"><span class="token closure-punctuation punctuation">|</span>_<span class="token closure-punctuation punctuation">|</span></span> <span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">,</span>
- <span class="token class-name">Err</span><span class="token punctuation">(</span>err<span class="token punctuation">)</span> <span class="token operator">=></span> <span class="token class-name">Err</span><span class="token punctuation">(</span>err<span class="token punctuation">.</span><span class="token function">into</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">,</span>
- <span class="token punctuation">}</span>
- <span class="token punctuation">}</span>
- <span class="token punctuation">}</span>
- </code></pre></div><h3 id="connection"><a href="#connection" class="header-anchor">#</a> Connection</h3> <h4 id="struct-18"><a href="#struct-18" class="header-anchor">#</a> struct</h4> <div class="language-rust extra-class"><pre class="language-rust"><code><span class="token comment">/// 发送和接收来自远端的帧</span>
- <span class="token attribute attr-name">#[derive(Debug)]</span>
- <span class="token keyword">pub</span> <span class="token keyword">struct</span> <span class="token type-definition class-name">Connection</span> <span class="token punctuation">{</span>
- <span class="token comment">// 带有写入缓冲区的tcp流</span>
- stream<span class="token punctuation">:</span> <span class="token class-name">BufWriter</span><span class="token operator"><</span><span class="token class-name">TcpStream</span><span class="token operator">></span><span class="token punctuation">,</span>
- <span class="token comment">// 帧读缓冲区</span>
- buffer<span class="token punctuation">:</span> <span class="token class-name">BytesMut</span><span class="token punctuation">,</span>
- <span class="token punctuation">}</span>
- </code></pre></div><h4 id="implementation-15"><a href="#implementation-15" class="header-anchor">#</a> Implementation</h4> <div class="language-rust extra-class"><pre class="language-rust"><code><span class="token keyword">impl</span> <span class="token class-name">Connection</span><span class="token punctuation">{</span>
- <span class="token comment">/// 创建新的连接并返回socket,同时初始化缓冲区</span>
- <span class="token keyword">pub</span> <span class="token keyword">fn</span> <span class="token function-definition function">new</span><span class="token punctuation">(</span>socket<span class="token punctuation">:</span> <span class="token class-name">TcpStream</span><span class="token punctuation">)</span> <span class="token punctuation">-></span> <span class="token class-name">Connection</span> <span class="token punctuation">{</span>
- <span class="token class-name">Connection</span> <span class="token punctuation">{</span>
- stream<span class="token punctuation">:</span> <span class="token class-name">BufWriter</span><span class="token punctuation">::</span><span class="token function">new</span><span class="token punctuation">(</span>socket<span class="token punctuation">)</span><span class="token punctuation">,</span>
- buffer<span class="token punctuation">:</span> <span class="token class-name">BytesMut</span><span class="token punctuation">::</span><span class="token function">with_capacity</span><span class="token punctuation">(</span><span class="token number">4</span> <span class="token operator">*</span> <span class="token number">1024</span><span class="token punctuation">)</span><span class="token punctuation">,</span>
- <span class="token punctuation">}</span>
- <span class="token punctuation">}</span>
- <span class="token comment">/// 从tcp流中读取一个帧</span>
- <span class="token keyword">pub</span> <span class="token keyword">async</span> <span class="token keyword">fn</span> <span class="token function-definition function">read_frame</span><span class="token punctuation">(</span><span class="token operator">&</span><span class="token keyword">mut</span> <span class="token keyword">self</span><span class="token punctuation">)</span> <span class="token punctuation">-></span> <span class="token keyword">crate</span><span class="token punctuation">::</span><span class="token class-name">Result</span><span class="token operator"><</span><span class="token class-name">Option</span><span class="token operator"><</span><span class="token class-name">Frame</span><span class="token operator">>></span> <span class="token punctuation">{</span>
- <span class="token keyword">loop</span> <span class="token punctuation">{</span>
- <span class="token comment">// 解析并获取帧</span>
- <span class="token keyword">if</span> <span class="token keyword">let</span> <span class="token class-name">Some</span><span class="token punctuation">(</span>frame<span class="token punctuation">)</span> <span class="token operator">=</span> <span class="token keyword">self</span><span class="token punctuation">.</span><span class="token function">parse_frame</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token operator">?</span> <span class="token punctuation">{</span>
- <span class="token keyword">return</span> <span class="token class-name">Ok</span><span class="token punctuation">(</span><span class="token class-name">Some</span><span class="token punctuation">(</span>frame<span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
- <span class="token punctuation">}</span>
- <span class="token comment">// 尝试从socket中读取更多的数据,如果读到的字节数为0,说明流被关闭</span>
- <span class="token keyword">if</span> <span class="token number">0</span> <span class="token operator">==</span> <span class="token keyword">self</span><span class="token punctuation">.</span>stream<span class="token punctuation">.</span><span class="token function">read_buf</span><span class="token punctuation">(</span><span class="token operator">&</span><span class="token keyword">mut</span> <span class="token keyword">self</span><span class="token punctuation">.</span>buffer<span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span><span class="token operator">?</span> <span class="token punctuation">{</span>
- <span class="token comment">// 如果缓冲区为空说明远端正常关闭,否则为远端发送rst保温</span>
- <span class="token keyword">if</span> <span class="token keyword">self</span><span class="token punctuation">.</span>buffer<span class="token punctuation">.</span><span class="token function">is_empty</span><span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token punctuation">{</span>
- <span class="token keyword">return</span> <span class="token class-name">Ok</span><span class="token punctuation">(</span><span class="token class-name">None</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
- <span class="token punctuation">}</span> <span class="token keyword">else</span> <span class="token punctuation">{</span>
- <span class="token keyword">return</span> <span class="token class-name">Err</span><span class="token punctuation">(</span><span class="token string">"connection reset by peer"</span><span class="token punctuation">.</span><span class="token function">into</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
- <span class="token punctuation">}</span>
- <span class="token punctuation">}</span>
- <span class="token punctuation">}</span>
- <span class="token punctuation">}</span>
- <span class="token comment">/// 解析并获取frame。</span>
- <span class="token comment">/// 如果数据不够则返回 `Ok(None)`</span>
- <span class="token comment">/// 如果数据不合理则返回Err</span>
- <span class="token keyword">fn</span> <span class="token function-definition function">parse_frame</span><span class="token punctuation">(</span><span class="token operator">&</span><span class="token keyword">mut</span> <span class="token keyword">self</span><span class="token punctuation">)</span> <span class="token punctuation">-></span> <span class="token keyword">crate</span><span class="token punctuation">::</span><span class="token class-name">Result</span><span class="token operator"><</span><span class="token class-name">Option</span><span class="token operator"><</span><span class="token class-name">Frame</span><span class="token operator">>></span> <span class="token punctuation">{</span>
- <span class="token keyword">use</span> <span class="token namespace">frame<span class="token punctuation">::</span></span><span class="token class-name">Error</span><span class="token punctuation">::</span><span class="token class-name">Incomplete</span><span class="token punctuation">;</span>
- <span class="token comment">// 创建cursor用于追踪buffer中的数据位置</span>
- <span class="token keyword">let</span> <span class="token keyword">mut</span> buf <span class="token operator">=</span> <span class="token class-name">Cursor</span><span class="token punctuation">::</span><span class="token function">new</span><span class="token punctuation">(</span><span class="token operator">&</span><span class="token keyword">self</span><span class="token punctuation">.</span>buffer<span class="token punctuation">[</span><span class="token punctuation">..</span><span class="token punctuation">]</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
- <span class="token comment">// 检查缓冲区中是否有足够的数据</span>
- <span class="token keyword">match</span> <span class="token class-name">Frame</span><span class="token punctuation">::</span><span class="token function">check</span><span class="token punctuation">(</span><span class="token operator">&</span><span class="token keyword">mut</span> buf<span class="token punctuation">)</span> <span class="token punctuation">{</span>
- <span class="token class-name">Ok</span><span class="token punctuation">(</span>_<span class="token punctuation">)</span> <span class="token operator">=></span> <span class="token punctuation">{</span>
- <span class="token comment">// 由于Frame::check()开始时会定位到缓冲区的末尾,所以可以获取到缓冲区的数据长度</span>
- <span class="token keyword">let</span> len <span class="token operator">=</span> buf<span class="token punctuation">.</span><span class="token function">position</span><span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token keyword">as</span> <span class="token keyword">usize</span><span class="token punctuation">;</span>
- <span class="token comment">// 重置cursor位置</span>
- buf<span class="token punctuation">.</span><span class="token function">set_position</span><span class="token punctuation">(</span><span class="token number">0</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
- <span class="token comment">// 解析frame</span>
- <span class="token keyword">let</span> frame <span class="token operator">=</span> <span class="token class-name">Frame</span><span class="token punctuation">::</span><span class="token function">parse</span><span class="token punctuation">(</span><span class="token operator">&</span><span class="token keyword">mut</span> buf<span class="token punctuation">)</span><span class="token operator">?</span><span class="token punctuation">;</span>
- <span class="token comment">// 从buffer中删除所有数据(因为已经被解析了)</span>
- <span class="token keyword">self</span><span class="token punctuation">.</span>buffer<span class="token punctuation">.</span><span class="token function">advance</span><span class="token punctuation">(</span>len<span class="token punctuation">)</span><span class="token punctuation">;</span>
- <span class="token comment">// 返回frame</span>
- <span class="token class-name">Ok</span><span class="token punctuation">(</span><span class="token class-name">Some</span><span class="token punctuation">(</span>frame<span class="token punctuation">)</span><span class="token punctuation">)</span>
- <span class="token punctuation">}</span>
- <span class="token comment">// 数据不足以解析成单独的帧,返回None,说明需要继续等待</span>
- <span class="token class-name">Err</span><span class="token punctuation">(</span><span class="token class-name">Incomplete</span><span class="token punctuation">)</span> <span class="token operator">=></span> <span class="token class-name">Ok</span><span class="token punctuation">(</span><span class="token class-name">None</span><span class="token punctuation">)</span><span class="token punctuation">,</span>
- <span class="token comment">// 返回错误</span>
- <span class="token class-name">Err</span><span class="token punctuation">(</span>e<span class="token punctuation">)</span> <span class="token operator">=></span> <span class="token class-name">Err</span><span class="token punctuation">(</span>e<span class="token punctuation">.</span><span class="token function">into</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">,</span>
- <span class="token punctuation">}</span>
- <span class="token punctuation">}</span>
- <span class="token comment">/// 向TCP流中写数据</span>
- <span class="token keyword">pub</span> <span class="token keyword">async</span> <span class="token keyword">fn</span> <span class="token function-definition function">write_frame</span><span class="token punctuation">(</span><span class="token operator">&</span><span class="token keyword">mut</span> <span class="token keyword">self</span><span class="token punctuation">,</span> frame<span class="token punctuation">:</span> <span class="token operator">&</span><span class="token class-name">Frame</span><span class="token punctuation">)</span> <span class="token punctuation">-></span> <span class="token namespace">io<span class="token punctuation">::</span></span><span class="token class-name">Result</span><span class="token operator"><</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token operator">></span> <span class="token punctuation">{</span>
- <span class="token keyword">match</span> frame <span class="token punctuation">{</span>
- <span class="token class-name">Frame</span><span class="token punctuation">::</span><span class="token class-name">Array</span><span class="token punctuation">(</span>val<span class="token punctuation">)</span> <span class="token operator">=></span> <span class="token punctuation">{</span>
- <span class="token comment">// 对帧类型前缀进行编码。对于数组,它是“*”。</span>
- <span class="token keyword">self</span><span class="token punctuation">.</span>stream<span class="token punctuation">.</span><span class="token function">write_u8</span><span class="token punctuation">(</span><span class="token char">b'*'</span><span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span><span class="token operator">?</span><span class="token punctuation">;</span>
- <span class="token comment">// 写入数组长度</span>
- <span class="token keyword">self</span><span class="token punctuation">.</span><span class="token function">write_decimal</span><span class="token punctuation">(</span>val<span class="token punctuation">.</span><span class="token function">len</span><span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token keyword">as</span> <span class="token keyword">u64</span><span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span><span class="token operator">?</span><span class="token punctuation">;</span>
- <span class="token comment">// 遍历并写入数组中每个元素</span>
- <span class="token keyword">for</span> entry <span class="token keyword">in</span> <span class="token operator">&</span><span class="token operator">*</span><span class="token operator">*</span>val <span class="token punctuation">{</span>
- <span class="token keyword">self</span><span class="token punctuation">.</span><span class="token function">write_value</span><span class="token punctuation">(</span>entry<span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span><span class="token operator">?</span><span class="token punctuation">;</span>
- <span class="token punctuation">}</span>
- <span class="token punctuation">}</span>
- <span class="token comment">// 直接写入整个帧</span>
- _ <span class="token operator">=></span> <span class="token keyword">self</span><span class="token punctuation">.</span><span class="token function">write_value</span><span class="token punctuation">(</span>frame<span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span><span class="token operator">?</span><span class="token punctuation">,</span>
- <span class="token punctuation">}</span>
- <span class="token comment">// 保证缓冲区都被写入套接字</span>
- <span class="token keyword">self</span><span class="token punctuation">.</span>stream<span class="token punctuation">.</span><span class="token function">flush</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span>
- <span class="token punctuation">}</span>
- <span class="token comment">/// 将帧文字写入流</span>
- <span class="token keyword">async</span> <span class="token keyword">fn</span> <span class="token function-definition function">write_value</span><span class="token punctuation">(</span><span class="token operator">&</span><span class="token keyword">mut</span> <span class="token keyword">self</span><span class="token punctuation">,</span> frame<span class="token punctuation">:</span> <span class="token operator">&</span><span class="token class-name">Frame</span><span class="token punctuation">)</span> <span class="token punctuation">-></span> <span class="token namespace">io<span class="token punctuation">::</span></span><span class="token class-name">Result</span><span class="token operator"><</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token operator">></span> <span class="token punctuation">{</span>
- <span class="token comment">// 判断帧的类型,将不同的帧按照格式写入流</span>
- <span class="token keyword">match</span> frame <span class="token punctuation">{</span>
- <span class="token class-name">Frame</span><span class="token punctuation">::</span><span class="token class-name">Simple</span><span class="token punctuation">(</span>val<span class="token punctuation">)</span> <span class="token operator">=></span> <span class="token punctuation">{</span>
- <span class="token keyword">self</span><span class="token punctuation">.</span>stream<span class="token punctuation">.</span><span class="token function">write_u8</span><span class="token punctuation">(</span><span class="token char">b'+'</span><span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span><span class="token operator">?</span><span class="token punctuation">;</span>
- <span class="token keyword">self</span><span class="token punctuation">.</span>stream<span class="token punctuation">.</span><span class="token function">write_all</span><span class="token punctuation">(</span>val<span class="token punctuation">.</span><span class="token function">as_bytes</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span><span class="token operator">?</span><span class="token punctuation">;</span>
- <span class="token keyword">self</span><span class="token punctuation">.</span>stream<span class="token punctuation">.</span><span class="token function">write_all</span><span class="token punctuation">(</span><span class="token string">b"\r\n"</span><span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span><span class="token operator">?</span><span class="token punctuation">;</span>
- <span class="token punctuation">}</span>
- <span class="token class-name">Frame</span><span class="token punctuation">::</span><span class="token class-name">Error</span><span class="token punctuation">(</span>val<span class="token punctuation">)</span> <span class="token operator">=></span> <span class="token punctuation">{</span>
- <span class="token keyword">self</span><span class="token punctuation">.</span>stream<span class="token punctuation">.</span><span class="token function">write_u8</span><span class="token punctuation">(</span><span class="token char">b'-'</span><span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span><span class="token operator">?</span><span class="token punctuation">;</span>
- <span class="token keyword">self</span><span class="token punctuation">.</span>stream<span class="token punctuation">.</span><span class="token function">write_all</span><span class="token punctuation">(</span>val<span class="token punctuation">.</span><span class="token function">as_bytes</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span><span class="token operator">?</span><span class="token punctuation">;</span>
- <span class="token keyword">self</span><span class="token punctuation">.</span>stream<span class="token punctuation">.</span><span class="token function">write_all</span><span class="token punctuation">(</span><span class="token string">b"\r\n"</span><span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span><span class="token operator">?</span><span class="token punctuation">;</span>
- <span class="token punctuation">}</span>
- <span class="token class-name">Frame</span><span class="token punctuation">::</span><span class="token class-name">Integer</span><span class="token punctuation">(</span>val<span class="token punctuation">)</span> <span class="token operator">=></span> <span class="token punctuation">{</span>
- <span class="token keyword">self</span><span class="token punctuation">.</span>stream<span class="token punctuation">.</span><span class="token function">write_u8</span><span class="token punctuation">(</span><span class="token char">b':'</span><span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span><span class="token operator">?</span><span class="token punctuation">;</span>
- <span class="token keyword">self</span><span class="token punctuation">.</span><span class="token function">write_decimal</span><span class="token punctuation">(</span><span class="token operator">*</span>val<span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span><span class="token operator">?</span><span class="token punctuation">;</span>
- <span class="token punctuation">}</span>
- <span class="token class-name">Frame</span><span class="token punctuation">::</span><span class="token class-name">Null</span> <span class="token operator">=></span> <span class="token punctuation">{</span>
- <span class="token keyword">self</span><span class="token punctuation">.</span>stream<span class="token punctuation">.</span><span class="token function">write_all</span><span class="token punctuation">(</span><span class="token string">b"$-1\r\n"</span><span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span><span class="token operator">?</span><span class="token punctuation">;</span>
- <span class="token punctuation">}</span>
- <span class="token class-name">Frame</span><span class="token punctuation">::</span><span class="token class-name">Bulk</span><span class="token punctuation">(</span>val<span class="token punctuation">)</span> <span class="token operator">=></span> <span class="token punctuation">{</span>
- <span class="token keyword">let</span> len <span class="token operator">=</span> val<span class="token punctuation">.</span><span class="token function">len</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
- <span class="token keyword">self</span><span class="token punctuation">.</span>stream<span class="token punctuation">.</span><span class="token function">write_u8</span><span class="token punctuation">(</span><span class="token char">b'$'</span><span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span><span class="token operator">?</span><span class="token punctuation">;</span>
- <span class="token keyword">self</span><span class="token punctuation">.</span><span class="token function">write_decimal</span><span class="token punctuation">(</span>len <span class="token keyword">as</span> <span class="token keyword">u64</span><span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span><span class="token operator">?</span><span class="token punctuation">;</span>
- <span class="token keyword">self</span><span class="token punctuation">.</span>stream<span class="token punctuation">.</span><span class="token function">write_all</span><span class="token punctuation">(</span>val<span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span><span class="token operator">?</span><span class="token punctuation">;</span>
- <span class="token keyword">self</span><span class="token punctuation">.</span>stream<span class="token punctuation">.</span><span class="token function">write_all</span><span class="token punctuation">(</span><span class="token string">b"\r\n"</span><span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span><span class="token operator">?</span><span class="token punctuation">;</span>
- <span class="token punctuation">}</span>
- <span class="token comment">// 暂时不支持解析数组类型</span>
- <span class="token class-name">Frame</span><span class="token punctuation">::</span><span class="token class-name">Array</span><span class="token punctuation">(</span>_val<span class="token punctuation">)</span> <span class="token operator">=></span> <span class="token macro property">unreachable!</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">,</span>
- <span class="token punctuation">}</span>
- <span class="token class-name">Ok</span><span class="token punctuation">(</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
- <span class="token punctuation">}</span>
- <span class="token comment">/// 将十进制帧写入流</span>
- <span class="token keyword">async</span> <span class="token keyword">fn</span> <span class="token function-definition function">write_decimal</span><span class="token punctuation">(</span><span class="token operator">&</span><span class="token keyword">mut</span> <span class="token keyword">self</span><span class="token punctuation">,</span> val<span class="token punctuation">:</span> <span class="token keyword">u64</span><span class="token punctuation">)</span> <span class="token punctuation">-></span> <span class="token namespace">io<span class="token punctuation">::</span></span><span class="token class-name">Result</span><span class="token operator"><</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token operator">></span> <span class="token punctuation">{</span>
- <span class="token keyword">use</span> <span class="token namespace">std<span class="token punctuation">::</span>io<span class="token punctuation">::</span></span><span class="token class-name">Write</span><span class="token punctuation">;</span>
- <span class="token comment">// 将数据转换成string</span>
- <span class="token keyword">let</span> <span class="token keyword">mut</span> buf <span class="token operator">=</span> <span class="token punctuation">[</span><span class="token number">0u8</span><span class="token punctuation">;</span> <span class="token number">20</span><span class="token punctuation">]</span><span class="token punctuation">;</span>
- <span class="token keyword">let</span> <span class="token keyword">mut</span> buf <span class="token operator">=</span> <span class="token class-name">Cursor</span><span class="token punctuation">::</span><span class="token function">new</span><span class="token punctuation">(</span><span class="token operator">&</span><span class="token keyword">mut</span> buf<span class="token punctuation">[</span><span class="token punctuation">..</span><span class="token punctuation">]</span><span class="token punctuation">)</span><span class="token punctuation">;</span>
- <span class="token macro property">write!</span><span class="token punctuation">(</span><span class="token operator">&</span><span class="token keyword">mut</span> buf<span class="token punctuation">,</span> <span class="token string">"{}"</span><span class="token punctuation">,</span> val<span class="token punctuation">)</span><span class="token operator">?</span><span class="token punctuation">;</span>
- <span class="token comment">// 将数据写入流</span>
- <span class="token keyword">let</span> pos <span class="token operator">=</span> buf<span class="token punctuation">.</span><span class="token function">position</span><span class="token punctuation">(</span><span class="token punctuation">)</span> <span class="token keyword">as</span> <span class="token keyword">usize</span><span class="token punctuation">;</span>
- <span class="token keyword">self</span><span class="token punctuation">.</span>stream<span class="token punctuation">.</span><span class="token function">write_all</span><span class="token punctuation">(</span><span class="token operator">&</span>buf<span class="token punctuation">.</span><span class="token function">get_ref</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">[</span><span class="token punctuation">..</span>pos<span class="token punctuation">]</span><span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span><span class="token operator">?</span><span class="token punctuation">;</span>
- <span class="token keyword">self</span><span class="token punctuation">.</span>stream<span class="token punctuation">.</span><span class="token function">write_all</span><span class="token punctuation">(</span><span class="token string">b"\r\n"</span><span class="token punctuation">)</span><span class="token punctuation">.</span><span class="token keyword">await</span><span class="token operator">?</span><span class="token punctuation">;</span>
- <span class="token class-name">Ok</span><span class="token punctuation">(</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token punctuation">)</span>
- <span class="token punctuation">}</span>
- <span class="token punctuation">}</span>
- </code></pre></div></div> <footer class="page-edit"><!----> <!----></footer> <!----> </main></div><div class="global-ui"></div></div>
- <script src="/rust_camp_tutorial/assets/js/app.d7ab8f65.js" defer></script><script src="/rust_camp_tutorial/assets/js/2.3dc1b8de.js" defer></script><script src="/rust_camp_tutorial/assets/js/1.7f771cfb.js" defer></script><script src="/rust_camp_tutorial/assets/js/23.7f3a9620.js" defer></script>
- </body>
- </html>
|