From fc5edaca30801f2d1acb7a9d39943638e6d4c1c1 Mon Sep 17 00:00:00 2001 From: Ben Burkert Date: Mon, 21 May 2018 12:56:02 -0700 Subject: [PATCH] net: use splice(2) on Linux when reading from UnixConn, rework splice tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Rework the splice tests and benchmarks. Move the reading and writing of the spliced connections to child processes so that the I/O is not part of benchmarks or profiles. Enable the use of splice(2) when reading from a unix connection and writing to a TCP connection. The updated benchmarks show a performance gain when using splice(2) to copy large chunks of data that the original benchmark did not capture. name old time/op new time/op delta Splice/tcp-to-tcp/1024-8 5.01µs ± 2% 5.08µs ± 3% ~ (p=0.068 n=8+10) Splice/tcp-to-tcp/2048-8 4.76µs ± 5% 4.65µs ± 3% -2.36% (p=0.015 n=9+8) Splice/tcp-to-tcp/4096-8 4.91µs ± 2% 4.98µs ± 5% ~ (p=0.315 n=9+10) Splice/tcp-to-tcp/8192-8 5.50µs ± 4% 5.44µs ± 3% ~ (p=0.758 n=7+9) Splice/tcp-to-tcp/16384-8 7.65µs ± 7% 6.53µs ± 3% -14.65% (p=0.000 n=10+9) Splice/tcp-to-tcp/32768-8 15.3µs ± 7% 8.5µs ± 5% -44.21% (p=0.000 n=10+10) Splice/tcp-to-tcp/65536-8 30.0µs ± 6% 15.7µs ± 1% -47.58% (p=0.000 n=10+8) Splice/tcp-to-tcp/131072-8 59.2µs ± 2% 27.4µs ± 5% -53.75% (p=0.000 n=9+9) Splice/tcp-to-tcp/262144-8 121µs ± 4% 54µs ±19% -55.56% (p=0.000 n=9+10) Splice/tcp-to-tcp/524288-8 247µs ± 6% 108µs ±12% -56.34% (p=0.000 n=10+10) Splice/tcp-to-tcp/1048576-8 490µs ± 4% 199µs ±12% -59.31% (p=0.000 n=8+10) Splice/unix-to-tcp/1024-8 1.20µs ± 2% 1.35µs ± 7% +12.47% (p=0.000 n=10+10) Splice/unix-to-tcp/2048-8 1.33µs ±12% 1.57µs ± 4% +17.85% (p=0.000 n=9+10) Splice/unix-to-tcp/4096-8 2.24µs ± 4% 1.67µs ± 4% -25.14% (p=0.000 n=9+10) Splice/unix-to-tcp/8192-8 4.59µs ± 8% 2.20µs ±10% -52.01% (p=0.000 n=10+10) Splice/unix-to-tcp/16384-8 8.46µs ±13% 3.48µs ± 6% -58.91% (p=0.000 n=10+10) Splice/unix-to-tcp/32768-8 18.5µs ± 9% 6.1µs ± 9% -66.99% (p=0.000 n=10+10) Splice/unix-to-tcp/65536-8 35.9µs ± 7% 13.5µs ± 6% -62.40% (p=0.000 n=10+9) Splice/unix-to-tcp/131072-8 79.4µs ± 6% 25.7µs ± 4% -67.62% (p=0.000 n=10+9) Splice/unix-to-tcp/262144-8 157µs ± 4% 54µs ± 8% -65.63% (p=0.000 n=10+10) Splice/unix-to-tcp/524288-8 311µs ± 3% 107µs ± 8% -65.74% (p=0.000 n=10+10) Splice/unix-to-tcp/1048576-8 643µs ± 4% 185µs ±32% -71.21% (p=0.000 n=10+10) name old speed new speed delta Splice/tcp-to-tcp/1024-8 204MB/s ± 2% 202MB/s ± 3% ~ (p=0.068 n=8+10) Splice/tcp-to-tcp/2048-8 430MB/s ± 5% 441MB/s ± 3% +2.39% (p=0.014 n=9+8) Splice/tcp-to-tcp/4096-8 833MB/s ± 2% 823MB/s ± 5% ~ (p=0.315 n=9+10) Splice/tcp-to-tcp/8192-8 1.49GB/s ± 4% 1.51GB/s ± 3% ~ (p=0.758 n=7+9) Splice/tcp-to-tcp/16384-8 2.14GB/s ± 7% 2.51GB/s ± 3% +17.03% (p=0.000 n=10+9) Splice/tcp-to-tcp/32768-8 2.15GB/s ± 7% 3.85GB/s ± 5% +79.11% (p=0.000 n=10+10) Splice/tcp-to-tcp/65536-8 2.19GB/s ± 5% 4.17GB/s ± 1% +90.65% (p=0.000 n=10+8) Splice/tcp-to-tcp/131072-8 2.22GB/s ± 2% 4.79GB/s ± 4% +116.26% (p=0.000 n=9+9) Splice/tcp-to-tcp/262144-8 2.17GB/s ± 4% 4.93GB/s ±17% +127.25% (p=0.000 n=9+10) Splice/tcp-to-tcp/524288-8 2.13GB/s ± 6% 4.89GB/s ±13% +130.15% (p=0.000 n=10+10) Splice/tcp-to-tcp/1048576-8 2.09GB/s ±10% 5.29GB/s ±11% +153.36% (p=0.000 n=10+10) Splice/unix-to-tcp/1024-8 850MB/s ± 2% 757MB/s ± 7% -10.94% (p=0.000 n=10+10) Splice/unix-to-tcp/2048-8 1.54GB/s ±11% 1.31GB/s ± 3% -15.32% (p=0.000 n=9+10) Splice/unix-to-tcp/4096-8 1.83GB/s ± 4% 2.45GB/s ± 4% +33.59% (p=0.000 n=9+10) Splice/unix-to-tcp/8192-8 1.79GB/s ± 9% 3.73GB/s ± 9% +108.05% (p=0.000 n=10+10) Splice/unix-to-tcp/16384-8 1.95GB/s ±13% 4.68GB/s ± 3% +139.80% (p=0.000 n=10+9) Splice/unix-to-tcp/32768-8 1.78GB/s ± 9% 5.38GB/s ±10% +202.71% (p=0.000 n=10+10) Splice/unix-to-tcp/65536-8 1.83GB/s ± 8% 4.85GB/s ± 6% +165.70% (p=0.000 n=10+9) Splice/unix-to-tcp/131072-8 1.65GB/s ± 6% 5.10GB/s ± 4% +208.77% (p=0.000 n=10+9) Splice/unix-to-tcp/262144-8 1.67GB/s ± 4% 4.87GB/s ± 7% +191.19% (p=0.000 n=10+10) Splice/unix-to-tcp/524288-8 1.69GB/s ± 3% 4.93GB/s ± 7% +192.38% (p=0.000 n=10+10) Splice/unix-to-tcp/1048576-8 1.63GB/s ± 3% 5.60GB/s ±44% +243.26% (p=0.000 n=10+9) Change-Id: I1eae4c3459c918558c70fc42283db22ff7e0442c Reviewed-on: https://go-review.googlesource.com/113997 Reviewed-by: Brad Fitzpatrick --- src/net/splice_linux.go | 14 +- src/net/splice_test.go | 567 +++++++++++++++------------------------- 2 files changed, 219 insertions(+), 362 deletions(-) diff --git a/src/net/splice_linux.go b/src/net/splice_linux.go index b055f93351..8a4d55af62 100644 --- a/src/net/splice_linux.go +++ b/src/net/splice_linux.go @@ -11,7 +11,7 @@ import ( // splice transfers data from r to c using the splice system call to minimize // copies from and to userspace. c must be a TCP connection. Currently, splice -// is only enabled if r is also a TCP connection. +// is only enabled if r is a TCP or Unix connection. // // If splice returns handled == false, it has performed no work. func splice(c *netFD, r io.Reader) (written int64, err error, handled bool) { @@ -23,11 +23,17 @@ func splice(c *netFD, r io.Reader) (written int64, err error, handled bool) { return 0, nil, true } } - s, ok := r.(*TCPConn) - if !ok { + + var s *netFD + if tc, ok := r.(*TCPConn); ok { + s = tc.fd + } else if uc, ok := r.(*UnixConn); ok { + s = uc.fd + } else { return 0, nil, false } - written, handled, sc, err := poll.Splice(&c.pfd, &s.fd.pfd, remain) + + written, handled, sc, err := poll.Splice(&c.pfd, &s.pfd, remain) if lr != nil { lr.N -= written } diff --git a/src/net/splice_test.go b/src/net/splice_test.go index ffe71ae384..3e7fd8251b 100644 --- a/src/net/splice_test.go +++ b/src/net/splice_test.go @@ -7,239 +7,103 @@ package net import ( - "bytes" - "fmt" "io" "io/ioutil" + "log" + "os" + "os/exec" + "strconv" "sync" "testing" ) func TestSplice(t *testing.T) { - t.Run("simple", testSpliceSimple) - t.Run("multipleWrite", testSpliceMultipleWrite) - t.Run("big", testSpliceBig) - t.Run("honorsLimitedReader", testSpliceHonorsLimitedReader) - t.Run("readerAtEOF", testSpliceReaderAtEOF) - t.Run("issue25985", testSpliceIssue25985) + t.Run("tcp-to-tcp", func(t *testing.T) { testSplice(t, "tcp", "tcp") }) + t.Run("unix-to-tcp", func(t *testing.T) { testSplice(t, "unix", "tcp") }) } -func testSpliceSimple(t *testing.T) { - srv, err := newSpliceTestServer() +func testSplice(t *testing.T, upNet, downNet string) { + t.Run("simple", spliceTestCase{upNet, downNet, 128, 128, 0}.test) + t.Run("multipleWrite", spliceTestCase{upNet, downNet, 4096, 1 << 20, 0}.test) + t.Run("big", spliceTestCase{upNet, downNet, 5 << 20, 1 << 30, 0}.test) + t.Run("honorsLimitedReader", spliceTestCase{upNet, downNet, 4096, 1 << 20, 1 << 10}.test) + t.Run("updatesLimitedReaderN", spliceTestCase{upNet, downNet, 1024, 4096, 4096 + 100}.test) + t.Run("limitedReaderAtLimit", spliceTestCase{upNet, downNet, 32, 128, 128}.test) + t.Run("readerAtEOF", func(t *testing.T) { testSpliceReaderAtEOF(t, upNet, downNet) }) + t.Run("issue25985", func(t *testing.T) { testSpliceIssue25985(t, upNet, downNet) }) +} + +type spliceTestCase struct { + upNet, downNet string + + chunkSize, totalSize int + limitReadSize int +} + +func (tc spliceTestCase) test(t *testing.T) { + clientUp, serverUp, err := spliceTestSocketPair(tc.upNet) if err != nil { t.Fatal(err) } - defer srv.Close() - copyDone := srv.Copy() - msg := []byte("splice test") - if _, err := srv.Write(msg); err != nil { - t.Fatal(err) - } - got := make([]byte, len(msg)) - if _, err := io.ReadFull(srv, got); err != nil { - t.Fatal(err) - } - if !bytes.Equal(got, msg) { - t.Errorf("got %q, wrote %q", got, msg) - } - srv.CloseWrite() - srv.CloseRead() - if err := <-copyDone; err != nil { - t.Errorf("splice: %v", err) - } -} - -func testSpliceMultipleWrite(t *testing.T) { - srv, err := newSpliceTestServer() - if err != nil { - t.Fatal(err) - } - defer srv.Close() - copyDone := srv.Copy() - msg1 := []byte("splice test part 1 ") - msg2 := []byte(" splice test part 2") - if _, err := srv.Write(msg1); err != nil { - t.Fatalf("Write: %v", err) - } - if _, err := srv.Write(msg2); err != nil { - t.Fatal(err) - } - got := make([]byte, len(msg1)+len(msg2)) - if _, err := io.ReadFull(srv, got); err != nil { - t.Fatal(err) - } - want := append(msg1, msg2...) - if !bytes.Equal(got, want) { - t.Errorf("got %q, wrote %q", got, want) - } - srv.CloseWrite() - srv.CloseRead() - if err := <-copyDone; err != nil { - t.Errorf("splice: %v", err) - } -} - -func testSpliceBig(t *testing.T) { - // The maximum amount of data that internal/poll.Splice will use in a - // splice(2) call is 4 << 20. Use a bigger size here so that we test an - // amount that doesn't fit in a single call. - size := 5 << 20 - srv, err := newSpliceTestServer() - if err != nil { - t.Fatal(err) - } - defer srv.Close() - big := make([]byte, size) - copyDone := srv.Copy() - type readResult struct { - b []byte - err error - } - readDone := make(chan readResult) - go func() { - got := make([]byte, len(big)) - _, err := io.ReadFull(srv, got) - readDone <- readResult{got, err} - }() - if _, err := srv.Write(big); err != nil { - t.Fatal(err) - } - res := <-readDone - if res.err != nil { - t.Fatal(res.err) - } - got := res.b - if !bytes.Equal(got, big) { - t.Errorf("input and output differ") - } - srv.CloseWrite() - srv.CloseRead() - if err := <-copyDone; err != nil { - t.Errorf("splice: %v", err) - } -} - -func testSpliceHonorsLimitedReader(t *testing.T) { - t.Run("stopsAfterN", testSpliceStopsAfterN) - t.Run("updatesN", testSpliceUpdatesN) - t.Run("readerAtLimit", testSpliceReaderAtLimit) -} - -func testSpliceStopsAfterN(t *testing.T) { - clientUp, serverUp, err := spliceTestSocketPair("tcp") - if err != nil { - t.Fatal(err) - } - defer clientUp.Close() defer serverUp.Close() - clientDown, serverDown, err := spliceTestSocketPair("tcp") + cleanup, err := startSpliceClient(clientUp, "w", tc.chunkSize, tc.totalSize) + if err != nil { + t.Fatal(err) + } + defer cleanup() + clientDown, serverDown, err := spliceTestSocketPair(tc.downNet) if err != nil { t.Fatal(err) } - defer clientDown.Close() defer serverDown.Close() - count := 128 - copyDone := make(chan error) - lr := &io.LimitedReader{ - N: int64(count), - R: serverUp, - } - go func() { - _, err := io.Copy(serverDown, lr) - serverDown.Close() - copyDone <- err - }() - msg := make([]byte, 2*count) - if _, err := clientUp.Write(msg); err != nil { + cleanup, err = startSpliceClient(clientDown, "r", tc.chunkSize, tc.totalSize) + if err != nil { t.Fatal(err) } - clientUp.Close() - var buf bytes.Buffer - if _, err := io.Copy(&buf, clientDown); err != nil { + defer cleanup() + var ( + r io.Reader = serverUp + size = tc.totalSize + ) + if tc.limitReadSize > 0 { + if tc.limitReadSize < size { + size = tc.limitReadSize + } + + r = &io.LimitedReader{ + N: int64(tc.limitReadSize), + R: serverUp, + } + defer serverUp.Close() + } + n, err := io.Copy(serverDown, r) + serverDown.Close() + if err != nil { t.Fatal(err) } - if buf.Len() != count { - t.Errorf("splice transferred %d bytes, want to stop after %d", buf.Len(), count) + if want := int64(size); want != n { + t.Errorf("want %d bytes spliced, got %d", want, n) } - clientDown.Close() - if err := <-copyDone; err != nil { - t.Errorf("splice: %v", err) + + if tc.limitReadSize > 0 { + wantN := 0 + if tc.limitReadSize > size { + wantN = tc.limitReadSize - size + } + + if n := r.(*io.LimitedReader).N; n != int64(wantN) { + t.Errorf("r.N = %d, want %d", n, wantN) + } } } -func testSpliceUpdatesN(t *testing.T) { - clientUp, serverUp, err := spliceTestSocketPair("tcp") +func testSpliceReaderAtEOF(t *testing.T, upNet, downNet string) { + clientUp, serverUp, err := spliceTestSocketPair(upNet) if err != nil { t.Fatal(err) } defer clientUp.Close() - defer serverUp.Close() - clientDown, serverDown, err := spliceTestSocketPair("tcp") - if err != nil { - t.Fatal(err) - } - defer clientDown.Close() - defer serverDown.Close() - count := 128 - copyDone := make(chan error) - lr := &io.LimitedReader{ - N: int64(100 + count), - R: serverUp, - } - go func() { - _, err := io.Copy(serverDown, lr) - copyDone <- err - }() - msg := make([]byte, count) - if _, err := clientUp.Write(msg); err != nil { - t.Fatal(err) - } - clientUp.Close() - got := make([]byte, count) - if _, err := io.ReadFull(clientDown, got); err != nil { - t.Fatal(err) - } - clientDown.Close() - if err := <-copyDone; err != nil { - t.Errorf("splice: %v", err) - } - wantN := int64(100) - if lr.N != wantN { - t.Errorf("lr.N = %d, want %d", lr.N, wantN) - } -} - -func testSpliceReaderAtLimit(t *testing.T) { - clientUp, serverUp, err := spliceTestSocketPair("tcp") - if err != nil { - t.Fatal(err) - } - defer clientUp.Close() - defer serverUp.Close() - clientDown, serverDown, err := spliceTestSocketPair("tcp") - if err != nil { - t.Fatal(err) - } - defer clientDown.Close() - defer serverDown.Close() - - lr := &io.LimitedReader{ - N: 0, - R: serverUp, - } - _, err, handled := splice(serverDown.(*TCPConn).fd, lr) - if !handled { - t.Errorf("exhausted LimitedReader: got err = %v, handled = %t, want handled = true", err, handled) - } -} - -func testSpliceReaderAtEOF(t *testing.T) { - clientUp, serverUp, err := spliceTestSocketPair("tcp") - if err != nil { - t.Fatal(err) - } - defer clientUp.Close() - clientDown, serverDown, err := spliceTestSocketPair("tcp") + clientDown, serverDown, err := spliceTestSocketPair(downNet) if err != nil { t.Fatal(err) } @@ -265,7 +129,7 @@ func testSpliceReaderAtEOF(t *testing.T) { // get a goodbye signal. Test for the goodbye signal. msg := "bye" go func() { - serverDown.(*TCPConn).ReadFrom(serverUp) + serverDown.(io.ReaderFrom).ReadFrom(serverUp) io.WriteString(serverDown, msg) serverDown.Close() }() @@ -280,13 +144,13 @@ func testSpliceReaderAtEOF(t *testing.T) { } } -func testSpliceIssue25985(t *testing.T) { - front, err := newLocalListener("tcp") +func testSpliceIssue25985(t *testing.T, upNet, downNet string) { + front, err := newLocalListener(upNet) if err != nil { t.Fatal(err) } defer front.Close() - back, err := newLocalListener("tcp") + back, err := newLocalListener(downNet) if err != nil { t.Fatal(err) } @@ -300,7 +164,7 @@ func testSpliceIssue25985(t *testing.T) { if err != nil { return } - dst, err := Dial("tcp", back.Addr().String()) + dst, err := Dial(downNet, back.Addr().String()) if err != nil { return } @@ -318,7 +182,7 @@ func testSpliceIssue25985(t *testing.T) { go proxy() - toFront, err := Dial("tcp", front.Addr().String()) + toFront, err := Dial(upNet, front.Addr().String()) if err != nil { t.Fatal(err) } @@ -340,166 +204,71 @@ func testSpliceIssue25985(t *testing.T) { wg.Wait() } -func BenchmarkTCPReadFrom(b *testing.B) { +func BenchmarkSplice(b *testing.B) { testHookUninstaller.Do(uninstallTestHooks) - var chunkSizes []int - for i := uint(10); i <= 20; i++ { - chunkSizes = append(chunkSizes, 1< totalSize { + buf = buf[:totalSize-count] + } + + var err error + if n, err = fn(buf); err != nil { + return + } + } +}