1
0
mirror of https://github.com/golang/go synced 2024-11-19 15:54:46 -07:00
go/src/runtime/chan.c

718 lines
12 KiB
C
Raw Normal View History

2008-07-13 15:29:46 -06:00
// Copyright 2009 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
#include "runtime.h"
2008-07-14 15:33:39 -06:00
static int32 debug = 0;
2008-07-13 15:29:46 -06:00
typedef struct Hchan Hchan;
2008-07-14 15:33:39 -06:00
typedef struct Link Link;
typedef struct WaitQ WaitQ;
typedef struct SudoG SudoG;
typedef struct Select Select;
typedef struct Scase Scase;
struct SudoG
{
G* g; // g and selgen constitute
byte elem[8]; // synch data element
int16 offset; // offset of case number
int32 selgen; // a weak pointer to g
SudoG* link;
};
struct WaitQ
{
SudoG* first;
SudoG* last;
};
2008-07-13 15:29:46 -06:00
struct Hchan
{
uint32 elemsize;
2008-07-14 15:33:39 -06:00
uint32 dataqsiz; // size of the circular q
uint32 qcount; // total data in the q
uint16 eo; // vararg of element
uint16 po; // vararg of present bool
2008-07-14 15:33:39 -06:00
Alg* elemalg; // interface for element type
Link* senddataq; // pointer for sender
Link* recvdataq; // pointer for receiver
WaitQ recvq; // list of recv waiters
WaitQ sendq; // list of send waiters
SudoG* free; // freelist
2008-07-14 15:33:39 -06:00
};
struct Link
{
Link* link; // asynch queue circular linked list
byte elem[8]; // asynch queue data element
2008-07-13 15:29:46 -06:00
};
struct Scase
{
Hchan* chan; // chan
byte* pc; // return pc
uint16 send; // 0-recv 1-send
uint16 so; // vararg of selected bool
union {
byte elem[8]; // element (send)
byte* elemp; // pointer to element (recv)
} u;
};
struct Select
{
uint16 tcase; // total count of scase[]
uint16 ncase; // currently filled scase[]
Scase scase[1]; // one per case
};
static SudoG* dequeue(WaitQ*, Hchan*);
static void enqueue(WaitQ*, SudoG*);
static SudoG* allocsg(Hchan*);
static void freesg(Hchan*, SudoG*);
static uint32 gcd(uint32, uint32);
static uint32 fastrand1(void);
static uint32 fastrand2(void);
2008-07-13 15:29:46 -06:00
// newchan(elemsize uint32, elemalg uint32, hint uint32) (hchan *chan any);
void
sys·newchan(uint32 elemsize, uint32 elemalg, uint32 hint,
Hchan* ret)
{
Hchan *c;
2008-07-14 15:33:39 -06:00
int32 i;
2008-07-13 15:29:46 -06:00
if(elemalg >= nelem(algarray)) {
prints("0<=");
sys·printint(elemalg);
prints("<");
sys·printint(nelem(algarray));
prints("\n");
throw("sys·newchan: elem algorithm out of range");
}
c = mal(sizeof(*c));
c->elemsize = elemsize;
c->elemalg = &algarray[elemalg];
2008-07-14 15:33:39 -06:00
if(hint > 0) {
Link *d, *b, *e;
// make a circular q
b = nil;
e = nil;
for(i=0; i<hint; i++) {
d = mal(sizeof(*d));
if(e == nil)
e = d;
d->link = b;
b = d;
}
e->link = b;
c->recvdataq = b;
c->senddataq = b;
c->qcount = 0;
c->dataqsiz = hint;
}
2008-07-13 15:29:46 -06:00
// these calculations are compiler dependent
c->eo = rnd(sizeof(c), elemsize);
c->po = rnd(c->eo+elemsize, 1);
2008-07-13 15:29:46 -06:00
ret = c;
FLUSH(&ret);
if(debug) {
prints("newchan: chan=");
sys·printpointer(c);
prints("; elemsize=");
sys·printint(elemsize);
prints("; elemalg=");
sys·printint(elemalg);
2008-07-14 15:33:39 -06:00
prints("; dataqsiz=");
sys·printint(c->dataqsiz);
2008-07-13 15:29:46 -06:00
prints("\n");
}
}
2008-07-15 22:07:59 -06:00
// chansend1(hchan *chan any, elem any);
2008-07-13 15:29:46 -06:00
void
2008-07-15 22:07:59 -06:00
sys·chansend1(Hchan* c, ...)
2008-07-13 15:29:46 -06:00
{
byte *ae;
SudoG *sgr;
G* gr;
2008-07-13 15:29:46 -06:00
ae = (byte*)&c + c->eo;
if(debug) {
prints("chansend: chan=");
sys·printpointer(c);
prints("; elem=");
c->elemalg->print(c->elemsize, ae);
prints("\n");
}
2008-07-14 15:33:39 -06:00
if(c->dataqsiz > 0)
goto asynch;
sgr = dequeue(&c->recvq, c);
if(sgr != nil) {
c->elemalg->copy(c->elemsize, sgr->elem, ae);
gr = sgr->g;
gr->param = sgr;
2008-07-14 15:33:39 -06:00
gr->status = Grunnable;
return;
}
sgr = allocsg(c);
c->elemalg->copy(c->elemsize, sgr->elem, ae);
2008-07-14 15:33:39 -06:00
g->status = Gwaiting;
enqueue(&c->sendq, sgr);
2008-07-14 15:33:39 -06:00
sys·gosched();
return;
asynch:
2008-07-14 18:41:38 -06:00
while(c->qcount >= c->dataqsiz) {
sgr = allocsg(c);
2008-07-14 18:41:38 -06:00
g->status = Gwaiting;
enqueue(&c->sendq, sgr);
2008-07-14 18:41:38 -06:00
sys·gosched();
}
c->elemalg->copy(c->elemsize, c->senddataq->elem, ae);
c->senddataq = c->senddataq->link;
c->qcount++;
sgr = dequeue(&c->recvq, c);
if(sgr != nil) {
gr = sgr->g;
freesg(c, sgr);
2008-07-14 18:41:38 -06:00
gr->status = Grunnable;
}
2008-07-13 15:29:46 -06:00
}
2008-07-13 17:20:27 -06:00
2008-07-15 22:07:59 -06:00
// chansend2(hchan *chan any, elem any) (pres bool);
void
sys·chansend2(Hchan* c, ...)
{
byte *ae, *ap;
SudoG *sgr;
2008-07-15 22:07:59 -06:00
G *gr;
ae = (byte*)&c + c->eo;
ap = (byte*)&c + c->po;
2008-07-16 12:46:33 -06:00
2008-07-15 22:07:59 -06:00
if(debug) {
prints("chansend: chan=");
sys·printpointer(c);
prints("; elem=");
c->elemalg->print(c->elemsize, ae);
prints("\n");
}
if(c->dataqsiz > 0)
goto asynch;
sgr = dequeue(&c->recvq, c);
if(sgr != nil) {
gr = sgr->g;
c->elemalg->copy(c->elemsize, sgr->elem, ae);
gr->param = sgr;
2008-07-15 22:07:59 -06:00
gr->status = Grunnable;
*ap = true;
return;
}
*ap = false;
return;
asynch:
if(c->qcount >= c->dataqsiz) {
*ap = false;
return;
}
c->elemalg->copy(c->elemsize, c->senddataq->elem, ae);
c->senddataq = c->senddataq->link;
c->qcount++;
sgr = dequeue(&c->recvq, c);
if(gr != nil) {
gr = sgr->g;
freesg(c, sgr);
2008-07-15 22:07:59 -06:00
gr->status = Grunnable;
}
2008-07-15 22:07:59 -06:00
*ap = true;
}
2008-07-13 17:20:27 -06:00
// chanrecv1(hchan *chan any) (elem any);
void
sys·chanrecv1(Hchan* c, ...)
{
byte *ae;
SudoG *sgs;
2008-07-14 15:33:39 -06:00
G *gs;
2008-07-13 17:20:27 -06:00
ae = (byte*)&c + c->eo;
if(debug) {
prints("chanrecv1: chan=");
sys·printpointer(c);
prints("\n");
}
2008-07-14 15:33:39 -06:00
if(c->dataqsiz > 0)
goto asynch;
sgs = dequeue(&c->sendq, c);
if(sgs != nil) {
c->elemalg->copy(c->elemsize, ae, sgs->elem);
gs = sgs->g;
gs->param = sgs;
2008-07-14 15:33:39 -06:00
gs->status = Grunnable;
freesg(c, sgs);
2008-07-14 15:33:39 -06:00
return;
}
sgs = allocsg(c);
2008-07-14 15:33:39 -06:00
g->status = Gwaiting;
enqueue(&c->recvq, sgs);
2008-07-14 15:33:39 -06:00
sys·gosched();
c->elemalg->copy(c->elemsize, ae, sgs->elem);
freesg(c, sgs);
2008-07-14 15:33:39 -06:00
return;
asynch:
2008-07-14 18:41:38 -06:00
while(c->qcount <= 0) {
sgs = allocsg(c);
2008-07-14 18:41:38 -06:00
g->status = Gwaiting;
enqueue(&c->recvq, sgs);
2008-07-14 18:41:38 -06:00
sys·gosched();
}
c->elemalg->copy(c->elemsize, ae, c->recvdataq->elem);
c->recvdataq = c->recvdataq->link;
c->qcount--;
sgs = dequeue(&c->sendq, c);
if(gs != nil) {
gs = sgs->g;
freesg(c, sgs);
2008-07-14 18:41:38 -06:00
gs->status = Grunnable;
}
2008-07-13 17:20:27 -06:00
}
// chanrecv2(hchan *chan any) (elem any, pres bool);
void
sys·chanrecv2(Hchan* c, ...)
{
2008-07-14 15:33:39 -06:00
byte *ae, *ap;
SudoG *sgs;
2008-07-14 18:41:38 -06:00
G *gs;
2008-07-13 17:20:27 -06:00
ae = (byte*)&c + c->eo;
ap = (byte*)&c + c->po;
2008-07-16 12:46:33 -06:00
2008-07-13 17:20:27 -06:00
if(debug) {
prints("chanrecv2: chan=");
sys·printpointer(c);
prints("\n");
}
2008-07-14 15:33:39 -06:00
if(c->dataqsiz > 0)
goto asynch;
2008-07-14 18:41:38 -06:00
sgs = dequeue(&c->sendq, c);
if(sgs != nil) {
c->elemalg->copy(c->elemsize, ae, sgs->elem);
gs = sgs->g;
gs->param = sgs;
2008-07-14 18:41:38 -06:00
gs->status = Grunnable;
freesg(c, sgs);
2008-07-14 18:41:38 -06:00
*ap = true;
return;
}
*ap = false;
return;
2008-07-14 15:33:39 -06:00
asynch:
2008-07-14 18:41:38 -06:00
if(c->qcount <= 0) {
*ap = false;
return;
}
c->elemalg->copy(c->elemsize, ae, c->recvdataq->elem);
c->recvdataq = c->recvdataq->link;
c->qcount--;
sgs = dequeue(&c->sendq, c);
if(sgs != nil) {
gs = sgs->g;
freesg(c, sgs);
2008-07-14 18:41:38 -06:00
gs->status = Grunnable;
}
2008-07-14 18:41:38 -06:00
*ap = true;
2008-07-13 17:20:27 -06:00
}
// newselect(size uint32) (sel *byte);
void
sys·newselect(int32 size, Select *sel)
{
int32 n;
n = 0;
if(size > 1)
n = size-1;
sel = mal(sizeof(*sel) + n*sizeof(sel->scase[0]));
sel->tcase = size;
sel->ncase = 0;
FLUSH(&sel);
if(debug) {
prints("newselect s=");
sys·printpointer(sel);
prints("\n");
}
}
// selectsend(sel *byte, hchan *chan any, elem any) (selected bool);
void
sys·selectsend(Select *sel, Hchan *c, ...)
{
int32 i, eo;
Scase *cas;
byte *ae;
// nil cases do not compete
if(c == nil)
return;
i = sel->ncase;
if(i >= sel->tcase)
throw("selectsend: too many cases");
sel->ncase = i+1;
cas = &sel->scase[i];
cas->pc = sys·getcallerpc(&sel);
cas->chan = c;
eo = rnd(sizeof(sel), sizeof(c));
eo = rnd(eo+sizeof(c), c->elemsize);
cas->so = rnd(eo+c->elemsize, 1);
cas->send = 1;
ae = (byte*)&sel + eo;
c->elemalg->copy(c->elemsize, cas->u.elem, ae);
if(debug) {
prints("newselect s=");
sys·printpointer(sel);
prints(" pc=");
sys·printpointer(cas->pc);
prints(" chan=");
sys·printpointer(cas->chan);
prints(" po=");
sys·printint(cas->so);
prints(" send=");
sys·printint(cas->send);
prints("\n");
}
}
// selectrecv(sel *byte, hchan *chan any, elem *any) (selected bool);
void
sys·selectrecv(Select *sel, Hchan *c, ...)
{
int32 i, epo;
Scase *cas;
// nil cases do not compete
if(c == nil)
return;
i = sel->ncase;
if(i >= sel->tcase)
throw("selectsend: too many cases");
sel->ncase = i+1;
cas = &sel->scase[i];
cas->pc = sys·getcallerpc(&sel);
cas->chan = c;
epo = rnd(sizeof(sel), sizeof(c));
epo = rnd(epo+sizeof(c), sizeof(byte*));
cas->so = rnd(epo+sizeof(byte*), 1);
cas->send = 0;
cas->u.elemp = *(byte**)((byte*)&sel + epo);
if(debug) {
prints("newselect s=");
sys·printpointer(sel);
prints(" pc=");
sys·printpointer(cas->pc);
prints(" chan=");
sys·printpointer(cas->chan);
prints(" so=");
sys·printint(cas->so);
prints(" send=");
sys·printint(cas->send);
prints("\n");
}
}
// selectgo(sel *byte);
void
sys·selectgo(Select *sel)
{
uint32 p, o, i;
Scase *cas;
Hchan *c;
SudoG *sg;
G *gp;
byte *ae, *as;
if(0) {
prints("selectgo: sel=");
sys·printpointer(sel);
prints("\n");
}
if(sel->ncase < 2) {
if(sel->ncase < 1)
throw("selectgo: no cases");
// make special case of one.
}
// select a (relative) prime
for(i=0;; i++) {
p = fastrand1();
if(gcd(p, sel->ncase) == 1)
break;
if(i > 1000) {
throw("selectgo: failed to select prime");
}
}
// select an initial offset
o = fastrand2();
p %= sel->ncase;
o %= sel->ncase;
// pass 1 - look for something already waiting
for(i=0; i<sel->ncase; i++) {
cas = &sel->scase[o];
c = cas->chan;
if(c->dataqsiz > 0) {
if(cas->send)
throw("selectgo: send asynch");
else
throw("selectgo: recv asynch");
}
if(cas->send) {
sg = dequeue(&c->recvq, c);
if(sg != nil)
goto gotr;
} else {
sg = dequeue(&c->sendq, c);
if(sg != nil)
goto gots;
}
o += p;
if(o >= sel->ncase)
o -= sel->ncase;
}
// pass 2 - enqueue on all chans
for(i=0; i<sel->ncase; i++) {
cas = &sel->scase[o];
c = cas->chan;
if(cas->send) {
sg = dequeue(&c->recvq, c);
if(sg != nil)
goto gotr; // probably an error
sg = allocsg(c);
sg->offset = o;
c->elemalg->copy(c->elemsize, sg->elem, cas->u.elem);
enqueue(&c->sendq, sg);
} else {
sg = dequeue(&c->sendq, c);
if(sg != nil)
goto gots; // probably an error
sg = allocsg(c);
sg->offset = o;
enqueue(&c->recvq, sg);
}
o += p;
if(o >= sel->ncase)
o -= sel->ncase;
}
if(0) {
prints("wait: sel=");
sys·printpointer(sel);
prints("\n");
}
g->status = Gwaiting;
sys·gosched();
if(0) {
prints("wait-return: sel=");
sys·printpointer(sel);
prints("\n");
}
sg = g->param;
o = sg->offset;
cas = &sel->scase[o];
c = cas->chan;
if(0) {
prints("wake: sel=");
sys·printpointer(sel);
prints(" c=");
sys·printpointer(c);
prints(" o=");
sys·printint(o);
prints("\n");
}
if(cas->send)
goto gots;
gotr:
if(0) {
prints("gotr: sel=");
sys·printpointer(sel);
prints(" c=");
sys·printpointer(c);
prints(" o=");
sys·printint(o);
prints("\n");
}
c->elemalg->copy(c->elemsize, sg->elem, cas->u.elem);
gp = sg->g;
gp->param = sg;
gp->status = Grunnable;
goto retc;
gots:
if(0) {
prints("gots: sel=");
sys·printpointer(sel);
prints(" c=");
sys·printpointer(c);
prints(" o=");
sys·printint(o);
prints("\n");
}
if(cas->u.elemp != nil)
c->elemalg->copy(c->elemsize, cas->u.elemp, sg->elem);
gp = sg->g;
gp->param = sg;
gp->status = Grunnable;
freesg(c, sg);
retc:
sys·setcallerpc(&sel, cas->pc);
as = (byte*)&sel + cas->so;
*as = true;
}
static SudoG*
dequeue(WaitQ *q, Hchan *c)
{
SudoG *sgp;
loop:
sgp = q->first;
if(sgp == nil)
return nil;
q->first = sgp->link;
// if sgp is stale, ignore it
if(sgp->selgen != sgp->g->selgen) {
//prints("INVALID PSEUDOG POINTER\n");
freesg(c, sgp);
goto loop;
}
// invalidate any others
sgp->g->selgen++;
return sgp;
}
static void
enqueue(WaitQ *q, SudoG *sgp)
{
sgp->link = nil;
if(q->first == nil) {
q->first = sgp;
q->last = sgp;
return;
}
q->last->link = sgp;
q->last = sgp;
}
static SudoG*
allocsg(Hchan *c)
{
SudoG* sg;
sg = c->free;
if(sg != nil) {
c->free = sg->link;
} else
sg = mal(sizeof(*sg));
sg->selgen = g->selgen;
sg->g = g;
return sg;
}
static void
freesg(Hchan *c, SudoG *sg)
{
sg->link = c->free;
c->free = sg;
}
static uint32
gcd(uint32 u, uint32 v)
{
for(;;) {
if(u > v) {
if(v == 0)
return u;
u = u%v;
continue;
}
if(u == 0)
return v;
v = v%u;
}
}
static uint32
fastrand1(void)
{
static uint32 x = 0x49f6428aUL;
x += x;
if(x & 0x80000000L)
x ^= 0x88888eefUL;
return x;
}
static uint32
fastrand2(void)
{
static uint32 x = 0x49f6428aUL;
x += x;
if(x & 0x80000000L)
x ^= 0xfafd871bUL;
return x;
}