main.rs 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394
  1. extern crate libc;
  2. extern crate syscalls;
  3. use std::{
  4. ffi::c_void,
  5. mem::{self, size_of},
  6. process,
  7. ptr::{self, NonNull},
  8. sync::atomic::{AtomicI32, Ordering},
  9. thread,
  10. time::Duration,
  11. };
  12. use syscalls::{
  13. syscall0, syscall2, syscall3, syscall6,
  14. Sysno::{futex, get_robust_list, gettid, set_robust_list},
  15. };
  16. use libc::{
  17. c_int, mmap, perror, EXIT_FAILURE, MAP_ANONYMOUS, MAP_FAILED, MAP_SHARED, PROT_READ, PROT_WRITE,
  18. };
  19. const FUTEX_WAIT: usize = 0;
  20. const FUTEX_WAKE: usize = 1;
  21. // 封装futex
  22. #[derive(Clone, Copy, Debug)]
  23. struct Futex {
  24. addr: *mut u32,
  25. }
  26. impl Futex {
  27. pub fn new(addr: *mut u32) -> Self {
  28. return Futex { addr };
  29. }
  30. pub fn get_addr(&self, offset: isize) -> *mut u32 {
  31. return unsafe { self.addr.offset(offset) };
  32. }
  33. pub fn get_val(&self, offset: isize) -> u32 {
  34. return unsafe { self.addr.offset(offset).read() };
  35. }
  36. pub fn set_val(&self, val: u32, offset: isize) {
  37. unsafe {
  38. self.addr.offset(offset).write(val);
  39. }
  40. }
  41. }
  42. unsafe impl Send for Futex {}
  43. unsafe impl Sync for Futex {}
  44. #[derive(Clone, Copy, Debug)]
  45. struct Lock {
  46. addr: *mut i32,
  47. }
  48. impl Lock {
  49. pub fn new(addr: *mut i32) -> Self {
  50. return Lock { addr };
  51. }
  52. pub fn get_val(&self, offset: isize) -> i32 {
  53. return unsafe { self.addr.offset(offset).read() };
  54. }
  55. pub fn set_val(&self, val: i32, offset: isize) {
  56. unsafe {
  57. self.addr.offset(offset).write(val);
  58. }
  59. }
  60. }
  61. unsafe impl Send for Lock {}
  62. unsafe impl Sync for Lock {}
  63. #[derive(Debug, Clone, Copy)]
  64. struct RobustList {
  65. next: *const RobustList,
  66. }
  67. #[derive(Debug, Clone, Copy)]
  68. struct RobustListHead {
  69. list: RobustList,
  70. /// 向kernel提供了要检查的futex字段的相对位置,保持用户空间的灵活性,可以自由
  71. /// 地调整其数据结构,而无需向内核硬编码任何特定的偏移量
  72. /// futexes中前面的地址是用来存入robust list中(list.next),后面是存放具体的futex val
  73. /// 这个字段的作用就是从前面的地址偏移到后面的地址中从而获取futex val
  74. #[allow(dead_code)]
  75. futex_offset: isize,
  76. /// 潜在的竞争条件:由于添加和删除列表是在获取锁之后进行的,这給线程留下了一个小窗口,在此期间可能会导致异常退出,
  77. /// 从而使锁被悬挂,为了防止这种可能性。用户空间还维护了一个简单的list_op_pending字段,允许线程在获取锁后但还未添加到
  78. /// 列表时就异常退出时进行清理。并且在完成列表添加或删除操作后将其清除
  79. /// 这里没有测试这个,在内核中实现实际上就是把list_op_pending地址进行一次唤醒(如果有等待者)
  80. #[allow(dead_code)]
  81. list_op_pending: *const RobustList,
  82. }
  83. fn error_handle(msg: &str) -> ! {
  84. unsafe { perror(msg.as_ptr() as *const i8) };
  85. process::exit(EXIT_FAILURE)
  86. }
  87. fn futex_wait(futexes: Futex, thread: &str, offset_futex: isize, lock: Lock, offset_count: isize) {
  88. loop {
  89. let atomic_count = AtomicI32::new(lock.get_val(offset_count));
  90. if atomic_count
  91. .compare_exchange(1, 0, Ordering::SeqCst, Ordering::SeqCst)
  92. .is_ok()
  93. {
  94. lock.set_val(0, offset_count);
  95. // 设置futex锁当前被哪个线程占用
  96. let tid = unsafe { syscall0(gettid).unwrap() as u32 };
  97. futexes.set_val(futexes.get_val(offset_futex) | tid, offset_futex);
  98. break;
  99. }
  100. println!("{} wating...", thread);
  101. let futex_val = futexes.get_val(offset_futex);
  102. futexes.set_val(futex_val | 0x8000_0000, offset_futex);
  103. let ret = unsafe {
  104. syscall6(
  105. futex,
  106. futexes.get_addr(offset_futex) as usize,
  107. FUTEX_WAIT,
  108. futexes.get_val(offset_futex) as usize,
  109. 0,
  110. 0,
  111. 0,
  112. )
  113. };
  114. if ret.is_err() {
  115. error_handle("futex_wait failed");
  116. }
  117. // 被唤醒后释放锁
  118. let atomic_count = AtomicI32::new(lock.get_val(offset_count));
  119. if atomic_count
  120. .compare_exchange(0, 1, Ordering::SeqCst, Ordering::SeqCst)
  121. .is_ok()
  122. {
  123. lock.set_val(1, offset_count);
  124. // 释放futex锁,不被任何线程占用
  125. futexes.set_val(futexes.get_val(offset_futex) & 0xc000_0000, offset_futex);
  126. break;
  127. }
  128. }
  129. }
  130. fn futex_wake(futexes: Futex, thread: &str, offset_futex: isize, lock: Lock, offset_count: isize) {
  131. let atomic_count = AtomicI32::new(lock.get_val(offset_count));
  132. if atomic_count
  133. .compare_exchange(0, 1, Ordering::SeqCst, Ordering::SeqCst)
  134. .is_ok()
  135. {
  136. lock.set_val(1, offset_count);
  137. // 释放futex锁,不被任何线程占用
  138. futexes.set_val(futexes.get_val(offset_futex) & 0xc000_0000, offset_futex);
  139. // 如果没有线程/进程在等这个futex,则不必唤醒, 释放改锁即可
  140. let futex_val = futexes.get_val(offset_futex);
  141. if futex_val & 0x8000_0000 == 0 {
  142. return;
  143. }
  144. futexes.set_val(futex_val & !(1 << 31), offset_futex);
  145. let ret = unsafe {
  146. syscall6(
  147. futex,
  148. futexes.get_addr(offset_futex) as usize,
  149. FUTEX_WAKE,
  150. 1,
  151. 0,
  152. 0,
  153. 0,
  154. )
  155. };
  156. if ret.is_err() {
  157. error_handle("futex wake failed");
  158. }
  159. println!("{} waked", thread);
  160. }
  161. }
  162. fn set_list(futexes: Futex) {
  163. let head = RobustListHead {
  164. list: RobustList { next: ptr::null() },
  165. futex_offset: 44,
  166. list_op_pending: ptr::null(),
  167. };
  168. let head = NonNull::from(&head).as_ptr();
  169. unsafe {
  170. // 加入第一个futex
  171. let head_ref_mut = &mut *head;
  172. head_ref_mut.list.next = futexes.get_addr(0) as *const RobustList;
  173. // 加入第二个futex
  174. let list_2 = NonNull::from(&*head_ref_mut.list.next).as_ptr();
  175. let list_2_ref_mut = &mut *list_2;
  176. list_2_ref_mut.next = futexes.get_addr(1) as *const RobustList;
  177. //println!("robust list next: {:?}", (*head).list.next );
  178. //println!("robust list next next: {:?}", (*(*head).list.next).next );
  179. // 向内核注册robust list
  180. let len = mem::size_of::<*mut RobustListHead>();
  181. let ret = syscall2(set_robust_list, head as usize, len);
  182. if ret.is_err() {
  183. println!("failed to set_robust_list, ret = {:?}", ret);
  184. }
  185. }
  186. }
  187. fn main() {
  188. test01();
  189. println!("-------------");
  190. test02();
  191. println!("-------------");
  192. }
  193. //测试set_robust_list和get_robust_list两个系统调用是否能正常使用
  194. fn test01() {
  195. // 创建robust list 头指针
  196. let head = RobustListHead {
  197. list: RobustList { next: ptr::null() },
  198. futex_offset: 8,
  199. list_op_pending: ptr::null(),
  200. };
  201. let head = NonNull::from(&head).as_ptr();
  202. let futexes = unsafe {
  203. mmap(
  204. ptr::null_mut::<c_void>(),
  205. (size_of::<c_int>() * 2) as libc::size_t,
  206. PROT_READ | PROT_WRITE,
  207. MAP_ANONYMOUS | MAP_SHARED,
  208. -1,
  209. 0,
  210. ) as *mut u32
  211. };
  212. if futexes == MAP_FAILED as *mut u32 {
  213. error_handle("futexes_addr mmap failed");
  214. }
  215. unsafe {
  216. futexes.offset(11).write(0x0000_0000);
  217. futexes.offset(12).write(0x8000_0000);
  218. println!("futex1 next addr: {:?}", futexes.offset(0));
  219. println!("futex2 next addr: {:?}", futexes.offset(1));
  220. println!("futex1 val addr: {:?}", futexes.offset(11));
  221. println!("futex2 val addr: {:?}", futexes.offset(12));
  222. println!("futex1 val: {:#x?}", futexes.offset(11).read());
  223. println!("futex2 val: {:#x?}", futexes.offset(12).read());
  224. }
  225. // 打印注册之前的robust list
  226. println!("robust list next(get behind): {:?}", &unsafe { *head });
  227. unsafe {
  228. let head_ref_mut = &mut *head;
  229. head_ref_mut.list.next = futexes.offset(0) as *const RobustList;
  230. let list_2 = NonNull::from(&*head_ref_mut.list.next).as_ptr();
  231. let list_2_ref_mut = &mut *list_2;
  232. list_2_ref_mut.next = futexes.offset(1) as *const RobustList;
  233. println!("robust list next addr: {:?}", (*head).list.next);
  234. println!(
  235. "robust list next next addr: {:?}",
  236. (*(*head).list.next).next
  237. );
  238. }
  239. unsafe {
  240. let len = mem::size_of::<*mut RobustListHead>();
  241. let ret = syscall2(set_robust_list, head as usize, len);
  242. if ret.is_err() {
  243. println!("failed to set_robust_list, ret = {:?}", ret);
  244. }
  245. }
  246. println!("get before, set after: {:?}", head);
  247. println!("get before, set after: {:?}", &unsafe { *head });
  248. unsafe {
  249. let len: usize = 0;
  250. println!("len = {}", len);
  251. let len_ptr = NonNull::from(&len).as_ptr();
  252. let ret = syscall3(get_robust_list, 0, head as usize, len_ptr as usize);
  253. println!("get len = {}", len);
  254. if ret.is_err() {
  255. println!("failed to get_robust_list, ret = {:?}", ret);
  256. }
  257. println!("futex1 val: {:#x}", futexes.offset(11).read());
  258. println!("futex2 val: {:#x}", futexes.offset(12).read());
  259. println!("robust list next: {:?}", futexes.offset(0));
  260. println!("robust list next next: {:#x?}", futexes.offset(0).read());
  261. }
  262. println!("robust list head(get after): {:?}", head);
  263. println!("robust list next(get after): {:?}", &unsafe { *head });
  264. }
  265. //测试一个线程异常退出时futex的robustness(多线程测试,目前futex还不支持多进程)
  266. fn test02() {
  267. let futexes = unsafe {
  268. mmap(
  269. ptr::null_mut::<c_void>(),
  270. (size_of::<c_int>() * 2) as libc::size_t,
  271. PROT_READ | PROT_WRITE,
  272. MAP_ANONYMOUS | MAP_SHARED,
  273. -1,
  274. 0,
  275. ) as *mut u32
  276. };
  277. if futexes == MAP_FAILED as *mut u32 {
  278. error_handle("mmap failed");
  279. }
  280. let count = unsafe {
  281. mmap(
  282. ptr::null_mut::<c_void>(),
  283. (size_of::<c_int>() * 2) as libc::size_t,
  284. PROT_READ | PROT_WRITE,
  285. MAP_ANONYMOUS | MAP_SHARED,
  286. -1,
  287. 0,
  288. ) as *mut i32
  289. };
  290. if count == MAP_FAILED as *mut i32 {
  291. error_handle("mmap failed");
  292. }
  293. unsafe {
  294. // 在这个示例中,第一段和第二段地址放入robust list,第11段地址和第12段地址存放futex val
  295. futexes.offset(11).write(0x0000_0000);
  296. futexes.offset(12).write(0x0000_0000);
  297. println!("futex1 next addr: {:?}", futexes.offset(0));
  298. println!("futex2 next addr: {:?}", futexes.offset(1));
  299. println!("futex1 val addr: {:?}", futexes.offset(11));
  300. println!("futex2 val addr: {:?}", futexes.offset(12));
  301. println!("futex1 val: {:#x?}", futexes.offset(11).read());
  302. println!("futex2 val: {:#x?}", futexes.offset(12).read());
  303. count.offset(0).write(1);
  304. count.offset(1).write(0);
  305. println!("count1 val: {:?}", count.offset(0).read());
  306. println!("count2 val: {:?}", count.offset(1).read());
  307. }
  308. let futexes = Futex::new(futexes);
  309. let locks = Lock::new(count);
  310. // tid1 = 7
  311. let thread1 = thread::spawn(move || {
  312. set_list(futexes);
  313. thread::sleep(Duration::from_secs(2));
  314. for i in 0..2 {
  315. futex_wait(futexes, "thread1", 11, locks, 0);
  316. println!("thread1 times: {}", i);
  317. thread::sleep(Duration::from_secs(3));
  318. let tid = unsafe { syscall0(gettid).unwrap() as u32 };
  319. futexes.set_val(futexes.get_val(12) | tid, 12);
  320. if i == 1 {
  321. // 让thread1异常退出,从而无法唤醒thread2,检测robustness
  322. println!("Thread1 exiting early due to simulated error.");
  323. return;
  324. }
  325. futex_wake(futexes, "thread2", 12, locks, 1);
  326. }
  327. });
  328. // tid2 = 6
  329. set_list(futexes);
  330. for i in 0..2 {
  331. futex_wait(futexes, "thread2", 12, locks, 1);
  332. println!("thread2 times: {}", i);
  333. let tid = unsafe { syscall0(gettid).unwrap() as u32 };
  334. futexes.set_val(futexes.get_val(11) | tid, 11);
  335. futex_wake(futexes, "thread1", 11, locks, 0);
  336. }
  337. thread1.join().unwrap();
  338. }