1
0
mirror of https://github.com/golang/go synced 2024-11-23 09:20:05 -07:00

os: use poll.fdMutex for Plan 9 files

This permits us to safely support concurrent access to files on Plan 9.
Concurrent access was already safe on other systems.

This does introduce a change: if one goroutine calls a blocking read
on a pipe, and another goroutine closes the pipe, then before this CL
the close would occur. Now the close will be delayed until the blocking
read completes.

Also add tests that concurrent I/O and Close on a pipe are OK.

For #50436
For #56043

Change-Id: I969c869ea3b8c5c2f2ef319e441a56a3c64e7bf5
Reviewed-on: https://go-review.googlesource.com/c/go/+/438347
Reviewed-by: Bryan Mills <bcmills@google.com>
Reviewed-by: Ian Lance Taylor <iant@google.com>
Reviewed-by: David du Colombier <0intro@gmail.com>
Run-TryBot: Ian Lance Taylor <iant@google.com>
Auto-Submit: Ian Lance Taylor <iant@google.com>
TryBot-Result: Gopher Robot <gobot@golang.org>
Reviewed-by: Rob Pike <r@golang.org>
This commit is contained in:
Ian Lance Taylor 2022-10-04 14:35:39 -07:00 committed by Gopher Robot
parent 669ec549b5
commit 4fe1971b2d
8 changed files with 306 additions and 28 deletions

View File

