Refactor processing deadline logics in server.go

The processing deadline handling was moved from the connection-level anonymous go-routine to the HandleConnection method. This refactor allows the reading from the connection to be looped, improving the handling of timeouts and parsing failure cases.
This commit is contained in:
Winni Neessen 2023-12-22 16:08:16 +01:00
parent 8d75b19e27
commit 5d72bcac91
Signed by: wneessen
GPG key ID: 5F3AF39B820C119D

View file

@ -110,14 +110,7 @@ func (s *Server) Listen() {
conn := NewConnection(c) conn := NewConnection(c)
s.wg.Add(1) s.wg.Add(1)
go func(co *Connection) { go func(co *Connection) {
err = co.conn.SetDeadline(time.Now().Add(s.conf.Parser.Timeout)) s.HandleConnection(co)
switch {
case err == nil:
s.HandleConnection(co)
default:
s.log.Error("failed to set processing deadline", LogErrKey, err,
slog.Duration("timeout", s.conf.Parser.Timeout))
}
s.wg.Done() s.wg.Done()
}(conn) }(conn)
} }
@ -127,41 +120,47 @@ func (s *Server) Listen() {
// It logs debug information about the connection and measures the processing time. // It logs debug information about the connection and measures the processing time.
// It closes the connection when done, and logs any error encountered during the process. // It closes the connection when done, and logs any error encountered during the process.
func (s *Server) HandleConnection(c *Connection) { func (s *Server) HandleConnection(c *Connection) {
b := time.Now()
s.log.Debug("handling connection")
defer func() { defer func() {
if err := c.conn.Close(); err != nil { if err := c.conn.Close(); err != nil {
s.log.Error("failed to close connection", LogErrKey, err) s.log.Error("failed to close connection", LogErrKey, err)
} }
}() }()
lm, err := s.parser.ParseReader(c.rb) ReadLoop:
if err != nil { for {
var ne *net.OpError if err := c.conn.SetDeadline(time.Now().Add(s.conf.Parser.Timeout)); err != nil {
switch { s.log.Error("failed to set processing deadline", LogErrKey, err,
case errors.As(err, &ne): slog.Duration("timeout", s.conf.Parser.Timeout))
if s.conf.Log.Extended {
s.log.Error("network error while processing message", LogErrKey,
ne.Error())
}
return
case errors.Is(err, io.EOF):
if s.conf.Log.Extended {
s.log.Error("message could not be processed", LogErrKey,
"EOF received")
}
return
default:
s.log.Error("failed to parse message", LogErrKey, err,
slog.String("parser_type", s.conf.Parser.Type))
return return
} }
lm, err := s.parser.ParseReader(c.rb)
if err != nil {
var ne *net.OpError
switch {
case errors.As(err, &ne):
if s.conf.Log.Extended {
s.log.Error("network error while processing message", LogErrKey,
ne.Error())
}
return
case errors.Is(err, io.EOF):
if s.conf.Log.Extended {
s.log.Error("message could not be processed", LogErrKey,
"EOF received")
}
return
default:
s.log.Error("failed to parse message", LogErrKey, err,
slog.String("parser_type", s.conf.Parser.Type))
continue ReadLoop
}
}
s.log.Debug("log message successfully received",
slog.String("message", lm.Message.String()),
slog.String("facility", lm.Facility.String()),
slog.String("severity", lm.Severity.String()))
} }
s.log.Debug("log message successfully received",
slog.String("message", lm.Message.String()),
slog.String("facility", lm.Facility.String()),
slog.String("severity", lm.Severity.String()),
slog.String("processing_time", time.Since(b).String()))
} }
// setLogLevel sets the log level based on the value of `s.conf.Log.Level`. // setLogLevel sets the log level based on the value of `s.conf.Log.Level`.