bmv2
Designing your own switch target with bmv2
queueing.h
Go to the documentation of this file.
1 /* Copyright 2013-present Barefoot Networks, Inc.
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 /*
17  * Antonin Bas (antonin@barefootnetworks.com)
18  *
19  */
20 
29 
30 #ifndef BM_BM_SIM_QUEUEING_H_
31 #define BM_BM_SIM_QUEUEING_H_
32 
33 #include <algorithm> // for std::max
34 #include <chrono>
35 #include <condition_variable>
36 #include <deque>
37 #include <mutex>
38 #include <queue>
39 #include <tuple> // for std::forward_as_tuple
40 #include <unordered_map>
41 #include <utility> // for std::piecewise_construct
42 #include <vector>
43 
44 namespace bm {
45 
46 // These queueing implementations used to have one lock for each worker, which
47 // meant that as long as 2 queues were assigned to different workers, they could
48 // operate (push / pop) in parallel. Since we added support for arbitrary port
49 // ids (and the port id is used as the queue id), we no longer have a reasonable
50 // upper bound on the maximum possible port id at construction time and we can
51 // no longer use a vector indexed by the queue id to store queue
52 // information. Each push / pop operation can potentially insert a new entry
53 // into the map. In order to accomodate for this, we had to start using a single
54 // lock, shared by all the workers. It's unlikely that contention for this lock
55 // will be a bottleneck.
56 
82 template <typename T, typename FMap>
84  using MutexType = std::mutex;
85  using LockType = std::unique_lock<MutexType>;
86 
87  public:
95  QueueingLogic(size_t nb_workers, size_t capacity, FMap map_to_worker)
96  : nb_workers(nb_workers),
97  capacity(capacity),
98  workers_info(nb_workers),
99  map_to_worker(std::move(map_to_worker)) { }
100 
103  void push_front(size_t queue_id, const T &item) {
104  size_t worker_id = map_to_worker(queue_id);
105  LockType lock(mutex);
106  auto &q_info = get_queue(queue_id);
107  auto &w_info = workers_info.at(worker_id);
108  while (q_info.size >= q_info.capacity) {
109  q_info.q_not_full.wait(lock);
110  }
111  w_info.queue.emplace_front(item, queue_id);
112  q_info.size++;
113  w_info.q_not_empty.notify_one();
114  }
115 
117  void push_front(size_t queue_id, T &&item) {
118  size_t worker_id = map_to_worker(queue_id);
119  LockType lock(mutex);
120  auto &q_info = get_queue(queue_id);
121  auto &w_info = workers_info.at(worker_id);
122  while (q_info.size >= q_info.capacity) {
123  q_info.q_not_full.wait(lock);
124  }
125  w_info.queue.emplace_front(std::move(item), queue_id);
126  q_info.size++;
127  w_info.q_not_empty.notify_one();
128  }
129 
137  void pop_back(size_t worker_id, size_t *queue_id, T *pItem) {
138  LockType lock(mutex);
139  auto &w_info = workers_info.at(worker_id);
140  auto &queue = w_info.queue;
141  while (queue.size() == 0) {
142  w_info.q_not_empty.wait(lock);
143  }
144  *queue_id = queue.back().queue_id;
145  *pItem = std::move(queue.back().e);
146  queue.pop_back();
147  auto &q_info = get_queue_or_throw(*queue_id);
148  q_info.size--;
149  q_info.q_not_full.notify_one();
150  }
151 
153  size_t size(size_t queue_id) const {
154  LockType lock(mutex);
155  auto it = queues_info.find(queue_id);
156  if (it == queues_info.end()) return 0;
157  auto &q_info = it->second;
158  return q_info.size;
159  }
160 
163  void set_capacity(size_t queue_id, size_t c) {
164  LockType lock(mutex);
165  auto &q_info = get_queue(queue_id);
166  q_info.capacity = c;
167  }
168 
170  void set_capacity_for_all(size_t c) {
171  LockType lock(mutex);
172  for (auto &p : queues_info) p.second.capacity = c;
173  capacity = c;
174  }
175 
177  QueueingLogic(const QueueingLogic &) = delete;
179  QueueingLogic &operator =(const QueueingLogic &) = delete;
180 
182  QueueingLogic(QueueingLogic &&) = delete;
184  QueueingLogic &&operator =(QueueingLogic &&) = delete;
185 
186  private:
187  struct QE {
188  QE(T e, size_t queue_id)
189  : e(std::move(e)), queue_id(queue_id) { }
190 
191  T e;
192  size_t queue_id;
193  };
194 
195  using MyQ = std::deque<QE>;
196 
197  struct QueueInfo {
198  explicit QueueInfo(size_t capacity)
199  : capacity(capacity) { }
200 
201  size_t size{0};
202  size_t capacity{0};
203  mutable std::condition_variable q_not_full{};
204  };
205 
206  struct WorkerInfo {
207  MyQ queue{};
208  mutable std::condition_variable q_not_empty{};
209  };
210 
211  QueueInfo &get_queue(size_t queue_id) {
212  auto it = queues_info.find(queue_id);
213  if (it != queues_info.end()) return it->second;
214  // piecewise_construct because QueueInfo is not copyable (because of mutex
215  // member)
216  auto p = queues_info.emplace(
217  std::piecewise_construct,
218  std::forward_as_tuple(queue_id),
219  std::forward_as_tuple(capacity));
220  return p.first->second;
221  }
222 
223  const QueueInfo &get_queue_or_throw(size_t queue_id) const {
224  return queues_info.at(queue_id);
225  }
226 
227  QueueInfo &get_queue_or_throw(size_t queue_id) {
228  return queues_info.at(queue_id);
229  }
230 
231  mutable MutexType mutex{};
232  size_t nb_workers;
233  size_t capacity; // default capacity
234  std::unordered_map<size_t, QueueInfo> queues_info;
235  std::vector<WorkerInfo> workers_info;
236  FMap map_to_worker;
237 };
238 
239 
251 template <typename T, typename FMap>
253  using MutexType = std::mutex;
254  using LockType = std::unique_lock<MutexType>;
255 
256  public:
261  QueueingLogicRL(size_t nb_workers, size_t capacity, FMap map_to_worker)
262  : nb_workers(nb_workers),
263  capacity(capacity),
264  workers_info(nb_workers),
265  map_to_worker(std::move(map_to_worker)) { }
266 
270  int push_front(size_t queue_id, const T &item) {
271  size_t worker_id = map_to_worker(queue_id);
272  LockType lock(mutex);
273  auto &q_info = get_queue(queue_id);
274  auto &w_info = workers_info.at(worker_id);
275  if (q_info.size >= q_info.capacity) return 0;
276  q_info.last_sent = get_next_tp(q_info);
277  w_info.queue.emplace(
278  item, queue_id, q_info.last_sent, w_info.wrapping_counter++);
279  q_info.size++;
280  w_info.q_not_empty.notify_one();
281  return 1;
282  }
283 
286  int push_front(size_t queue_id, T &&item) {
287  size_t worker_id = map_to_worker(queue_id);
288  LockType lock(mutex);
289  auto &q_info = get_queue(queue_id);
290  auto &w_info = workers_info.at(worker_id);
291  if (q_info.size >= q_info.capacity) return 0;
292  q_info.last_sent = get_next_tp(q_info);
293  w_info.queue.emplace(
294  std::move(item), queue_id, q_info.last_sent, w_info.wrapping_counter++);
295  q_info.size++;
296  w_info.q_not_empty.notify_one();
297  return 1;
298  }
299 
305  void pop_back(size_t worker_id, size_t *queue_id, T *pItem) {
306  LockType lock(mutex);
307  auto &w_info = workers_info.at(worker_id);
308  auto &queue = w_info.queue;
309  while (true) {
310  if (queue.size() == 0) {
311  w_info.q_not_empty.wait(lock);
312  } else {
313  if (queue.top().send <= clock::now()) break;
314  w_info.q_not_empty.wait_until(lock, queue.top().send);
315  }
316  }
317  *queue_id = queue.top().queue_id;
318  // TODO(antonin): improve / document this
319  // http://stackoverflow.com/questions/20149471/move-out-element-of-std-priority-queue-in-c11
320  *pItem = std::move(const_cast<QE &>(queue.top()).e);
321  queue.pop();
322  auto &q_info = get_queue_or_throw(*queue_id);
323  q_info.size--;
324  }
325 
327  size_t size(size_t queue_id) const {
328  LockType lock(mutex);
329  auto it = queues_info.find(queue_id);
330  if (it == queues_info.end()) return 0;
331  auto &q_info = it->second;
332  return q_info.size;
333  }
334 
337  void set_capacity(size_t queue_id, size_t c) {
338  LockType lock(mutex);
339  auto &q_info = get_queue(queue_id);
340  q_info.capacity = c;
341  }
342 
344  void set_capacity_for_all(size_t c) {
345  LockType lock(mutex);
346  for (auto &p : queues_info) p.second.capacity = c;
347  capacity = c;
348  }
349 
355  void set_rate(size_t queue_id, uint64_t pps) {
356  LockType lock(mutex);
357  auto &q_info = get_queue(queue_id);
358  q_info.queue_rate_pps = pps;
359  q_info.pkt_delay_ticks = rate_to_ticks(pps);
360  }
361 
363  void set_rate_for_all(uint64_t pps) {
364  using std::chrono::duration;
365  using std::chrono::duration_cast;
366  LockType lock(mutex);
367  for (auto &p : queues_info) {
368  auto &q_info = p.second;
369  q_info.queue_rate_pps = pps;
370  q_info.pkt_delay_ticks = rate_to_ticks(pps);
371  }
372  queue_rate_pps = pps;
373  }
374 
376  QueueingLogicRL(const QueueingLogicRL &) = delete;
378  QueueingLogicRL &operator =(const QueueingLogicRL &) = delete;
379 
381  QueueingLogicRL(QueueingLogicRL &&) = delete;
384 
385  private:
386  using ticks = std::chrono::nanoseconds;
387  // clock choice? switch to steady if observing re-ordering
388  // using clock = std::chrono::steady_clock;
389  using clock = std::chrono::high_resolution_clock;
390 
391  static constexpr ticks rate_to_ticks(uint64_t pps) {
392  using std::chrono::duration;
393  using std::chrono::duration_cast;
394  return (pps == 0) ?
395  ticks(0) : duration_cast<ticks>(duration<double>(1. / pps));
396  }
397 
398  struct QE {
399  // QE(T e, size_t queue_id, const clock::time_point &send, size_t id)
400  // : e(std::move(e)), queue_id(queue_id), send(send), id(id) { }
401  QE(T e, size_t queue_id, const clock::time_point &send, size_t id)
402  : e(std::move(e)), queue_id(queue_id), send(send), id(id) { }
403 
404  T e;
405  size_t queue_id;
406  clock::time_point send;
407  size_t id;
408  };
409 
410  struct QEComp {
411  bool operator()(const QE &lhs, const QE &rhs) const {
412  // the point of the id is to avoid re-orderings when the send timestamp is
413  // the same for 2 items, which seems to happen (when the pps rate is 0)
414  // with both the steady_clock and the high_resolution_clock on my Linux
415  // VM.
416  return (lhs.send == rhs.send) ? lhs.id > rhs.id : lhs.send > rhs.send;
417  }
418  };
419 
420  // performance seems to be roughly the same for deque vs vector
421  using MyQ = std::priority_queue<QE, std::deque<QE>, QEComp>;
422  // using MyQ = std::priority_queue<QE, std::vector<QE>, QEComp>;
423 
424  struct QueueInfo {
425  QueueInfo(size_t capacity, uint64_t queue_rate_pps)
426  : capacity(capacity),
427  queue_rate_pps(queue_rate_pps),
428  pkt_delay_ticks(rate_to_ticks(queue_rate_pps)),
429  last_sent(clock::now()) { }
430 
431  size_t size{0};
432  size_t capacity;
433  uint64_t queue_rate_pps;
434  ticks pkt_delay_ticks;
435  clock::time_point last_sent;
436  };
437 
438  struct WorkerInfo {
439  MyQ queue{};
440  mutable std::condition_variable q_not_empty{};
441  size_t wrapping_counter{0};
442  };
443 
444  QueueInfo &get_queue(size_t queue_id) {
445  auto it = queues_info.find(queue_id);
446  if (it != queues_info.end()) return it->second;
447  auto p = queues_info.emplace(queue_id, QueueInfo(capacity, queue_rate_pps));
448  return p.first->second;
449  }
450 
451  const QueueInfo &get_queue_or_throw(size_t queue_id) const {
452  return queues_info.at(queue_id);
453  }
454 
455  QueueInfo &get_queue_or_throw(size_t queue_id) {
456  return queues_info.at(queue_id);
457  }
458 
459  clock::time_point get_next_tp(const QueueInfo &q_info) {
460  return std::max(clock::now(), q_info.last_sent + q_info.pkt_delay_ticks);
461  }
462 
463  mutable MutexType mutex{};
464  size_t nb_workers;
465  size_t capacity; // default capacity
466  uint64_t queue_rate_pps{0}; // default rate
467  std::unordered_map<size_t, QueueInfo> queues_info;
468  std::vector<WorkerInfo> workers_info;
469  FMap map_to_worker;
470 };
471 
472 
488 template <typename T, typename FMap>
490  using MutexType = std::mutex;
491  using LockType = std::unique_lock<MutexType>;
492 
493  public:
500  QueueingLogicPriRL(size_t nb_workers, size_t capacity,
501  FMap map_to_worker, size_t nb_priorities = 2)
502  : nb_workers(nb_workers),
503  capacity(capacity),
504  workers_info(nb_workers),
505  map_to_worker(std::move(map_to_worker)),
506  nb_priorities(nb_priorities) { }
507 
513  int push_front(size_t queue_id, size_t priority, const T &item) {
514  size_t worker_id = map_to_worker(queue_id);
515  LockType lock(mutex);
516  auto &q_info = get_queue(queue_id);
517  auto &w_info = workers_info.at(worker_id);
518  auto &q_info_pri = q_info.at(priority);
519  if (q_info_pri.size >= q_info_pri.capacity) return 0;
520  q_info_pri.last_sent = get_next_tp(q_info_pri);
521  w_info.queues[priority].emplace(
522  item, queue_id, q_info_pri.last_sent, w_info.wrapping_counter++);
523  q_info_pri.size++;
524  q_info.size++;
525  w_info.size++;
526  w_info.q_not_empty.notify_one();
527  return 1;
528  }
529 
530  int push_front(size_t queue_id, const T &item) {
531  return push_front(queue_id, 0, item);
532  }
533 
536  int push_front(size_t queue_id, size_t priority, T &&item) {
537  size_t worker_id = map_to_worker(queue_id);
538  LockType lock(mutex);
539  auto &q_info = get_queue(queue_id);
540  auto &w_info = workers_info.at(worker_id);
541  auto &q_info_pri = q_info.at(priority);
542  if (q_info_pri.size >= q_info_pri.capacity) return 0;
543  q_info_pri.last_sent = get_next_tp(q_info_pri);
544  w_info.queues[priority].emplace(
545  std::move(item),
546  queue_id,
547  q_info_pri.last_sent,
548  w_info.wrapping_counter++);
549  q_info_pri.size++;
550  q_info.size++;
551  w_info.size++;
552  w_info.q_not_empty.notify_one();
553  return 1;
554  }
555 
556  int push_front(size_t queue_id, T &&item) {
557  return push_front(queue_id, 0, std::move(item));
558  }
559 
569  void pop_back(size_t worker_id, size_t *queue_id, size_t *priority,
570  T *pItem) {
571  LockType lock(mutex);
572  auto &w_info = workers_info.at(worker_id);
573  MyQ *queue = nullptr;
574  size_t pri;
575  while (true) {
576  if (w_info.size == 0) {
577  w_info.q_not_empty.wait(lock);
578  } else {
579  auto now = clock::now();
580  auto next = clock::time_point::max();
581  // This will iterate from nb_priorities-1 to 0
582  for (pri = nb_priorities ; pri-- > 0;) {
583  auto &q = w_info.queues[pri];
584  if (q.size() == 0) continue;
585  if (q.top().send <= now) {
586  queue = &q;
587  break;
588  }
589  next = std::min(next, q.top().send);
590  }
591  if (queue) break;
592  w_info.q_not_empty.wait_until(lock, next);
593  }
594  }
595  *queue_id = queue->top().queue_id;
596  *priority = pri;
597  // TODO(antonin): improve / document this
598  // http://stackoverflow.com/questions/20149471/move-out-element-of-std-priority-queue-in-c11
599  *pItem = std::move(const_cast<QE &>(queue->top()).e);
600  queue->pop();
601  auto &q_info = get_queue_or_throw(*queue_id);
602  auto &q_info_pri = q_info.at(*priority);
603  q_info_pri.size--;
604  q_info.size--;
605  w_info.size--;
606  }
607 
611  void pop_back(size_t worker_id, size_t *queue_id, T *pItem) {
612  size_t priority;
613  return pop_back(worker_id, queue_id, &priority, pItem);
614  }
615 
619  size_t size(size_t queue_id) const {
620  LockType lock(mutex);
621  auto it = queues_info.find(queue_id);
622  if (it == queues_info.end()) return 0;
623  auto &q_info = it->second;
624  return q_info.size;
625  }
626 
629  size_t size(size_t queue_id, size_t priority) const {
630  LockType lock(mutex);
631  auto it = queues_info.find(queue_id);
632  if (it == queues_info.end()) return 0;
633  auto &q_info = it->second;
634  auto &q_info_pri = q_info.at(priority);
635  return q_info_pri.size;
636  }
637 
640  void set_capacity(size_t queue_id, size_t c) {
641  LockType lock(mutex);
642  for_each_q(queue_id, SetCapacityFn(c));
643  }
644 
647  void set_capacity(size_t queue_id, size_t priority, size_t c) {
648  LockType lock(mutex);
649  for_one_q(queue_id, priority, SetCapacityFn(c));
650  }
651 
654  void set_capacity_for_all(size_t c) {
655  LockType lock(mutex);
656  for (auto &p : queues_info) for_each_q(p.first, SetCapacityFn(c));
657  capacity = c;
658  }
659 
665  void set_rate(size_t queue_id, uint64_t pps) {
666  LockType lock(mutex);
667  for_each_q(queue_id, SetRateFn(pps));
668  }
669 
672  void set_rate(size_t queue_id, size_t priority, uint64_t pps) {
673  LockType lock(mutex);
674  for_one_q(queue_id, priority, SetRateFn(pps));
675  }
676 
678  void set_rate_for_all(uint64_t pps) {
679  LockType lock(mutex);
680  for (auto &p : queues_info) for_each_q(p.first, SetRateFn(pps));
681  queue_rate_pps = pps;
682  }
683 
685  QueueingLogicPriRL(const QueueingLogicPriRL &) = delete;
688 
693 
694  private:
695  using ticks = std::chrono::nanoseconds;
696  // clock choice? switch to steady if observing re-ordering
697  // in my Linux VM, it seems that both clocks behave the same (can sometimes
698  // stop increasing for a bit but do not go backwards).
699  // using clock = std::chrono::steady_clock;
700  using clock = std::chrono::high_resolution_clock;
701 
702  static constexpr ticks rate_to_ticks(uint64_t pps) {
703  using std::chrono::duration;
704  using std::chrono::duration_cast;
705  return (pps == 0) ?
706  ticks(0) : duration_cast<ticks>(duration<double>(1. / pps));
707  }
708 
709  struct QE {
710  QE(T e, size_t queue_id, const clock::time_point &send, size_t id)
711  : e(std::move(e)), queue_id(queue_id), send(send), id(id) { }
712 
713  T e;
714  size_t queue_id;
715  clock::time_point send;
716  size_t id;
717  };
718 
719  struct QEComp {
720  bool operator()(const QE &lhs, const QE &rhs) const {
721  return (lhs.send == rhs.send) ? lhs.id > rhs.id : lhs.send > rhs.send;
722  }
723  };
724 
725  using MyQ = std::priority_queue<QE, std::deque<QE>, QEComp>;
726 
727  struct QueueInfoPri {
728  QueueInfoPri(size_t capacity, uint64_t queue_rate_pps)
729  : capacity(capacity),
730  queue_rate_pps(queue_rate_pps),
731  pkt_delay_ticks(rate_to_ticks(queue_rate_pps)),
732  last_sent(clock::now()) { }
733 
734  size_t size{0};
735  size_t capacity;
736  uint64_t queue_rate_pps;
737  ticks pkt_delay_ticks;
738  clock::time_point last_sent;
739  };
740 
741  struct QueueInfo : public std::vector<QueueInfoPri> {
742  QueueInfo(size_t capacity, uint64_t queue_rate_pps, size_t nb_priorities)
743  : std::vector<QueueInfoPri>(
744  nb_priorities, QueueInfoPri(capacity, queue_rate_pps)) { }
745 
746  size_t size{0};
747  };
748 
749  struct WorkerInfo {
750  mutable std::condition_variable q_not_empty{};
751  size_t size{0};
752  std::array<MyQ, 32> queues;
753  size_t wrapping_counter{0};
754  };
755 
756  QueueInfo &get_queue(size_t queue_id) {
757  auto it = queues_info.find(queue_id);
758  if (it != queues_info.end()) return it->second;
759  auto p = queues_info.emplace(
760  queue_id, QueueInfo(capacity, queue_rate_pps, nb_priorities));
761  return p.first->second;
762  }
763 
764  const QueueInfo &get_queue_or_throw(size_t queue_id) const {
765  return queues_info.at(queue_id);
766  }
767 
768  QueueInfo &get_queue_or_throw(size_t queue_id) {
769  return queues_info.at(queue_id);
770  }
771 
772  clock::time_point get_next_tp(const QueueInfoPri &q_info_pri) {
773  return std::max(clock::now(),
774  q_info_pri.last_sent + q_info_pri.pkt_delay_ticks);
775  }
776 
777  template <typename Function>
778  Function for_each_q(size_t queue_id, Function fn) {
779  auto &q_info = get_queue(queue_id);
780  for (auto &q_info_pri : q_info) fn(q_info_pri);
781  return fn;
782  }
783 
784  template <typename Function>
785  Function for_one_q(size_t queue_id, size_t priority, Function fn) {
786  auto &q_info = get_queue(queue_id);
787  auto &q_info_pri = q_info.at(priority);
788  fn(q_info_pri);
789  return fn;
790  }
791 
792  struct SetCapacityFn {
793  explicit SetCapacityFn(size_t c)
794  : c(c) { }
795 
796  void operator ()(QueueInfoPri &info) const { // NOLINT(runtime/references)
797  info.capacity = c;
798  }
799 
800  size_t c;
801  };
802 
803  struct SetRateFn {
804  explicit SetRateFn(uint64_t pps)
805  : pps(pps) {
806  using std::chrono::duration;
807  using std::chrono::duration_cast;
808  pkt_delay_ticks = rate_to_ticks(pps);
809  }
810 
811  void operator ()(QueueInfoPri &info) const { // NOLINT(runtime/references)
812  info.queue_rate_pps = pps;
813  info.pkt_delay_ticks = pkt_delay_ticks;
814  }
815 
816  uint64_t pps;
817  ticks pkt_delay_ticks;
818  };
819 
820  mutable MutexType mutex;
821  size_t nb_workers;
822  size_t capacity; // default capacity
823  uint64_t queue_rate_pps{0}; // default rate
824  std::unordered_map<size_t, QueueInfo> queues_info{};
825  std::vector<WorkerInfo> workers_info{};
826  std::vector<MyQ> queues{};
827  FMap map_to_worker;
828  size_t nb_priorities;
829 };
830 
831 } // namespace bm
832 
833 #endif // BM_BM_SIM_QUEUEING_H_
bm::QueueingLogicRL::set_capacity
void set_capacity(size_t queue_id, size_t c)
Definition: queueing.h:337
bm::QueueingLogic::operator=
QueueingLogic & operator=(const QueueingLogic &)=delete
Deleted copy assignment operator.
bm::QueueingLogicPriRL::operator=
QueueingLogicPriRL & operator=(const QueueingLogicPriRL &)=delete
Deleted copy assignment operator.
bm::QueueingLogicPriRL
Definition: queueing.h:489
bm::QueueingLogicRL::operator=
QueueingLogicRL & operator=(const QueueingLogicRL &)=delete
Deleted copy assignment operator.
bm::QueueingLogic
Definition: queueing.h:83
bm::QueueingLogicRL::set_capacity_for_all
void set_capacity_for_all(size_t c)
Set the capacity of all logical queues to c elements.
Definition: queueing.h:344
bm::QueueingLogicPriRL::push_front
int push_front(size_t queue_id, size_t priority, T &&item)
Definition: queueing.h:536
bm::QueueingLogicRL::set_rate_for_all
void set_rate_for_all(uint64_t pps)
Set the maximum rate of all logical queues to pps.
Definition: queueing.h:363
bm::QueueingLogicRL::QueueingLogicRL
QueueingLogicRL(size_t nb_workers, size_t capacity, FMap map_to_worker)
Definition: queueing.h:261
bm::QueueingLogicPriRL::set_rate_for_all
void set_rate_for_all(uint64_t pps)
Set the rate of all the priority queues of all logical queues to pps.
Definition: queueing.h:678
bm::QueueingLogicRL::pop_back
void pop_back(size_t worker_id, size_t *queue_id, T *pItem)
Definition: queueing.h:305
bm::QueueingLogicPriRL::set_capacity
void set_capacity(size_t queue_id, size_t c)
Definition: queueing.h:640
bm::QueueingLogicRL::push_front
int push_front(size_t queue_id, T &&item)
Definition: queueing.h:286
bm::QueueingLogicRL
Definition: queueing.h:252
bm::QueueingLogicRL::set_rate
void set_rate(size_t queue_id, uint64_t pps)
Definition: queueing.h:355
bm::QueueingLogicPriRL::set_capacity_for_all
void set_capacity_for_all(size_t c)
Definition: queueing.h:654
bm::QueueingLogicRL::push_front
int push_front(size_t queue_id, const T &item)
Definition: queueing.h:270
bm::QueueingLogic::push_front
void push_front(size_t queue_id, T &&item)
Moves item to the front of the logical queue with id queue_id.
Definition: queueing.h:117
bm::QueueingLogicRL::size
size_t size(size_t queue_id) const
Get the occupancy of the logical queue with id queue_id.
Definition: queueing.h:327
bm::QueueingLogic::set_capacity
void set_capacity(size_t queue_id, size_t c)
Definition: queueing.h:163
bm::QueueingLogicPriRL::set_rate
void set_rate(size_t queue_id, uint64_t pps)
Definition: queueing.h:665
bm::QueueingLogicPriRL::pop_back
void pop_back(size_t worker_id, size_t *queue_id, size_t *priority, T *pItem)
Definition: queueing.h:569
bm::QueueingLogic::set_capacity_for_all
void set_capacity_for_all(size_t c)
Set the capacity of all logical queues to c elements.
Definition: queueing.h:170
bm::QueueingLogic::pop_back
void pop_back(size_t worker_id, size_t *queue_id, T *pItem)
Definition: queueing.h:137
bm::QueueingLogicPriRL::push_front
int push_front(size_t queue_id, size_t priority, const T &item)
Definition: queueing.h:513
bm::QueueingLogic::size
size_t size(size_t queue_id) const
Get the occupancy of the logical queue with id queue_id.
Definition: queueing.h:153
bm::QueueingLogicPriRL::pop_back
void pop_back(size_t worker_id, size_t *queue_id, T *pItem)
Definition: queueing.h:611
bm::QueueingLogic::QueueingLogic
QueueingLogic(size_t nb_workers, size_t capacity, FMap map_to_worker)
Definition: queueing.h:95
bm::QueueingLogicPriRL::set_rate
void set_rate(size_t queue_id, size_t priority, uint64_t pps)
Definition: queueing.h:672
bm::QueueingLogicPriRL::QueueingLogicPriRL
QueueingLogicPriRL(size_t nb_workers, size_t capacity, FMap map_to_worker, size_t nb_priorities=2)
Definition: queueing.h:500
bm::QueueingLogic::push_front
void push_front(size_t queue_id, const T &item)
Definition: queueing.h:103
bm::QueueingLogicPriRL::size
size_t size(size_t queue_id) const
Get the occupancy of the logical queue with id queue_id.
Definition: queueing.h:619
bm::QueueingLogicPriRL::set_capacity
void set_capacity(size_t queue_id, size_t priority, size_t c)
Definition: queueing.h:647
bm::QueueingLogicPriRL::size
size_t size(size_t queue_id, size_t priority) const
Definition: queueing.h:629