diff --git a/src/pkg/runtime/export_test.go b/src/pkg/runtime/export_test.go index c1971cd2d1..062aea2487 100644 --- a/src/pkg/runtime/export_test.go +++ b/src/pkg/runtime/export_test.go @@ -61,3 +61,9 @@ func ParForIters(desc *ParFor, tid uint32) (uint32, uint32) { begin, end := parforiters(desc, uintptr(tid)) return uint32(begin), uint32(end) } + +func testSchedLocalQueue() +func testSchedLocalQueueSteal() + +var TestSchedLocalQueue1 = testSchedLocalQueue +var TestSchedLocalQueueSteal1 = testSchedLocalQueueSteal diff --git a/src/pkg/runtime/proc.c b/src/pkg/runtime/proc.c index f1e3ad59d7..4f02d00faa 100644 --- a/src/pkg/runtime/proc.c +++ b/src/pkg/runtime/proc.c @@ -154,6 +154,10 @@ static void matchmg(void); // match m's to g's static void readylocked(G*); // ready, but sched is locked static void mnextg(M*, G*); static void mcommoninit(M*); +static void runqput(P*, G*); +static G* runqget(P*); +static void runqgrow(P*); +static G* runqsteal(P*, P*); void setmcpumax(uint32 n) @@ -1755,3 +1759,216 @@ runtime·setcpuprofilerate(void (*fn)(uintptr*, int32), int32 hz) if(hz != 0) runtime·resetcpuprofiler(hz); } + +// Put g on local runnable queue. +// TODO(dvyukov): consider using lock-free queue. +static void +runqput(P *p, G *gp) +{ + int32 h, t, s; + + runtime·lock(p); +retry: + h = p->runqhead; + t = p->runqtail; + s = p->runqsize; + if(t == h-1 || (h == 0 && t == s-1)) { + runqgrow(p); + goto retry; + } + p->runq[t++] = gp; + if(t == s) + t = 0; + p->runqtail = t; + runtime·unlock(p); +} + +// Get g from local runnable queue. +static G* +runqget(P *p) +{ + G *gp; + int32 t, h, s; + + if(p->runqhead == p->runqtail) + return nil; + runtime·lock(p); + h = p->runqhead; + t = p->runqtail; + s = p->runqsize; + if(t == h) { + runtime·unlock(p); + return nil; + } + gp = p->runq[h++]; + if(h == s) + h = 0; + p->runqhead = h; + runtime·unlock(p); + return gp; +} + +// Grow local runnable queue. +// TODO(dvyukov): consider using fixed-size array +// and transfer excess to the global list (local queue can grow way too big). +static void +runqgrow(P *p) +{ + G **q; + int32 s, t, h, t2; + + h = p->runqhead; + t = p->runqtail; + s = p->runqsize; + t2 = 0; + q = runtime·malloc(2*s*sizeof(*q)); + while(t != h) { + q[t2++] = p->runq[h++]; + if(h == s) + h = 0; + } + runtime·free(p->runq); + p->runq = q; + p->runqhead = 0; + p->runqtail = t2; + p->runqsize = 2*s; +} + +// Steal half of elements from local runnable queue of p2 +// and put onto local runnable queue of p. +// Returns one of the stolen elements (or nil if failed). +static G* +runqsteal(P *p, P *p2) +{ + G *gp, *gp1; + int32 t, h, s, t2, h2, s2, c, i; + + if(p2->runqhead == p2->runqtail) + return nil; + // sort locks to prevent deadlocks + if(p < p2) + runtime·lock(p); + runtime·lock(p2); + if(p2->runqhead == p2->runqtail) { + runtime·unlock(p2); + if(p < p2) + runtime·unlock(p); + return nil; + } + if(p >= p2) + runtime·lock(p); + // now we've locked both queues and know the victim is not empty + h = p->runqhead; + t = p->runqtail; + s = p->runqsize; + h2 = p2->runqhead; + t2 = p2->runqtail; + s2 = p2->runqsize; + gp = p2->runq[h2++]; // return value + if(h2 == s2) + h2 = 0; + // steal roughly half + if(t2 > h2) + c = (t2 - h2) / 2; + else + c = (s2 - h2 + t2) / 2; + // copy + for(i = 0; i != c; i++) { + // the target queue is full? + if(t == h-1 || (h == 0 && t == s-1)) + break; + // the victim queue is empty? + if(t2 == h2) + break; + gp1 = p2->runq[h2++]; + if(h2 == s2) + h2 = 0; + p->runq[t++] = gp1; + if(t == s) + t = 0; + } + p->runqtail = t; + p2->runqhead = h2; + runtime·unlock(p2); + runtime·unlock(p); + return gp; +} + +void +runtime·testSchedLocalQueue(void) +{ + P p; + G gs[1000]; + int32 i, j; + + runtime·memclr((byte*)&p, sizeof(p)); + p.runqsize = 1; + p.runqhead = 0; + p.runqtail = 0; + p.runq = runtime·malloc(p.runqsize*sizeof(*p.runq)); + + for(i = 0; i < nelem(gs); i++) { + if(runqget(&p) != nil) + runtime·throw("runq is not empty initially"); + for(j = 0; j < i; j++) + runqput(&p, &gs[i]); + for(j = 0; j < i; j++) { + if(runqget(&p) != &gs[i]) { + runtime·printf("bad element at iter %d/%d\n", i, j); + runtime·throw("bad element"); + } + } + if(runqget(&p) != nil) + runtime·throw("runq is not empty afterwards"); + } +} + +void +runtime·testSchedLocalQueueSteal(void) +{ + P p1, p2; + G gs[1000], *gp; + int32 i, j, s; + + runtime·memclr((byte*)&p1, sizeof(p1)); + p1.runqsize = 1; + p1.runqhead = 0; + p1.runqtail = 0; + p1.runq = runtime·malloc(p1.runqsize*sizeof(*p1.runq)); + + runtime·memclr((byte*)&p2, sizeof(p2)); + p2.runqsize = nelem(gs); + p2.runqhead = 0; + p2.runqtail = 0; + p2.runq = runtime·malloc(p2.runqsize*sizeof(*p2.runq)); + + for(i = 0; i < nelem(gs); i++) { + for(j = 0; j < i; j++) { + gs[j].sig = 0; + runqput(&p1, &gs[j]); + } + gp = runqsteal(&p2, &p1); + s = 0; + if(gp) { + s++; + gp->sig++; + } + while(gp = runqget(&p2)) { + s++; + gp->sig++; + } + while(gp = runqget(&p1)) + gp->sig++; + for(j = 0; j < i; j++) { + if(gs[j].sig != 1) { + runtime·printf("bad element %d(%d) at iter %d\n", j, gs[j].sig, i); + runtime·throw("bad element"); + } + } + if(s != i/2 && s != i/2+1) { + runtime·printf("bad steal %d, want %d or %d, iter %d\n", + s, i/2, i/2+1, i); + runtime·throw("bad steal"); + } + } +} \ No newline at end of file diff --git a/src/pkg/runtime/proc_test.go b/src/pkg/runtime/proc_test.go index 1f727da073..b9d57a6da1 100644 --- a/src/pkg/runtime/proc_test.go +++ b/src/pkg/runtime/proc_test.go @@ -113,6 +113,14 @@ func stackGrowthRecursive(i int) { } } +func TestSchedLocalQueue(t *testing.T) { + runtime.TestSchedLocalQueue1() +} + +func TestSchedLocalQueueSteal(t *testing.T) { + runtime.TestSchedLocalQueueSteal1() +} + func benchmarkStackGrowth(b *testing.B, rec int) { const CallsPerSched = 1000 procs := runtime.GOMAXPROCS(-1) diff --git a/src/pkg/runtime/runtime.h b/src/pkg/runtime/runtime.h index 24591995c8..61e33eb95e 100644 --- a/src/pkg/runtime/runtime.h +++ b/src/pkg/runtime/runtime.h @@ -52,6 +52,7 @@ typedef struct G G; typedef struct Gobuf Gobuf; typedef union Lock Lock; typedef struct M M; +typedef struct P P; typedef struct Mem Mem; typedef union Note Note; typedef struct Slice Slice; @@ -312,6 +313,17 @@ struct M uintptr end[]; }; +struct P +{ + Lock; + + // Queue of runnable goroutines. + G** runq; + int32 runqhead; + int32 runqtail; + int32 runqsize; +}; + // The m->locked word holds a single bit saying whether // external calls to LockOSThread are in effect, and then a counter // of the internal nesting depth of lockOSThread / unlockOSThread.