exp/ssh: introduce Session to replace Cmd for interactive commands

This CL replaces the Cmd type with a Session type representing
interactive channels. This lays the foundation for supporting
other kinds of channels like direct-tcpip or x11.

client.go:
* replace chanlist map with slice.
* generalize stdout and stderr into a single type.
* unexport ClientChan to clientChan.

doc.go:
* update ServerConfig/ServerConn documentation.
* update Client example for Session.

message.go:
* make channelExtendedData more like channelData.

session.go:
* added Session which replaces Cmd.

R=agl, rsc, n13m3y3r, gustavo
CC=golang-dev
https://golang.org/cl/5302054
This commit is contained in:
Dave Cheney 2011-10-24 19:13:55 -04:00 committed by Adam Langley
parent 2f3f3aa2ed
commit 5791233461
5 changed files with 230 additions and 231 deletions

View file

@ -13,5 +13,6 @@ GOFILES=\
transport.go\
server.go\
server_shell.go\
session.go\
include ../../../Make.pkg

View file

@ -8,7 +8,6 @@ import (
"big"
"crypto"
"crypto/rand"
"encoding/binary"
"fmt"
"io"
"os"
@ -31,10 +30,6 @@ func Client(c net.Conn, config *ClientConfig) (*ClientConn, os.Error) {
conn := &ClientConn{
transport: newTransport(c, config.rand()),
config: config,
chanlist: chanlist{
Mutex: new(sync.Mutex),
chans: make(map[uint32]*ClientChan),
},
}
if err := conn.handshake(); err != nil {
conn.Close()
@ -233,18 +228,17 @@ func (c *ClientConn) kexDH(group *dhGroup, hashFunc crypto.Hash, magics *handsha
return H, K, nil
}
// OpenChan opens a new client channel. The most common session type is "session".
// openChan opens a new client channel. The most common session type is "session".
// The full set of valid session types are listed in RFC 4250 4.9.1.
func (c *ClientConn) OpenChan(typ string) (*ClientChan, os.Error) {
ch, id := c.newChan(c.transport)
func (c *ClientConn) openChan(typ string) (*clientChan, os.Error) {
ch := c.newChan(c.transport)
if err := c.writePacket(marshal(msgChannelOpen, channelOpenMsg{
ChanType: typ,
PeersId: id,
PeersWindow: 8192,
MaxPacketSize: 16384,
PeersId: ch.id,
PeersWindow: 1 << 14,
MaxPacketSize: 1 << 15, // RFC 4253 6.1
})); err != nil {
// remove channel reference
c.chanlist.remove(id)
c.chanlist.remove(ch.id)
return nil, err
}
// wait for response
@ -252,10 +246,10 @@ func (c *ClientConn) OpenChan(typ string) (*ClientChan, os.Error) {
case *channelOpenConfirmMsg:
ch.peersId = msg.MyId
case *channelOpenFailureMsg:
c.chanlist.remove(id)
c.chanlist.remove(ch.id)
return nil, os.NewError(msg.Message)
default:
c.chanlist.remove(id)
c.chanlist.remove(ch.id)
return nil, os.NewError("Unexpected packet")
}
return ch, nil
@ -271,6 +265,10 @@ func (c *ClientConn) mainLoop() {
c.Close()
return
}
// TODO(dfc) A note on blocking channel use.
// The msg, win, data and dataExt channels of a clientChan can
// cause this loop to block indefinately if the consumer does
// not service them.
switch msg := decode(packet).(type) {
case *channelOpenMsg:
c.getChan(msg.PeersId).msg <- msg
@ -280,9 +278,9 @@ func (c *ClientConn) mainLoop() {
c.getChan(msg.PeersId).msg <- msg
case *channelCloseMsg:
ch := c.getChan(msg.PeersId)
close(ch.stdinWriter.win)
close(ch.stdoutReader.data)
close(ch.stderrReader.dataExt)
close(ch.win)
close(ch.data)
close(ch.dataExt)
c.chanlist.remove(msg.PeersId)
case *channelEOFMsg:
c.getChan(msg.PeersId).msg <- msg
@ -293,13 +291,16 @@ func (c *ClientConn) mainLoop() {
case *channelRequestMsg:
c.getChan(msg.PeersId).msg <- msg
case *windowAdjustMsg:
c.getChan(msg.PeersId).stdinWriter.win <- int(msg.AdditionalBytes)
c.getChan(msg.PeersId).win <- int(msg.AdditionalBytes)
case *channelData:
c.getChan(msg.PeersId).stdoutReader.data <- msg.Payload
c.getChan(msg.PeersId).data <- msg.Payload
case *channelExtendedData:
// TODO(dfc) should this send be non blocking. RFC 4254 5.2 suggests
// ext data consumes window size, does that need to be handled as well ?
c.getChan(msg.PeersId).stderrReader.dataExt <- msg.Data
// RFC 4254 5.2 defines data_type_code 1 to be data destined
// for stderr on interactive sessions. Other data types are
// silently discarded.
if msg.Datatype == 1 {
c.getChan(msg.PeersId).dataExt <- msg.Payload
}
default:
fmt.Printf("mainLoop: unhandled %#v\n", msg)
}
@ -338,207 +339,95 @@ func (c *ClientConfig) rand() io.Reader {
return c.Rand
}
// A ClientChan represents a single RFC 4254 channel that is multiplexed
// A clientChan represents a single RFC 4254 channel that is multiplexed
// over a single SSH connection.
type ClientChan struct {
type clientChan struct {
packetWriter
*stdinWriter // used by Exec and Shell
*stdoutReader // used by Exec and Shell
*stderrReader // used by Exec and Shell
id, peersId uint32
msg chan interface{} // incoming messages
id, peersId uint32
data chan []byte // receives the payload of channelData messages
dataExt chan []byte // receives the payload of channelExtendedData messages
win chan int // receives window adjustments
msg chan interface{} // incoming messages
}
func newClientChan(t *transport, id uint32) *ClientChan {
// TODO(DFC) allocating stdin/out/err on ClientChan creation is
// wasteful, but ClientConn.mainLoop() needs a way of finding
// those channels before Exec/Shell is called because the remote
// may send window adjustments at any time.
return &ClientChan{
func newClientChan(t *transport, id uint32) *clientChan {
return &clientChan{
packetWriter: t,
stdinWriter: &stdinWriter{
packetWriter: t,
id: id,
win: make(chan int, 16),
},
stdoutReader: &stdoutReader{
packetWriter: t,
id: id,
win: 8192,
data: make(chan []byte, 16),
},
stderrReader: &stderrReader{
dataExt: make(chan string, 16),
},
id: id,
msg: make(chan interface{}, 16),
id: id,
data: make(chan []byte, 16),
dataExt: make(chan []byte, 16),
win: make(chan int, 16),
msg: make(chan interface{}, 16),
}
}
// Close closes the channel. This does not close the underlying connection.
func (c *ClientChan) Close() os.Error {
func (c *clientChan) Close() os.Error {
return c.writePacket(marshal(msgChannelClose, channelCloseMsg{
PeersId: c.id,
}))
}
// Setenv sets an environment variable that will be applied to any
// command executed by Shell or Exec.
func (c *ClientChan) Setenv(name, value string) os.Error {
namLen := stringLength([]byte(name))
valLen := stringLength([]byte(value))
payload := make([]byte, namLen+valLen)
marshalString(payload[:namLen], []byte(name))
marshalString(payload[namLen:], []byte(value))
return c.sendChanReq(channelRequestMsg{
PeersId: c.id,
Request: "env",
WantReply: true,
RequestSpecificData: payload,
})
}
func (c *ClientChan) sendChanReq(req channelRequestMsg) os.Error {
func (c *clientChan) sendChanReq(req channelRequestMsg) os.Error {
if err := c.writePacket(marshal(msgChannelRequest, req)); err != nil {
return err
}
for {
switch msg := (<-c.msg).(type) {
case *channelRequestSuccessMsg:
return nil
case *channelRequestFailureMsg:
return os.NewError(req.Request)
default:
return fmt.Errorf("%#v", msg)
}
msg := <-c.msg
if _, ok := msg.(*channelRequestSuccessMsg); ok {
return nil
}
panic("unreachable")
}
// An empty mode list (a string of 1 character, opcode 0), see RFC 4254 Section 8.
var emptyModeList = []byte{0, 0, 0, 1, 0}
// RequstPty requests a pty to be allocated on the remote side of this channel.
func (c *ClientChan) RequestPty(term string, h, w int) os.Error {
buf := make([]byte, 4+len(term)+16+len(emptyModeList))
b := marshalString(buf, []byte(term))
binary.BigEndian.PutUint32(b, uint32(h))
binary.BigEndian.PutUint32(b[4:], uint32(w))
binary.BigEndian.PutUint32(b[8:], uint32(h*8))
binary.BigEndian.PutUint32(b[12:], uint32(w*8))
copy(b[16:], emptyModeList)
return c.sendChanReq(channelRequestMsg{
PeersId: c.id,
Request: "pty-req",
WantReply: true,
RequestSpecificData: buf,
})
}
// Exec runs cmd on the remote host.
// Typically, the remote server passes cmd to the shell for interpretation.
func (c *ClientChan) Exec(cmd string) (*Cmd, os.Error) {
cmdLen := stringLength([]byte(cmd))
payload := make([]byte, cmdLen)
marshalString(payload, []byte(cmd))
err := c.sendChanReq(channelRequestMsg{
PeersId: c.id,
Request: "exec",
WantReply: true,
RequestSpecificData: payload,
})
return &Cmd{
c.stdinWriter,
c.stdoutReader,
c.stderrReader,
}, err
}
// Shell starts a login shell on the remote host.
func (c *ClientChan) Shell() (*Cmd, os.Error) {
err := c.sendChanReq(channelRequestMsg{
PeersId: c.id,
Request: "shell",
WantReply: true,
})
return &Cmd{
c.stdinWriter,
c.stdoutReader,
c.stderrReader,
}, err
return fmt.Errorf("failed to complete request: %s, %#v", req.Request, msg)
}
// Thread safe channel list.
type chanlist struct {
*sync.Mutex
// TODO(dfc) should could be converted to a slice
chans map[uint32]*ClientChan
// protects concurrent access to chans
sync.Mutex
// chans are indexed by the local id of the channel, clientChan.id.
// The PeersId value of messages received by ClientConn.mainloop is
// used to locate the right local clientChan in this slice.
chans []*clientChan
}
// Allocate a new ClientChan with the next avail local id.
func (c *chanlist) newChan(t *transport) (*ClientChan, uint32) {
func (c *chanlist) newChan(t *transport) *clientChan {
c.Lock()
defer c.Unlock()
for i := uint32(0); i < 1<<31; i++ {
if _, ok := c.chans[i]; !ok {
ch := newClientChan(t, i)
for i := range c.chans {
if c.chans[i] == nil {
ch := newClientChan(t, uint32(i))
c.chans[i] = ch
return ch, uint32(i)
return ch
}
}
panic("unable to find free channel")
i := len(c.chans)
ch := newClientChan(t, uint32(i))
c.chans = append(c.chans, ch)
return ch
}
func (c *chanlist) getChan(id uint32) *ClientChan {
func (c *chanlist) getChan(id uint32) *clientChan {
c.Lock()
defer c.Unlock()
return c.chans[id]
return c.chans[int(id)]
}
func (c *chanlist) remove(id uint32) {
c.Lock()
defer c.Unlock()
delete(c.chans, id)
c.chans[int(id)] = nil
}
// A Cmd represents a connection to a remote command or shell
// Closing Cmd.Stdin will be observed by the remote process.
type Cmd struct {
// Writes to Stdin are made available to the command's standard input.
// Closing Stdin causes the command to observe an EOF on its standard input.
Stdin io.WriteCloser
// Reads from Stdout consume the command's standard output.
// There is a fixed amount of buffering of the command's standard output.
// Failing to read from Stdout will eventually cause the command to block
// when writing to its standard output. Closing Stdout unblocks any
// such writes and makes them return errors.
Stdout io.ReadCloser
// Reads from Stderr consume the command's standard error.
// The SSH protocol assumes it can always send standard error;
// the command will never block writing to its standard error.
// However, failure to read from Stderr will eventually cause the
// SSH protocol to jam, so it is important to arrange for reading
// from Stderr, even if by
// go io.Copy(ioutil.Discard, cmd.Stderr)
Stderr io.Reader
}
// A stdinWriter represents the stdin of a remote process.
type stdinWriter struct {
// A chanWriter represents the stdin of a remote process.
type chanWriter struct {
win chan int // receives window adjustments
id uint32
rwin int // current rwin size
packetWriter // for sending channelDataMsg
id uint32 // this channel's id
rwin int // current rwin size
packetWriter // for sending channelDataMsg
}
// Write writes data to the remote process's standard input.
func (w *stdinWriter) Write(data []byte) (n int, err os.Error) {
func (w *chanWriter) Write(data []byte) (n int, err os.Error) {
for {
if w.rwin == 0 {
win, ok := <-w.win
@ -560,69 +449,42 @@ func (w *stdinWriter) Write(data []byte) (n int, err os.Error) {
panic("unreachable")
}
func (w *stdinWriter) Close() os.Error {
func (w *chanWriter) Close() os.Error {
return w.writePacket(marshal(msgChannelEOF, channelEOFMsg{w.id}))
}
// A stdoutReader represents the stdout of a remote process.
type stdoutReader struct {
// A chanReader represents stdout or stderr of a remote process.
type chanReader struct {
// TODO(dfc) a fixed size channel may not be the right data structure.
// If writes to this channel block, they will block mainLoop, making
// it unable to receive new messages from the remote side.
data chan []byte // receives data from remote
id uint32
win int // current win size
packetWriter // for sending windowAdjustMsg
packetWriter // for sending windowAdjustMsg
buf []byte
}
// Read reads data from the remote process's standard output.
func (r *stdoutReader) Read(data []byte) (int, os.Error) {
// Read reads data from the remote process's stdout or stderr.
func (r *chanReader) Read(data []byte) (int, os.Error) {
var ok bool
for {
if len(r.buf) > 0 {
n := copy(data, r.buf)
r.buf = r.buf[n:]
r.win += n
msg := windowAdjustMsg{
PeersId: r.id,
AdditionalBytes: uint32(n),
}
err := r.writePacket(marshal(msgChannelWindowAdjust, msg))
return n, err
return n, r.writePacket(marshal(msgChannelWindowAdjust, msg))
}
r.buf, ok = <-r.data
if !ok {
return 0, os.EOF
}
r.win -= len(r.buf)
}
panic("unreachable")
}
func (r *stdoutReader) Close() os.Error {
func (r *chanReader) Close() os.Error {
return r.writePacket(marshal(msgChannelEOF, channelEOFMsg{r.id}))
}
// A stderrReader represents the stderr of a remote process.
type stderrReader struct {
dataExt chan string // receives dataExt from remote
buf []byte // buffer current dataExt
}
// Read reads a line of data from the remote process's stderr.
func (r *stderrReader) Read(data []byte) (int, os.Error) {
for {
if len(r.buf) > 0 {
n := copy(data, r.buf)
r.buf = r.buf[n:]
return n, nil
}
buf, ok := <-r.dataExt
if !ok {
return 0, os.EOF
}
r.buf = []byte(buf)
}
panic("unreachable")
}

View file

@ -11,26 +11,29 @@ protocol is a remote shell and this is specifically implemented. However,
the multiplexed nature of SSH is exposed to users that wish to support
others.
An SSH server is represented by a Server, which manages a number of
ServerConnections and handles authentication.
An SSH server is represented by a ServerConfig, which holds certificate
details and handles authentication of ServerConns.
var s Server
s.PubKeyCallback = pubKeyAuth
s.PasswordCallback = passwordAuth
config := new(ServerConfig)
config.PubKeyCallback = pubKeyAuth
config.PasswordCallback = passwordAuth
pemBytes, err := ioutil.ReadFile("id_rsa")
if err != nil {
panic("Failed to load private key")
}
err = s.SetRSAPrivateKey(pemBytes)
err = config.SetRSAPrivateKey(pemBytes)
if err != nil {
panic("Failed to parse private key")
}
Once a Server has been set up, connections can be attached.
Once a ServerConfig has been configured, connections can be accepted.
var sConn ServerConnection
sConn.Server = &s
listener := Listen("tcp", "0.0.0.0:2022", config)
sConn, err := listener.Accept()
if err != nil {
panic("failed to accept incoming connection")
}
err = sConn.Handshake(conn)
if err != nil {
panic("failed to handshake")
@ -38,7 +41,6 @@ Once a Server has been set up, connections can be attached.
An SSH connection multiplexes several channels, which must be accepted themselves:
for {
channel, err := sConn.Accept()
if err != nil {
@ -85,17 +87,19 @@ authentication method is supported.
}
client, err := Dial("yourserver.com:22", config)
Each ClientConn can support multiple channels, represented by ClientChan. Each
channel should be of a type specified in rfc4250, 4.9.1.
Each ClientConn can support multiple interactive sessions, represented by a Session.
ch, err := client.OpenChan("session")
session, err := client.NewSession()
Once the ClientChan is opened, you can execute a single command on the remote side
Once a Session is created, you can execute a single command on the remote side
using the Exec method.
cmd, err := ch.Exec("/usr/bin/whoami")
reader := bufio.NewReader(cmd.Stdin)
if err := session.Exec("/usr/bin/whoami"); err != nil {
panic("Failed to exec: " + err.String())
}
reader := bufio.NewReader(session.Stdin)
line, _, _ := reader.ReadLine()
fmt.Println(line)
session.Close()
*/
package ssh

View file

@ -154,7 +154,7 @@ type channelData struct {
type channelExtendedData struct {
PeersId uint32
Datatype uint32
Data string
Payload []byte `ssh:"rest"`
}
type channelRequestMsg struct {

132
src/pkg/exp/ssh/session.go Normal file
View file

@ -0,0 +1,132 @@
// Copyright 2011 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package ssh
// Session implements an interactive session described in
// "RFC 4254, section 6".
import (
"encoding/binary"
"io"
"os"
)
// A Session represents a connection to a remote command or shell.
type Session struct {
// Writes to Stdin are made available to the remote command's standard input.
// Closing Stdin causes the command to observe an EOF on its standard input.
Stdin io.WriteCloser
// Reads from Stdout and Stderr consume from the remote command's standard
// output and error streams, respectively.
// There is a fixed amount of buffering that is shared for the two streams.
// Failing to read from either may eventually cause the command to block.
// Closing Stdout unblocks such writes and causes them to return errors.
Stdout io.ReadCloser
Stderr io.Reader
*clientChan // the channel backing this session
started bool // started is set to true once a Shell or Exec is invoked.
}
// Setenv sets an environment variable that will be applied to any
// command executed by Shell or Exec.
func (s *Session) Setenv(name, value string) os.Error {
n, v := []byte(name), []byte(value)
nlen, vlen := stringLength(n), stringLength(v)
payload := make([]byte, nlen+vlen)
marshalString(payload[:nlen], n)
marshalString(payload[nlen:], v)
return s.sendChanReq(channelRequestMsg{
PeersId: s.id,
Request: "env",
WantReply: true,
RequestSpecificData: payload,
})
}
// An empty mode list (a string of 1 character, opcode 0), see RFC 4254 Section 8.
var emptyModeList = []byte{0, 0, 0, 1, 0}
// RequestPty requests the association of a pty with the session on the remote host.
func (s *Session) RequestPty(term string, h, w int) os.Error {
buf := make([]byte, 4+len(term)+16+len(emptyModeList))
b := marshalString(buf, []byte(term))
binary.BigEndian.PutUint32(b, uint32(h))
binary.BigEndian.PutUint32(b[4:], uint32(w))
binary.BigEndian.PutUint32(b[8:], uint32(h*8))
binary.BigEndian.PutUint32(b[12:], uint32(w*8))
copy(b[16:], emptyModeList)
return s.sendChanReq(channelRequestMsg{
PeersId: s.id,
Request: "pty-req",
WantReply: true,
RequestSpecificData: buf,
})
}
// Exec runs cmd on the remote host. Typically, the remote
// server passes cmd to the shell for interpretation.
// A Session only accepts one call to Exec or Shell.
func (s *Session) Exec(cmd string) os.Error {
if s.started {
return os.NewError("session already started")
}
cmdLen := stringLength([]byte(cmd))
payload := make([]byte, cmdLen)
marshalString(payload, []byte(cmd))
s.started = true
return s.sendChanReq(channelRequestMsg{
PeersId: s.id,
Request: "exec",
WantReply: true,
RequestSpecificData: payload,
})
}
// Shell starts a login shell on the remote host. A Session only
// accepts one call to Exec or Shell.
func (s *Session) Shell() os.Error {
if s.started {
return os.NewError("session already started")
}
s.started = true
return s.sendChanReq(channelRequestMsg{
PeersId: s.id,
Request: "shell",
WantReply: true,
})
}
// NewSession returns a new interactive session on the remote host.
func (c *ClientConn) NewSession() (*Session, os.Error) {
ch, err := c.openChan("session")
if err != nil {
return nil, err
}
return &Session{
Stdin: &chanWriter{
packetWriter: ch,
id: ch.id,
win: ch.win,
},
Stdout: &chanReader{
packetWriter: ch,
id: ch.id,
data: ch.data,
},
Stderr: &chanReader{
packetWriter: ch,
id: ch.id,
data: ch.dataExt,
},
clientChan: ch,
}, nil
}