From ad221202f2ae57e83d49ee08b6423069408057a7 Mon Sep 17 00:00:00 2001 From: ppeb Date: Sun, 27 Apr 2025 15:29:09 -0500 Subject: [PATCH 1/5] Cache config, lock behind mutex --- common/config.go | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/common/config.go b/common/config.go index 47c925c..7cb0208 100644 --- a/common/config.go +++ b/common/config.go @@ -3,6 +3,8 @@ package common import ( "encoding/xml" "os" + + "github.com/sasha-s/go-deadlock" ) type Config struct { @@ -48,10 +50,16 @@ type Config struct { ServerName string `xml:"serverName,omitempty"` } -var config Config -var configLoaded bool +var ( + config Config + configLoaded bool + cmutex = deadlock.Mutex{} +) func GetConfig() Config { + cmutex.Lock() + defer cmutex.Unlock() + if configLoaded { return config } @@ -128,5 +136,7 @@ func GetConfig() Config { config.AllowMultipleDeviceIDs = "never" } + configLoaded = true + return config } From c67efb33a03870dd9964337fd89db86c2225dc54 Mon Sep 17 00:00:00 2001 From: ppeb Date: Sun, 27 Apr 2025 16:16:30 -0500 Subject: [PATCH 2/5] Publicize nhttp.AtomicBool for use in qr2 --- nhttp/buffer.go | 28 ++++++++++++++-------------- nhttp/request.go | 4 ++-- nhttp/response.go | 10 +++++----- nhttp/server.go | 2 +- qr2/main.go | 9 +++++---- 5 files changed, 27 insertions(+), 26 deletions(-) diff --git a/nhttp/buffer.go b/nhttp/buffer.go index 365438b..007039a 100644 --- a/nhttp/buffer.go +++ b/nhttp/buffer.go @@ -28,11 +28,11 @@ func (noBody) Read([]byte) (int, error) { return 0, io.EOF } func (noBody) Close() error { return nil } func (noBody) WriteTo(io.Writer) (int64, error) { return 0, nil } -type atomicBool int32 +type AtomicBool int32 -func (b *atomicBool) isSet() bool { return atomic.LoadInt32((*int32)(b)) != 0 } -func (b *atomicBool) setTrue() { atomic.StoreInt32((*int32)(b), 1) } -func (b *atomicBool) setFalse() { atomic.StoreInt32((*int32)(b), 0) } +func (b *AtomicBool) IsSet() bool { return atomic.LoadInt32((*int32)(b)) != 0 } +func (b *AtomicBool) SetTrue() { atomic.StoreInt32((*int32)(b), 1) } +func (b *AtomicBool) SetFalse() { atomic.StoreInt32((*int32)(b), 0) } var textprotoReaderPool sync.Pool @@ -180,34 +180,34 @@ func (cw *chunkWriter) close() { type expectContinueReader struct { resp *response readCloser io.ReadCloser - closed atomicBool - sawEOF atomicBool + closed AtomicBool + sawEOF AtomicBool } func (ecr *expectContinueReader) Read(p []byte) (n int, err error) { - if ecr.closed.isSet() { + if ecr.closed.IsSet() { return 0, errors.New("nhttp: invalid Read on closed Body") } w := ecr.resp - if !w.wroteContinue && w.canWriteContinue.isSet() { + if !w.wroteContinue && w.canWriteContinue.IsSet() { w.wroteContinue = true w.writeContinueMu.Lock() - if w.canWriteContinue.isSet() { + if w.canWriteContinue.IsSet() { w.conn.bufw.WriteString("HTTP/1.1 100 Continue\r\n\r\n") w.conn.bufw.Flush() - w.canWriteContinue.setFalse() + w.canWriteContinue.SetFalse() } w.writeContinueMu.Unlock() } n, err = ecr.readCloser.Read(p) if err == io.EOF { - ecr.sawEOF.setTrue() + ecr.sawEOF.SetTrue() } return } func (ecr *expectContinueReader) Close() error { - ecr.closed.setTrue() + ecr.closed.SetTrue() return ecr.readCloser.Close() } @@ -288,7 +288,7 @@ func (cw *chunkWriter) writeHeader(p []byte) { // send a Content-Length header. // Further, we don't send an automatic Content-Length if they // set a Transfer-Encoding, because they're generally incompatible. - if w.handlerDone.isSet() && !trailers && !hasTE && bodyAllowedForStatus(w.status) && header.Get("Content-Length") == "" && (!isHEAD || len(p) > 0) { + if w.handlerDone.IsSet() && !trailers && !hasTE && bodyAllowedForStatus(w.status) && header.Get("Content-Length") == "" && (!isHEAD || len(p) > 0) { w.contentLength = int64(len(p)) setHeader.contentLength = strconv.AppendInt(cw.res.clenBuf[:0], int64(len(p)), 10) } @@ -330,7 +330,7 @@ func (cw *chunkWriter) writeHeader(p []byte) { // because we don't know if the next bytes on the wire will be // the body-following-the-timer or the subsequent request. // See Issue 11549. - if ecr, ok := w.req.Body.(*expectContinueReader); ok && !ecr.sawEOF.isSet() { + if ecr, ok := w.req.Body.(*expectContinueReader); ok && !ecr.sawEOF.IsSet() { w.closeAfterReply = true } diff --git a/nhttp/request.go b/nhttp/request.go index 68dcfa3..48b355f 100644 --- a/nhttp/request.go +++ b/nhttp/request.go @@ -218,7 +218,7 @@ type response struct { // These two fields together synchronize the body reader // (the expectContinueReader, which wants to write 100 Continue) // against the main writer. - canWriteContinue atomicBool + canWriteContinue AtomicBool writeContinueMu sync.Mutex w *bufio.Writer // buffers output in chunks to chunkWriter @@ -256,7 +256,7 @@ type response struct { // written. trailers []string - handlerDone atomicBool // set true when the handler exits + handlerDone AtomicBool // set true when the handler exits // Buffers for Date, Content-Length, and status code dateBuf [len(TimeFormat)]byte diff --git a/nhttp/response.go b/nhttp/response.go index 80afb08..c3f84bc 100644 --- a/nhttp/response.go +++ b/nhttp/response.go @@ -63,7 +63,7 @@ func (w *response) sendExpectationFailed() { } func (w *response) finishRequest() { - w.handlerDone.setTrue() + w.handlerDone.SetTrue() if !w.wroteHeader { w.WriteHeader(_http.StatusOK) @@ -107,9 +107,9 @@ func (w *response) WriteHeader(code int) { // Handle informational headers if code >= 100 && code <= 199 { // Prevent a potential race with an automatically-sent 100 Continue triggered by Request.Body.Read() - if code == 100 && w.canWriteContinue.isSet() { + if code == 100 && w.canWriteContinue.IsSet() { w.writeContinueMu.Lock() - w.canWriteContinue.setFalse() + w.canWriteContinue.SetFalse() w.writeContinueMu.Unlock() } @@ -207,13 +207,13 @@ func (w *response) WriteString(data string) (n int, err error) { // either dataB or dataS is non-zero. func (w *response) write(lenData int, dataB []byte, dataS string) (n int, err error) { - if w.canWriteContinue.isSet() { + if w.canWriteContinue.IsSet() { // Body reader wants to write 100 Continue but hasn't yet. // Tell it not to. The store must be done while holding the lock // because the lock makes sure that there is not an active write // this very moment. w.writeContinueMu.Lock() - w.canWriteContinue.setFalse() + w.canWriteContinue.SetFalse() w.writeContinueMu.Unlock() } diff --git a/nhttp/server.go b/nhttp/server.go index 5eb0ef5..3bbeb8a 100644 --- a/nhttp/server.go +++ b/nhttp/server.go @@ -480,7 +480,7 @@ func (c *conn) serve(ctx context.Context) { if req.ProtoAtLeast(1, 1) && req.ContentLength != 0 { // Wrap the Body reader with one that replies on the connection req.Body = &expectContinueReader{readCloser: req.Body, resp: w} - w.canWriteContinue.setTrue() + w.canWriteContinue.SetTrue() } } else if req.Header.Get("Expect") != "" { w.sendExpectationFailed() diff --git a/qr2/main.go b/qr2/main.go index 050a89a..0ddd8be 100644 --- a/qr2/main.go +++ b/qr2/main.go @@ -7,6 +7,7 @@ import ( "time" "wwfc/common" "wwfc/logging" + "wwfc/nhttp" "github.com/logrusorgru/aurora/v3" ) @@ -29,7 +30,7 @@ const ( var ( masterConn net.PacketConn - inShutdown = false + inShutdown nhttp.AtomicBool waitGroup = sync.WaitGroup{} ) @@ -44,7 +45,7 @@ func StartServer(reload bool) { } masterConn = conn - inShutdown = false + inShutdown.SetFalse() if reload { err := loadSessions() @@ -79,7 +80,7 @@ func StartServer(reload bool) { logging.Notice("QR2", "Listening on", aurora.BrightCyan(address)) for { - if inShutdown { + if inShutdown.IsSet() { return } @@ -97,7 +98,7 @@ func StartServer(reload bool) { } func Shutdown() { - inShutdown = true + inShutdown.SetTrue() masterConn.Close() waitGroup.Wait() From 54ff9e8a52e181751f9b036f718c405d95099a22 Mon Sep 17 00:00:00 2001 From: ppeb Date: Sun, 27 Apr 2025 16:28:20 -0500 Subject: [PATCH 3/5] Lock mutex for backend rpcClient before checking nil during frontend shutdown --- main.go | 22 ++++++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/main.go b/main.go index 98fec0a..9c94489 100644 --- a/main.go +++ b/main.go @@ -21,6 +21,7 @@ import ( "wwfc/logging" "wwfc/nas" "wwfc/natneg" + "wwfc/nhttp" "wwfc/qr2" "wwfc/race" "wwfc/sake" @@ -262,7 +263,8 @@ var ( rpcClient *rpc.Client // This mutex could be locked for a very long time, don't use deadlock detection - rpcMutex sync.Mutex + rpcMutex sync.Mutex + rpcWaiting nhttp.AtomicBool rpcBusyCount sync.WaitGroup backendReady = make(chan struct{}) @@ -275,6 +277,8 @@ var ( // frontendMain starts the backend process and communicates with it using RPC func frontendMain(noSignal, noBackend bool) { + rpcWaiting.SetFalse() + integrated = !noBackend sigExit := make(chan os.Signal, 1) @@ -319,10 +323,22 @@ func frontendMain(noSignal, noBackend bool) { select {} } - if rpcClient == nil { + // If we're waiting for the backend to connect, then don't try to lock the + // mutex because it's never going to unlock + if rpcWaiting.IsSet() { + logging.Notice("FRONTEND", "Backend rpcClient is not connected") return } + rpcMutex.Lock() + if rpcClient == nil { + logging.Notice("FRONTEND", "Backend rpcClient is not connected") + rpcMutex.Unlock() + return + } + rpcMutex.Unlock() + + logging.Notice("FRONTEND", "Sending RPCPacket.Shutdown") rpcClient.Call("RPCPacket.Shutdown", "", nil) rpcClient.Close() } @@ -387,6 +403,7 @@ func startBackendProcess(reload bool, wait bool) { // waitForBackend waits for the backend to start. // Expects the RPC mutex to be locked. func waitForBackend() { + rpcWaiting.SetTrue() <-backendReady backendReady = make(chan struct{}) @@ -396,6 +413,7 @@ func waitForBackend() { rpcClient = client rpcMutex.Unlock() + rpcWaiting.SetFalse() logging.Notice("FRONTEND", "Connected to backend") return From cd530ca913b138389c3a929834c9e2e8469bb58c Mon Sep 17 00:00:00 2001 From: ppeb Date: Sun, 27 Apr 2025 16:30:56 -0500 Subject: [PATCH 4/5] Change natneg inShutdown to nhttp.AtomicBool --- natneg/main.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/natneg/main.go b/natneg/main.go index b5cf22a..1aac320 100644 --- a/natneg/main.go +++ b/natneg/main.go @@ -11,6 +11,7 @@ import ( "time" "wwfc/common" "wwfc/logging" + "wwfc/nhttp" "github.com/logrusorgru/aurora/v3" ) @@ -82,7 +83,7 @@ var ( mutex = sync.RWMutex{} natnegConn net.PacketConn - inShutdown = false + inShutdown nhttp.AtomicBool waitGroup = sync.WaitGroup{} ) @@ -97,7 +98,7 @@ func StartServer(reload bool) { } natnegConn = conn - inShutdown = false + inShutdown.SetFalse() if reload { // Load state @@ -135,7 +136,7 @@ func StartServer(reload bool) { logging.Notice("NATNEG", "Listening on", aurora.BrightCyan(address)) for { - if inShutdown { + if inShutdown.IsSet() { return } @@ -153,7 +154,7 @@ func StartServer(reload bool) { } func Shutdown() { - inShutdown = true + inShutdown.SetTrue() natnegConn.Close() waitGroup.Wait() @@ -295,7 +296,7 @@ func handleConnection(conn net.PacketConn, addr net.Addr, buffer []byte) { func closeSession(moduleName string, session *NATNEGSession) { mutex.Lock() - if inShutdown { + if inShutdown.IsSet() { mutex.Unlock() return } From 7078a37f363469a26d18bbaa9239abbae7478234 Mon Sep 17 00:00:00 2001 From: ppeb Date: Sun, 27 Apr 2025 16:33:09 -0500 Subject: [PATCH 5/5] Create state folder on backend launch --- main.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/main.go b/main.go index 9c94489..a93f506 100644 --- a/main.go +++ b/main.go @@ -72,6 +72,12 @@ type RPCPacket struct { // backendMain starts all the servers and creates an RPC server to communicate with the frontend func backendMain(noSignal, noReload bool) { + err := os.Mkdir("state", 0755) + if err != nil && !os.IsExist(err) { + logging.Error("BACKEN", err) + os.Exit(1) + } + sigExit := make(chan os.Signal, 1) signal.Notify(sigExit, syscall.SIGINT, syscall.SIGTERM)