You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

33 satır
1.2KB

  1. from queue import Full, Queue
  2. from time import monotonic as time
  3. # add implementation of putleft to Queue
  4. class Dequeue(Queue):
  5. def putleft(self, item, block=True, timeout=None):
  6. with self.not_full:
  7. if self.maxsize > 0:
  8. if not block:
  9. if self._qsize() >= self.maxsize:
  10. raise Full
  11. elif timeout is None:
  12. while self._qsize() >= self.maxsize:
  13. self.not_full.wait()
  14. elif timeout < 0:
  15. raise ValueError("'timeout' must be a non-negative number")
  16. else:
  17. endtime = time() + timeout
  18. while self._qsize() >= self.maxsize:
  19. remaining = endtime - time()
  20. if remaining <= 0.0:
  21. raise Full
  22. self.not_full.wait(remaining)
  23. self._putleft(item)
  24. self.unfinished_tasks += 1
  25. self.not_empty.notify()
  26. def putleft_nowait(self, item):
  27. return self.putleft(item, block=False)
  28. def _putleft(self, item):
  29. self.queue.appendleft(item)