diff --git a/config.go b/config.go index 1f648c7..f7d3b1a 100644 --- a/config.go +++ b/config.go @@ -7,8 +7,13 @@ package logranger import ( "fmt" "os" + "strings" + "time" "github.com/kkyr/fig" + "github.com/wneessen/go-parsesyslog" + "github.com/wneessen/go-parsesyslog/rfc3164" + "github.com/wneessen/go-parsesyslog/rfc5424" ) // Config holds all the global configuration settings that are parsed by fig @@ -34,8 +39,16 @@ type Config struct { Type ListenerType `fig:"type" default:"unix"` } `fig:"listener"` Log struct { - Level string `fig:"level" default:"info"` + Level string `fig:"level" default:"info"` + Extended bool `fig:"extended"` } `fig:"log"` + Parser struct { + Type string `fig:"type" validate:"required"` + Timeout time.Duration `fig:"timeout" default:"500ms"` + } + internal struct { + ParserType parsesyslog.ParserType + } } // NewConfig creates a new instance of the Config object by reading and loading @@ -53,5 +66,14 @@ func NewConfig(p, f string) (*Config, error) { return &co, fmt.Errorf("failed to load config: %w", err) } + switch { + case strings.EqualFold(co.Parser.Type, "rfc3164"): + co.internal.ParserType = rfc3164.Type + case strings.EqualFold(co.Parser.Type, "rfc5424"): + co.internal.ParserType = rfc5424.Type + default: + return nil, fmt.Errorf("unknown parser type: %s", co.Parser.Type) + } + return &co, nil } diff --git a/go.mod b/go.mod index cd342f1..f8305ad 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,7 @@ go 1.21 require ( github.com/kkyr/fig v0.4.0 - github.com/wneessen/go-parsesyslog v0.2.1 + github.com/wneessen/go-parsesyslog v0.2.2 ) require ( diff --git a/go.sum b/go.sum index 7d45465..3091500 100644 --- a/go.sum +++ b/go.sum @@ -16,8 +16,8 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= -github.com/wneessen/go-parsesyslog v0.2.1 h1:T6a6BsLb4plF1wG6MJ0aJDNtXE2Y28PMdCN97HmJi8A= -github.com/wneessen/go-parsesyslog v0.2.1/go.mod h1:xk+PXAOW/9Vc6AfrXd1tZKFyndq1jwxK+rIdlKuHpcc= +github.com/wneessen/go-parsesyslog v0.2.2 h1:RhY7KJ5smi/f+KbHF7pEVqXF1fgvP3SVOc2nstixTNo= +github.com/wneessen/go-parsesyslog v0.2.2/go.mod h1:xk+PXAOW/9Vc6AfrXd1tZKFyndq1jwxK+rIdlKuHpcc= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/server.go b/server.go index 3b48788..d03c4ca 100644 --- a/server.go +++ b/server.go @@ -5,16 +5,18 @@ package logranger import ( + "errors" "fmt" + "io" "log/slog" "net" "os" "strings" "sync" + "time" "github.com/wneessen/go-parsesyslog" _ "github.com/wneessen/go-parsesyslog/rfc3164" - "github.com/wneessen/go-parsesyslog/rfc5424" _ "github.com/wneessen/go-parsesyslog/rfc5424" ) @@ -31,7 +33,8 @@ type Server struct { listener net.Listener // log is a pointer to the slog.Logger log *slog.Logger - + // parser is a parsesyslog.Parser + parser parsesyslog.Parser // wg is a sync.WaitGroup wg sync.WaitGroup } @@ -52,6 +55,11 @@ func (s *Server) Run() error { if err != nil { return err } + p, err := parsesyslog.New(s.conf.internal.ParserType) + if err != nil { + return fmt.Errorf("failed to initalize syslog parser: %w", err) + } + s.parser = p return s.RunWithListener(l) } @@ -87,6 +95,7 @@ func (s *Server) RunWithListener(l net.Listener) error { return nil } +// Listen handles incoming connections and processes log messages. func (s *Server) Listen() { defer s.wg.Done() s.log.Info("listening for new connections", slog.String("listen_addr", s.listener.Addr().String())) @@ -96,31 +105,63 @@ func (s *Server) Listen() { s.log.Error("failed to accept new connection", LogErrKey, err) continue } - s.log.Debug("accepted new connection", slog.String("remote_addr", c.RemoteAddr().String())) + s.log.Debug("accepted new connection", + slog.String("remote_addr", c.RemoteAddr().String())) conn := NewConnection(c) s.wg.Add(1) go func(co *Connection) { - s.HandleConnection(co) + err = co.conn.SetDeadline(time.Now().Add(s.conf.Parser.Timeout)) + switch { + case err == nil: + s.HandleConnection(co) + default: + s.log.Error("failed to set processing deadline", LogErrKey, err, + slog.Duration("timeout", s.conf.Parser.Timeout)) + } s.wg.Done() }(conn) } } +// HandleConnection handles a single connection by parsing and processing log messages. +// It logs debug information about the connection and measures the processing time. +// It closes the connection when done, and logs any error encountered during the process. func (s *Server) HandleConnection(c *Connection) { + b := time.Now() s.log.Debug("handling connection") - defer c.conn.Close() - pa, err := parsesyslog.New(rfc5424.Type) - if err != nil { - s.log.Error("failed to initialize logger", LogErrKey, err) - return - } - lm, err := pa.ParseReader(c.rb) - if err != nil { - s.log.Error("failed to parse message", LogErrKey, err) - return - } - s.log.Info("log message received", slog.String("message", lm.Message.String())) + defer func() { + if err := c.conn.Close(); err != nil { + s.log.Error("failed to close connection", LogErrKey, err) + } + }() + lm, err := s.parser.ParseReader(c.rb) + if err != nil { + var ne *net.OpError + switch { + case errors.As(err, &ne): + if s.conf.Log.Extended { + s.log.Error("network error while processing message", LogErrKey, + ne.Error()) + } + return + case errors.Is(err, io.EOF): + if s.conf.Log.Extended { + s.log.Error("message could not be processed", LogErrKey, + "EOF received") + } + return + default: + s.log.Error("failed to parse message", LogErrKey, err, + slog.String("parser_type", s.conf.Parser.Type)) + return + } + } + s.log.Debug("log message successfully received", + slog.String("message", lm.Message.String()), + slog.String("facility", lm.Facility.String()), + slog.String("severity", lm.Severity.String()), + slog.String("processing_time", time.Since(b).String())) } // setLogLevel sets the log level based on the value of `s.conf.Log.Level`.