1
0
mirror of https://github.com/golang/go synced 2024-11-23 00:10:07 -07:00

fix bugs in asynch select

R=r
OCL=15586
CL=15586
This commit is contained in:
Ken Thompson 2008-09-19 20:43:30 -07:00
parent d910e90edd
commit 1da727a31f

View File

@ -201,6 +201,7 @@ sendchan(Hchan *c, byte *ep, bool *pres)
return; return;
asynch: asynch:
//prints("\nasend\n");
while(c->qcount >= c->dataqsiz) { while(c->qcount >= c->dataqsiz) {
// (rsc) should check for pres != nil // (rsc) should check for pres != nil
sg = allocsg(c); sg = allocsg(c);
@ -208,6 +209,7 @@ asynch:
enqueue(&c->sendq, sg); enqueue(&c->sendq, sg);
unlock(&chanlock); unlock(&chanlock);
sys·gosched(); sys·gosched();
lock(&chanlock); lock(&chanlock);
} }
if(ep != nil) if(ep != nil)
@ -218,10 +220,12 @@ asynch:
sg = dequeue(&c->recvq, c); sg = dequeue(&c->recvq, c);
if(sg != nil) { if(sg != nil) {
gp = sg->g; gp = sg->g;
gp->param = sg;
freesg(c, sg); freesg(c, sg);
unlock(&chanlock); unlock(&chanlock);
//prints("wakeup\n");
ready(gp); ready(gp);
}else } else
unlock(&chanlock); unlock(&chanlock);
} }
@ -290,10 +294,11 @@ asynch:
sg = dequeue(&c->sendq, c); sg = dequeue(&c->sendq, c);
if(sg != nil) { if(sg != nil) {
gp = sg->g; gp = sg->g;
gp->param = sg;
freesg(c, sg); freesg(c, sg);
unlock(&chanlock); unlock(&chanlock);
ready(gp); ready(gp);
}else } else
unlock(&chanlock); unlock(&chanlock);
} }
@ -523,7 +528,6 @@ sys·selectgo(Select *sel)
for(i=0; i<sel->ncase; i++) { for(i=0; i<sel->ncase; i++) {
cas = &sel->scase[o]; cas = &sel->scase[o];
c = cas->chan; c = cas->chan;
if(c->dataqsiz > 0) { if(c->dataqsiz > 0) {
if(cas->send) { if(cas->send) {
if(c->qcount < c->dataqsiz) if(c->qcount < c->dataqsiz)
@ -532,7 +536,7 @@ sys·selectgo(Select *sel)
if(c->qcount > 0) if(c->qcount > 0)
goto asynr; goto asynr;
} }
} } else
if(cas->send) { if(cas->send) {
sg = dequeue(&c->recvq, c); sg = dequeue(&c->recvq, c);
@ -557,23 +561,29 @@ sys·selectgo(Select *sel)
if(c->dataqsiz > 0) { if(c->dataqsiz > 0) {
if(cas->send) { if(cas->send) {
if(c->qcount < c->dataqsiz) { if(c->qcount < c->dataqsiz) {
prints("second pass asyn send\n"); prints("selectgo: pass 2 async send\n");
goto asyns; goto asyns;
} }
sg = allocsg(c);
sg->offset = o;
enqueue(&c->sendq, sg);
} else { } else {
if(c->qcount > 0) { if(c->qcount > 0) {
prints("second pass asyn recv\n"); prints("selectgo: pass 2 async recv\n");
goto asynr; goto asynr;
} }
sg = allocsg(c);
sg->offset = o;
enqueue(&c->recvq, sg);
} }
} } else
if(cas->send) { if(cas->send) {
sg = dequeue(&c->recvq, c); sg = dequeue(&c->recvq, c);
if(sg != nil) { if(sg != nil) {
prints("second pass syn send\n"); prints("selectgo: pass 2 sync send\n");
g->selgen++; g->selgen++;
goto gots; // probably an error goto gots;
} }
sg = allocsg(c); sg = allocsg(c);
sg->offset = o; sg->offset = o;
@ -582,9 +592,9 @@ sys·selectgo(Select *sel)
} else { } else {
sg = dequeue(&c->sendq, c); sg = dequeue(&c->sendq, c);
if(sg != nil) { if(sg != nil) {
prints("second pass syn recv\n"); prints("selectgo: pass 2 sync recv\n");
g->selgen++; g->selgen++;
goto gotr; // probably an error goto gotr;
} }
sg = allocsg(c); sg = allocsg(c);
sg->offset = o; sg->offset = o;
@ -596,9 +606,6 @@ sys·selectgo(Select *sel)
o -= sel->ncase; o -= sel->ncase;
} }
// send and recv paths to sleep for a rendezvous
// (rsc) not correct to set Gwaiting after queueing;
// might already have been readied.
g->status = Gwaiting; g->status = Gwaiting;
unlock(&chanlock); unlock(&chanlock);
sys·gosched(); sys·gosched();
@ -623,6 +630,12 @@ sys·selectgo(Select *sel)
prints("\n"); prints("\n");
} }
if(c->dataqsiz > 0) {
if(cas->send)
goto asyns;
goto asynr;
}
if(!cas->send) { if(!cas->send) {
if(cas->u.elemp != nil) if(cas->u.elemp != nil)
c->elemalg->copy(c->elemsize, cas->u.elemp, sg->elem); c->elemalg->copy(c->elemsize, cas->u.elemp, sg->elem);
@ -632,10 +645,31 @@ sys·selectgo(Select *sel)
goto retc; goto retc;
asynr: asynr:
if(cas->u.elemp != nil)
c->elemalg->copy(c->elemsize, cas->u.elemp, c->recvdataq->elem);
c->recvdataq = c->recvdataq->link;
c->qcount--;
sg = dequeue(&c->sendq, c);
if(sg != nil) {
gp = sg->g;
freesg(c, sg);
ready(gp);
}
goto retc;
asyns: asyns:
unlock(&chanlock); if(cas->u.elem != nil)
throw("asyn"); c->elemalg->copy(c->elemsize, c->senddataq->elem, cas->u.elem);
return; // compiler doesn't know throw doesn't return c->senddataq = c->senddataq->link;
c->qcount++;
sg = dequeue(&c->recvq, c);
if(sg != nil) {
gp = sg->g;
gp->param = sg;
freesg(c, sg);
ready(gp);
}
goto retc;
gotr: gotr:
// recv path to wakeup the sender (sg) // recv path to wakeup the sender (sg)