@ -10,26 +10,26 @@ package poll
var Consume = consume
type FDMutex struct {
type XFDMutex struct {
fdMutex
}
func (mu *FDMutex) Incref() bool {
func (mu *XFDMutex) Incref() bool {
return mu.incref()
}
func (mu *FDMutex) IncrefAndClose() bool {
func (mu *XFDMutex) IncrefAndClose() bool {
return mu.increfAndClose()
}
func (mu *FDMutex) Decref() bool {
func (mu *XFDMutex) Decref() bool {
return mu.decref()
}
func (mu *FDMutex) RWLock(read bool) bool {
func (mu *XFDMutex) RWLock(read bool) bool {
return mu.rwlock(read)
}
func (mu *FDMutex) RWUnlock(read bool) bool {
func (mu *XFDMutex) RWUnlock(read bool) bool {
return mu.rwunlock(read)
}

View File

@ -14,7 +14,7 @@ import (
)
func TestMutexLock(t *testing.T) {
var mu FDMutex
var mu XFDMutex
if !mu.Incref() {
t.Fatal("broken")
@ -39,7 +39,7 @@ func TestMutexLock(t *testing.T) {
}
func TestMutexClose(t *testing.T) {
var mu FDMutex
var mu XFDMutex
if !mu.IncrefAndClose() {
t.Fatal("broken")
}
@ -60,7 +60,7 @@ func TestMutexClose(t *testing.T) {
func TestMutexCloseUnblock(t *testing.T) {
c := make(chan bool, 4)
var mu FDMutex
var mu XFDMutex
mu.RWLock(true)
for i := 0; i < 4; i++ {
go func() {
@ -104,7 +104,7 @@ func TestMutexPanic(t *testing.T) {
f()
}
var mu FDMutex
var mu XFDMutex
ensurePanics(func() { mu.Decref() })
ensurePanics(func() { mu.RWUnlock(true) })
ensurePanics(func() { mu.RWUnlock(false) })
@ -137,7 +137,7 @@ func TestMutexOverflowPanic(t *testing.T) {
}
}()
var mu1 FDMutex
var mu1 XFDMutex
for i := 0; i < 1<<21; i++ {
mu1.Incref()
}
@ -152,7 +152,7 @@ func TestMutexStress(t *testing.T) {
}
defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(P))
done := make(chan bool, P)
var mu FDMutex
var mu XFDMutex
var readState [2]uint64
var writeState [2]uint64
for p := 0; p < P; p++ {

View File

@ -0,0 +1,42 @@
// Copyright 2022 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package poll
// Expose fdMutex for use by the os package on Plan 9.
// On Plan 9 we don't want to use async I/O for file operations,
// but we still want the locking semantics that fdMutex provides.
// FDMutex is an exported fdMutex, only for Plan 9.
type FDMutex struct {
fdmu fdMutex
}
func (fdmu *FDMutex) Incref() bool {
return fdmu.fdmu.incref()
}
func (fdmu *FDMutex) Decref() bool {
return fdmu.fdmu.decref()
}
func (fdmu *FDMutex) IncrefAndClose() bool {
return fdmu.fdmu.increfAndClose()
}
func (fdmu *FDMutex) ReadLock() bool {
return fdmu.fdmu.rwlock(true)
}
func (fdmu *FDMutex) ReadUnlock() bool {
return fdmu.fdmu.rwunlock(true)
}
func (fdmu *FDMutex) WriteLock() bool {
return fdmu.fdmu.rwlock(false)
}
func (fdmu *FDMutex) WriteUnlock() bool {
return fdmu.fdmu.rwunlock(false)
}

View File

@ -0,0 +1,70 @@
// Copyright 2022 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package os
// File locking support for Plan 9. This uses fdMutex from the
// internal/poll package.
// incref adds a reference to the file. It returns an error if the file
// is already closed. This method is on File so that we can incorporate
// a nil test.
func (f *File) incref(op string) (err error) {
if f == nil {
return ErrInvalid
}
if !f.fdmu.Incref() {
err = ErrClosed
if op != "" {
err = &PathError{Op: op, Path: f.name, Err: err}
}
}
return err
}
// decref removes a reference to the file. If this is the last
// remaining reference, and the file has been marked to be closed,
// then actually close it.
func (file *file) decref() error {
if file.fdmu.Decref() {
return file.destroy()
}
return nil
}
// readLock adds a reference to the file and locks it for reading.
// It returns an error if the file is already closed.
func (file *file) readLock() error {
if !file.fdmu.ReadLock() {
return ErrClosed
}
return nil
}
// readUnlock removes a reference from the file and unlocks it for reading.
// It also closes the file if it marked as closed and there is no remaining
// reference.
func (file *file) readUnlock() {
if file.fdmu.ReadUnlock() {
file.destroy()
}
}
// writeLock adds a reference to the file and locks it for writing.
// It returns an error if the file is already closed.
func (file *file) writeLock() error {
if !file.fdmu.WriteLock() {
return ErrClosed
}
return nil
}
// writeUnlock removes a reference from the file and unlocks it for writing.
// It also closes the file if it is marked as closed and there is no remaining
// reference.
func (file *file) writeUnlock() {
if file.fdmu.WriteUnlock() {
file.destroy()
}
}

View File

@ -22,6 +22,7 @@ func fixLongPath(path string) string {
// can overwrite this data, which could cause the finalizer
// to close the wrong file descriptor.
type file struct {
fdmu poll.FDMutex
fd int
name string
dirinfo *dirInfo // nil unless directory being read
@ -142,24 +143,35 @@ func openFileNolog(name string, flag int, perm FileMode) (*File, error) {
// be canceled and return immediately with an ErrClosed error.
// Close will return an error if it has already been called.
func (f *File) Close() error {
if err := f.checkValid("close"); err != nil {
return err
if f == nil {
return ErrInvalid
}
return f.file.close()
}
func (file *file) close() error {
if file == nil || file.fd == badFd {
return ErrInvalid
if !file.fdmu.IncrefAndClose() {
return &PathError{Op: "close", Path: file.name, Err: ErrClosed}
}
// At this point we should cancel any pending I/O.
// How do we do that on Plan 9?
err := file.decref()
// no need for a finalizer anymore
runtime.SetFinalizer(file, nil)
return err
}
// destroy actually closes the descriptor. This is called when
// there are no remaining references, by the decref, readUnlock,
// and writeUnlock methods.
func (file *file) destroy() error {
var err error
if e := syscall.Close(file.fd); e != nil {
err = &PathError{Op: "close", Path: file.name, Err: e}
}
file.fd = badFd // so it can't be closed again
// no need for a finalizer anymore
runtime.SetFinalizer(file, nil)
return err
}
@ -193,6 +205,12 @@ func (f *File) Truncate(size int64) error {
if err != nil {
return &PathError{Op: "truncate", Path: f.name, Err: err}
}
if err := f.incref("truncate"); err != nil {
return err
}
defer f.decref()
if err = syscall.Fwstat(f.fd, buf[:n]); err != nil {
return &PathError{Op: "truncate", Path: f.name, Err: err}
}
@ -219,6 +237,12 @@ func (f *File) chmod(mode FileMode) error {
if err != nil {
return &PathError{Op: "chmod", Path: f.name, Err: err}
}
if err := f.incref("chmod"); err != nil {
return err
}
defer f.decref()
if err = syscall.Fwstat(f.fd, buf[:n]); err != nil {
return &PathError{Op: "chmod", Path: f.name, Err: err}
}
@ -240,6 +264,12 @@ func (f *File) Sync() error {
if err != nil {
return &PathError{Op: "sync", Path: f.name, Err: err}
}
if err := f.incref("sync"); err != nil {
return err
}
defer f.decref()
if err = syscall.Fwstat(f.fd, buf[:n]); err != nil {
return &PathError{Op: "sync", Path: f.name, Err: err}
}
@ -249,6 +279,10 @@ func (f *File) Sync() error {
// read reads up to len(b) bytes from the File.
// It returns the number of bytes read and an error, if any.
func (f *File) read(b []byte) (n int, err error) {
if err := f.readLock(); err != nil {
return 0, err
}
defer f.readUnlock()
n, e := fixCount(syscall.Read(f.fd, b))
if n == 0 && len(b) > 0 && e == nil {
return 0, io.EOF
@ -260,6 +294,10 @@ func (f *File) read(b []byte) (n int, err error) {
// It returns the number of bytes read and the error, if any.
// EOF is signaled by a zero count with err set to nil.
func (f *File) pread(b []byte, off int64) (n int, err error) {
if err := f.readLock(); err != nil {
return 0, err
}
defer f.readUnlock()
n, e := fixCount(syscall.Pread(f.fd, b, off))
if n == 0 && len(b) > 0 && e == nil {
return 0, io.EOF
@ -272,6 +310,10 @@ func (f *File) pread(b []byte, off int64) (n int, err error) {
// Since Plan 9 preserves message boundaries, never allow
// a zero-byte write.
func (f *File) write(b []byte) (n int, err error) {
if err := f.writeLock(); err != nil {
return 0, err
}
defer f.writeUnlock()
if len(b) == 0 {
return 0, nil
}
@ -283,6 +325,10 @@ func (f *File) write(b []byte) (n int, err error) {
// Since Plan 9 preserves message boundaries, never allow
// a zero-byte write.
func (f *File) pwrite(b []byte, off int64) (n int, err error) {
if err := f.writeLock(); err != nil {
return 0, err
}
defer f.writeUnlock()
if len(b) == 0 {
return 0, nil
}
@ -294,6 +340,10 @@ func (f *File) pwrite(b []byte, off int64) (n int, err error) {
// relative to the current offset, and 2 means relative to the end.
// It returns the new offset and an error, if any.
func (f *File) seek(offset int64, whence int) (ret int64, err error) {
if err := f.incref(""); err != nil {
return 0, err
}
defer f.decref()
if f.dirinfo != nil {
// Free cached dirinfo, so we allocate a new one if we
// access this file as a directory again. See #35767 and #37161.
@ -493,9 +543,10 @@ func tempDir() string {
// which must be a directory.
// If there is an error, it will be of type *PathError.
func (f *File) Chdir() error {
if err := f.checkValid("chdir"); err != nil {
if err := f.incref("chdir"); err != nil {
return err
}
defer f.decref()
if e := syscall.Fchdir(f.fd); e != nil {
return &PathError{Op: "chdir", Path: f.name, Err: e}
}
@ -526,16 +577,17 @@ func (f *File) setWriteDeadline(time.Time) error {
return poll.ErrNoDeadline
}
// checkValid checks whether f is valid for use.
// If not, it returns an appropriate error, perhaps incorporating the operation name op.
// checkValid checks whether f is valid for use, but does not prepare
// to actually use it. If f is not ready checkValid returns an appropriate
// error, perhaps incorporating the operation name op.
func (f *File) checkValid(op string) error {
if f == nil {
return ErrInvalid
}
if f.fd == badFd {
return &PathError{Op: op, Path: f.name, Err: ErrClosed}
if err := f.incref(op); err != nil {
return err
}
return nil
return f.decref()
}
type rawConn struct{}

View File

@ -2799,3 +2799,115 @@ func TestWriteStringAlloc(t *testing.T) {
t.Errorf("expected 0 allocs for File.WriteString, got %v", allocs)
}
}
// Test that it's OK to have parallel I/O and Close on a pipe.
func TestPipeIOCloseRace(t *testing.T) {
// Skip on wasm, which doesn't have pipes.
if runtime.GOOS == "js" {
t.Skip("skipping on js: no pipes")
}
r, w, err := Pipe()
if err != nil {
t.Fatal(err)
}
var wg sync.WaitGroup
wg.Add(3)
go func() {
defer wg.Done()
for {
n, err := w.Write([]byte("hi"))
if err != nil {
// We look at error strings as the
// expected errors are OS-specific.
switch {
case errors.Is(err, ErrClosed),
strings.Contains(err.Error(), "broken pipe"),
strings.Contains(err.Error(), "pipe is being closed"),
strings.Contains(err.Error(), "hungup channel"):
// Ignore an expected error.
default:
// Unexpected error.
t.Error(err)
}
return
}
if n != 2 {
t.Errorf("wrote %d bytes, expected 2", n)
return
}
}
}()
go func() {
defer wg.Done()
for {
var buf [2]byte
n, err := r.Read(buf[:])
if err != nil {
if err != io.EOF && !errors.Is(err, ErrClosed) {
t.Error(err)
}
return
}
if n != 2 {
t.Errorf("read %d bytes, want 2", n)
}
}
}()
go func() {
defer wg.Done()
// Let the other goroutines start. This is just to get
// a better test, the test will still pass if they
// don't start.
time.Sleep(time.Millisecond)
if err := r.Close(); err != nil {
t.Error(err)
}
if err := w.Close(); err != nil {
t.Error(err)
}
}()
wg.Wait()
}
// Test that it's OK to call Close concurrently on a pipe.
func TestPipeCloseRace(t *testing.T) {
// Skip on wasm, which doesn't have pipes.
if runtime.GOOS == "js" {
t.Skip("skipping on js: no pipes")
}
r, w, err := Pipe()
if err != nil {
t.Fatal(err)
}
var wg sync.WaitGroup
c := make(chan error, 4)
f := func() {
defer wg.Done()
c <- r.Close()
c <- w.Close()
}
wg.Add(2)
go f()
go f()
nils, errs := 0, 0
for i := 0; i < 4; i++ {
err := <-c
if err == nil {
nils++
} else {
errs++
}
}
if nils != 2 || errs != 2 {
t.Errorf("got nils %d errs %d, want 2 2", nils, errs)
}
}

View File

@ -56,7 +56,11 @@ func dirstat(arg any) (*syscall.Dir, error) {
switch a := arg.(type) {
case *File:
name = a.name
if err := a.incref("fstat"); err != nil {
return nil, err
}
n, err = syscall.Fstat(a.fd, buf)
a.decref()
case string:
name = a
n, err = syscall.Stat(a, buf)

View File

@ -28,5 +28,3 @@ func sameFile(fs1, fs2 *fileStat) bool {
b := fs2.sys.(*syscall.Dir)
return a.Qid.Path == b.Qid.Path && a.Type == b.Type && a.Dev == b.Dev
}
const badFd = -1