Class
Instance Methods
ractor.c
View on GitHub
static VALUE
ractor_selector__wait(VALUE selv, VALUE do_receivev, VALUE do_yieldv, VALUE yield_value, VALUE move)
{
rb_execution_context_t *ec = GET_EC();
struct rb_ractor_selector *s = RACTOR_SELECTOR_PTR(selv);
struct rb_ractor_basket *tb = &s->take_basket;
struct rb_ractor_basket taken_basket;
rb_ractor_t *cr = rb_ec_ractor_ptr(ec);
bool do_receive = !!RTEST(do_receivev);
bool do_yield = !!RTEST(do_yieldv);
VALUE ret_v, ret_r;
enum rb_ractor_wait_status wait_status;
struct rb_ractor_queue *rq = &cr->sync.recv_queue;
struct rb_ractor_queue *ts = &cr->sync.takers_queue;
RUBY_DEBUG_LOG("start");
retry:
RUBY_DEBUG_LOG("takers:%ld", s->take_ractors->num_entries);
// setup wait_status
wait_status = wait_none;
if (s->take_ractors->num_entries > 0) wait_status |= wait_taking;
if (do_receive) wait_status |= wait_receiving;
if (do_yield) wait_status |= wait_yielding;
RUBY_DEBUG_LOG("wait:%s", wait_status_str(wait_status));
if (wait_status == wait_none) {
rb_raise(rb_eRactorError, "no taking ractors");
}
// check recv_queue
if (do_receive && (ret_v = ractor_try_receive(ec, cr, rq)) != Qundef) {
ret_r = ID2SYM(rb_intern("receive"));
goto success;
}
// check takers
if (do_yield && ractor_try_yield(ec, cr, ts, yield_value, move, false, false)) {
ret_v = Qnil;
ret_r = ID2SYM(rb_intern("yield"));
goto success;
}
// check take_basket
VM_ASSERT(basket_type_p(&s->take_basket, basket_type_reserved));
s->take_basket.type.e = basket_type_none;
// kick all take target ractors
st_foreach(s->take_ractors, ractor_selector_wait_i, (st_data_t)tb);
RACTOR_LOCK_SELF(cr);
{
retry_waiting:
while (1) {
if (!basket_none_p(tb)) {
RUBY_DEBUG_LOG("taken:%s from r:%u", basket_type_name(tb->type.e),
tb->sender ? rb_ractor_id(RACTOR_PTR(tb->sender)) : 0);
break;
}
if (do_receive && !ractor_queue_empty_p(cr, rq)) {
RUBY_DEBUG_LOG("can receive (%d)", rq->cnt);
break;
}
if (do_yield && ractor_check_take_basket(cr, ts)) {
RUBY_DEBUG_LOG("can yield");
break;
}
ractor_sleep_with_cleanup(ec, cr, wait_status, ractor_selector_wait_cleaup, tb);
}
taken_basket = *tb;
// ensure
// tb->type.e = basket_type_reserved # do it atomic in the following code
if (taken_basket.type.e == basket_type_yielding ||
RUBY_ATOMIC_CAS(tb->type.atomic, taken_basket.type.e, basket_type_reserved) != taken_basket.type.e) {
if (basket_type_p(tb, basket_type_yielding)) {
RACTOR_UNLOCK_SELF(cr);
{
rb_thread_sleep(0);
}
RACTOR_LOCK_SELF(cr);
}
goto retry_waiting;
}
}
RACTOR_UNLOCK_SELF(cr);
// check the taken resutl
switch (taken_basket.type.e) {
case basket_type_none:
VM_ASSERT(do_receive || do_yield);
goto retry;
case basket_type_yielding:
rb_bug("unreachable");
case basket_type_deleted: {
ractor_selector_remove(selv, taken_basket.sender);
rb_ractor_t *r = RACTOR_PTR(taken_basket.sender);
if (ractor_take_will_lock(r, &taken_basket)) {
RUBY_DEBUG_LOG("has_will");
}
else {
RUBY_DEBUG_LOG("no will");
// rb_raise(rb_eRactorClosedError, "The outgoing-port is already closed");
// remove and retry wait
goto retry;
}
break;
}
case basket_type_will:
// no more messages
ractor_selector_remove(selv, taken_basket.sender);
break;
default:
break;
}
RUBY_DEBUG_LOG("taken_basket:%s", basket_type_name(taken_basket.type.e));
ret_v = ractor_basket_accept(&taken_basket);
ret_r = taken_basket.sender;
success:
return rb_ary_new_from_args(2, ret_r, ret_v);
}
No documentation available
#
ractor.c
View on GitHub
static VALUE
ractor_selector_add(VALUE selv, VALUE rv)
{
if (!rb_ractor_p(rv)) {
rb_raise(rb_eArgError, "Not a ractor object");
}
rb_ractor_t *r = RACTOR_PTR(rv);
struct rb_ractor_selector *s = RACTOR_SELECTOR_PTR(selv);
if (st_lookup(s->take_ractors, (st_data_t)r, NULL)) {
rb_raise(rb_eArgError, "already added");
}
struct rb_ractor_selector_take_config *config = malloc(sizeof(struct rb_ractor_selector_take_config));
VM_ASSERT(config != NULL);
config->closed = false;
config->oneshot = false;
if (ractor_register_take(GET_RACTOR(), r, &s->take_basket, false, config, true)) {
st_insert(s->take_ractors, (st_data_t)r, (st_data_t)config);
}
return rv;
}
No documentation available
#
ractor.c
View on GitHub
static VALUE
ractor_selector_clear(VALUE selv)
{
struct rb_ractor_selector *s = RACTOR_SELECTOR_PTR(selv);
st_foreach(s->take_ractors, ractor_selector_clear_i, (st_data_t)selv);
st_clear(s->take_ractors);
return selv;
}
No documentation available
#
ractor.c
View on GitHub
static VALUE
ractor_selector_empty_p(VALUE selv)
{
struct rb_ractor_selector *s = RACTOR_SELECTOR_PTR(selv);
return s->take_ractors->num_entries == 0 ? Qtrue : Qfalse;
}
No documentation available
ractor.c
View on GitHub
static VALUE
ractor_selector_remove(VALUE selv, VALUE rv)
{
if (!rb_ractor_p(rv)) {
rb_raise(rb_eArgError, "Not a ractor object");
}
rb_ractor_t *r = RACTOR_PTR(rv);
struct rb_ractor_selector *s = RACTOR_SELECTOR_PTR(selv);
RUBY_DEBUG_LOG("r:%u", rb_ractor_id(r));
if (!st_lookup(s->take_ractors, (st_data_t)r, NULL)) {
rb_raise(rb_eArgError, "not added yet");
}
ractor_deregister_take(r, &s->take_basket);
struct rb_ractor_selector_take_config *config;
st_delete(s->take_ractors, (st_data_t *)&r, (st_data_t *)&config);
free(config);
return rv;
}
No documentation available
ractor.c
View on GitHub
static VALUE
ractor_selector_wait(int argc, VALUE *argv, VALUE selector)
{
VALUE options;
ID keywords[3];
VALUE values[3];
keywords[0] = rb_intern("receive");
keywords[1] = rb_intern("yield_value");
keywords[2] = rb_intern("move");
rb_scan_args(argc, argv, "0:", &options);
rb_get_kwargs(options, keywords, 0, numberof(values), values);
return ractor_selector__wait(selector,
values[0] == Qundef ? Qfalse : RTEST(values[0]),
values[1] != Qundef, values[1], values[2]);
}
No documentation available