mirror of
https://github.com/WiiLink24/wfc-server.git
synced 2026-03-21 17:44:58 -05:00
Merge pull request #84 from Retro-Rewind-Team/threading
Minor threading fixes and improvements
This commit is contained in:
commit
bca085f2bd
|
|
@ -3,6 +3,8 @@ package common
|
|||
import (
|
||||
"encoding/xml"
|
||||
"os"
|
||||
|
||||
"github.com/sasha-s/go-deadlock"
|
||||
)
|
||||
|
||||
type Config struct {
|
||||
|
|
@ -48,10 +50,16 @@ type Config struct {
|
|||
ServerName string `xml:"serverName,omitempty"`
|
||||
}
|
||||
|
||||
var config Config
|
||||
var configLoaded bool
|
||||
var (
|
||||
config Config
|
||||
configLoaded bool
|
||||
cmutex = deadlock.Mutex{}
|
||||
)
|
||||
|
||||
func GetConfig() Config {
|
||||
cmutex.Lock()
|
||||
defer cmutex.Unlock()
|
||||
|
||||
if configLoaded {
|
||||
return config
|
||||
}
|
||||
|
|
@ -128,5 +136,7 @@ func GetConfig() Config {
|
|||
config.AllowMultipleDeviceIDs = "never"
|
||||
}
|
||||
|
||||
configLoaded = true
|
||||
|
||||
return config
|
||||
}
|
||||
|
|
|
|||
28
main.go
28
main.go
|
|
@ -21,6 +21,7 @@ import (
|
|||
"wwfc/logging"
|
||||
"wwfc/nas"
|
||||
"wwfc/natneg"
|
||||
"wwfc/nhttp"
|
||||
"wwfc/qr2"
|
||||
"wwfc/race"
|
||||
"wwfc/sake"
|
||||
|
|
@ -71,6 +72,12 @@ type RPCPacket struct {
|
|||
|
||||
// backendMain starts all the servers and creates an RPC server to communicate with the frontend
|
||||
func backendMain(noSignal, noReload bool) {
|
||||
err := os.Mkdir("state", 0755)
|
||||
if err != nil && !os.IsExist(err) {
|
||||
logging.Error("BACKEN", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
sigExit := make(chan os.Signal, 1)
|
||||
signal.Notify(sigExit, syscall.SIGINT, syscall.SIGTERM)
|
||||
|
||||
|
|
@ -262,7 +269,8 @@ var (
|
|||
rpcClient *rpc.Client
|
||||
|
||||
// This mutex could be locked for a very long time, don't use deadlock detection
|
||||
rpcMutex sync.Mutex
|
||||
rpcMutex sync.Mutex
|
||||
rpcWaiting nhttp.AtomicBool
|
||||
|
||||
rpcBusyCount sync.WaitGroup
|
||||
backendReady = make(chan struct{})
|
||||
|
|
@ -275,6 +283,8 @@ var (
|
|||
|
||||
// frontendMain starts the backend process and communicates with it using RPC
|
||||
func frontendMain(noSignal, noBackend bool) {
|
||||
rpcWaiting.SetFalse()
|
||||
|
||||
integrated = !noBackend
|
||||
|
||||
sigExit := make(chan os.Signal, 1)
|
||||
|
|
@ -319,10 +329,22 @@ func frontendMain(noSignal, noBackend bool) {
|
|||
select {}
|
||||
}
|
||||
|
||||
if rpcClient == nil {
|
||||
// If we're waiting for the backend to connect, then don't try to lock the
|
||||
// mutex because it's never going to unlock
|
||||
if rpcWaiting.IsSet() {
|
||||
logging.Notice("FRONTEND", "Backend rpcClient is not connected")
|
||||
return
|
||||
}
|
||||
|
||||
rpcMutex.Lock()
|
||||
if rpcClient == nil {
|
||||
logging.Notice("FRONTEND", "Backend rpcClient is not connected")
|
||||
rpcMutex.Unlock()
|
||||
return
|
||||
}
|
||||
rpcMutex.Unlock()
|
||||
|
||||
logging.Notice("FRONTEND", "Sending RPCPacket.Shutdown")
|
||||
rpcClient.Call("RPCPacket.Shutdown", "", nil)
|
||||
rpcClient.Close()
|
||||
}
|
||||
|
|
@ -387,6 +409,7 @@ func startBackendProcess(reload bool, wait bool) {
|
|||
// waitForBackend waits for the backend to start.
|
||||
// Expects the RPC mutex to be locked.
|
||||
func waitForBackend() {
|
||||
rpcWaiting.SetTrue()
|
||||
<-backendReady
|
||||
backendReady = make(chan struct{})
|
||||
|
||||
|
|
@ -396,6 +419,7 @@ func waitForBackend() {
|
|||
rpcClient = client
|
||||
rpcMutex.Unlock()
|
||||
|
||||
rpcWaiting.SetFalse()
|
||||
logging.Notice("FRONTEND", "Connected to backend")
|
||||
|
||||
return
|
||||
|
|
|
|||
|
|
@ -11,6 +11,7 @@ import (
|
|||
"time"
|
||||
"wwfc/common"
|
||||
"wwfc/logging"
|
||||
"wwfc/nhttp"
|
||||
|
||||
"github.com/logrusorgru/aurora/v3"
|
||||
)
|
||||
|
|
@ -82,7 +83,7 @@ var (
|
|||
mutex = sync.RWMutex{}
|
||||
natnegConn net.PacketConn
|
||||
|
||||
inShutdown = false
|
||||
inShutdown nhttp.AtomicBool
|
||||
waitGroup = sync.WaitGroup{}
|
||||
)
|
||||
|
||||
|
|
@ -97,7 +98,7 @@ func StartServer(reload bool) {
|
|||
}
|
||||
|
||||
natnegConn = conn
|
||||
inShutdown = false
|
||||
inShutdown.SetFalse()
|
||||
|
||||
if reload {
|
||||
// Load state
|
||||
|
|
@ -135,7 +136,7 @@ func StartServer(reload bool) {
|
|||
logging.Notice("NATNEG", "Listening on", aurora.BrightCyan(address))
|
||||
|
||||
for {
|
||||
if inShutdown {
|
||||
if inShutdown.IsSet() {
|
||||
return
|
||||
}
|
||||
|
||||
|
|
@ -153,7 +154,7 @@ func StartServer(reload bool) {
|
|||
}
|
||||
|
||||
func Shutdown() {
|
||||
inShutdown = true
|
||||
inShutdown.SetTrue()
|
||||
natnegConn.Close()
|
||||
waitGroup.Wait()
|
||||
|
||||
|
|
@ -295,7 +296,7 @@ func handleConnection(conn net.PacketConn, addr net.Addr, buffer []byte) {
|
|||
|
||||
func closeSession(moduleName string, session *NATNEGSession) {
|
||||
mutex.Lock()
|
||||
if inShutdown {
|
||||
if inShutdown.IsSet() {
|
||||
mutex.Unlock()
|
||||
return
|
||||
}
|
||||
|
|
|
|||
|
|
@ -28,11 +28,11 @@ func (noBody) Read([]byte) (int, error) { return 0, io.EOF }
|
|||
func (noBody) Close() error { return nil }
|
||||
func (noBody) WriteTo(io.Writer) (int64, error) { return 0, nil }
|
||||
|
||||
type atomicBool int32
|
||||
type AtomicBool int32
|
||||
|
||||
func (b *atomicBool) isSet() bool { return atomic.LoadInt32((*int32)(b)) != 0 }
|
||||
func (b *atomicBool) setTrue() { atomic.StoreInt32((*int32)(b), 1) }
|
||||
func (b *atomicBool) setFalse() { atomic.StoreInt32((*int32)(b), 0) }
|
||||
func (b *AtomicBool) IsSet() bool { return atomic.LoadInt32((*int32)(b)) != 0 }
|
||||
func (b *AtomicBool) SetTrue() { atomic.StoreInt32((*int32)(b), 1) }
|
||||
func (b *AtomicBool) SetFalse() { atomic.StoreInt32((*int32)(b), 0) }
|
||||
|
||||
var textprotoReaderPool sync.Pool
|
||||
|
||||
|
|
@ -180,34 +180,34 @@ func (cw *chunkWriter) close() {
|
|||
type expectContinueReader struct {
|
||||
resp *response
|
||||
readCloser io.ReadCloser
|
||||
closed atomicBool
|
||||
sawEOF atomicBool
|
||||
closed AtomicBool
|
||||
sawEOF AtomicBool
|
||||
}
|
||||
|
||||
func (ecr *expectContinueReader) Read(p []byte) (n int, err error) {
|
||||
if ecr.closed.isSet() {
|
||||
if ecr.closed.IsSet() {
|
||||
return 0, errors.New("nhttp: invalid Read on closed Body")
|
||||
}
|
||||
w := ecr.resp
|
||||
if !w.wroteContinue && w.canWriteContinue.isSet() {
|
||||
if !w.wroteContinue && w.canWriteContinue.IsSet() {
|
||||
w.wroteContinue = true
|
||||
w.writeContinueMu.Lock()
|
||||
if w.canWriteContinue.isSet() {
|
||||
if w.canWriteContinue.IsSet() {
|
||||
w.conn.bufw.WriteString("HTTP/1.1 100 Continue\r\n\r\n")
|
||||
w.conn.bufw.Flush()
|
||||
w.canWriteContinue.setFalse()
|
||||
w.canWriteContinue.SetFalse()
|
||||
}
|
||||
w.writeContinueMu.Unlock()
|
||||
}
|
||||
n, err = ecr.readCloser.Read(p)
|
||||
if err == io.EOF {
|
||||
ecr.sawEOF.setTrue()
|
||||
ecr.sawEOF.SetTrue()
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (ecr *expectContinueReader) Close() error {
|
||||
ecr.closed.setTrue()
|
||||
ecr.closed.SetTrue()
|
||||
return ecr.readCloser.Close()
|
||||
}
|
||||
|
||||
|
|
@ -288,7 +288,7 @@ func (cw *chunkWriter) writeHeader(p []byte) {
|
|||
// send a Content-Length header.
|
||||
// Further, we don't send an automatic Content-Length if they
|
||||
// set a Transfer-Encoding, because they're generally incompatible.
|
||||
if w.handlerDone.isSet() && !trailers && !hasTE && bodyAllowedForStatus(w.status) && header.Get("Content-Length") == "" && (!isHEAD || len(p) > 0) {
|
||||
if w.handlerDone.IsSet() && !trailers && !hasTE && bodyAllowedForStatus(w.status) && header.Get("Content-Length") == "" && (!isHEAD || len(p) > 0) {
|
||||
w.contentLength = int64(len(p))
|
||||
setHeader.contentLength = strconv.AppendInt(cw.res.clenBuf[:0], int64(len(p)), 10)
|
||||
}
|
||||
|
|
@ -330,7 +330,7 @@ func (cw *chunkWriter) writeHeader(p []byte) {
|
|||
// because we don't know if the next bytes on the wire will be
|
||||
// the body-following-the-timer or the subsequent request.
|
||||
// See Issue 11549.
|
||||
if ecr, ok := w.req.Body.(*expectContinueReader); ok && !ecr.sawEOF.isSet() {
|
||||
if ecr, ok := w.req.Body.(*expectContinueReader); ok && !ecr.sawEOF.IsSet() {
|
||||
w.closeAfterReply = true
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -218,7 +218,7 @@ type response struct {
|
|||
// These two fields together synchronize the body reader
|
||||
// (the expectContinueReader, which wants to write 100 Continue)
|
||||
// against the main writer.
|
||||
canWriteContinue atomicBool
|
||||
canWriteContinue AtomicBool
|
||||
writeContinueMu sync.Mutex
|
||||
|
||||
w *bufio.Writer // buffers output in chunks to chunkWriter
|
||||
|
|
@ -256,7 +256,7 @@ type response struct {
|
|||
// written.
|
||||
trailers []string
|
||||
|
||||
handlerDone atomicBool // set true when the handler exits
|
||||
handlerDone AtomicBool // set true when the handler exits
|
||||
|
||||
// Buffers for Date, Content-Length, and status code
|
||||
dateBuf [len(TimeFormat)]byte
|
||||
|
|
|
|||
|
|
@ -63,7 +63,7 @@ func (w *response) sendExpectationFailed() {
|
|||
}
|
||||
|
||||
func (w *response) finishRequest() {
|
||||
w.handlerDone.setTrue()
|
||||
w.handlerDone.SetTrue()
|
||||
|
||||
if !w.wroteHeader {
|
||||
w.WriteHeader(_http.StatusOK)
|
||||
|
|
@ -107,9 +107,9 @@ func (w *response) WriteHeader(code int) {
|
|||
// Handle informational headers
|
||||
if code >= 100 && code <= 199 {
|
||||
// Prevent a potential race with an automatically-sent 100 Continue triggered by Request.Body.Read()
|
||||
if code == 100 && w.canWriteContinue.isSet() {
|
||||
if code == 100 && w.canWriteContinue.IsSet() {
|
||||
w.writeContinueMu.Lock()
|
||||
w.canWriteContinue.setFalse()
|
||||
w.canWriteContinue.SetFalse()
|
||||
w.writeContinueMu.Unlock()
|
||||
}
|
||||
|
||||
|
|
@ -207,13 +207,13 @@ func (w *response) WriteString(data string) (n int, err error) {
|
|||
|
||||
// either dataB or dataS is non-zero.
|
||||
func (w *response) write(lenData int, dataB []byte, dataS string) (n int, err error) {
|
||||
if w.canWriteContinue.isSet() {
|
||||
if w.canWriteContinue.IsSet() {
|
||||
// Body reader wants to write 100 Continue but hasn't yet.
|
||||
// Tell it not to. The store must be done while holding the lock
|
||||
// because the lock makes sure that there is not an active write
|
||||
// this very moment.
|
||||
w.writeContinueMu.Lock()
|
||||
w.canWriteContinue.setFalse()
|
||||
w.canWriteContinue.SetFalse()
|
||||
w.writeContinueMu.Unlock()
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -480,7 +480,7 @@ func (c *conn) serve(ctx context.Context) {
|
|||
if req.ProtoAtLeast(1, 1) && req.ContentLength != 0 {
|
||||
// Wrap the Body reader with one that replies on the connection
|
||||
req.Body = &expectContinueReader{readCloser: req.Body, resp: w}
|
||||
w.canWriteContinue.setTrue()
|
||||
w.canWriteContinue.SetTrue()
|
||||
}
|
||||
} else if req.Header.Get("Expect") != "" {
|
||||
w.sendExpectationFailed()
|
||||
|
|
|
|||
|
|
@ -7,6 +7,7 @@ import (
|
|||
"time"
|
||||
"wwfc/common"
|
||||
"wwfc/logging"
|
||||
"wwfc/nhttp"
|
||||
|
||||
"github.com/logrusorgru/aurora/v3"
|
||||
)
|
||||
|
|
@ -29,7 +30,7 @@ const (
|
|||
|
||||
var (
|
||||
masterConn net.PacketConn
|
||||
inShutdown = false
|
||||
inShutdown nhttp.AtomicBool
|
||||
waitGroup = sync.WaitGroup{}
|
||||
)
|
||||
|
||||
|
|
@ -44,7 +45,7 @@ func StartServer(reload bool) {
|
|||
}
|
||||
|
||||
masterConn = conn
|
||||
inShutdown = false
|
||||
inShutdown.SetFalse()
|
||||
|
||||
if reload {
|
||||
err := loadSessions()
|
||||
|
|
@ -79,7 +80,7 @@ func StartServer(reload bool) {
|
|||
logging.Notice("QR2", "Listening on", aurora.BrightCyan(address))
|
||||
|
||||
for {
|
||||
if inShutdown {
|
||||
if inShutdown.IsSet() {
|
||||
return
|
||||
}
|
||||
|
||||
|
|
@ -97,7 +98,7 @@ func StartServer(reload bool) {
|
|||
}
|
||||
|
||||
func Shutdown() {
|
||||
inShutdown = true
|
||||
inShutdown.SetTrue()
|
||||
masterConn.Close()
|
||||
waitGroup.Wait()
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue
Block a user