From 581ed296acdaf3139525e142bc85c4bf5cc79122 Mon Sep 17 00:00:00 2001 From: AJ ONeal Date: Sat, 16 May 2020 03:09:47 -0600 Subject: [PATCH] wip: new client --- .gitignore | 1 + mplexer/cmd/client/client.go | 62 +++++++++++++++ mplexer/listener.go | 68 ++++++++++++++++ mplexer/mplexer.go | 148 +++++++++++++++++++++++++++++++++++ mplexer/packer/addr.go | 50 ++++++++++++ mplexer/packer/conn.go | 117 +++++++++++++++++++++++++++ mplexer/sortinghat.go | 10 +++ 7 files changed, 456 insertions(+) create mode 100644 mplexer/cmd/client/client.go create mode 100644 mplexer/listener.go create mode 100644 mplexer/mplexer.go create mode 100644 mplexer/packer/addr.go create mode 100644 mplexer/packer/conn.go create mode 100644 mplexer/sortinghat.go diff --git a/.gitignore b/.gitignore index 1f6690d..ca937f6 100644 --- a/.gitignore +++ b/.gitignore @@ -11,5 +11,6 @@ acme.d /cmd/telebit-relay/telebit-relay *.exe +.*.sw* log.txt *.log diff --git a/mplexer/cmd/client/client.go b/mplexer/cmd/client/client.go new file mode 100644 index 0000000..a60b129 --- /dev/null +++ b/mplexer/cmd/client/client.go @@ -0,0 +1,62 @@ +package main + +import ( + "context" + "net" + + "git.coolaj86.com/coolaj86/go-telebitd/mplexer" +) + +func main() { + r := &Router{ + secret: os.Getenv("SECRET"), + } + m := &mplexer.MultiplexLocal{ + Relay: os.Getenv("RELAY"), + SortingHat: r, + } + + ctx := context.Background() + + // TODO more m.ListenAndServe(mux) style? + m.ListenAndServe(ctx) +} + +type Router struct { + secret string +} + +func (r *Router) Authz() (string, error) { + return r.secret, nil +} + +// this function is very client-specific logic +func (r *Router) LookupTarget(paddr packer.Addr) (net.Conn, error) { + //if target := LookupPort(paddr.Servername()); nil != target { } + if target := r.LookupServername(paddr.Port()); nil != target { + tconn, err := net.Dial(target.Network(), target.Hostname()) + if nil != err { + return nil, err + } + /* + // TODO for http proxy + return mplexer.TargetOptions { + Hostname // default localhost + Termination // default TLS + XFWD // default... no? + Port // default 0 + Conn // should be dialed beforehand + }, nil + */ + return tconn, nil + } +} + +func (r *Router) LookupServername(servername string) mplexer.Addr { + return &mplexer.NewAddr( + mplexer.HTTPS, + mplexer.TCP, // TCP -> termination.None? / Plain? + "localhost", + 3000, + ) +} diff --git a/mplexer/listener.go b/mplexer/listener.go new file mode 100644 index 0000000..34a1bf2 --- /dev/null +++ b/mplexer/listener.go @@ -0,0 +1,68 @@ +package mplexer + +import ( + "context" + "fmt" + "io" + "net" + "net/http" + + "github.com/gorilla/websocket" +) + +// Listener defines a listener for use with http servers +type Listener struct { + //ParentAddr net.Addr + Conns chan *Conn + ws *websocket.Conn +} + +// NewListener creates a channel for connections and returns the listener +func (m *MultiplexLocal) Listen(ctx context.Context) (*Listener, error) { + authz, err := m.SortingHat.Authz() + if nil != err { + return nil, err + } + + wsd := websocket.Dialer{} + headers := http.Header{} + headers.Set("Authorization", fmt.Sprintf("Bearer %s", authz)) + // *http.Response + wsconn, _, err := wsd.DialContext(ctx, m.Relay, headers) + if nil != err { + return nil, err + } + listener := &Listener{ + Conns: make(chan *Conn), + } + return listener, nil +} + +// Feed will block while pushing a net.Conn onto Conns +func (l *Listener) Feed(conn *Conn) { + l.Conns <- conn +} + +// net.Listener interface + +// Accept will block and wait for a new net.Conn +func (l *Listener) Accept() (*Conn, error) { + conn, ok := <-l.Conns + if ok { + return conn, nil + } + return nil, io.EOF +} + +// Close will close the Conns channel +func (l *Listener) Close() error { + close(l.Conns) + return nil +} + +// Addr returns nil to fulfill the net.Listener interface +func (l *Listener) Addr() net.Addr { + // Addr may (or may not) return the original TCP or TLS listener's address + //return l.ParentAddr + return nil +} diff --git a/mplexer/mplexer.go b/mplexer/mplexer.go new file mode 100644 index 0000000..b0141c5 --- /dev/null +++ b/mplexer/mplexer.go @@ -0,0 +1,148 @@ +package mplexer + +import ( + "context" + "fmt" + "io" + "net" + "os" + "time" +) + +type MultiplexLocal struct { + Relay string + SortingHat SortingHat + Timeout time.Duration +} + +func New(relay string, hat SortingHat) *MultiplexLocal { + return &MultiplexLocal{ + Relay: relay, + SortingHat: hat, + Timeout: 30 * time.Second, + } +} + +func (m *MultiplexLocal) ListenAndServe(ctx context.Context) error { + listener, err := m.Listen(ctx) + if nil != err { + return err + } + + for { + pconn, err := listener.Accept() // packer.Conn + if nil != err { + return err + } + + go m.serve(ctx, pconn) + } +} + +func (m *MultiplexLocal) serve(ctx context.Context, pconn *Conn) { + //paddr := pconn.LocalAddr().(*Addr) // packer.Addr + paddr := pconn.LocalAddr() + //addr.Network() + //addr.String() + paddr.Scheme() + //paddr.Encrypted() + //paddr.Servername() + + // todo: some sort of logic to avoid infinite loop to self? + // (that's probably not possible since the connection could + // route several layers deep) + if target, err := m.SortingHat.LookupTarget(paddr); nil != target { + if nil != err { + // TODO get a log channel or some such + fmt.Fprintf(os.Stderr, "lookup failed for tunneled client: %s", err) + err := pconn.Error(err) + if nil != err { + fmt.Fprintf(os.Stderr, "failed to signal error back to relay: %s", err) + } + return + } + pipePacker(ctx, pconn, target, m.Timeout) + } +} + +func pipePacker(ctx context.Context, pconn *Conn, target net.Conn, timeout time.Duration) { + // how can this be done so that target errors are + // sent back to the relay server? + + // Also something like ReadAhead(size) should signal + // to read and send up to `size` bytes without waiting + // for a response - since we can't signal 'non-read' as + // is the normal operation of tcp... or can we? + // And how do we distinguish idle from dropped? + // Maybe this should have been a udp protocol??? + + defer pconn.Close() + defer target.Close() + + srcCh := make(chan []byte) + dstCh := make(chan []byte) + errCh := make(chan error) + + // Source (Relay) Read Channel + go func() { + // TODO what's the optimal size to buffer? + // TODO user buffered reader + b := make([]byte, 128*1024) + for { + pconn.SetDeadline(time.Now().Add(timeout)) + n, err := pconn.Read(b) + if n > 0 { + srcCh <- b + } + if nil != err { + // TODO let client log this server-side error (unless EOF) + // (nil here because we probably can't send the error to the relay) + errCh <- nil + break + } + } + }() + + // Target (Local) Read Channel + go func() { + // TODO what's the optimal size to buffer? + // TODO user buffered reader + b := make([]byte, 128*1024) + for { + target.SetDeadline(time.Now().Add(timeout)) + n, err := target.Read(b) + if n > 0 { + dstCh <- b + } + if nil != err { + if io.EOF == err { + err = nil + } + errCh <- err + break + } + } + }() + + for { + select { + case <-ctx.Done(): + break + case b := <-srcCh: + target.SetDeadline(time.Now().Add(timeout)) + _, err := target.Write(b) + if nil != err { + // TODO log error locally + pconn.Error(err) + break + } + case b := <-dstCh: + pconn.SetDeadline(time.Now().Add(timeout)) + _, err := pconn.Write(b) + if nil != err { + // TODO log error locally + break + } + } + } +} diff --git a/mplexer/packer/addr.go b/mplexer/packer/addr.go new file mode 100644 index 0000000..e1d5f89 --- /dev/null +++ b/mplexer/packer/addr.go @@ -0,0 +1,50 @@ +package mplexer + +import ( + "fmt" + "strconv" +) + +type Scheme string + +const ( + HTTPS = Scheme("https") + HTTP = Scheme("http") + SSH = Scheme("ssh") + OpenVPN = Scheme("openvpn") +) + +type Termination string + +const ( + TCP = Termination("none") + TLS = Termination("tls") +) + +type Addr struct { + scheme Scheme + termination Termination + addr string + port int +} + +func NewAddr(s Scheme, t Termination, a string, p int) *Addr { + return &Addr{ + scheme: s, + termination: t, + addr: a, + port: p, + } +} + +func (a *Addr) String() string { + return fmt.Sprintf("%s:%s:%s:%d", a.Network(), a.Scheme(), a.addr, a.port) +} + +func (a *Addr) Network() string { + return a.addr + ":" + strconv.Itoa(a.port) +} + +func (a *Addr) Scheme() Scheme { + return a.scheme +} diff --git a/mplexer/packer/conn.go b/mplexer/packer/conn.go new file mode 100644 index 0000000..f0e0764 --- /dev/null +++ b/mplexer/packer/conn.go @@ -0,0 +1,117 @@ +package mplexer + +import ( + "errors" + "net" + "time" +) + +type Conn struct { + // TODO + relayRemoteAddr string + relayRemotePort int + relaySourceProto string + relaySourceAddr string + relaySourcePort int +} + +// TODO conn.go -> conn/conn.go +// TODO NewConn -> New + +func NewConn() *Conn { + return nil +} + +// Read reads data from the connection. +// Read can be made to time out and return an Error with Timeout() == true +// after a fixed time limit; see SetDeadline and SetReadDeadline. +func (c *Conn) Read(b []byte) (n int, err error) { + panic(errors.New("not implemented")) + return 0, nil +} + +// Write writes data to the connection. +// Write can be made to time out and return an Error with Timeout() == true +// after a fixed time limit; see SetDeadline and SetWriteDeadline. +func (c *Conn) Write(b []byte) (n int, err error) { + panic(errors.New("not implemented")) + return 0, nil +} + +// Close closes the connection. +// Any blocked Read or Write operations will be unblocked and return errors. +func (c *Conn) Close() error { + panic(errors.New("not implemented")) + return nil +} + +// Error signals an error back to the relay +func (c *Conn) Error(err error) error { + panic(errors.New("not implemented")) + return nil +} + +/* +// LocalAddr returns the local network address. +func (c *Conn) LocalAddr() net.Addr { + panic(errors.New("not implemented")) + return &net.IPAddr{} +} +*/ + +// LocalAddr returns the local network address. +func (c *Conn) LocalAddr() *Addr { + panic(errors.New("not implemented")) + return &Addr{} +} + +// RemoteAddr returns the remote network address. +func (c *Conn) RemoteAddr() net.Addr { + panic(errors.New("not implemented")) + return &net.IPAddr{} +} + +// SetDeadline sets the read and write deadlines associated +// with the connection. It is equivalent to calling both +// SetReadDeadline and SetWriteDeadline. +// +// A deadline is an absolute time after which I/O operations +// fail with a timeout (see type Error) instead of +// blocking. The deadline applies to all future and pending +// I/O, not just the immediately following call to Read or +// Write. After a deadline has been exceeded, the connection +// can be refreshed by setting a deadline in the future. +// +// An idle timeout can be implemented by repeatedly extending +// the deadline after successful Read or Write calls. +// +// A zero value for t means I/O operations will not time out. +// +// Note that if a TCP connection has keep-alive turned on, +// which is the default unless overridden by Dialer.KeepAlive +// or ListenConfig.KeepAlive, then a keep-alive failure may +// also return a timeout error. On Unix systems a keep-alive +// failure on I/O can be detected using +// errors.Is(err, syscall.ETIMEDOUT). +func (c *Conn) SetDeadline(t time.Time) error { + panic(errors.New("not implemented")) + return nil +} + +// SetReadDeadline sets the deadline for future Read calls +// and any currently-blocked Read call. +// A zero value for t means Read will not time out. +func (c *Conn) SetReadDeadline(t time.Time) error { + panic(errors.New("not implemented")) + return nil +} + +// SetWriteDeadline sets the deadline for future Write calls +// and any currently-blocked Write call. +// Even if write times out, it may return n > 0, indicating that +// some of the data was successfully written. +// A zero value for t means Write will not time out. +func (c *Conn) SetWriteDeadline(t time.Time) error { + panic(errors.New("not implemented")) + return nil +} diff --git a/mplexer/sortinghat.go b/mplexer/sortinghat.go new file mode 100644 index 0000000..e1cf327 --- /dev/null +++ b/mplexer/sortinghat.go @@ -0,0 +1,10 @@ +package mplexer + +import ( + "net" +) + +type SortingHat interface { + LookupTarget(*Addr) (net.Conn, error) + Authz() (string, error) +}