bmv2
Designing your own switch target with bmv2
Loading...
Searching...
No Matches
queueing.h
Go to the documentation of this file.
1/* Copyright 2013-present Barefoot Networks, Inc.
2 *
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16/*
17 * Antonin Bas (antonin@barefootnetworks.com)
18 *
19 */
20
29
30#ifndef BM_BM_SIM_QUEUEING_H_
31#define BM_BM_SIM_QUEUEING_H_
32
33#include <algorithm> // for std::max
34#include <chrono>
35#include <condition_variable>
36#include <deque>
37#include <mutex>
38#include <queue>
39#include <tuple> // for std::forward_as_tuple
40#include <unordered_map>
41#include <utility> // for std::piecewise_construct
42#include <vector>
43
44namespace bm {
45
46// These queueing implementations used to have one lock for each worker, which
47// meant that as long as 2 queues were assigned to different workers, they could
48// operate (push / pop) in parallel. Since we added support for arbitrary port
49// ids (and the port id is used as the queue id), we no longer have a reasonable
50// upper bound on the maximum possible port id at construction time and we can
51// no longer use a vector indexed by the queue id to store queue
52// information. Each push / pop operation can potentially insert a new entry
53// into the map. In order to accomodate for this, we had to start using a single
54// lock, shared by all the workers. It's unlikely that contention for this lock
55// will be a bottleneck.
56
82template <typename T, typename FMap>
84 using MutexType = std::mutex;
85 using LockType = std::unique_lock<MutexType>;
86
87 public:
95 QueueingLogic(size_t nb_workers, size_t capacity, FMap map_to_worker)
96 : nb_workers(nb_workers),
97 capacity(capacity),
98 workers_info(nb_workers),
99 map_to_worker(std::move(map_to_worker)) { }
100
103 void push_front(size_t queue_id, const T &item) {
104 size_t worker_id = map_to_worker(queue_id);
105 LockType lock(mutex);
106 auto &q_info = get_queue(queue_id);
107 auto &w_info = workers_info.at(worker_id);
108 while (q_info.size >= q_info.capacity) {
109 q_info.q_not_full.wait(lock);
110 }
111 w_info.queue.emplace_front(item, queue_id);
112 q_info.size++;
113 w_info.q_not_empty.notify_one();
114 }
115
117 void push_front(size_t queue_id, T &&item) {
118 size_t worker_id = map_to_worker(queue_id);
119 LockType lock(mutex);
120 auto &q_info = get_queue(queue_id);
121 auto &w_info = workers_info.at(worker_id);
122 while (q_info.size >= q_info.capacity) {
123 q_info.q_not_full.wait(lock);
124 }
125 w_info.queue.emplace_front(std::move(item), queue_id);
126 q_info.size++;
127 w_info.q_not_empty.notify_one();
128 }
129
137 void pop_back(size_t worker_id, size_t *queue_id, T *pItem) {
138 LockType lock(mutex);
139 auto &w_info = workers_info.at(worker_id);
140 auto &queue = w_info.queue;
141 while (queue.size() == 0) {
142 w_info.q_not_empty.wait(lock);
143 }
144 *queue_id = queue.back().queue_id;
145 *pItem = std::move(queue.back().e);
146 queue.pop_back();
147 auto &q_info = get_queue_or_throw(*queue_id);
148 q_info.size--;
149 q_info.q_not_full.notify_one();
150 }
151
153 size_t size(size_t queue_id) const {
154 LockType lock(mutex);
155 auto it = queues_info.find(queue_id);
156 if (it == queues_info.end()) return 0;
157 auto &q_info = it->second;
158 return q_info.size;
159 }
160
163 void set_capacity(size_t queue_id, size_t c) {
164 LockType lock(mutex);
165 auto &q_info = get_queue(queue_id);
166 q_info.capacity = c;
167 }
168
170 void set_capacity_for_all(size_t c) {
171 LockType lock(mutex);
172 for (auto &p : queues_info) p.second.capacity = c;
173 capacity = c;
174 }
175
177 QueueingLogic(const QueueingLogic &) = delete;
180
185
186 private:
187 struct QE {
188 QE(T e, size_t queue_id)
189 : e(std::move(e)), queue_id(queue_id) { }
190
191 T e;
192 size_t queue_id;
193 };
194
195 using MyQ = std::deque<QE>;
196
197 struct QueueInfo {
198 explicit QueueInfo(size_t capacity)
199 : capacity(capacity) { }
200
201 size_t size{0};
202 size_t capacity{0};
203 mutable std::condition_variable q_not_full{};
204 };
205
206 struct WorkerInfo {
207 MyQ queue{};
208 mutable std::condition_variable q_not_empty{};
209 };
210
211 QueueInfo &get_queue(size_t queue_id) {
212 auto it = queues_info.find(queue_id);
213 if (it != queues_info.end()) return it->second;
214 // piecewise_construct because QueueInfo is not copyable (because of mutex
215 // member)
216 auto p = queues_info.emplace(
217 std::piecewise_construct,
218 std::forward_as_tuple(queue_id),
219 std::forward_as_tuple(capacity));
220 return p.first->second;
221 }
222
223 const QueueInfo &get_queue_or_throw(size_t queue_id) const {
224 return queues_info.at(queue_id);
225 }
226
227 QueueInfo &get_queue_or_throw(size_t queue_id) {
228 return queues_info.at(queue_id);
229 }
230
231 mutable MutexType mutex{};
232 size_t nb_workers;
233 size_t capacity; // default capacity
234 std::unordered_map<size_t, QueueInfo> queues_info;
235 std::vector<WorkerInfo> workers_info;
236 FMap map_to_worker;
237};
238
239
251template <typename T, typename FMap>
253 using MutexType = std::mutex;
254 using LockType = std::unique_lock<MutexType>;
255
256 public:
261 QueueingLogicRL(size_t nb_workers, size_t capacity, FMap map_to_worker)
262 : nb_workers(nb_workers),
263 capacity(capacity),
264 workers_info(nb_workers),
265 map_to_worker(std::move(map_to_worker)) { }
266
270 int push_front(size_t queue_id, const T &item) {
271 size_t worker_id = map_to_worker(queue_id);
272 LockType lock(mutex);
273 auto &q_info = get_queue(queue_id);
274 auto &w_info = workers_info.at(worker_id);
275 if (q_info.size >= q_info.capacity) return 0;
276 q_info.last_sent = get_next_tp(q_info);
277 w_info.queue.emplace(
278 item, queue_id, q_info.last_sent, w_info.wrapping_counter++);
279 q_info.size++;
280 w_info.q_not_empty.notify_one();
281 return 1;
282 }
283
286 int push_front(size_t queue_id, T &&item) {
287 size_t worker_id = map_to_worker(queue_id);
288 LockType lock(mutex);
289 auto &q_info = get_queue(queue_id);
290 auto &w_info = workers_info.at(worker_id);
291 if (q_info.size >= q_info.capacity) return 0;
292 q_info.last_sent = get_next_tp(q_info);
293 w_info.queue.emplace(
294 std::move(item), queue_id, q_info.last_sent, w_info.wrapping_counter++);
295 q_info.size++;
296 w_info.q_not_empty.notify_one();
297 return 1;
298 }
299
305 void pop_back(size_t worker_id, size_t *queue_id, T *pItem) {
306 LockType lock(mutex);
307 auto &w_info = workers_info.at(worker_id);
308 auto &queue = w_info.queue;
309 while (true) {
310 if (queue.size() == 0) {
311 w_info.q_not_empty.wait(lock);
312 } else {
313 if (queue.top().send <= clock::now()) break;
314 w_info.q_not_empty.wait_until(lock, queue.top().send);
315 }
316 }
317 *queue_id = queue.top().queue_id;
318 // TODO(antonin): improve / document this
319 // http://stackoverflow.com/questions/20149471/move-out-element-of-std-priority-queue-in-c11
320 *pItem = std::move(const_cast<QE &>(queue.top()).e);
321 queue.pop();
322 auto &q_info = get_queue_or_throw(*queue_id);
323 q_info.size--;
324 }
325
327 size_t size(size_t queue_id) const {
328 LockType lock(mutex);
329 auto it = queues_info.find(queue_id);
330 if (it == queues_info.end()) return 0;
331 auto &q_info = it->second;
332 return q_info.size;
333 }
334
337 void set_capacity(size_t queue_id, size_t c) {
338 LockType lock(mutex);
339 auto &q_info = get_queue(queue_id);
340 q_info.capacity = c;
341 }
342
344 void set_capacity_for_all(size_t c) {
345 LockType lock(mutex);
346 for (auto &p : queues_info) p.second.capacity = c;
347 capacity = c;
348 }
349
355 void set_rate(size_t queue_id, uint64_t pps) {
356 LockType lock(mutex);
357 auto &q_info = get_queue(queue_id);
358 q_info.queue_rate_pps = pps;
359 q_info.pkt_delay_ticks = rate_to_ticks(pps);
360 }
361
363 void set_rate_for_all(uint64_t pps) {
364 using std::chrono::duration;
365 using std::chrono::duration_cast;
366 LockType lock(mutex);
367 for (auto &p : queues_info) {
368 auto &q_info = p.second;
369 q_info.queue_rate_pps = pps;
370 q_info.pkt_delay_ticks = rate_to_ticks(pps);
371 }
372 queue_rate_pps = pps;
373 }
374
379
384
385 private:
386 using ticks = std::chrono::nanoseconds;
387 // clock choice? switch to steady if observing re-ordering
388 // using clock = std::chrono::steady_clock;
389 using clock = std::chrono::high_resolution_clock;
390
391 static constexpr ticks rate_to_ticks(uint64_t pps) {
392 using std::chrono::duration;
393 using std::chrono::duration_cast;
394 return (pps == 0) ?
395 ticks(0) : duration_cast<ticks>(duration<double>(1. / pps));
396 }
397
398 struct QE {
399 // QE(T e, size_t queue_id, const clock::time_point &send, size_t id)
400 // : e(std::move(e)), queue_id(queue_id), send(send), id(id) { }
401 QE(T e, size_t queue_id, const clock::time_point &send, size_t id)
402 : e(std::move(e)), queue_id(queue_id), send(send), id(id) { }
403
404 T e;
405 size_t queue_id;
406 clock::time_point send;
407 size_t id;
408 };
409
410 struct QEComp {
411 bool operator()(const QE &lhs, const QE &rhs) const {
412 // the point of the id is to avoid re-orderings when the send timestamp is
413 // the same for 2 items, which seems to happen (when the pps rate is 0)
414 // with both the steady_clock and the high_resolution_clock on my Linux
415 // VM.
416 return (lhs.send == rhs.send) ? lhs.id > rhs.id : lhs.send > rhs.send;
417 }
418 };
419
420 // performance seems to be roughly the same for deque vs vector
421 using MyQ = std::priority_queue<QE, std::deque<QE>, QEComp>;
422 // using MyQ = std::priority_queue<QE, std::vector<QE>, QEComp>;
423
424 struct QueueInfo {
425 QueueInfo(size_t capacity, uint64_t queue_rate_pps)
426 : capacity(capacity),
427 queue_rate_pps(queue_rate_pps),
428 pkt_delay_ticks(rate_to_ticks(queue_rate_pps)),
429 last_sent(clock::now()) { }
430
431 size_t size{0};
432 size_t capacity;
433 uint64_t queue_rate_pps;
434 ticks pkt_delay_ticks;
435 clock::time_point last_sent;
436 };
437
438 struct WorkerInfo {
439 MyQ queue{};
440 mutable std::condition_variable q_not_empty{};
441 size_t wrapping_counter{0};
442 };
443
444 QueueInfo &get_queue(size_t queue_id) {
445 auto it = queues_info.find(queue_id);
446 if (it != queues_info.end()) return it->second;
447 auto p = queues_info.emplace(queue_id, QueueInfo(capacity, queue_rate_pps));
448 return p.first->second;
449 }
450
451 const QueueInfo &get_queue_or_throw(size_t queue_id) const {
452 return queues_info.at(queue_id);
453 }
454
455 QueueInfo &get_queue_or_throw(size_t queue_id) {
456 return queues_info.at(queue_id);
457 }
458
459 clock::time_point get_next_tp(const QueueInfo &q_info) {
460 return std::max(clock::now(), q_info.last_sent + q_info.pkt_delay_ticks);
461 }
462
463 mutable MutexType mutex{};
464 size_t nb_workers;
465 size_t capacity; // default capacity
466 uint64_t queue_rate_pps{0}; // default rate
467 std::unordered_map<size_t, QueueInfo> queues_info;
468 std::vector<WorkerInfo> workers_info;
469 FMap map_to_worker;
470};
471
472
488template <typename T, typename FMap>
490 using MutexType = std::mutex;
491 using LockType = std::unique_lock<MutexType>;
492
493 public:
500 QueueingLogicPriRL(size_t nb_workers, size_t capacity,
501 FMap map_to_worker, size_t nb_priorities = 2)
502 : nb_workers(nb_workers),
503 capacity(capacity),
504 workers_info(nb_workers),
505 map_to_worker(std::move(map_to_worker)),
506 nb_priorities(nb_priorities) { }
507
513 int push_front(size_t queue_id, size_t priority, const T &item) {
514 size_t worker_id = map_to_worker(queue_id);
515 LockType lock(mutex);
516 auto &q_info = get_queue(queue_id);
517 auto &w_info = workers_info.at(worker_id);
518 auto &q_info_pri = q_info.at(priority);
519 if (q_info_pri.size >= q_info_pri.capacity) return 0;
520 q_info_pri.last_sent = get_next_tp(q_info_pri);
521 w_info.queues[priority].emplace(
522 item, queue_id, q_info_pri.last_sent, w_info.wrapping_counter++);
523 q_info_pri.size++;
524 q_info.size++;
525 w_info.size++;
526 w_info.q_not_empty.notify_one();
527 return 1;
528 }
529
530 int push_front(size_t queue_id, const T &item) {
531 return push_front(queue_id, 0, item);
532 }
533
536 int push_front(size_t queue_id, size_t priority, T &&item) {
537 size_t worker_id = map_to_worker(queue_id);
538 LockType lock(mutex);
539 auto &q_info = get_queue(queue_id);
540 auto &w_info = workers_info.at(worker_id);
541 auto &q_info_pri = q_info.at(priority);
542 if (q_info_pri.size >= q_info_pri.capacity) return 0;
543 q_info_pri.last_sent = get_next_tp(q_info_pri);
544 w_info.queues[priority].emplace(
545 std::move(item),
546 queue_id,
547 q_info_pri.last_sent,
548 w_info.wrapping_counter++);
549 q_info_pri.size++;
550 q_info.size++;
551 w_info.size++;
552 w_info.q_not_empty.notify_one();
553 return 1;
554 }
555
556 int push_front(size_t queue_id, T &&item) {
557 return push_front(queue_id, 0, std::move(item));
558 }
559
569 void pop_back(size_t worker_id, size_t *queue_id, size_t *priority,
570 T *pItem) {
571 LockType lock(mutex);
572 auto &w_info = workers_info.at(worker_id);
573 MyQ *queue = nullptr;
574 size_t pri;
575 while (true) {
576 if (w_info.size == 0) {
577 w_info.q_not_empty.wait(lock);
578 } else {
579 auto now = clock::now();
580 auto next = clock::time_point::max();
581 // This will iterate from nb_priorities-1 to 0
582 for (pri = nb_priorities ; pri-- > 0;) {
583 auto &q = w_info.queues[pri];
584 if (q.size() == 0) continue;
585 if (q.top().send <= now) {
586 queue = &q;
587 break;
588 }
589 next = std::min(next, q.top().send);
590 }
591 if (queue) break;
592 w_info.q_not_empty.wait_until(lock, next);
593 }
594 }
595 *queue_id = queue->top().queue_id;
596 *priority = pri;
597 // TODO(antonin): improve / document this
598 // http://stackoverflow.com/questions/20149471/move-out-element-of-std-priority-queue-in-c11
599 *pItem = std::move(const_cast<QE &>(queue->top()).e);
600 queue->pop();
601 auto &q_info = get_queue_or_throw(*queue_id);
602 auto &q_info_pri = q_info.at(*priority);
603 q_info_pri.size--;
604 q_info.size--;
605 w_info.size--;
606 }
607
611 void pop_back(size_t worker_id, size_t *queue_id, T *pItem) {
612 size_t priority;
613 return pop_back(worker_id, queue_id, &priority, pItem);
614 }
615
619 size_t size(size_t queue_id) const {
620 LockType lock(mutex);
621 auto it = queues_info.find(queue_id);
622 if (it == queues_info.end()) return 0;
623 auto &q_info = it->second;
624 return q_info.size;
625 }
626
629 size_t size(size_t queue_id, size_t priority) const {
630 LockType lock(mutex);
631 auto it = queues_info.find(queue_id);
632 if (it == queues_info.end()) return 0;
633 auto &q_info = it->second;
634 auto &q_info_pri = q_info.at(priority);
635 return q_info_pri.size;
636 }
637
640 void set_capacity(size_t queue_id, size_t c) {
641 LockType lock(mutex);
642 for_each_q(queue_id, SetCapacityFn(c));
643 }
644
647 void set_capacity(size_t queue_id, size_t priority, size_t c) {
648 LockType lock(mutex);
649 for_one_q(queue_id, priority, SetCapacityFn(c));
650 }
651
654 void set_capacity_for_all(size_t c) {
655 LockType lock(mutex);
656 for (auto &p : queues_info) for_each_q(p.first, SetCapacityFn(c));
657 capacity = c;
658 }
659
665 void set_rate(size_t queue_id, uint64_t pps) {
666 LockType lock(mutex);
667 for_each_q(queue_id, SetRateFn(pps));
668 }
669
672 void set_rate(size_t queue_id, size_t priority, uint64_t pps) {
673 LockType lock(mutex);
674 for_one_q(queue_id, priority, SetRateFn(pps));
675 }
676
678 void set_rate_for_all(uint64_t pps) {
679 LockType lock(mutex);
680 for (auto &p : queues_info) for_each_q(p.first, SetRateFn(pps));
681 queue_rate_pps = pps;
682 }
683
688
693
694 private:
695 using ticks = std::chrono::nanoseconds;
696 // clock choice? switch to steady if observing re-ordering
697 // in my Linux VM, it seems that both clocks behave the same (can sometimes
698 // stop increasing for a bit but do not go backwards).
699 // using clock = std::chrono::steady_clock;
700 using clock = std::chrono::high_resolution_clock;
701
702 static constexpr ticks rate_to_ticks(uint64_t pps) {
703 using std::chrono::duration;
704 using std::chrono::duration_cast;
705 return (pps == 0) ?
706 ticks(0) : duration_cast<ticks>(duration<double>(1. / pps));
707 }
708
709 struct QE {
710 QE(T e, size_t queue_id, const clock::time_point &send, size_t id)
711 : e(std::move(e)), queue_id(queue_id), send(send), id(id) { }
712
713 T e;
714 size_t queue_id;
715 clock::time_point send;
716 size_t id;
717 };
718
719 struct QEComp {
720 bool operator()(const QE &lhs, const QE &rhs) const {
721 return (lhs.send == rhs.send) ? lhs.id > rhs.id : lhs.send > rhs.send;
722 }
723 };
724
725 using MyQ = std::priority_queue<QE, std::deque<QE>, QEComp>;
726
727 struct QueueInfoPri {
728 QueueInfoPri(size_t capacity, uint64_t queue_rate_pps)
729 : capacity(capacity),
730 queue_rate_pps(queue_rate_pps),
731 pkt_delay_ticks(rate_to_ticks(queue_rate_pps)),
732 last_sent(clock::now()) { }
733
734 size_t size{0};
735 size_t capacity;
736 uint64_t queue_rate_pps;
737 ticks pkt_delay_ticks;
738 clock::time_point last_sent;
739 };
740
741 struct QueueInfo : public std::vector<QueueInfoPri> {
742 QueueInfo(size_t capacity, uint64_t queue_rate_pps, size_t nb_priorities)
743 : std::vector<QueueInfoPri>(
744 nb_priorities, QueueInfoPri(capacity, queue_rate_pps)) { }
745
746 size_t size{0};
747 };
748
749 struct WorkerInfo {
750 mutable std::condition_variable q_not_empty{};
751 size_t size{0};
752 std::array<MyQ, 32> queues;
753 size_t wrapping_counter{0};
754 };
755
756 QueueInfo &get_queue(size_t queue_id) {
757 auto it = queues_info.find(queue_id);
758 if (it != queues_info.end()) return it->second;
759 auto p = queues_info.emplace(
760 queue_id, QueueInfo(capacity, queue_rate_pps, nb_priorities));
761 return p.first->second;
762 }
763
764 const QueueInfo &get_queue_or_throw(size_t queue_id) const {
765 return queues_info.at(queue_id);
766 }
767
768 QueueInfo &get_queue_or_throw(size_t queue_id) {
769 return queues_info.at(queue_id);
770 }
771
772 clock::time_point get_next_tp(const QueueInfoPri &q_info_pri) {
773 return std::max(clock::now(),
774 q_info_pri.last_sent + q_info_pri.pkt_delay_ticks);
775 }
776
777 template <typename Function>
778 Function for_each_q(size_t queue_id, Function fn) {
779 auto &q_info = get_queue(queue_id);
780 for (auto &q_info_pri : q_info) fn(q_info_pri);
781 return fn;
782 }
783
784 template <typename Function>
785 Function for_one_q(size_t queue_id, size_t priority, Function fn) {
786 auto &q_info = get_queue(queue_id);
787 auto &q_info_pri = q_info.at(priority);
788 fn(q_info_pri);
789 return fn;
790 }
791
792 struct SetCapacityFn {
793 explicit SetCapacityFn(size_t c)
794 : c(c) { }
795
796 void operator ()(QueueInfoPri &info) const { // NOLINT(runtime/references)
797 info.capacity = c;
798 }
799
800 size_t c;
801 };
802
803 struct SetRateFn {
804 explicit SetRateFn(uint64_t pps)
805 : pps(pps) {
806 using std::chrono::duration;
807 using std::chrono::duration_cast;
808 pkt_delay_ticks = rate_to_ticks(pps);
809 }
810
811 void operator ()(QueueInfoPri &info) const { // NOLINT(runtime/references)
812 info.queue_rate_pps = pps;
813 info.pkt_delay_ticks = pkt_delay_ticks;
814 }
815
816 uint64_t pps;
817 ticks pkt_delay_ticks;
818 };
819
820 mutable MutexType mutex;
821 size_t nb_workers;
822 size_t capacity; // default capacity
823 uint64_t queue_rate_pps{0}; // default rate
824 std::unordered_map<size_t, QueueInfo> queues_info{};
825 std::vector<WorkerInfo> workers_info{};
826 std::vector<MyQ> queues{};
827 FMap map_to_worker;
828 size_t nb_priorities;
829};
830
831} // namespace bm
832
833#endif // BM_BM_SIM_QUEUEING_H_
Definition queueing.h:489
void set_rate_for_all(uint64_t pps)
Set the rate of all the priority queues of all logical queues to pps.
Definition queueing.h:678
QueueingLogicPriRL & operator=(const QueueingLogicPriRL &)=delete
Deleted copy assignment operator.
void set_rate(size_t queue_id, uint64_t pps)
Definition queueing.h:665
void set_capacity(size_t queue_id, size_t priority, size_t c)
Definition queueing.h:647
int push_front(size_t queue_id, size_t priority, T &&item)
Definition queueing.h:536
size_t size(size_t queue_id) const
Get the occupancy of the logical queue with id queue_id.
Definition queueing.h:619
QueueingLogicPriRL(const QueueingLogicPriRL &)=delete
Deleted copy constructor.
QueueingLogicPriRL(size_t nb_workers, size_t capacity, FMap map_to_worker, size_t nb_priorities=2)
Definition queueing.h:500
void set_capacity(size_t queue_id, size_t c)
Definition queueing.h:640
void pop_back(size_t worker_id, size_t *queue_id, size_t *priority, T *pItem)
Definition queueing.h:569
void pop_back(size_t worker_id, size_t *queue_id, T *pItem)
Definition queueing.h:611
QueueingLogicPriRL(QueueingLogicPriRL &&)=delete
Deleted move constructor.
void set_capacity_for_all(size_t c)
Definition queueing.h:654
void set_rate(size_t queue_id, size_t priority, uint64_t pps)
Definition queueing.h:672
size_t size(size_t queue_id, size_t priority) const
Definition queueing.h:629
int push_front(size_t queue_id, size_t priority, const T &item)
Definition queueing.h:513
Definition queueing.h:252
void pop_back(size_t worker_id, size_t *queue_id, T *pItem)
Definition queueing.h:305
void set_rate_for_all(uint64_t pps)
Set the maximum rate of all logical queues to pps.
Definition queueing.h:363
QueueingLogicRL(QueueingLogicRL &&)=delete
Deleted move constructor.
int push_front(size_t queue_id, T &&item)
Definition queueing.h:286
size_t size(size_t queue_id) const
Get the occupancy of the logical queue with id queue_id.
Definition queueing.h:327
void set_capacity_for_all(size_t c)
Set the capacity of all logical queues to c elements.
Definition queueing.h:344
QueueingLogicRL(const QueueingLogicRL &)=delete
Deleted copy constructor.
QueueingLogicRL(size_t nb_workers, size_t capacity, FMap map_to_worker)
Definition queueing.h:261
int push_front(size_t queue_id, const T &item)
Definition queueing.h:270
QueueingLogicRL & operator=(const QueueingLogicRL &)=delete
Deleted copy assignment operator.
void set_rate(size_t queue_id, uint64_t pps)
Definition queueing.h:355
void set_capacity(size_t queue_id, size_t c)
Definition queueing.h:337
Definition queueing.h:83
QueueingLogic(size_t nb_workers, size_t capacity, FMap map_to_worker)
Definition queueing.h:95
size_t size(size_t queue_id) const
Get the occupancy of the logical queue with id queue_id.
Definition queueing.h:153
void set_capacity_for_all(size_t c)
Set the capacity of all logical queues to c elements.
Definition queueing.h:170
QueueingLogic(QueueingLogic &&)=delete
Deleted move constructor.
QueueingLogic & operator=(const QueueingLogic &)=delete
Deleted copy assignment operator.
void push_front(size_t queue_id, const T &item)
Definition queueing.h:103
void pop_back(size_t worker_id, size_t *queue_id, T *pItem)
Definition queueing.h:137
QueueingLogic(const QueueingLogic &)=delete
Deleted copy constructor.
void push_front(size_t queue_id, T &&item)
Moves item to the front of the logical queue with id queue_id.
Definition queueing.h:117
void set_capacity(size_t queue_id, size_t c)
Definition queueing.h:163