30 #ifndef BM_BM_SIM_QUEUEING_H_
31 #define BM_BM_SIM_QUEUEING_H_
35 #include <condition_variable>
40 #include <unordered_map>
82 template <
typename T,
typename FMap>
84 using MutexType = std::mutex;
85 using LockType = std::unique_lock<MutexType>;
96 : nb_workers(nb_workers),
98 workers_info(nb_workers),
99 map_to_worker(std::move(map_to_worker)) { }
104 size_t worker_id = map_to_worker(queue_id);
105 LockType lock(mutex);
106 auto &q_info = get_queue(queue_id);
107 auto &w_info = workers_info.at(worker_id);
108 while (q_info.size >= q_info.capacity) {
109 q_info.q_not_full.wait(lock);
111 w_info.queue.emplace_front(item, queue_id);
113 w_info.q_not_empty.notify_one();
118 size_t worker_id = map_to_worker(queue_id);
119 LockType lock(mutex);
120 auto &q_info = get_queue(queue_id);
121 auto &w_info = workers_info.at(worker_id);
122 while (q_info.size >= q_info.capacity) {
123 q_info.q_not_full.wait(lock);
125 w_info.queue.emplace_front(std::move(item), queue_id);
127 w_info.q_not_empty.notify_one();
137 void pop_back(
size_t worker_id,
size_t *queue_id, T *pItem) {
138 LockType lock(mutex);
139 auto &w_info = workers_info.at(worker_id);
140 auto &queue = w_info.queue;
141 while (queue.size() == 0) {
142 w_info.q_not_empty.wait(lock);
144 *queue_id = queue.back().queue_id;
145 *pItem = std::move(queue.back().e);
147 auto &q_info = get_queue_or_throw(*queue_id);
149 q_info.q_not_full.notify_one();
153 size_t size(
size_t queue_id)
const {
154 LockType lock(mutex);
155 auto it = queues_info.find(queue_id);
156 if (it == queues_info.end())
return 0;
157 auto &q_info = it->second;
164 LockType lock(mutex);
165 auto &q_info = get_queue(queue_id);
171 LockType lock(mutex);
172 for (
auto &p : queues_info) p.second.capacity = c;
188 QE(T e,
size_t queue_id)
189 : e(std::move(e)), queue_id(queue_id) { }
195 using MyQ = std::deque<QE>;
198 explicit QueueInfo(
size_t capacity)
199 : capacity(capacity) { }
203 mutable std::condition_variable q_not_full{};
208 mutable std::condition_variable q_not_empty{};
211 QueueInfo &get_queue(
size_t queue_id) {
212 auto it = queues_info.find(queue_id);
213 if (it != queues_info.end())
return it->second;
216 auto p = queues_info.emplace(
217 std::piecewise_construct,
218 std::forward_as_tuple(queue_id),
219 std::forward_as_tuple(capacity));
220 return p.first->second;
223 const QueueInfo &get_queue_or_throw(
size_t queue_id)
const {
224 return queues_info.at(queue_id);
227 QueueInfo &get_queue_or_throw(
size_t queue_id) {
228 return queues_info.at(queue_id);
231 mutable MutexType mutex{};
234 std::unordered_map<size_t, QueueInfo> queues_info;
235 std::vector<WorkerInfo> workers_info;
251 template <
typename T,
typename FMap>
253 using MutexType = std::mutex;
254 using LockType = std::unique_lock<MutexType>;
262 : nb_workers(nb_workers),
264 workers_info(nb_workers),
265 map_to_worker(std::move(map_to_worker)) { }
271 size_t worker_id = map_to_worker(queue_id);
272 LockType lock(mutex);
273 auto &q_info = get_queue(queue_id);
274 auto &w_info = workers_info.at(worker_id);
275 if (q_info.size >= q_info.capacity)
return 0;
276 q_info.last_sent = get_next_tp(q_info);
277 w_info.queue.emplace(
278 item, queue_id, q_info.last_sent, w_info.wrapping_counter++);
280 w_info.q_not_empty.notify_one();
287 size_t worker_id = map_to_worker(queue_id);
288 LockType lock(mutex);
289 auto &q_info = get_queue(queue_id);
290 auto &w_info = workers_info.at(worker_id);
291 if (q_info.size >= q_info.capacity)
return 0;
292 q_info.last_sent = get_next_tp(q_info);
293 w_info.queue.emplace(
294 std::move(item), queue_id, q_info.last_sent, w_info.wrapping_counter++);
296 w_info.q_not_empty.notify_one();
305 void pop_back(
size_t worker_id,
size_t *queue_id, T *pItem) {
306 LockType lock(mutex);
307 auto &w_info = workers_info.at(worker_id);
308 auto &queue = w_info.queue;
310 if (queue.size() == 0) {
311 w_info.q_not_empty.wait(lock);
313 if (queue.top().send <= clock::now())
break;
314 w_info.q_not_empty.wait_until(lock, queue.top().send);
317 *queue_id = queue.top().queue_id;
320 *pItem = std::move(
const_cast<QE &
>(queue.top()).e);
322 auto &q_info = get_queue_or_throw(*queue_id);
327 size_t size(
size_t queue_id)
const {
328 LockType lock(mutex);
329 auto it = queues_info.find(queue_id);
330 if (it == queues_info.end())
return 0;
331 auto &q_info = it->second;
338 LockType lock(mutex);
339 auto &q_info = get_queue(queue_id);
345 LockType lock(mutex);
346 for (
auto &p : queues_info) p.second.capacity = c;
356 LockType lock(mutex);
357 auto &q_info = get_queue(queue_id);
358 q_info.queue_rate_pps = pps;
359 q_info.pkt_delay_ticks = rate_to_ticks(pps);
364 using std::chrono::duration;
365 using std::chrono::duration_cast;
366 LockType lock(mutex);
367 for (
auto &p : queues_info) {
368 auto &q_info = p.second;
369 q_info.queue_rate_pps = pps;
370 q_info.pkt_delay_ticks = rate_to_ticks(pps);
372 queue_rate_pps = pps;
386 using ticks = std::chrono::nanoseconds;
389 using clock = std::chrono::high_resolution_clock;
391 static constexpr ticks rate_to_ticks(uint64_t pps) {
392 using std::chrono::duration;
393 using std::chrono::duration_cast;
395 ticks(0) : duration_cast<ticks>(duration<double>(1. / pps));
401 QE(T e,
size_t queue_id,
const clock::time_point &send,
size_t id)
402 : e(std::move(e)), queue_id(queue_id), send(send), id(id) { }
406 clock::time_point send;
411 bool operator()(
const QE &lhs,
const QE &rhs)
const {
416 return (lhs.send == rhs.send) ? lhs.id > rhs.id : lhs.send > rhs.send;
421 using MyQ = std::priority_queue<QE, std::deque<QE>, QEComp>;
425 QueueInfo(
size_t capacity, uint64_t queue_rate_pps)
426 : capacity(capacity),
427 queue_rate_pps(queue_rate_pps),
428 pkt_delay_ticks(rate_to_ticks(queue_rate_pps)),
429 last_sent(clock::now()) { }
433 uint64_t queue_rate_pps;
434 ticks pkt_delay_ticks;
435 clock::time_point last_sent;
440 mutable std::condition_variable q_not_empty{};
441 size_t wrapping_counter{0};
444 QueueInfo &get_queue(
size_t queue_id) {
445 auto it = queues_info.find(queue_id);
446 if (it != queues_info.end())
return it->second;
447 auto p = queues_info.emplace(queue_id, QueueInfo(capacity, queue_rate_pps));
448 return p.first->second;
451 const QueueInfo &get_queue_or_throw(
size_t queue_id)
const {
452 return queues_info.at(queue_id);
455 QueueInfo &get_queue_or_throw(
size_t queue_id) {
456 return queues_info.at(queue_id);
459 clock::time_point get_next_tp(
const QueueInfo &q_info) {
460 return std::max(clock::now(), q_info.last_sent + q_info.pkt_delay_ticks);
463 mutable MutexType mutex{};
466 uint64_t queue_rate_pps{0};
467 std::unordered_map<size_t, QueueInfo> queues_info;
468 std::vector<WorkerInfo> workers_info;
488 template <
typename T,
typename FMap>
490 using MutexType = std::mutex;
491 using LockType = std::unique_lock<MutexType>;
501 FMap map_to_worker,
size_t nb_priorities = 2)
502 : nb_workers(nb_workers),
504 workers_info(nb_workers),
505 map_to_worker(std::move(map_to_worker)),
506 nb_priorities(nb_priorities) { }
513 int push_front(
size_t queue_id,
size_t priority,
const T &item) {
514 size_t worker_id = map_to_worker(queue_id);
515 LockType lock(mutex);
516 auto &q_info = get_queue(queue_id);
517 auto &w_info = workers_info.at(worker_id);
518 auto &q_info_pri = q_info.at(priority);
519 if (q_info_pri.size >= q_info_pri.capacity)
return 0;
520 q_info_pri.last_sent = get_next_tp(q_info_pri);
521 w_info.queues[priority].emplace(
522 item, queue_id, q_info_pri.last_sent, w_info.wrapping_counter++);
526 w_info.q_not_empty.notify_one();
530 int push_front(
size_t queue_id,
const T &item) {
537 size_t worker_id = map_to_worker(queue_id);
538 LockType lock(mutex);
539 auto &q_info = get_queue(queue_id);
540 auto &w_info = workers_info.at(worker_id);
541 auto &q_info_pri = q_info.at(priority);
542 if (q_info_pri.size >= q_info_pri.capacity)
return 0;
543 q_info_pri.last_sent = get_next_tp(q_info_pri);
544 w_info.queues[priority].emplace(
547 q_info_pri.last_sent,
548 w_info.wrapping_counter++);
552 w_info.q_not_empty.notify_one();
557 return push_front(queue_id, 0, std::move(item));
569 void pop_back(
size_t worker_id,
size_t *queue_id,
size_t *priority,
571 LockType lock(mutex);
572 auto &w_info = workers_info.at(worker_id);
573 MyQ *queue =
nullptr;
576 if (w_info.size == 0) {
577 w_info.q_not_empty.wait(lock);
579 auto now = clock::now();
580 auto next = clock::time_point::max();
582 for (pri = nb_priorities ; pri-- > 0;) {
583 auto &q = w_info.queues[pri];
584 if (q.size() == 0)
continue;
585 if (q.top().send <= now) {
589 next = std::min(next, q.top().send);
592 w_info.q_not_empty.wait_until(lock, next);
595 *queue_id = queue->top().queue_id;
599 *pItem = std::move(
const_cast<QE &
>(queue->top()).e);
601 auto &q_info = get_queue_or_throw(*queue_id);
602 auto &q_info_pri = q_info.at(*priority);
611 void pop_back(
size_t worker_id,
size_t *queue_id, T *pItem) {
613 return pop_back(worker_id, queue_id, &priority, pItem);
619 size_t size(
size_t queue_id)
const {
620 LockType lock(mutex);
621 auto it = queues_info.find(queue_id);
622 if (it == queues_info.end())
return 0;
623 auto &q_info = it->second;
629 size_t size(
size_t queue_id,
size_t priority)
const {
630 LockType lock(mutex);
631 auto it = queues_info.find(queue_id);
632 if (it == queues_info.end())
return 0;
633 auto &q_info = it->second;
634 auto &q_info_pri = q_info.at(priority);
635 return q_info_pri.size;
641 LockType lock(mutex);
642 for_each_q(queue_id, SetCapacityFn(c));
648 LockType lock(mutex);
649 for_one_q(queue_id, priority, SetCapacityFn(c));
655 LockType lock(mutex);
656 for (
auto &p : queues_info) for_each_q(p.first, SetCapacityFn(c));
666 LockType lock(mutex);
667 for_each_q(queue_id, SetRateFn(pps));
672 void set_rate(
size_t queue_id,
size_t priority, uint64_t pps) {
673 LockType lock(mutex);
674 for_one_q(queue_id, priority, SetRateFn(pps));
679 LockType lock(mutex);
680 for (
auto &p : queues_info) for_each_q(p.first, SetRateFn(pps));
681 queue_rate_pps = pps;
695 using ticks = std::chrono::nanoseconds;
700 using clock = std::chrono::high_resolution_clock;
702 static constexpr ticks rate_to_ticks(uint64_t pps) {
703 using std::chrono::duration;
704 using std::chrono::duration_cast;
706 ticks(0) : duration_cast<ticks>(duration<double>(1. / pps));
710 QE(T e,
size_t queue_id,
const clock::time_point &send,
size_t id)
711 : e(std::move(e)), queue_id(queue_id), send(send), id(id) { }
715 clock::time_point send;
720 bool operator()(
const QE &lhs,
const QE &rhs)
const {
721 return (lhs.send == rhs.send) ? lhs.id > rhs.id : lhs.send > rhs.send;
725 using MyQ = std::priority_queue<QE, std::deque<QE>, QEComp>;
727 struct QueueInfoPri {
728 QueueInfoPri(
size_t capacity, uint64_t queue_rate_pps)
729 : capacity(capacity),
730 queue_rate_pps(queue_rate_pps),
731 pkt_delay_ticks(rate_to_ticks(queue_rate_pps)),
732 last_sent(clock::now()) { }
736 uint64_t queue_rate_pps;
737 ticks pkt_delay_ticks;
738 clock::time_point last_sent;
741 struct QueueInfo :
public std::vector<QueueInfoPri> {
742 QueueInfo(
size_t capacity, uint64_t queue_rate_pps,
size_t nb_priorities)
743 : std::vector<QueueInfoPri>(
744 nb_priorities, QueueInfoPri(capacity, queue_rate_pps)) { }
750 mutable std::condition_variable q_not_empty{};
752 std::array<MyQ, 32> queues;
753 size_t wrapping_counter{0};
756 QueueInfo &get_queue(
size_t queue_id) {
757 auto it = queues_info.find(queue_id);
758 if (it != queues_info.end())
return it->second;
759 auto p = queues_info.emplace(
760 queue_id, QueueInfo(capacity, queue_rate_pps, nb_priorities));
761 return p.first->second;
764 const QueueInfo &get_queue_or_throw(
size_t queue_id)
const {
765 return queues_info.at(queue_id);
768 QueueInfo &get_queue_or_throw(
size_t queue_id) {
769 return queues_info.at(queue_id);
772 clock::time_point get_next_tp(
const QueueInfoPri &q_info_pri) {
773 return std::max(clock::now(),
774 q_info_pri.last_sent + q_info_pri.pkt_delay_ticks);
777 template <
typename Function>
778 Function for_each_q(
size_t queue_id, Function fn) {
779 auto &q_info = get_queue(queue_id);
780 for (
auto &q_info_pri : q_info) fn(q_info_pri);
784 template <
typename Function>
785 Function for_one_q(
size_t queue_id,
size_t priority, Function fn) {
786 auto &q_info = get_queue(queue_id);
787 auto &q_info_pri = q_info.at(priority);
792 struct SetCapacityFn {
793 explicit SetCapacityFn(
size_t c)
796 void operator ()(QueueInfoPri &info)
const {
804 explicit SetRateFn(uint64_t pps)
806 using std::chrono::duration;
807 using std::chrono::duration_cast;
808 pkt_delay_ticks = rate_to_ticks(pps);
811 void operator ()(QueueInfoPri &info)
const {
812 info.queue_rate_pps = pps;
813 info.pkt_delay_ticks = pkt_delay_ticks;
817 ticks pkt_delay_ticks;
820 mutable MutexType mutex;
823 uint64_t queue_rate_pps{0};
824 std::unordered_map<size_t, QueueInfo> queues_info{};
825 std::vector<WorkerInfo> workers_info{};
826 std::vector<MyQ> queues{};
828 size_t nb_priorities;
833 #endif // BM_BM_SIM_QUEUEING_H_