mirror of
https://github.com/golang/go
synced 2024-11-25 05:38:01 -07:00
exp/ssh: improve client channel close behavior
R=gustav.paul CC=golang-dev https://golang.org/cl/5480062
This commit is contained in:
parent
7cf4825425
commit
2b600f77dd
@ -200,7 +200,7 @@ func (c *ClientConn) mainLoop() {
|
|||||||
peersId := uint32(packet[1])<<24 | uint32(packet[2])<<16 | uint32(packet[3])<<8 | uint32(packet[4])
|
peersId := uint32(packet[1])<<24 | uint32(packet[2])<<16 | uint32(packet[3])<<8 | uint32(packet[4])
|
||||||
if length := int(packet[5])<<24 | int(packet[6])<<16 | int(packet[7])<<8 | int(packet[8]); length > 0 {
|
if length := int(packet[5])<<24 | int(packet[6])<<16 | int(packet[7])<<8 | int(packet[8]); length > 0 {
|
||||||
packet = packet[9:]
|
packet = packet[9:]
|
||||||
c.getChan(peersId).stdout.data <- packet[:length]
|
c.getChan(peersId).stdout.handleData(packet[:length])
|
||||||
}
|
}
|
||||||
case msgChannelExtendedData:
|
case msgChannelExtendedData:
|
||||||
if len(packet) < 13 {
|
if len(packet) < 13 {
|
||||||
@ -215,7 +215,7 @@ func (c *ClientConn) mainLoop() {
|
|||||||
// for stderr on interactive sessions. Other data types are
|
// for stderr on interactive sessions. Other data types are
|
||||||
// silently discarded.
|
// silently discarded.
|
||||||
if datatype == 1 {
|
if datatype == 1 {
|
||||||
c.getChan(peersId).stderr.data <- packet[:length]
|
c.getChan(peersId).stderr.handleData(packet[:length])
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
@ -228,13 +228,22 @@ func (c *ClientConn) mainLoop() {
|
|||||||
c.getChan(msg.PeersId).msg <- msg
|
c.getChan(msg.PeersId).msg <- msg
|
||||||
case *channelCloseMsg:
|
case *channelCloseMsg:
|
||||||
ch := c.getChan(msg.PeersId)
|
ch := c.getChan(msg.PeersId)
|
||||||
|
ch.theyClosed = true
|
||||||
close(ch.stdin.win)
|
close(ch.stdin.win)
|
||||||
close(ch.stdout.data)
|
ch.stdout.eof()
|
||||||
close(ch.stderr.data)
|
ch.stderr.eof()
|
||||||
close(ch.msg)
|
close(ch.msg)
|
||||||
|
if !ch.weClosed {
|
||||||
|
ch.weClosed = true
|
||||||
|
ch.sendClose()
|
||||||
|
}
|
||||||
c.chanlist.remove(msg.PeersId)
|
c.chanlist.remove(msg.PeersId)
|
||||||
case *channelEOFMsg:
|
case *channelEOFMsg:
|
||||||
c.getChan(msg.PeersId).sendEOF()
|
ch := c.getChan(msg.PeersId)
|
||||||
|
ch.stdout.eof()
|
||||||
|
// RFC 4254 is mute on how EOF affects dataExt messages but
|
||||||
|
// it is logical to signal EOF at the same time.
|
||||||
|
ch.stderr.eof()
|
||||||
case *channelRequestSuccessMsg:
|
case *channelRequestSuccessMsg:
|
||||||
c.getChan(msg.PeersId).msg <- msg
|
c.getChan(msg.PeersId).msg <- msg
|
||||||
case *channelRequestFailureMsg:
|
case *channelRequestFailureMsg:
|
||||||
@ -243,6 +252,8 @@ func (c *ClientConn) mainLoop() {
|
|||||||
c.getChan(msg.PeersId).msg <- msg
|
c.getChan(msg.PeersId).msg <- msg
|
||||||
case *windowAdjustMsg:
|
case *windowAdjustMsg:
|
||||||
c.getChan(msg.PeersId).stdin.win <- int(msg.AdditionalBytes)
|
c.getChan(msg.PeersId).stdin.win <- int(msg.AdditionalBytes)
|
||||||
|
case *disconnectMsg:
|
||||||
|
break
|
||||||
default:
|
default:
|
||||||
fmt.Printf("mainLoop: unhandled message %T: %v\n", msg, msg)
|
fmt.Printf("mainLoop: unhandled message %T: %v\n", msg, msg)
|
||||||
}
|
}
|
||||||
@ -295,6 +306,9 @@ type clientChan struct {
|
|||||||
stdout *chanReader // receives the payload of channelData messages
|
stdout *chanReader // receives the payload of channelData messages
|
||||||
stderr *chanReader // receives the payload of channelExtendedData messages
|
stderr *chanReader // receives the payload of channelExtendedData messages
|
||||||
msg chan interface{} // incoming messages
|
msg chan interface{} // incoming messages
|
||||||
|
|
||||||
|
theyClosed bool // indicates the close msg has been received from the remote side
|
||||||
|
weClosed bool // incidates the close msg has been sent from our side
|
||||||
}
|
}
|
||||||
|
|
||||||
// newClientChan returns a partially constructed *clientChan
|
// newClientChan returns a partially constructed *clientChan
|
||||||
@ -336,20 +350,29 @@ func (c *clientChan) waitForChannelOpenResponse() error {
|
|||||||
return errors.New("unexpected packet")
|
return errors.New("unexpected packet")
|
||||||
}
|
}
|
||||||
|
|
||||||
// sendEOF Sends EOF to the server. RFC 4254 Section 5.3
|
// sendEOF sends EOF to the server. RFC 4254 Section 5.3
|
||||||
func (c *clientChan) sendEOF() error {
|
func (c *clientChan) sendEOF() error {
|
||||||
return c.writePacket(marshal(msgChannelEOF, channelEOFMsg{
|
return c.writePacket(marshal(msgChannelEOF, channelEOFMsg{
|
||||||
PeersId: c.peersId,
|
PeersId: c.peersId,
|
||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close closes the channel. This does not close the underlying connection.
|
// sendClose signals the intent to close the channel.
|
||||||
func (c *clientChan) Close() error {
|
func (c *clientChan) sendClose() error {
|
||||||
return c.writePacket(marshal(msgChannelClose, channelCloseMsg{
|
return c.writePacket(marshal(msgChannelClose, channelCloseMsg{
|
||||||
PeersId: c.peersId,
|
PeersId: c.peersId,
|
||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Close closes the channel. This does not close the underlying connection.
|
||||||
|
func (c *clientChan) Close() error {
|
||||||
|
if !c.weClosed {
|
||||||
|
c.weClosed = true
|
||||||
|
return c.sendClose()
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// Thread safe channel list.
|
// Thread safe channel list.
|
||||||
type chanlist struct {
|
type chanlist struct {
|
||||||
// protects concurrent access to chans
|
// protects concurrent access to chans
|
||||||
@ -421,7 +444,7 @@ func (w *chanWriter) Write(data []byte) (n int, err error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (w *chanWriter) Close() error {
|
func (w *chanWriter) Close() error {
|
||||||
return w.clientChan.writePacket(marshal(msgChannelEOF, channelEOFMsg{w.clientChan.peersId}))
|
return w.clientChan.sendEOF()
|
||||||
}
|
}
|
||||||
|
|
||||||
// A chanReader represents stdout or stderr of a remote process.
|
// A chanReader represents stdout or stderr of a remote process.
|
||||||
@ -430,10 +453,27 @@ type chanReader struct {
|
|||||||
// If writes to this channel block, they will block mainLoop, making
|
// If writes to this channel block, they will block mainLoop, making
|
||||||
// it unable to receive new messages from the remote side.
|
// it unable to receive new messages from the remote side.
|
||||||
data chan []byte // receives data from remote
|
data chan []byte // receives data from remote
|
||||||
|
dataClosed bool // protects data from being closed twice
|
||||||
clientChan *clientChan // the channel backing this reader
|
clientChan *clientChan // the channel backing this reader
|
||||||
buf []byte
|
buf []byte
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// eof signals to the consumer that there is no more data to be received.
|
||||||
|
func (r *chanReader) eof() {
|
||||||
|
if !r.dataClosed {
|
||||||
|
r.dataClosed = true
|
||||||
|
close(r.data)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// handleData sends buf to the reader's consumer. If r.data is closed
|
||||||
|
// the data will be silently discarded
|
||||||
|
func (r *chanReader) handleData(buf []byte) {
|
||||||
|
if !r.dataClosed {
|
||||||
|
r.data <- buf
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Read reads data from the remote process's stdout or stderr.
|
// Read reads data from the remote process's stdout or stderr.
|
||||||
func (r *chanReader) Read(data []byte) (int, error) {
|
func (r *chanReader) Read(data []byte) (int, error) {
|
||||||
var ok bool
|
var ok bool
|
||||||
|
Loading…
Reference in New Issue
Block a user