mirror of
https://github.com/wneessen/logranger.git
synced 2024-12-22 10:00:38 +01:00
Update go-parsesyslog dependency and refine server connection handling
This update upgrades the go-parsesyslog dependency version to v0.2.2. It also enhances the server's connection handling. This is done by expanding the `HandleConnection` method to handle network errors and EOF scenarios more thoroughly, adjusting processing time measurements, and handling deadlines for connection processing. Furthermore, the initialization of the syslog parser has now been moved to the `NewServer` function for efficiency.
This commit is contained in:
parent
43b1147106
commit
830de14890
4 changed files with 83 additions and 20 deletions
24
config.go
24
config.go
|
@ -7,8 +7,13 @@ package logranger
|
|||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/kkyr/fig"
|
||||
"github.com/wneessen/go-parsesyslog"
|
||||
"github.com/wneessen/go-parsesyslog/rfc3164"
|
||||
"github.com/wneessen/go-parsesyslog/rfc5424"
|
||||
)
|
||||
|
||||
// Config holds all the global configuration settings that are parsed by fig
|
||||
|
@ -34,8 +39,16 @@ type Config struct {
|
|||
Type ListenerType `fig:"type" default:"unix"`
|
||||
} `fig:"listener"`
|
||||
Log struct {
|
||||
Level string `fig:"level" default:"info"`
|
||||
Level string `fig:"level" default:"info"`
|
||||
Extended bool `fig:"extended"`
|
||||
} `fig:"log"`
|
||||
Parser struct {
|
||||
Type string `fig:"type" validate:"required"`
|
||||
Timeout time.Duration `fig:"timeout" default:"500ms"`
|
||||
}
|
||||
internal struct {
|
||||
ParserType parsesyslog.ParserType
|
||||
}
|
||||
}
|
||||
|
||||
// NewConfig creates a new instance of the Config object by reading and loading
|
||||
|
@ -53,5 +66,14 @@ func NewConfig(p, f string) (*Config, error) {
|
|||
return &co, fmt.Errorf("failed to load config: %w", err)
|
||||
}
|
||||
|
||||
switch {
|
||||
case strings.EqualFold(co.Parser.Type, "rfc3164"):
|
||||
co.internal.ParserType = rfc3164.Type
|
||||
case strings.EqualFold(co.Parser.Type, "rfc5424"):
|
||||
co.internal.ParserType = rfc5424.Type
|
||||
default:
|
||||
return nil, fmt.Errorf("unknown parser type: %s", co.Parser.Type)
|
||||
}
|
||||
|
||||
return &co, nil
|
||||
}
|
||||
|
|
2
go.mod
2
go.mod
|
@ -8,7 +8,7 @@ go 1.21
|
|||
|
||||
require (
|
||||
github.com/kkyr/fig v0.4.0
|
||||
github.com/wneessen/go-parsesyslog v0.2.1
|
||||
github.com/wneessen/go-parsesyslog v0.2.2
|
||||
)
|
||||
|
||||
require (
|
||||
|
|
4
go.sum
4
go.sum
|
@ -16,8 +16,8 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
|
|||
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
|
||||
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
|
||||
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
|
||||
github.com/wneessen/go-parsesyslog v0.2.1 h1:T6a6BsLb4plF1wG6MJ0aJDNtXE2Y28PMdCN97HmJi8A=
|
||||
github.com/wneessen/go-parsesyslog v0.2.1/go.mod h1:xk+PXAOW/9Vc6AfrXd1tZKFyndq1jwxK+rIdlKuHpcc=
|
||||
github.com/wneessen/go-parsesyslog v0.2.2 h1:RhY7KJ5smi/f+KbHF7pEVqXF1fgvP3SVOc2nstixTNo=
|
||||
github.com/wneessen/go-parsesyslog v0.2.2/go.mod h1:xk+PXAOW/9Vc6AfrXd1tZKFyndq1jwxK+rIdlKuHpcc=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
|
|
73
server.go
73
server.go
|
@ -5,16 +5,18 @@
|
|||
package logranger
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"log/slog"
|
||||
"net"
|
||||
"os"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/wneessen/go-parsesyslog"
|
||||
_ "github.com/wneessen/go-parsesyslog/rfc3164"
|
||||
"github.com/wneessen/go-parsesyslog/rfc5424"
|
||||
_ "github.com/wneessen/go-parsesyslog/rfc5424"
|
||||
)
|
||||
|
||||
|
@ -31,7 +33,8 @@ type Server struct {
|
|||
listener net.Listener
|
||||
// log is a pointer to the slog.Logger
|
||||
log *slog.Logger
|
||||
|
||||
// parser is a parsesyslog.Parser
|
||||
parser parsesyslog.Parser
|
||||
// wg is a sync.WaitGroup
|
||||
wg sync.WaitGroup
|
||||
}
|
||||
|
@ -52,6 +55,11 @@ func (s *Server) Run() error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
p, err := parsesyslog.New(s.conf.internal.ParserType)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to initalize syslog parser: %w", err)
|
||||
}
|
||||
s.parser = p
|
||||
return s.RunWithListener(l)
|
||||
}
|
||||
|
||||
|
@ -87,6 +95,7 @@ func (s *Server) RunWithListener(l net.Listener) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// Listen handles incoming connections and processes log messages.
|
||||
func (s *Server) Listen() {
|
||||
defer s.wg.Done()
|
||||
s.log.Info("listening for new connections", slog.String("listen_addr", s.listener.Addr().String()))
|
||||
|
@ -96,31 +105,63 @@ func (s *Server) Listen() {
|
|||
s.log.Error("failed to accept new connection", LogErrKey, err)
|
||||
continue
|
||||
}
|
||||
s.log.Debug("accepted new connection", slog.String("remote_addr", c.RemoteAddr().String()))
|
||||
s.log.Debug("accepted new connection",
|
||||
slog.String("remote_addr", c.RemoteAddr().String()))
|
||||
conn := NewConnection(c)
|
||||
s.wg.Add(1)
|
||||
go func(co *Connection) {
|
||||
s.HandleConnection(co)
|
||||
err = co.conn.SetDeadline(time.Now().Add(s.conf.Parser.Timeout))
|
||||
switch {
|
||||
case err == nil:
|
||||
s.HandleConnection(co)
|
||||
default:
|
||||
s.log.Error("failed to set processing deadline", LogErrKey, err,
|
||||
slog.Duration("timeout", s.conf.Parser.Timeout))
|
||||
}
|
||||
s.wg.Done()
|
||||
}(conn)
|
||||
}
|
||||
}
|
||||
|
||||
// HandleConnection handles a single connection by parsing and processing log messages.
|
||||
// It logs debug information about the connection and measures the processing time.
|
||||
// It closes the connection when done, and logs any error encountered during the process.
|
||||
func (s *Server) HandleConnection(c *Connection) {
|
||||
b := time.Now()
|
||||
s.log.Debug("handling connection")
|
||||
defer c.conn.Close()
|
||||
pa, err := parsesyslog.New(rfc5424.Type)
|
||||
if err != nil {
|
||||
s.log.Error("failed to initialize logger", LogErrKey, err)
|
||||
return
|
||||
}
|
||||
lm, err := pa.ParseReader(c.rb)
|
||||
if err != nil {
|
||||
s.log.Error("failed to parse message", LogErrKey, err)
|
||||
return
|
||||
}
|
||||
s.log.Info("log message received", slog.String("message", lm.Message.String()))
|
||||
defer func() {
|
||||
if err := c.conn.Close(); err != nil {
|
||||
s.log.Error("failed to close connection", LogErrKey, err)
|
||||
}
|
||||
}()
|
||||
|
||||
lm, err := s.parser.ParseReader(c.rb)
|
||||
if err != nil {
|
||||
var ne *net.OpError
|
||||
switch {
|
||||
case errors.As(err, &ne):
|
||||
if s.conf.Log.Extended {
|
||||
s.log.Error("network error while processing message", LogErrKey,
|
||||
ne.Error())
|
||||
}
|
||||
return
|
||||
case errors.Is(err, io.EOF):
|
||||
if s.conf.Log.Extended {
|
||||
s.log.Error("message could not be processed", LogErrKey,
|
||||
"EOF received")
|
||||
}
|
||||
return
|
||||
default:
|
||||
s.log.Error("failed to parse message", LogErrKey, err,
|
||||
slog.String("parser_type", s.conf.Parser.Type))
|
||||
return
|
||||
}
|
||||
}
|
||||
s.log.Debug("log message successfully received",
|
||||
slog.String("message", lm.Message.String()),
|
||||
slog.String("facility", lm.Facility.String()),
|
||||
slog.String("severity", lm.Severity.String()),
|
||||
slog.String("processing_time", time.Since(b).String()))
|
||||
}
|
||||
|
||||
// setLogLevel sets the log level based on the value of `s.conf.Log.Level`.
|
||||
|
|
Loading…
Reference in a new issue