Compare commits

...

3 commits

Author SHA1 Message Date
d00969886a
Merge pull request #381 from wneessen/bug/380_use-of-closed-network-connection-error-when-using-concurrently
Add mutex for concurrent send protection
2024-11-21 10:38:09 +01:00
6e9df0b724
Increase timeout for DialAndSend context
Updated the timeout for the DialAndSend context in 'client_test.go' from 5 seconds to 1 minute to ensure sufficient time for the operation to complete. This change helps prevent premature timeouts that can cause test failures.
2024-11-21 10:33:24 +01:00
1a579c2149
Add mutex for concurrent send protection
Introduced a sendMutex to synchronize access to shared resources in the DialAndSendWithContext method. This ensures thread safety when sending multiple messages concurrently. Added a corresponding test to verify the concurrent sending functionality.
2024-11-21 10:26:48 +01:00
2 changed files with 44 additions and 0 deletions

View file

@ -170,6 +170,9 @@ type (
// requestDSN indicates wether we want to request DSN (Delivery Status Notifications). // requestDSN indicates wether we want to request DSN (Delivery Status Notifications).
requestDSN bool requestDSN bool
// sendMutex is used to synchronize access to shared resources during the dial and send methods.
sendMutex sync.Mutex
// smtpAuth is the authentication type that is used to authenticate the user with SMTP server. It // smtpAuth is the authentication type that is used to authenticate the user with SMTP server. It
// satisfies the smtp.Auth interface. // satisfies the smtp.Auth interface.
// //
@ -1058,6 +1061,8 @@ func (c *Client) DialAndSend(messages ...*Msg) error {
// - An error if the connection fails, if sending the messages fails, or if closing the // - An error if the connection fails, if sending the messages fails, or if closing the
// connection fails; otherwise, returns nil. // connection fails; otherwise, returns nil.
func (c *Client) DialAndSendWithContext(ctx context.Context, messages ...*Msg) error { func (c *Client) DialAndSendWithContext(ctx context.Context, messages ...*Msg) error {
c.sendMutex.Lock()
defer c.sendMutex.Unlock()
if err := c.DialWithContext(ctx); err != nil { if err := c.DialWithContext(ctx); err != nil {
return fmt.Errorf("dial failed: %w", err) return fmt.Errorf("dial failed: %w", err)
} }

View file

@ -2297,6 +2297,45 @@ func TestClient_DialAndSendWithContext(t *testing.T) {
t.Errorf("client was supposed to fail on dial") t.Errorf("client was supposed to fail on dial")
} }
}) })
t.Run("concurrent sending via DialAndSendWithContext", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
PortAdder.Add(1)
serverPort := int(TestServerPortBase + PortAdder.Load())
featureSet := "250-8BITMIME\r\n250-DSN\r\n250 SMTPUTF8"
go func() {
if err := simpleSMTPServer(ctx, t, &serverProps{
FeatureSet: featureSet,
ListenPort: serverPort,
}); err != nil {
t.Errorf("failed to start test server: %s", err)
return
}
}()
time.Sleep(time.Millisecond * 30)
client, err := NewClient(DefaultHost, WithPort(serverPort), WithTLSPolicy(NoTLS))
if err != nil {
t.Fatalf("failed to create new client: %s", err)
}
wg := sync.WaitGroup{}
for i := 0; i < 50; i++ {
wg.Add(1)
go func() {
defer wg.Done()
msg := testMessage(t)
msg.SetMessageIDWithValue("this.is.a.message.id")
ctxDial, cancelDial := context.WithTimeout(ctx, time.Minute)
defer cancelDial()
if goroutineErr := client.DialAndSendWithContext(ctxDial, msg); goroutineErr != nil {
t.Errorf("failed to dial and send message: %s", goroutineErr)
}
}()
}
wg.Wait()
})
} }
func TestClient_auth(t *testing.T) { func TestClient_auth(t *testing.T) {