completion.c 9.6 KB


  1. #include "common/completion.h"
  2. #include "common/kthread.h"
  3. /**
  4. * @brief 初始化一个completion变量
  5. *
  6. * @param x completion
  7. */
  8. void completion_init(struct completion *x)
  9. {
  10. x->done = 0;
  11. wait_queue_head_init(&x->wait_queue);
  12. }
  13. /**
  14. * @brief 唤醒一个wait_queue中的节点
  15. *
  16. * @param x completion
  17. */
  18. void complete(struct completion *x)
  19. {
  20. spin_lock(&x->wait_queue.lock);
  21. if (x->done != COMPLETE_ALL)
  22. ++(x->done);
  23. wait_queue_wakeup_on_stack(&x->wait_queue, -1UL); // -1UL代表所有节点都满足条件,暂时这么写
  24. spin_unlock(&x->wait_queue.lock);
  25. }
  26. /**
  27. * @brief 永久标记done为Complete_All, 并从wait_queue中删除所有节点
  28. *
  29. * @param x completion
  30. */
  31. void complete_all(struct completion *x)
  32. {
  33. spin_lock(&x->wait_queue.lock);
  34. x->done = COMPLETE_ALL; // 永久赋值
  35. while (!list_empty(&x->wait_queue.wait_list))
  36. wait_queue_wakeup_on_stack(&x->wait_queue, -1UL); // -1UL代表所有节点都满足条件,暂时这么写
  37. spin_unlock(&x->wait_queue.lock);
  38. }
  39. /**
  40. * @brief 辅助函数:通用的处理wait命令的函数(即所有wait_for_completion函数最核心部分在这里)
  41. *
  42. * @param x completion
  43. * @param action 函数指针
  44. * @param timeout 一个非负整数
  45. * @param state 你要设置进程的状态为state
  46. * @return long - 返回剩余的timeout
  47. */
  48. static long __wait_for_common(struct completion *x, long (*action)(long), long timeout, int state)
  49. {
  50. if (!x->done)
  51. {
  52. DECLARE_WAIT_ON_STACK_SELF(wait);
  53. while (!x->done && timeout > 0)
  54. {
  55. // 加入等待队列, 但是不会调度走
  56. if (list_empty(&wait.wait_list))
  57. list_append(&x->wait_queue.wait_list, &wait.wait_list);
  58. wait.pcb->state = state; // 清除运行位, 并设置为interuptible/uninteruptible
  59. spin_unlock(&x->wait_queue.lock);
  60. timeout = action(timeout);
  61. spin_lock(&x->wait_queue.lock);
  62. }
  63. if (!x->done)
  64. return timeout; // 仍然没有complete, 但是被其他进程唤醒
  65. wait.pcb->state = PROC_RUNNING; // 设置为运行, 并清空state, 所以使用等号赋值
  66. if (!list_empty(&wait.wait_list))
  67. list_del_init(&wait.wait_list); // 必须使用del_init
  68. }
  69. if (x->done != COMPLETE_ALL)
  70. --(x->done);
  71. return timeout ? timeout : 1; // 这里linux返回1,不知道为啥
  72. }
  73. /**
  74. * @brief 等待completion命令唤醒进程, 同时设置pcb->state为uninteruptible.
  75. *
  76. * @param x completion
  77. */
  78. void wait_for_completion(struct completion *x)
  79. {
  80. spin_lock(&x->wait_queue.lock);
  81. __wait_for_common(x, &schedule_timeout_ms, MAX_TIMEOUT, PROC_UNINTERRUPTIBLE);
  82. spin_unlock(&x->wait_queue.lock);
  83. }
  84. /**
  85. * @brief 等待指定时间,超时后就返回, 同时设置pcb->state为uninteruptible.
  86. *
  87. * @param x completion
  88. * @param timeout 非负整数,等待指定时间,超时后就返回/ 或者提前done,则返回剩余timeout时间
  89. * @return long - 返回剩余的timeout
  90. */
  91. long wait_for_completion_timeout(struct completion *x, long timeout)
  92. {
  93. BUG_ON(timeout < 0);
  94. spin_lock(&x->wait_queue.lock);
  95. timeout = __wait_for_common(x, &schedule_timeout_ms, timeout, PROC_UNINTERRUPTIBLE);
  96. spin_unlock(&x->wait_queue.lock);
  97. return timeout;
  98. }
  99. /**
  100. * @brief 等待completion的完成,但是可以被中断(我也不太懂可以被中断是什么意思,就是pcb->state=interuptible)
  101. *
  102. * @param x completion
  103. */
  104. void wait_for_completion_interruptible(struct completion *x)
  105. {
  106. spin_lock(&x->wait_queue.lock);
  107. __wait_for_common(x, &schedule_timeout_ms, MAX_TIMEOUT, PROC_INTERRUPTIBLE);
  108. spin_unlock(&x->wait_queue.lock);
  109. }
  110. /**
  111. * @brief 等待指定时间,超时后就返回, 等待completion的完成,但是可以被中断.
  112. *
  113. * @param x completion
  114. * @param timeout 非负整数,等待指定时间,超时后就返回/ 或者提前done,则返回剩余timeout时间
  115. * @return long - 返回剩余的timeout
  116. */
  117. long wait_for_completion_interruptible_timeout(struct completion *x, long timeout)
  118. {
  119. BUG_ON(timeout < 0);
  120. spin_lock(&x->wait_queue.lock);
  121. timeout = __wait_for_common(x, &schedule_timeout_ms, timeout, PROC_INTERRUPTIBLE);
  122. spin_unlock(&x->wait_queue.lock);
  123. return timeout;
  124. }
  125. /**
  126. * @brief 尝试获取completion的一个done!如果您在wait之前加上这个函数作为判断,说不定会加快运行速度。
  127. *
  128. * @param x completion
  129. * @return true - 表示不需要wait_for_completion,并且已经获取到了一个completion(即返回true意味着done已经被 减1 ) \
  130. * @return false - 表示当前done=0,您需要进入等待,即wait_for_completion
  131. */
  132. bool try_wait_for_completion(struct completion *x)
  133. {
  134. if (!READ_ONCE(x->done))
  135. return false;
  136. bool ret = true;
  137. spin_lock(&x->wait_queue.lock);
  138. if (!x->done)
  139. ret = false;
  140. else if (x->done != COMPLETE_ALL)
  141. --(x->done);
  142. spin_unlock(&x->wait_queue.lock);
  143. return ret;
  144. }
  145. /**
  146. * @brief 测试一个completion是否有waiter。(即done是不是等于0)
  147. *
  148. * @param x completion
  149. * @return true
  150. * @return false
  151. */
  152. bool completion_done(struct completion *x)
  153. {
  154. if (!READ_ONCE(x->done))
  155. return false;
  156. // 这里的意义是: 如果是多线程的情况下,您有可能需要等待另一个进程的complete操作, 才算真正意义上的completed!
  157. spin_lock(&x->wait_queue.lock);
  158. if (!READ_ONCE(x->done))
  159. {
  160. spin_unlock(&x->wait_queue.lock);
  161. return false;
  162. }
  163. spin_unlock(&x->wait_queue.lock);
  164. return true;
  165. }
  166. /**
  167. * @brief 对completion数组进行wait操作
  168. *
  169. * @param x completion array
  170. * @param n len of the array
  171. */
  172. void wait_for_multicompletion(struct completion x[], int n)
  173. {
  174. for (int i = 0; i < n; i++) // 对每一个completion都等一遍
  175. {
  176. if (!completion_done(&x[i])) // 如果没有done,直接wait
  177. {
  178. wait_for_completion(&x[i]);
  179. }
  180. else if (!try_wait_for_completion(&x[i])) //上面测试过done>0,那么这里尝试去获取一个done,如果失败了,就继续wait
  181. {
  182. wait_for_completion(&x[i]);
  183. }
  184. }
  185. }
  186. /**
  187. * @brief 等待者, 等待wait_for_completion
  188. *
  189. * @param one_to_one
  190. * @param one_to_many
  191. * @param many_to_one
  192. */
  193. int __test_completion_waiter(void *input_data)
  194. {
  195. struct __test_data *data = (struct __test_data *)input_data;
  196. // kdebug("THE %d WAITER BEGIN", -data->id);
  197. // 测试一对多能不能实现等待 - 由外部统一放闸一起跑
  198. if (!try_wait_for_completion(data->one_to_many))
  199. {
  200. wait_for_completion(data->one_to_many);
  201. }
  202. // 测试一对一能不能实现等待
  203. if (!try_wait_for_completion(data->one_to_many))
  204. {
  205. wait_for_completion(data->one_to_many);
  206. }
  207. // 完成上面两个等待, 执行complete声明自己已经完成
  208. complete(data->many_to_one);
  209. // kdebug("THE %d WAITER SOLVED", -data->id);
  210. return true;
  211. }
  212. /**
  213. * @brief 执行者,执行complete
  214. *
  215. * @param one_to_one
  216. * @param one_to_many
  217. * @param many_to_one
  218. */
  219. int __test_completion_worker(void *input_data)
  220. {
  221. struct __test_data *data = (struct __test_data *)input_data;
  222. // kdebug("THE %d WORKER BEGIN", data->id);
  223. // 测试一对多能不能实现等待 - 由外部统一放闸一起跑
  224. if (!try_wait_for_completion(data->one_to_many))
  225. {
  226. wait_for_completion(data->one_to_many);
  227. }
  228. schedule_timeout_ms(50);
  229. // for(uint64_t i=0;i<1e7;++i)
  230. // pause();
  231. complete(data->one_to_one);
  232. // 完成上面两个等待, 执行complete声明自己已经完成
  233. complete(data->many_to_one);
  234. // kdebug("THE %d WORKER SOLVED", data->id);
  235. return true;
  236. }
  237. /**
  238. * @brief 测试函数
  239. *
  240. */
  241. void __test_completion()
  242. {
  243. // kdebug("BEGIN COMPLETION TEST");
  244. const int N = 100;
  245. struct completion *one_to_one = kzalloc(sizeof(struct completion) * N, 0);
  246. struct completion *one_to_many = kzalloc(sizeof(struct completion), 0);
  247. struct completion *waiter_many_to_one = kzalloc(sizeof(struct completion) * N, 0);
  248. struct completion *worker_many_to_one = kzalloc(sizeof(struct completion) * N, 0);
  249. struct __test_data *waiter_data = kzalloc(sizeof(struct __test_data) * N, 0);
  250. struct __test_data *worker_data = kzalloc(sizeof(struct __test_data) * N, 0);
  251. completion_init(one_to_many);
  252. for (int i = 0; i < N; i++)
  253. {
  254. completion_init(&one_to_one[i]);
  255. completion_init(&waiter_many_to_one[i]);
  256. completion_init(&worker_many_to_one[i]);
  257. }
  258. for (int i = 0; i < N; i++)
  259. {
  260. waiter_data[i].id = -i; // waiter
  261. waiter_data[i].many_to_one = &waiter_many_to_one[i];
  262. waiter_data[i].one_to_one = &one_to_one[i];
  263. waiter_data[i].one_to_many = one_to_many;
  264. kthread_run(__test_completion_waiter, &waiter_data[i], "the %dth waiter", i);
  265. }
  266. for (int i = 0; i < N; i++)
  267. {
  268. worker_data[i].id = i; // worker
  269. worker_data[i].many_to_one = &worker_many_to_one[i];
  270. worker_data[i].one_to_one = &one_to_one[i];
  271. worker_data[i].one_to_many = one_to_many;
  272. kthread_run(__test_completion_worker, &worker_data[i], "the %dth worker", i);
  273. }
  274. complete_all(one_to_many);
  275. // kdebug("all of the waiters and workers begin running");
  276. // kdebug("BEGIN COUNTING");
  277. wait_for_multicompletion(waiter_many_to_one, N);
  278. wait_for_multicompletion(worker_many_to_one, N);
  279. // kdebug("all of the waiters and workers complete");
  280. kfree(one_to_one);
  281. kfree(one_to_many);
  282. kfree(waiter_many_to_one);
  283. kfree(worker_many_to_one);
  284. kfree(waiter_data);
  285. kfree(worker_data);
  286. // kdebug("completion test done.");
  287. }