This class represents queues of specified size capacity. The push operation may be blocked if the capacity is full.
See Thread::Queue
for an example of how a Thread::SizedQueue
works.
static VALUE
rb_szqueue_initialize(VALUE self, VALUE vmax)
{
long max;
struct rb_szqueue *sq = szqueue_ptr(self);
max = NUM2LONG(vmax);
if (max <= 0) {
rb_raise(rb_eArgError, "queue size must be positive");
}
RB_OBJ_WRITE(self, &sq->q.que, ary_buf_new());
list_head_init(szqueue_waitq(sq));
list_head_init(szqueue_pushq(sq));
sq->max = max;
return self;
}
Creates a fixed-length queue with a maximum size of max
.
static VALUE
rb_szqueue_clear(VALUE self)
{
struct rb_szqueue *sq = szqueue_ptr(self);
rb_ary_clear(check_array(self, sq->q.que));
wakeup_all(szqueue_pushq(sq));
return self;
}
Removes all objects from the queue.
static VALUE
rb_szqueue_close(VALUE self)
{
if (!queue_closed_p(self)) {
struct rb_szqueue *sq = szqueue_ptr(self);
FL_SET(self, QUEUE_CLOSED);
wakeup_all(szqueue_waitq(sq));
wakeup_all(szqueue_pushq(sq));
}
return self;
}
Similar to Thread::Queue#close
.
The difference is behavior with waiting enqueuing threads.
If there are waiting enqueuing threads, they are interrupted by raising ClosedQueueError(‘queue closed’).
static VALUE
rb_szqueue_empty_p(VALUE self)
{
struct rb_szqueue *sq = szqueue_ptr(self);
return RBOOL(queue_length(self, &sq->q) == 0);
}
Returns true
if the queue is empty.
static VALUE
rb_szqueue_length(VALUE self)
{
struct rb_szqueue *sq = szqueue_ptr(self);
return LONG2NUM(queue_length(self, &sq->q));
}
Returns the length of the queue.
static VALUE
rb_szqueue_max_get(VALUE self)
{
return LONG2NUM(szqueue_ptr(self)->max);
}
Returns the maximum size of the queue.
static VALUE
rb_szqueue_max_set(VALUE self, VALUE vmax)
{
long max = NUM2LONG(vmax);
long diff = 0;
struct rb_szqueue *sq = szqueue_ptr(self);
if (max <= 0) {
rb_raise(rb_eArgError, "queue size must be positive");
}
if (max > sq->max) {
diff = max - sq->max;
}
sq->max = max;
sync_wakeup(szqueue_pushq(sq), diff);
return vmax;
}
Sets the maximum size of the queue to the given number
.
static VALUE
rb_szqueue_num_waiting(VALUE self)
{
struct rb_szqueue *sq = szqueue_ptr(self);
return INT2NUM(sq->q.num_waiting + sq->num_waiting_push);
}
Returns the number of threads waiting on the queue.
static VALUE
rb_szqueue_pop(int argc, VALUE *argv, VALUE self)
{
int should_block = queue_pop_should_block(argc, argv);
return szqueue_do_pop(self, should_block);
}
Retrieves data from the queue.
If the queue is empty, the calling thread is suspended until data is pushed onto the queue. If non_block
is true, the thread isn’t suspended, and ThreadError
is raised.
static VALUE
rb_szqueue_push(int argc, VALUE *argv, VALUE self)
{
struct rb_szqueue *sq = szqueue_ptr(self);
int should_block = szqueue_push_should_block(argc, argv);
while (queue_length(self, &sq->q) >= sq->max) {
if (!should_block) {
rb_raise(rb_eThreadError, "queue full");
}
else if (queue_closed_p(self)) {
break;
}
else {
rb_execution_context_t *ec = GET_EC();
struct queue_waiter queue_waiter = {
.w = {.self = self, .th = ec->thread_ptr, .fiber = ec->fiber_ptr},
.as = {.sq = sq}
};
struct list_head *pushq = szqueue_pushq(sq);
list_add_tail(pushq, &queue_waiter.w.node);
sq->num_waiting_push++;
rb_ensure(queue_sleep, self, szqueue_sleep_done, (VALUE)&queue_waiter);
}
}
if (queue_closed_p(self)) {
raise_closed_queue_error(self);
}
return queue_do_push(self, &sq->q, argv[0]);
}
Pushes object
to the queue.
If there is no space left in the queue, waits until space becomes available, unless non_block
is true. If non_block
is true, the thread isn’t suspended, and ThreadError
is raised.