// Copyright 2012 The Go Authors. All rights reserved. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. // Parallel for algorithm. package runtime // A parfor holds state for the parallel for operation. type parfor struct { body func(*parfor, uint32) // executed for each element done uint32 // number of idle threads nthr uint32 // total number of threads thrseq uint32 // thread id sequencer cnt uint32 // iteration space [0, cnt) wait bool // if true, wait while all threads finish processing, // otherwise parfor may return while other threads are still working thr []parforthread // thread descriptors // stats nsteal uint64 nstealcnt uint64 nprocyield uint64 nosyield uint64 nsleep uint64 } // A parforthread holds state for a single thread in the parallel for. type parforthread struct { // the thread's iteration space [32lsb, 32msb) pos uint64 // stats nsteal uint64 nstealcnt uint64 nprocyield uint64 nosyield uint64 nsleep uint64 pad [_CacheLineSize]byte } func parforalloc(nthrmax uint32) *parfor { return &parfor{ thr: make([]parforthread, nthrmax), } } // Parforsetup initializes desc for a parallel for operation with nthr // threads executing n jobs. // // On return the nthr threads are each expected to call parfordo(desc) // to run the operation. During those calls, for each i in [0, n), one // thread will be used invoke body(desc, i). // If wait is true, no parfordo will return until all work has been completed. // If wait is false, parfordo may return when there is a small amount // of work left, under the assumption that another thread has that // work well in hand. func parforsetup(desc *parfor, nthr, n uint32, wait bool, body func(*parfor, uint32)) { if desc == nil || nthr == 0 || nthr > uint32(len(desc.thr)) || body == nil { print("desc=", desc, " nthr=", nthr, " count=", n, " body=", body, "\n") throw("parfor: invalid args") } desc.body = body desc.done = 0 desc.nthr = nthr desc.thrseq = 0 desc.cnt = n desc.wait = wait desc.nsteal = 0 desc.nstealcnt = 0 desc.nprocyield = 0 desc.nosyield = 0 desc.nsleep = 0 for i := range desc.thr { begin := uint32(uint64(n) * uint64(i) / uint64(nthr)) end := uint32(uint64(n) * uint64(i+1) / uint64(nthr)) desc.thr[i].pos = uint64(begin) | uint64(end)<<32 } } func parfordo(desc *parfor) { // Obtain 0-based thread index. tid := xadd(&desc.thrseq, 1) - 1 if tid >= desc.nthr { print("tid=", tid, " nthr=", desc.nthr, "\n") throw("parfor: invalid tid") } // If single-threaded, just execute the for serially. body := desc.body if desc.nthr == 1 { for i := uint32(0); i < desc.cnt; i++ { body(desc, i) } return } me := &desc.thr[tid] mypos := &me.pos for { for { // While there is local work, // bump low index and execute the iteration. pos := xadd64(mypos, 1) begin := uint32(pos) - 1 end := uint32(pos >> 32) if begin < end { body(desc, begin) continue } break } // Out of work, need to steal something. idle := false for try := uint32(0); ; try++ { // If we don't see any work for long enough, // increment the done counter... if try > desc.nthr*4 && !idle { idle = true xadd(&desc.done, 1) } // ...if all threads have incremented the counter, // we are done. extra := uint32(0) if !idle { extra = 1 } if desc.done+extra == desc.nthr { if !idle { xadd(&desc.done, 1) } goto exit } // Choose a random victim for stealing. var begin, end uint32 victim := fastrand1() % (desc.nthr - 1) if victim >= tid { victim++ } victimpos := &desc.thr[victim].pos for { // See if it has any work. pos := atomicload64(victimpos) begin = uint32(pos) end = uint32(pos >> 32) if begin+1 >= end { end = 0 begin = end break } if idle { xadd(&desc.done, -1) idle = false } begin2 := begin + (end-begin)/2 newpos := uint64(begin) | uint64(begin2)<<32 if cas64(victimpos, pos, newpos) { begin = begin2 break } } if begin < end { // Has successfully stolen some work. if idle { throw("parfor: should not be idle") } atomicstore64(mypos, uint64(begin)|uint64(end)<<32) me.nsteal++ me.nstealcnt += uint64(end) - uint64(begin) break } // Backoff. if try < desc.nthr { // nothing } else if try < 4*desc.nthr { me.nprocyield++ procyield(20) } else if !desc.wait { // If a caller asked not to wait for the others, exit now // (assume that most work is already done at this point). if !idle { xadd(&desc.done, 1) } goto exit } else if try < 6*desc.nthr { me.nosyield++ osyield() } else { me.nsleep++ usleep(1) } } } exit: xadd64(&desc.nsteal, int64(me.nsteal)) xadd64(&desc.nstealcnt, int64(me.nstealcnt)) xadd64(&desc.nprocyield, int64(me.nprocyield)) xadd64(&desc.nosyield, int64(me.nosyield)) xadd64(&desc.nsleep, int64(me.nsleep)) me.nsteal = 0 me.nstealcnt = 0 me.nprocyield = 0 me.nosyield = 0 me.nsleep = 0 }