Преглед изворни кода

feat(main): 引入多线程读取 ctl 命名管道,用信号机制改写 service 和 task 的状态检查 (#47)

* feat: 多线程读取ctl命名管道,信号机制改写service和task的状态检查
Val213 пре 4 месеци
родитељ
комит
32c2329fca
6 измењених фајлова са 131 додато и 88 уклоњено
  1. 2 1
      Cargo.toml
  2. 10 5
      src/main.rs
  3. 77 63
      src/manager/mod.rs
  4. 26 18
      src/systemctl/listener/mod.rs
  5. 1 1
      src/unit/mod.rs
  6. 15 0
      src/unit/signal.rs

+ 2 - 1
Cargo.toml

@@ -14,12 +14,13 @@ path = "systemctl/src/main.rs"
 # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
 
 [dependencies]
+nix = "0.23"
 hashbrown = "0.11"
 cfg-if = { version = "1.0" }
 lazy_static = { version = "1.4.0" }
 libc = "0.2"
 humantime = "2.1"
-
+tokio = { version = "1.25", features = ["full"] }
 [profile.release]
 panic = 'abort'
 

+ 10 - 5
src/main.rs

@@ -7,13 +7,13 @@ mod systemctl;
 mod task;
 mod time;
 mod unit;
-
+use crate::executor::Executor;
 use error::ErrorFormat;
 use manager::{timer_manager::TimerManager, Manager};
 use parse::UnitParser;
+use std::thread;
 use systemctl::listener::Systemctl;
-
-use crate::executor::Executor;
+use unit::signal::init_signal_handler;
 
 pub struct FileDescriptor(usize);
 
@@ -54,6 +54,13 @@ fn main() {
         println!("Parse {} success!", path);
     }
 
+    // 初始化信号处理程序
+    init_signal_handler();
+    // 监听systemctl
+    thread::spawn(move || {
+        Systemctl::ctl_listen();
+    });
+
     // 启动完服务后进入主循环
     loop {
         // 检查各服务运行状态
@@ -62,7 +69,5 @@ fn main() {
         Manager::check_cmd_proc();
         // 检查计时器任务
         TimerManager::check_timer();
-        // 监听systemctl
-        Systemctl::ctl_listen();
     }
 }

+ 77 - 63
src/manager/mod.rs

@@ -7,92 +7,106 @@ pub use unit_manager::*;
 use crate::executor::ExitStatus;
 
 use self::timer_manager::TimerManager;
-
+use crate::unit::signal::SIGCHILD_SIGNAL_RECEIVED;
+use nix::sys::wait::{waitpid, WaitPidFlag, WaitStatus};
+use nix::unistd::Pid;
+use std::sync::atomic::Ordering;
 pub struct Manager;
 
 impl Manager {
-    /// ## 检查当前DragonReach运行的项目状态,并对其分发处理
+    /// ## 检查当前 DragonReach 运行的项目状态,并对其分发处理
     pub fn check_running_status() {
-        // 检查正在运行的Unit
-        let mut running_manager = RUNNING_TABLE.write().unwrap();
-        let mut exited_unit: Vec<(usize, ExitStatus)> = Vec::new();
-        for unit in running_manager.mut_running_table() {
-            let proc = unit.1;
-            match proc.try_wait() {
-                //进程正常退出
-                Ok(Some(status)) => {
-                    exited_unit.push((
-                        *unit.0,
-                        ExitStatus::from_exit_code(status.code().unwrap_or(0)),
-                    ));
-                }
-                //进程错误退出(或启动失败)
-                Err(e) => {
-                    eprintln!("unit error: {}", e);
-
-                    //test
-                    exited_unit.push((*unit.0, ExitStatus::from_exit_code(!0)));
+        if SIGCHILD_SIGNAL_RECEIVED
+            .compare_exchange(true, false, Ordering::SeqCst, Ordering::SeqCst)
+            .is_ok()
+        {
+            let mut exited_unit: Vec<(usize, ExitStatus)> = Vec::new();
+            let mut running_manager = RUNNING_TABLE.write().unwrap();
+            // 检查所有运行中的 Unit
+            for unit in running_manager.mut_running_table() {
+                let pid = Pid::from_raw(unit.1.id() as i32);
+                // 检查 Unit 的运行状态
+                match waitpid(Some(pid), Some(WaitPidFlag::WNOHANG)) {
+                    // 若 Unit 为正常退出,则将其加入退出列表
+                    Ok(WaitStatus::Exited(_, status)) => {
+                        exited_unit.push((*unit.0, ExitStatus::from_exit_code(status)));
+                    }
+                    // 若 Unit 为被信号终止,则将其加入退出列表,并输出日志
+                    Ok(WaitStatus::Signaled(_, signal, _)) => {
+                        eprintln!("unit terminated by signal: {}", signal);
+                        exited_unit.push((*unit.0, ExitStatus::from_exit_code(!0)));
+                    }
+                    // 其他错误情况
+                    Err(_) => {
+                        eprintln!("unit waitpid error");
+                    }
+                    // 若 Unit 正常运行,则不做处理
+                    Ok(_) => {}
                 }
-                //进程处于正常运行状态
-                _ => {}
             }
-        }
-        //释放锁,以便后续删除操作能拿到锁
-        drop(running_manager);
 
-        // 处理退出的Unit
-        for tmp in exited_unit {
-            // 从运行表中擦除该unit
-            UnitManager::remove_running(tmp.0);
+            drop(running_manager);
 
-            // 取消该任务的定时器任务
-            TimerManager::cancel_timer(tmp.0);
+            // 处理退出的 Unit
+            for tmp in exited_unit {
+                // 将该任务从运行表中移除
+                UnitManager::remove_running(tmp.0);
 
-            let _ = UnitManager::get_unit_with_id(&tmp.0)
-                .unwrap()
-                .lock()
-                .unwrap()
-                .exit(); //交付给相应类型的Unit类型去执行退出后的逻辑
+                // 取消该任务的定时器任务
+                TimerManager::cancel_timer(tmp.0);
 
-            TimerManager::update_next_trigger(tmp.0, false); //更新所有归属于此unit的计时器
+                // 交付处理子进程退出逻辑
+                let _ = UnitManager::get_unit_with_id(&tmp.0)
+                    .unwrap()
+                    .lock()
+                    .unwrap()
+                    .exit();
 
-            // 交付处理子进程退出逻辑
-            let unit = UnitManager::get_unit_with_id(&tmp.0).unwrap();
-            unit.lock().unwrap().after_exit(tmp.1);
-        }
+                // 更新属于该 Unit 的定时器任务
+                TimerManager::update_next_trigger(tmp.0, false);
 
-        // 若无运行中任务,则取出IDLE任务运行
-        if UnitManager::running_count() == 0 {
-            let unit = UnitManager::pop_a_idle_service();
-            match unit {
-                Some(unit) => {
+                // 交付处理子进程退出后逻辑
+                let unit = UnitManager::get_unit_with_id(&tmp.0).unwrap();
+                unit.lock().unwrap().after_exit(tmp.1);
+            }
+            // 若无运行中任务,则取出 IDLE 任务运行
+            if UnitManager::running_count() == 0 {
+                if let Some(unit) = UnitManager::pop_a_idle_service() {
                     let _ = unit.lock().unwrap().run();
                 }
-                None => {}
             }
         }
     }
 
     /// ## 检查当前所有cmd进程的运行状态
     pub fn check_cmd_proc() {
-        let mut exited = Vec::new();
-        let mut table = CMD_PROCESS_TABLE.write().unwrap();
-        for tuple in table.iter_mut() {
-            let mut proc = tuple.1.lock().unwrap();
-            match proc.try_wait() {
-                // 正常运行
-                Ok(None) => {}
-                // 停止运行,从表中删除数据
-                _ => {
-                    // TODO: 应该添加错误处理,有一些命令执行失败会影响服务正常运行
-                    // 后续应该添加机制来执行服务相关命令启动失败的回调
-                    exited.push(*tuple.0);
+        if SIGCHILD_SIGNAL_RECEIVED
+            .compare_exchange(true, false, Ordering::SeqCst, Ordering::SeqCst)
+            .is_ok()
+        {
+            let mut exited = Vec::new();
+            let mut table = CMD_PROCESS_TABLE.write().unwrap();
+
+            for tuple in table.iter_mut() {
+                let pid = Pid::from_raw(tuple.1.lock().unwrap().id() as i32);
+                match waitpid(Some(pid), Some(WaitPidFlag::WNOHANG)) {
+                    // 若 cmd 停止运行,则将其加入退出列表
+                    Ok(WaitStatus::Exited(_, _)) | Ok(WaitStatus::Signaled(_, _, _)) => {
+                        eprintln!("cmd exited");
+                        exited.push(*tuple.0);
+                    }
+                    Ok(_) => {}
+                    Err(_) => {
+                        // TODO: 应该添加错误处理,有一些命令执行失败会影响服务正常运行
+                        // 后续应该添加机制来执行服务相关命令启动失败的回调
+                        eprintln!("cmd waitpid error");
+                    }
                 }
             }
-        }
 
-        for id in exited {
-            table.remove(&id);
+            for id in exited {
+                table.remove(&id);
+            }
         }
     }
 }

+ 26 - 18
src/systemctl/listener/mod.rs

@@ -1,15 +1,14 @@
+use super::ctl_parser::{CommandOperation, CtlParser, Pattern};
+use super::{ctl_path, DRAGON_REACH_CTL_PIPE};
+use crate::error::ErrorFormat;
+use crate::manager::ctl_manager::CtlManager;
+use lazy_static::lazy_static;
 use std::fs::{self, File};
 use std::io::Read;
 use std::os::fd::FromRawFd;
 use std::sync::{Arc, Mutex};
-
-use lazy_static::lazy_static;
-
-use crate::error::ErrorFormat;
-use crate::manager::ctl_manager::CtlManager;
-
-use super::ctl_parser::{CommandOperation, CtlParser, Pattern};
-use super::{ctl_path, DRAGON_REACH_CTL_PIPE};
+use std::thread;
+use std::time::Duration;
 
 lazy_static! {
     static ref CTL_READER: Mutex<Arc<File>> = {
@@ -17,7 +16,7 @@ lazy_static! {
         Mutex::new(Arc::new(file))
     };
 }
-
+#[derive(Debug)]
 pub struct Command {
     pub(crate) operation: CommandOperation,
     pub(crate) args: Option<Vec<String>>,
@@ -68,18 +67,27 @@ impl Systemctl {
     /// 持续从系统服务控制管道中读取命令。
     ///
     pub fn ctl_listen() {
+        println!("ctl listen");
         let mut guard = CTL_READER.lock().unwrap();
         let mut s = String::new();
-        if let Ok(size) = guard.read_to_string(&mut s) {
-            if size == 0 {
-                return;
-            }
-            match CtlParser::parse_ctl(&s) {
-                Ok(cmd) => {
-                    let _ = CtlManager::exec_ctl(cmd);
+        loop {
+            s.clear();
+            match guard.read_to_string(&mut s) {
+                Ok(size) if size > 0 => match CtlParser::parse_ctl(&s) {
+                    Ok(cmd) => {
+                        let _ = CtlManager::exec_ctl(cmd);
+                    }
+                    Err(e) => {
+                        eprintln!("Failed to parse command: {}", e.error_format());
+                    }
+                },
+                Ok(_) => {
+                    // 如果读取到的大小为0,说明没有数据可读,适当休眠
+                    thread::sleep(Duration::from_millis(100));
                 }
-                Err(err) => {
-                    eprintln!("parse tcl command error: {}", err.error_format());
+                Err(e) => {
+                    eprintln!("Failed to read from pipe: {}", e);
+                    break;
                 }
             }
         }

+ 1 - 1
src/unit/mod.rs

@@ -12,9 +12,9 @@ use crate::parse::parse_util::UnitParseUtil;
 use crate::parse::Segment;
 
 pub mod service;
+pub mod signal;
 pub mod target;
 pub mod timer;
-
 use self::target::TargetUnit;
 
 pub fn generate_unit_id() -> usize {

+ 15 - 0
src/unit/signal.rs

@@ -0,0 +1,15 @@
+use nix::sys::signal::{self, SigHandler, Signal};
+use std::sync::atomic::{AtomicBool, Ordering};
+
+pub static SIGCHILD_SIGNAL_RECEIVED: AtomicBool = AtomicBool::new(false);
+
+extern "C" fn handle_sigchld(_: libc::c_int) {
+    SIGCHILD_SIGNAL_RECEIVED.store(true, Ordering::SeqCst);
+}
+
+pub fn init_signal_handler() {
+    unsafe {
+        signal::signal(Signal::SIGCHLD, SigHandler::Handler(handle_sigchld))
+            .expect("Error setting SIGUSR1 handler");
+    }
+}