fix udp listener and fix agent TX/RX

Signed-off-by: Matheus Sampaio Queiroga <srherobrine20@gmail.com>
This commit is contained in:
Matheus Sampaio Queiroga 2024-06-21 20:49:21 -03:00
parent f6985e6661
commit dc5e8a1cdf
Signed by: Sirherobrine23
GPG Key ID: 01CCABE2580AFEBC
9 changed files with 160 additions and 344 deletions

@ -1,6 +1,7 @@
package client
import (
"bufio"
"bytes"
"encoding/json"
"errors"
@ -122,24 +123,32 @@ func (tun *Client) GetTargetWrite(Proto uint8, To netip.AddrPort) io.Writer {
}
func (client *Client) handlers() {
bufioBuff := bufio.NewReader(client.Conn)
var lastPing int64 = 0
for {
if time.Now().UnixMilli() - lastPing > 3_000 {
if time.Now().UnixMilli()-lastPing > 3_000 {
var now = time.Now()
go client.Send(proto.Request{Ping: &now})
}
res, err := proto.ReaderResponse(client.Conn)
d, _ := json.Marshal(res)
fmt.Println(string(d))
res, err := proto.ReaderResponse(bufioBuff)
if err != nil {
fmt.Println(err)
if err == proto.ErrInvalidBody {
continue
}
fmt.Println(err)
panic(err) // TODO: Require fix to agent shutdown graced
} else if res.Unauthorized || res.NotListened {
}
d, _ := json.Marshal(res)
fmt.Println(string(d))
if res.Pong != nil {
lastPing = res.Pong.UnixMilli()
continue
}
if res.Unauthorized || res.NotListened {
panic(fmt.Errorf("cannot recive requests")) // TODO: Require fix to agent shutdown graced
} else if res.SendAuth {
var auth = proto.AgentAuth(client.Token)

@ -145,7 +145,7 @@ func (p *pipe) RemoteAddr() net.Addr { return p.remoteAddr }
func (p *pipe) Read(b []byte) (int, error) {
n, err := p.read(b)
if err != nil && err != io.EOF && err != io.ErrClosedPipe {
err = &net.OpError{Op: "read", Net: "pipe", Err: err}
err = &net.OpError{Op: "read", Net: "udp", Err: err}
}
return n, err
}
@ -177,7 +177,7 @@ func (p *pipe) read(b []byte) (n int, err error) {
func (p *pipe) Write(b []byte) (int, error) {
n, err := p.write(b)
if err != nil && err != io.ErrClosedPipe {
err = &net.OpError{Op: "write", Net: "pipe", Err: err}
err = &net.OpError{Op: "write", Net: "udp", Err: err}
}
return n, err
}

@ -1,140 +0,0 @@
package udplisterner
import (
"io"
"net"
"net/netip"
"sync"
"sirherobrine23.org/Minecraft-Server/go-pproxit/internal/pipe"
)
type clientInfo struct {
WriteSize int // Size to Write
Conn net.Conn // Client Pipe
}
type UdpListerner struct {
readSize int // UDPConn size to reader
udpConn *net.UDPConn // UDPConn root
clientInfo *sync.Map // Storage *clientInfo
newClient chan any // Accept connection channel or error
}
func ListenUDPAddr(network string, address *net.UDPAddr) (UdpListen *UdpListerner, err error) {
UdpListen = new(UdpListerner)
if UdpListen.udpConn, err = net.ListenUDP(network, address); err != nil {
return nil, err
}
UdpListen.readSize = 1024 // Initial buffer reader
UdpListen.newClient = make(chan any)
UdpListen.clientInfo = new(sync.Map)
go UdpListen.backgroud() // Recive new requests
return UdpListen, nil
}
func ListenAddrPort(network string, address netip.AddrPort) (*UdpListerner, error) {
return ListenUDPAddr(network, net.UDPAddrFromAddrPort(address))
}
func Listen(network, address string) (*UdpListerner, error) {
local, err := net.ResolveUDPAddr(network, address)
if err != nil {
return nil, err
}
return ListenUDPAddr(network, local)
}
// Close clients and close client channel
func (udp *UdpListerner) Close() error {
close(udp.newClient) // Close channel to new accepts
var toDelete map[string]*clientInfo
udp.clientInfo.Range(func(key, value any) bool {
toDelete[key.(string)] = value.(*clientInfo)
return true
})
for key, info := range toDelete {
info.Conn.Close()
udp.clientInfo.Delete(key)
}
return nil
}
func (udp *UdpListerner) CloseClient(clientAddrPort string) {
client, ok := udp.clientInfo.LoadAndDelete(clientAddrPort)
if !ok {
return
}
agent := client.(clientInfo)
agent.Conn.Close()
}
func (udp UdpListerner) Addr() net.Addr {
if udp.udpConn == nil {
return &net.UDPAddr{}
}
return udp.udpConn.LocalAddr()
}
func (udp UdpListerner) Accept() (net.Conn, error) {
if conn, ok := <-udp.newClient; ok {
if err, isErr := conn.(error); isErr {
return nil, err
}
return conn.(*clientInfo).Conn, nil
}
return nil, net.ErrClosed
}
func (udp *UdpListerner) backgroud() {
for {
readBuffer := make([]byte, udp.readSize) // Make reader size
n, from, err := udp.udpConn.ReadFromUDP(readBuffer)
if err != nil {
if err == io.EOF || err == io.ErrUnexpectedEOF {
udp.Close() // Close clients
break
}
continue
} else if n-udp.readSize == 0 {
udp.readSize += 500 // Add 500 Bytes to reader
}
// Check if exists current connection
if client, ok := udp.clientInfo.Load(from.String()); ok {
toListener := client.(*clientInfo)
toListener.Conn.Write(readBuffer[:n]) // n size from Buffer to client
continue // Contine loop
}
go func() {
// Create new client
newClient := new(clientInfo)
newClient.WriteSize = n // Same Size from reader buffer
var agentPipe net.Conn
newClient.Conn, agentPipe = pipe.CreatePipe(udp.udpConn.LocalAddr(), from)
udp.newClient <- newClient // Send to accept
udp.clientInfo.Store(from.String(), newClient)
go agentPipe.Write(readBuffer[:n]) // n size from Buffer to client
go func() {
for {
client, ok := udp.clientInfo.Load(from.String())
if !ok {
udp.clientInfo.Delete(from.String())
agentPipe.Close()
break // bye-bye
}
newClient := client.(*clientInfo)
writeBuffer := make([]byte, newClient.WriteSize)
n, err := agentPipe.Read(writeBuffer)
if err != nil {
udp.clientInfo.Delete(from.String())
agentPipe.Close()
break
}
go udp.udpConn.WriteToUDP(writeBuffer[:n], from)
}
}()
}()
}
}

136
internal/udplisterner/v2.go Normal file

@ -0,0 +1,136 @@
package udplisterner
import (
"bufio"
"bytes"
"io"
"log"
"net"
"net/netip"
"sync"
"sirherobrine23.org/Minecraft-Server/go-pproxit/internal/pipe"
)
type writeRoot struct {
conn *net.UDPConn
to *net.UDPAddr
}
func (wr *writeRoot) Write(w []byte) (int, error) {
return wr.conn.WriteToUDP(w, wr.to)
}
type client struct {
fromAgent, toClient net.Conn
bufferCache *bytes.Buffer
bufioCache *bufio.Reader
}
type UDPServer struct {
rootUdp *net.UDPConn // Root connection to read
peers map[string]*client // peers connected
newPeer chan net.Conn
peerError chan error
closed bool
rw sync.RWMutex
}
// Local address
func (udpListen *UDPServer) Addr() net.Addr {
return udpListen.rootUdp.LocalAddr()
}
// Close peers and root connection
func (udpListen *UDPServer) Close() error {
if udpListen.closed {
return io.ErrClosedPipe
}
udpListen.closed = true
for peerIndex := range udpListen.peers {
log.Printf("closing %s", peerIndex)
udpListen.peers[peerIndex].fromAgent.Close() // Close
log.Printf("clearing %s", peerIndex)
udpListen.peers[peerIndex].bufferCache.Reset()
}
log.Printf("closing udp root")
return udpListen.rootUdp.Close()
}
// Accept new client
func (udpListen *UDPServer) Accept() (peer net.Conn, err error) {
select {
case peer = <-udpListen.newPeer:
return
case err = <-udpListen.peerError:
return
}
}
func (udpListen *UDPServer) handler() {
for {
buff := make([]byte, 1480)
n, from, err := udpListen.rootUdp.ReadFromUDP(buff)
if err != nil {
return
}
udpListen.rw.Lock()
if _, exist := udpListen.peers[from.String()]; !exist {
c := new(client)
c.bufferCache = new(bytes.Buffer)
c.bufioCache = bufio.NewReader(c.bufferCache)
c.fromAgent, c.toClient = pipe.CreatePipe(from, from)
udpListen.peers[from.String()] = c
go func(){
for _, exist := udpListen.peers[from.String()]; exist; {
io.Copy(c.fromAgent, c.bufioCache)
}
}()
go func() {
udpListen.newPeer <- c.toClient
io.Copy(&writeRoot{udpListen.rootUdp, from}, c.fromAgent)
udpListen.rw.Lock()
delete(udpListen.peers, from.String())
udpListen.rw.Unlock()
}()
}
udpListen.rw.Unlock()
udpListen.rw.RLock()
udpListen.peers[from.String()].bufferCache.Write(buff[:n])
udpListen.rw.RUnlock()
}
}
func listenRoot(network string, laddr *net.UDPAddr) (net.Listener, error) {
conn, err := net.ListenUDP(network, laddr)
if err != nil {
return nil, err
}
var root = &UDPServer{
rootUdp: conn,
peers: make(map[string]*client),
newPeer: make(chan net.Conn),
peerError: make(chan error),
}
go root.handler()
return root, nil
}
func ListenAddrPort(Network string, address netip.AddrPort) (net.Listener, error) {
return listenRoot(Network, net.UDPAddrFromAddrPort(address))
}
func Listen(Network, address string) (net.Listener, error) {
ip, err := net.ResolveUDPAddr(Network, address)
if err != nil {
return nil, err
}
return listenRoot(Network, ip)
}

@ -1,65 +0,0 @@
package udplisterner
import (
"bytes"
"io"
"net"
"net/netip"
"time"
)
type PipeConn struct {
root *net.UDPConn
to, localAddr netip.AddrPort
closed bool
closedChan chan struct{}
buff *bytes.Buffer
}
func NewConn(root *net.UDPConn, LocalAddr, to netip.AddrPort) *PipeConn {
return &PipeConn{
root: root,
to: to,
localAddr: LocalAddr,
closedChan: make(chan struct{}),
closed: false,
buff: bytes.NewBuffer(make([]byte, 0)),
}
}
// Send close to channel
func (conn *PipeConn) Close() error {
conn.closedChan <- struct{}{}
conn.closed = true // end channel
return nil
}
func (conn PipeConn) LocalAddr() net.Addr {
return net.UDPAddrFromAddrPort(conn.localAddr)
}
func (conn PipeConn) RemoteAddr() net.Addr {
return net.UDPAddrFromAddrPort(conn.to)
}
// Write direct to root UDP Connection
func (conn PipeConn) Write(w []byte) (int, error) {
return conn.root.WriteToUDPAddrPort(w, conn.to)
}
func (PipeConn) SetDeadline(time.Time) error { return nil }
func (PipeConn) SetWriteDeadline(time.Time) error { return nil }
func (PipeConn) SetReadDeadline(time.Time) error { return nil }
func (conn PipeConn) Read(r []byte) (int, error) {
if conn.closed {
return 0, io.EOF
}
count := 50
for !conn.closed && conn.buff.Len() < len(r) {
if count--; count == 0 {
return 0, io.EOF
}
<-time.After(time.Second * 5)
}
return conn.buff.Read(r)
}

@ -1,113 +0,0 @@
/*
Lidar com pacotes de varios tamanhos
*/
package udplisterner
import (
"log"
"net"
"net/netip"
"os"
"sync"
)
type Udplisterner struct {
loger *log.Logger
conn *net.UDPConn // root listen connection
newClientErr chan error
newClient chan net.Conn
currentClients map[string]*PipeConn
locker sync.RWMutex
}
func Listen(network string, laddr netip.AddrPort) (net.Listener, error) {
return Listener(network, net.UDPAddrFromAddrPort(laddr))
}
func Listener(network string, laddr *net.UDPAddr) (net.Listener, error) {
conn, err := net.ListenUDP(network, laddr)
if err != nil {
return nil, err
}
listen := &Udplisterner{
loger: log.New(os.Stderr, conn.LocalAddr().String()+" ", log.Ltime),
conn: conn,
newClientErr: make(chan error),
newClient: make(chan net.Conn),
currentClients: make(map[string]*PipeConn),
}
go listen.handle()
return listen, nil
}
func (list *Udplisterner) handle() {
defer list.Close()
var readBuff int = 1024 * 8
for {
buff := make([]byte, readBuff)
list.loger.Printf("Reading from %d bytes\n", readBuff)
n, from, err := list.conn.ReadFromUDPAddrPort(buff)
if err != nil {
if opt, isOpt := err.(*net.OpError); isOpt {
if opt.Temporary() {
continue
}
}
list.loger.Printf("closing readers, err: %s", err.Error())
list.newClientErr <- err
break
} else if n == readBuff {
readBuff += 1024 // Grow buffer size
list.loger.Printf("Growing from %d to %d\n", readBuff-1024, readBuff)
}
list.locker.RLock() // Read locker
client, ok := list.currentClients[from.String()]
list.locker.RUnlock() // Unlocker
if !ok {
list.loger.Printf("New client %s with %d bytes to storage, waiting locker\n", from.String(), n)
list.loger.Println(buff[:n])
list.locker.Lock() // Locker map
list.loger.Printf("locked")
list.currentClients[from.String()] = NewConn(list.conn, from, from)
client = list.currentClients[from.String()]
list.locker.Unlock() // Unlocker map
list.loger.Printf("unlocked")
go func() {
list.newClient <- client
<-client.closedChan
list.locker.Lock()
delete(list.currentClients, from.String())
list.locker.Unlock()
list.loger.Printf("Closing client %s\n", from.String())
}()
}
list.loger.Printf("Caching %d bytes to %s\n", n, from.String())
client.buff.Write(buff[:n])
}
}
func (c *Udplisterner) Addr() net.Addr { return c.conn.LocalAddr() }
func (c *Udplisterner) Close() error {
c.locker.Lock()
defer c.locker.Unlock()
for key := range c.currentClients {
delete(c.currentClients, key)
}
c.conn.Close()
return nil
}
func (c *Udplisterner) Accept() (net.Conn, error) {
select {
case client := <-c.newClient:
return client, nil
case err := <-c.newClientErr:
return nil, err
}
}

@ -3,7 +3,6 @@ package proto
import (
"bytes"
"io"
"log"
"net/netip"
"time"
@ -110,7 +109,6 @@ func ReaderResponse(r io.Reader) (*Response, error) {
func WriteResponse(w io.Writer, res Response) error {
buff, err := res.Wbytes()
defer log.Println(buff)
if err != nil {
return err
} else if _, err := w.Write(buff); err != nil {

@ -6,7 +6,7 @@ import (
"net"
"net/netip"
"sirherobrine23.org/Minecraft-Server/go-pproxit/internal/udplisterner/v2"
"sirherobrine23.org/Minecraft-Server/go-pproxit/internal/udplisterner"
"sirherobrine23.org/Minecraft-Server/go-pproxit/proto"
)
@ -27,7 +27,7 @@ type Server struct {
}
func NewController(calls ServerCall, local netip.AddrPort) (*Server, error) {
conn, err := udplisterner.Listen("udp", local)
conn, err := udplisterner.ListenAddrPort("udp", local)
if err != nil {
return nil, err
}
@ -54,14 +54,12 @@ func (controller *Server) handler() {
func (controller *Server) handlerConn(conn net.Conn) {
defer conn.Close() // End agent accepted
fmt.Printf("Parsing connection from %s\n", conn.RemoteAddr().String())
var req *proto.Request
var tunnelInfo TunnelInfo
var err error
for {
if req, err = proto.ReaderRequest(conn); err != nil {
fmt.Println(err)
panic(err)
return
}

@ -1,20 +1,18 @@
package server
import (
"encoding/json"
"fmt"
"io"
"log"
"net"
"net/netip"
"time"
"sirherobrine23.org/Minecraft-Server/go-pproxit/internal/udplisterner/v2"
"sirherobrine23.org/Minecraft-Server/go-pproxit/internal/udplisterner"
"sirherobrine23.org/Minecraft-Server/go-pproxit/proto"
)
type TunnelCall interface {
BlockedAddr(AddrPort string) bool // Ignore request from this address
BlockedAddr(AddrPort string) bool // Ignore request from this address
AgentPing(agent, server time.Time) // Register ping to Agent
AgentShutdown(onTime time.Time) // Agend end connection
RegisterRX(client netip.AddrPort, Size int, Proto uint8) // Register Recived data from client
@ -60,8 +58,6 @@ func (tun *Tunnel) Close() error {
}
func (tun *Tunnel) send(res proto.Response) error {
d, _ := json.Marshal(res)
fmt.Printf("\nSending: %s\n", string(d))
return proto.WriteResponse(tun.RootConn, res)
}
@ -127,9 +123,6 @@ func (tun *Tunnel) Setup() {
return
}
d, _ := json.Marshal(req)
fmt.Printf("\nRequest: %s\n", string(d))
if req.AgentAuth != nil {
go tun.send(proto.Response{
AgentInfo: &proto.AgentInfo{
@ -195,7 +188,7 @@ func (tun *Tunnel) TCP() (err error) {
// Listen UDP
func (tun *Tunnel) UDP() (err error) {
if tun.connUDP, err = udplisterner.Listen("udp", netip.AddrPortFrom(netip.IPv4Unspecified(), tun.TunInfo.UDPPort)); err != nil {
if tun.connUDP, err = udplisterner.ListenAddrPort("udp", netip.AddrPortFrom(netip.IPv4Unspecified(), tun.TunInfo.UDPPort)); err != nil {
return
}
go func() {