mirror of
https://github.com/golang/go
synced 2024-11-19 23:14:47 -07:00
428afae027
Yet another leftover from C: parfor took a func value for the callback, casted it to an unsafe.Pointer for storage, and then casted it back to a func value to call it. This is unnecessary, so just store the body as a func value. Beyond general cleanup, this also eliminates the last use of unsafe in parfor. Change-Id: Ia904af7c6c443ba75e2699835aee8e9a39b26dd8 Reviewed-on: https://go-review.googlesource.com/3396 Reviewed-by: Russ Cox <rsc@golang.org> Reviewed-by: Dmitry Vyukov <dvyukov@google.com>
213 lines
5.1 KiB
Go
213 lines
5.1 KiB
Go
// Copyright 2012 The Go Authors. All rights reserved.
|
|
// Use of this source code is governed by a BSD-style
|
|
// license that can be found in the LICENSE file.
|
|
|
|
// Parallel for algorithm.
|
|
|
|
package runtime
|
|
|
|
// A parfor holds state for the parallel for operation.
|
|
type parfor struct {
|
|
body func(*parfor, uint32) // executed for each element
|
|
done uint32 // number of idle threads
|
|
nthr uint32 // total number of threads
|
|
thrseq uint32 // thread id sequencer
|
|
cnt uint32 // iteration space [0, cnt)
|
|
wait bool // if true, wait while all threads finish processing,
|
|
// otherwise parfor may return while other threads are still working
|
|
|
|
thr []parforthread // thread descriptors
|
|
|
|
// stats
|
|
nsteal uint64
|
|
nstealcnt uint64
|
|
nprocyield uint64
|
|
nosyield uint64
|
|
nsleep uint64
|
|
}
|
|
|
|
// A parforthread holds state for a single thread in the parallel for.
|
|
type parforthread struct {
|
|
// the thread's iteration space [32lsb, 32msb)
|
|
pos uint64
|
|
// stats
|
|
nsteal uint64
|
|
nstealcnt uint64
|
|
nprocyield uint64
|
|
nosyield uint64
|
|
nsleep uint64
|
|
pad [_CacheLineSize]byte
|
|
}
|
|
|
|
func parforalloc(nthrmax uint32) *parfor {
|
|
return &parfor{
|
|
thr: make([]parforthread, nthrmax),
|
|
}
|
|
}
|
|
|
|
// Parforsetup initializes desc for a parallel for operation with nthr
|
|
// threads executing n jobs.
|
|
//
|
|
// On return the nthr threads are each expected to call parfordo(desc)
|
|
// to run the operation. During those calls, for each i in [0, n), one
|
|
// thread will be used invoke body(desc, i).
|
|
// If wait is true, no parfordo will return until all work has been completed.
|
|
// If wait is false, parfordo may return when there is a small amount
|
|
// of work left, under the assumption that another thread has that
|
|
// work well in hand.
|
|
func parforsetup(desc *parfor, nthr, n uint32, wait bool, body func(*parfor, uint32)) {
|
|
if desc == nil || nthr == 0 || nthr > uint32(len(desc.thr)) || body == nil {
|
|
print("desc=", desc, " nthr=", nthr, " count=", n, " body=", body, "\n")
|
|
throw("parfor: invalid args")
|
|
}
|
|
|
|
desc.body = body
|
|
desc.done = 0
|
|
desc.nthr = nthr
|
|
desc.thrseq = 0
|
|
desc.cnt = n
|
|
desc.wait = wait
|
|
desc.nsteal = 0
|
|
desc.nstealcnt = 0
|
|
desc.nprocyield = 0
|
|
desc.nosyield = 0
|
|
desc.nsleep = 0
|
|
|
|
for i := range desc.thr {
|
|
begin := uint32(uint64(n) * uint64(i) / uint64(nthr))
|
|
end := uint32(uint64(n) * uint64(i+1) / uint64(nthr))
|
|
desc.thr[i].pos = uint64(begin) | uint64(end)<<32
|
|
}
|
|
}
|
|
|
|
func parfordo(desc *parfor) {
|
|
// Obtain 0-based thread index.
|
|
tid := xadd(&desc.thrseq, 1) - 1
|
|
if tid >= desc.nthr {
|
|
print("tid=", tid, " nthr=", desc.nthr, "\n")
|
|
throw("parfor: invalid tid")
|
|
}
|
|
|
|
// If single-threaded, just execute the for serially.
|
|
body := desc.body
|
|
if desc.nthr == 1 {
|
|
for i := uint32(0); i < desc.cnt; i++ {
|
|
body(desc, i)
|
|
}
|
|
return
|
|
}
|
|
|
|
me := &desc.thr[tid]
|
|
mypos := &me.pos
|
|
for {
|
|
for {
|
|
// While there is local work,
|
|
// bump low index and execute the iteration.
|
|
pos := xadd64(mypos, 1)
|
|
begin := uint32(pos) - 1
|
|
end := uint32(pos >> 32)
|
|
if begin < end {
|
|
body(desc, begin)
|
|
continue
|
|
}
|
|
break
|
|
}
|
|
|
|
// Out of work, need to steal something.
|
|
idle := false
|
|
for try := uint32(0); ; try++ {
|
|
// If we don't see any work for long enough,
|
|
// increment the done counter...
|
|
if try > desc.nthr*4 && !idle {
|
|
idle = true
|
|
xadd(&desc.done, 1)
|
|
}
|
|
|
|
// ...if all threads have incremented the counter,
|
|
// we are done.
|
|
extra := uint32(0)
|
|
if !idle {
|
|
extra = 1
|
|
}
|
|
if desc.done+extra == desc.nthr {
|
|
if !idle {
|
|
xadd(&desc.done, 1)
|
|
}
|
|
goto exit
|
|
}
|
|
|
|
// Choose a random victim for stealing.
|
|
var begin, end uint32
|
|
victim := fastrand1() % (desc.nthr - 1)
|
|
if victim >= tid {
|
|
victim++
|
|
}
|
|
victimpos := &desc.thr[victim].pos
|
|
for {
|
|
// See if it has any work.
|
|
pos := atomicload64(victimpos)
|
|
begin = uint32(pos)
|
|
end = uint32(pos >> 32)
|
|
if begin+1 >= end {
|
|
end = 0
|
|
begin = end
|
|
break
|
|
}
|
|
if idle {
|
|
xadd(&desc.done, -1)
|
|
idle = false
|
|
}
|
|
begin2 := begin + (end-begin)/2
|
|
newpos := uint64(begin) | uint64(begin2)<<32
|
|
if cas64(victimpos, pos, newpos) {
|
|
begin = begin2
|
|
break
|
|
}
|
|
}
|
|
if begin < end {
|
|
// Has successfully stolen some work.
|
|
if idle {
|
|
throw("parfor: should not be idle")
|
|
}
|
|
atomicstore64(mypos, uint64(begin)|uint64(end)<<32)
|
|
me.nsteal++
|
|
me.nstealcnt += uint64(end) - uint64(begin)
|
|
break
|
|
}
|
|
|
|
// Backoff.
|
|
if try < desc.nthr {
|
|
// nothing
|
|
} else if try < 4*desc.nthr {
|
|
me.nprocyield++
|
|
procyield(20)
|
|
} else if !desc.wait {
|
|
// If a caller asked not to wait for the others, exit now
|
|
// (assume that most work is already done at this point).
|
|
if !idle {
|
|
xadd(&desc.done, 1)
|
|
}
|
|
goto exit
|
|
} else if try < 6*desc.nthr {
|
|
me.nosyield++
|
|
osyield()
|
|
} else {
|
|
me.nsleep++
|
|
usleep(1)
|
|
}
|
|
}
|
|
}
|
|
|
|
exit:
|
|
xadd64(&desc.nsteal, int64(me.nsteal))
|
|
xadd64(&desc.nstealcnt, int64(me.nstealcnt))
|
|
xadd64(&desc.nprocyield, int64(me.nprocyield))
|
|
xadd64(&desc.nosyield, int64(me.nosyield))
|
|
xadd64(&desc.nsleep, int64(me.nsleep))
|
|
me.nsteal = 0
|
|
me.nstealcnt = 0
|
|
me.nprocyield = 0
|
|
me.nosyield = 0
|
|
me.nsleep = 0
|
|
}
|