mirror of
https://github.com/coredns/coredns.git
synced 2026-05-25 11:20:23 -04:00
plugin/dnstap: feature: added incoming connection support (#8086)
* plugin/dnstap: added incoming connection support feature to dnstap plugin Signed-off-by: Endre Szabo <git@end.re> * fixed problems pointed out by ci linter Signed-off-by: Endre Szabo <git@end.re> --------- Signed-off-by: Endre Szabo <git@end.re>
This commit is contained in:
@@ -14,6 +14,8 @@ Every message is sent to the socket as soon as it comes in, the *dnstap* plugin
|
||||
|
||||
## Syntax
|
||||
|
||||
### Outgoing Connections (Connect to Sink)
|
||||
|
||||
~~~ txt
|
||||
dnstap SOCKET [full] [writebuffer] [queue] {
|
||||
[identity IDENTITY]
|
||||
@@ -32,6 +34,28 @@ dnstap SOCKET [full] [writebuffer] [queue] {
|
||||
* **EXTRA** to define "extra" field in dnstap payload, [metadata](../metadata/) replacement available here.
|
||||
* `skipverify` to skip tls verification during connection. Default to be secure
|
||||
|
||||
### Incoming Connections (Accept from Sinks)
|
||||
|
||||
~~~ txt
|
||||
dnstap listen SOCKET [full] {
|
||||
[identity IDENTITY]
|
||||
[version VERSION]
|
||||
[extra EXTRA]
|
||||
[tls CERT KEY [CA]]
|
||||
[skipverify]
|
||||
}
|
||||
~~~
|
||||
|
||||
* `listen` indicates this is a listening socket that accepts incoming connections from dnstap sinks.
|
||||
* **SOCKET** is the socket address to listen on (e.g., `tcp://127.0.0.1:6000`, `unix:///tmp/dnstap.sock`).
|
||||
* `full` to include the wire-format DNS message.
|
||||
* **IDENTITY** to override the identity of the server. Defaults to the hostname.
|
||||
* **VERSION** to override the version field. Defaults to the CoreDNS version.
|
||||
* **EXTRA** to define "extra" field in dnstap payload, [metadata](../metadata/) replacement available here.
|
||||
* `tls CERT KEY [CA]` to enable TLS for the listener. **CERT** and **KEY** are paths to the server certificate and key files. Optional **CA** is the path to the CA certificate for client verification.
|
||||
* `skipverify` to skip client certificate verification. Default is to verify client certificates. Equivalent to the **CA** option above being unspecified.
|
||||
|
||||
**Note:** Incoming connections use unbuffered channels to broadcast events. If a connected sink becomes slow or disconnected, messages are dropped for that sink only, and the connection is closed.
|
||||
|
||||
## Examples
|
||||
|
||||
@@ -92,6 +116,35 @@ dnstap tls://127.0.0.1:6000 full {
|
||||
}
|
||||
~~~
|
||||
|
||||
Listen for incoming dnstap sink connections on a Unix socket.
|
||||
|
||||
~~~ txt
|
||||
dnstap listen /tmp/dnstap.sock full
|
||||
~~~
|
||||
|
||||
Listen for incoming dnstap sink connections on TCP.
|
||||
|
||||
~~~ txt
|
||||
dnstap listen tcp://127.0.0.1:6000 full
|
||||
~~~
|
||||
|
||||
Listen for incoming dnstap sink connections on TLS with mTLS client authentication.
|
||||
|
||||
~~~ txt
|
||||
dnstap listen tls://127.0.0.1:6000 full {
|
||||
tls /path/to/server-cert.pem /path/to/server-key.pem /path/to/ca.pem
|
||||
}
|
||||
~~~
|
||||
|
||||
Listen for incoming dnstap sink connections on TLS without client certificate verification.
|
||||
|
||||
~~~ txt
|
||||
dnstap listen tls://127.0.0.1:6000 full {
|
||||
tls /path/to/server-cert.pem /path/to/server-key.pem
|
||||
skipverify
|
||||
}
|
||||
~~~
|
||||
|
||||
You can use _dnstap_ more than once to define multiple taps. The following logs information including the
|
||||
wire-format DNS message about client requests and responses to */tmp/dnstap.sock*,
|
||||
and also sends client requests and responses without wire-format DNS messages to a remote FQDN.
|
||||
@@ -101,6 +154,13 @@ dnstap /tmp/dnstap.sock full
|
||||
dnstap tcp://example.com:6000
|
||||
~~~
|
||||
|
||||
You can also combine outgoing connections with incoming listeners:
|
||||
|
||||
~~~ txt
|
||||
dnstap tcp://remote-collector.example.com:6000 full
|
||||
dnstap listen tcp://127.0.0.1:6001 full
|
||||
~~~
|
||||
|
||||
## Command Line Tool
|
||||
|
||||
Dnstap has a command line tool that can be used to inspect the logging. The tool can be found
|
||||
|
||||
@@ -15,9 +15,10 @@ import (
|
||||
|
||||
// Dnstap is the dnstap handler.
|
||||
type Dnstap struct {
|
||||
Next plugin.Handler
|
||||
io tapper
|
||||
repl replacer.Replacer
|
||||
Next plugin.Handler
|
||||
io tapper
|
||||
listener *listener
|
||||
repl replacer.Replacer
|
||||
|
||||
// IncludeRawMessage will include the raw DNS message into the dnstap messages if true.
|
||||
IncludeRawMessage bool
|
||||
@@ -49,7 +50,17 @@ func (h *Dnstap) TapMessageWithMetadata(ctx context.Context, m *tap.Message, sta
|
||||
|
||||
func (h *Dnstap) tapWithExtra(m *tap.Message, extra []byte) {
|
||||
t := tap.Dnstap_MESSAGE
|
||||
h.io.Dnstap(&tap.Dnstap{Type: &t, Message: m, Identity: h.Identity, Version: h.Version, Extra: extra})
|
||||
payload := &tap.Dnstap{Type: &t, Message: m, Identity: h.Identity, Version: h.Version, Extra: extra}
|
||||
|
||||
// Send to outgoing connection if configured
|
||||
if h.io != nil {
|
||||
h.io.Dnstap(payload)
|
||||
}
|
||||
|
||||
// Broadcast to incoming listeners if configured
|
||||
if h.listener != nil {
|
||||
h.listener.Dnstap(payload)
|
||||
}
|
||||
}
|
||||
|
||||
func (h *Dnstap) tapQuery(ctx context.Context, w dns.ResponseWriter, query *dns.Msg, queryTime time.Time) {
|
||||
|
||||
@@ -149,3 +149,30 @@ func TestTapMessage(t *testing.T) {
|
||||
}
|
||||
h.TapMessage(tapq.GetMessage())
|
||||
}
|
||||
|
||||
// TestNilIoAndListener tests that the handler works correctly when io or listener is nil
|
||||
func TestNilIoAndListener(t *testing.T) {
|
||||
testMsg := testMessage()
|
||||
msg.SetType(testMsg, tap.Message_CLIENT_QUERY)
|
||||
|
||||
// Test with nil io (listener-only mode)
|
||||
h1 := Dnstap{
|
||||
io: nil,
|
||||
listener: nil,
|
||||
}
|
||||
// Should not panic
|
||||
h1.TapMessage(testMsg)
|
||||
|
||||
// Test with only io set
|
||||
w := &writer{t: t}
|
||||
tapq := &tap.Dnstap{Message: testMsg}
|
||||
w.queue = append(w.queue, tapq)
|
||||
h2 := Dnstap{
|
||||
io: w,
|
||||
listener: nil,
|
||||
}
|
||||
h2.TapMessage(testMsg)
|
||||
if len(w.queue) != 0 {
|
||||
t.Errorf("Expected io to receive message")
|
||||
}
|
||||
}
|
||||
|
||||
254
plugin/dnstap/listener.go
Normal file
254
plugin/dnstap/listener.go
Normal file
@@ -0,0 +1,254 @@
|
||||
package dnstap
|
||||
|
||||
import (
|
||||
"crypto/tls"
|
||||
"crypto/x509"
|
||||
"net"
|
||||
"os"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
tap "github.com/dnstap/golang-dnstap"
|
||||
)
|
||||
|
||||
// listener manages incoming dnstap sink connections.
|
||||
// Unlike outgoing connections (dio), incoming connections have no buffering -
|
||||
// if a client is slow or disconnected, messages are dropped for that client only.
|
||||
type listener struct {
|
||||
endpoint string
|
||||
proto string // "unix", "tcp", or "tls"
|
||||
ln net.Listener
|
||||
clients map[*client]struct{}
|
||||
clientsMu sync.RWMutex
|
||||
quit chan struct{}
|
||||
tcpTimeout time.Duration
|
||||
skipVerify bool
|
||||
certFile string
|
||||
keyFile string
|
||||
caFile string
|
||||
logger WarnLogger
|
||||
}
|
||||
|
||||
// client represents a single connected dnstap sink.
|
||||
type client struct {
|
||||
conn net.Conn
|
||||
enc *encoder
|
||||
quit chan struct{}
|
||||
}
|
||||
|
||||
// newListener creates a new listener for incoming dnstap connections.
|
||||
func newListener(proto, endpoint string) *listener {
|
||||
return &listener{
|
||||
endpoint: endpoint,
|
||||
proto: proto,
|
||||
clients: make(map[*client]struct{}),
|
||||
quit: make(chan struct{}),
|
||||
tcpTimeout: tcpTimeout,
|
||||
skipVerify: skipVerify,
|
||||
logger: log,
|
||||
}
|
||||
}
|
||||
|
||||
// loadCAPool loads a CA certificate pool from a PEM file.
|
||||
func loadCAPool(caFile string) (*x509.CertPool, error) {
|
||||
caCert, err := os.ReadFile(caFile)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
pool := x509.NewCertPool()
|
||||
if !pool.AppendCertsFromPEM(caCert) {
|
||||
return nil, err
|
||||
}
|
||||
return pool, nil
|
||||
}
|
||||
|
||||
// listen starts accepting incoming connections.
|
||||
func (l *listener) listen() error {
|
||||
var ln net.Listener
|
||||
var err error
|
||||
|
||||
switch l.proto {
|
||||
case "tls":
|
||||
if l.certFile == "" || l.keyFile == "" {
|
||||
l.logger.Warningf("TLS listener requires cert and key files")
|
||||
return nil
|
||||
}
|
||||
|
||||
cert, err := tls.LoadX509KeyPair(l.certFile, l.keyFile)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
config := &tls.Config{
|
||||
Certificates: []tls.Certificate{cert},
|
||||
ClientAuth: tls.NoClientCert,
|
||||
// #nosec G402 -- optional, user-configurable escape hatch for environments that cannot validate certs.
|
||||
InsecureSkipVerify: l.skipVerify,
|
||||
}
|
||||
|
||||
// If CA file is provided, enable and require client certificate verification
|
||||
if l.caFile != "" {
|
||||
pool, err := loadCAPool(l.caFile)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
config.ClientCAs = pool
|
||||
if l.skipVerify {
|
||||
config.ClientAuth = tls.VerifyClientCertIfGiven
|
||||
} else {
|
||||
config.ClientAuth = tls.RequireAndVerifyClientCert
|
||||
}
|
||||
}
|
||||
|
||||
ln, err = tls.Listen("tcp", l.endpoint, config)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
case "tcp":
|
||||
ln, err = net.Listen("tcp", l.endpoint)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
default:
|
||||
// unix socket
|
||||
ln, err = net.Listen("unix", l.endpoint)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
l.ln = ln
|
||||
go l.acceptLoop()
|
||||
return nil
|
||||
}
|
||||
|
||||
// acceptLoop accepts new incoming connections and spawns a goroutine for each.
|
||||
func (l *listener) acceptLoop() {
|
||||
for {
|
||||
select {
|
||||
case <-l.quit:
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
conn, err := l.ln.Accept()
|
||||
if err != nil {
|
||||
select {
|
||||
case <-l.quit:
|
||||
return
|
||||
default:
|
||||
l.logger.Warningf("Error accepting dnstap connection: %v", err)
|
||||
time.Sleep(100 * time.Millisecond) // brief pause on error
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
// Set TCP parameters for TCP connections
|
||||
if tcpConn, ok := conn.(*net.TCPConn); ok {
|
||||
tcpConn.SetNoDelay(false)
|
||||
}
|
||||
|
||||
// Create encoder for this client
|
||||
enc, err := newEncoder(conn, l.tcpTimeout)
|
||||
if err != nil {
|
||||
l.logger.Warningf("Error creating encoder for dnstap client: %v", err)
|
||||
conn.Close()
|
||||
continue
|
||||
}
|
||||
|
||||
c := &client{
|
||||
conn: conn,
|
||||
enc: enc,
|
||||
quit: make(chan struct{}),
|
||||
}
|
||||
|
||||
l.clientsMu.Lock()
|
||||
l.clients[c] = struct{}{}
|
||||
l.clientsMu.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
// Dnstap broadcasts the payload to all connected clients.
|
||||
// Unlike dio.Dnstap(), this does not buffer; if a write fails, we just drop
|
||||
// the message for that client and close its connection.
|
||||
func (l *listener) Dnstap(payload *tap.Dnstap) {
|
||||
l.clientsMu.RLock()
|
||||
defer l.clientsMu.RUnlock()
|
||||
|
||||
for c := range l.clients {
|
||||
select {
|
||||
case <-c.quit:
|
||||
// Client already closed
|
||||
continue
|
||||
default:
|
||||
if err := c.enc.writeMsg(payload); err != nil {
|
||||
go l.removeClient(c)
|
||||
} else {
|
||||
if err := c.enc.flush(); err != nil {
|
||||
l.removeClient(c)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// removeClient removes a client from the active set and closes its connection.
|
||||
func (l *listener) removeClient(c *client) {
|
||||
l.clientsMu.Lock()
|
||||
defer l.clientsMu.Unlock()
|
||||
|
||||
if _, exists := l.clients[c]; !exists {
|
||||
return // already removed
|
||||
}
|
||||
|
||||
delete(l.clients, c)
|
||||
|
||||
// Close quit channel to signal goroutines to stop (this also stops the flush ticker)
|
||||
select {
|
||||
case <-c.quit:
|
||||
// Already closed
|
||||
default:
|
||||
close(c.quit)
|
||||
}
|
||||
|
||||
if c.enc != nil {
|
||||
c.enc.flush()
|
||||
c.enc.close()
|
||||
}
|
||||
if c.conn != nil {
|
||||
c.conn.Close()
|
||||
}
|
||||
}
|
||||
|
||||
// close stops accepting new connections and closes all active clients.
|
||||
func (l *listener) close() {
|
||||
close(l.quit)
|
||||
|
||||
if l.ln != nil {
|
||||
l.ln.Close()
|
||||
}
|
||||
|
||||
l.clientsMu.Lock()
|
||||
defer l.clientsMu.Unlock()
|
||||
|
||||
for c := range l.clients {
|
||||
delete(l.clients, c)
|
||||
|
||||
// Close quit channel to signal goroutines to stop
|
||||
select {
|
||||
case <-c.quit:
|
||||
// Already closed
|
||||
default:
|
||||
close(c.quit)
|
||||
}
|
||||
|
||||
if c.enc != nil {
|
||||
c.enc.flush()
|
||||
c.enc.close()
|
||||
}
|
||||
if c.conn != nil {
|
||||
c.conn.Close()
|
||||
}
|
||||
}
|
||||
}
|
||||
175
plugin/dnstap/listener_test.go
Normal file
175
plugin/dnstap/listener_test.go
Normal file
@@ -0,0 +1,175 @@
|
||||
package dnstap
|
||||
|
||||
import (
|
||||
"net"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
tap "github.com/dnstap/golang-dnstap"
|
||||
)
|
||||
|
||||
func TestListenerCreation(t *testing.T) {
|
||||
tests := []struct {
|
||||
proto string
|
||||
endpoint string
|
||||
}{
|
||||
{"tcp", "127.0.0.1:16000"},
|
||||
{"unix", "/tmp/dnstap-test.sock"},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
l := newListener(tc.proto, tc.endpoint)
|
||||
if l.proto != tc.proto {
|
||||
t.Errorf("Expected proto %s, got %s", tc.proto, l.proto)
|
||||
}
|
||||
if l.endpoint != tc.endpoint {
|
||||
t.Errorf("Expected endpoint %s, got %s", tc.endpoint, l.endpoint)
|
||||
}
|
||||
if l.skipVerify != false {
|
||||
t.Errorf("Expected skipVerify to be false by default")
|
||||
}
|
||||
if len(l.clients) != 0 {
|
||||
t.Errorf("Expected clients map to be empty")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestListenerBroadcast(_ *testing.T) {
|
||||
l := newListener("tcp", "127.0.0.1:16001")
|
||||
|
||||
// Verify that calling Dnstap with no clients doesn't panic
|
||||
msgType := tap.Dnstap_MESSAGE
|
||||
testPayload := &tap.Dnstap{
|
||||
Type: &msgType,
|
||||
Message: &tap.Message{
|
||||
QueryAddress: net.ParseIP("10.0.0.1").To4(),
|
||||
},
|
||||
}
|
||||
|
||||
// Should not panic with no clients
|
||||
l.Dnstap(testPayload)
|
||||
|
||||
// Add a mock client (without encoder to avoid framestream handshake complexity)
|
||||
mockConn := &mockConn{writes: [][]byte{}}
|
||||
c := &client{
|
||||
conn: mockConn,
|
||||
enc: nil, // Set to nil to avoid framestream issues in test
|
||||
quit: make(chan struct{}),
|
||||
}
|
||||
|
||||
l.clients[c] = struct{}{}
|
||||
|
||||
// Broadcast should handle nil encoder gracefully (will call removeClient on error)
|
||||
l.Dnstap(testPayload)
|
||||
}
|
||||
|
||||
func TestListenerRemoveClient(t *testing.T) {
|
||||
l := newListener("tcp", "127.0.0.1:16002")
|
||||
|
||||
mockConn := &mockConn{writes: [][]byte{}}
|
||||
|
||||
c := &client{
|
||||
conn: mockConn,
|
||||
enc: nil, // Skip encoder for simplicity
|
||||
quit: make(chan struct{}),
|
||||
}
|
||||
|
||||
l.clients[c] = struct{}{}
|
||||
|
||||
if len(l.clients) != 1 {
|
||||
t.Error("Expected 1 client")
|
||||
}
|
||||
|
||||
l.removeClient(c)
|
||||
|
||||
if len(l.clients) != 0 {
|
||||
t.Error("Expected 0 clients after removal")
|
||||
}
|
||||
|
||||
// Verify quit channel is closed
|
||||
select {
|
||||
case <-c.quit:
|
||||
// Good, channel is closed
|
||||
default:
|
||||
t.Error("Expected quit channel to be closed")
|
||||
}
|
||||
}
|
||||
|
||||
// mockConn implements net.Conn for testing
|
||||
type mockConn struct {
|
||||
writes [][]byte
|
||||
closed bool
|
||||
}
|
||||
|
||||
func (m *mockConn) Read(_ []byte) (n int, err error) {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
func (m *mockConn) Write(b []byte) (n int, err error) {
|
||||
if m.closed {
|
||||
return 0, net.ErrClosed
|
||||
}
|
||||
// Copy the data to avoid issues with buffer reuse
|
||||
data := make([]byte, len(b))
|
||||
copy(data, b)
|
||||
m.writes = append(m.writes, data)
|
||||
return len(b), nil
|
||||
}
|
||||
|
||||
func (m *mockConn) Close() error {
|
||||
m.closed = true
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *mockConn) LocalAddr() net.Addr {
|
||||
return &net.TCPAddr{IP: net.ParseIP("127.0.0.1"), Port: 16000}
|
||||
}
|
||||
|
||||
func (m *mockConn) RemoteAddr() net.Addr {
|
||||
return &net.TCPAddr{IP: net.ParseIP("127.0.0.1"), Port: 50000}
|
||||
}
|
||||
|
||||
func (m *mockConn) SetDeadline(_ time.Time) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *mockConn) SetReadDeadline(_ time.Time) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *mockConn) SetWriteDeadline(_ time.Time) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestListenerClose(t *testing.T) {
|
||||
l := newListener("tcp", "127.0.0.1:16003")
|
||||
|
||||
// Add some mock clients
|
||||
for range 3 {
|
||||
mockConn := &mockConn{writes: [][]byte{}}
|
||||
c := &client{
|
||||
conn: mockConn,
|
||||
enc: nil, // Skip encoder for simplicity
|
||||
quit: make(chan struct{}),
|
||||
}
|
||||
l.clients[c] = struct{}{}
|
||||
}
|
||||
|
||||
if len(l.clients) != 3 {
|
||||
t.Errorf("Expected 3 clients, got %d", len(l.clients))
|
||||
}
|
||||
|
||||
l.close()
|
||||
|
||||
if len(l.clients) != 0 {
|
||||
t.Errorf("Expected 0 clients after close, got %d", len(l.clients))
|
||||
}
|
||||
|
||||
// Verify quit channel is closed
|
||||
select {
|
||||
case <-l.quit:
|
||||
// Good, channel is closed
|
||||
default:
|
||||
t.Error("Expected listener quit channel to be closed")
|
||||
}
|
||||
}
|
||||
@@ -44,6 +44,17 @@ func parseConfig(c *caddy.Controller) ([]*Dnstap, error) {
|
||||
|
||||
endpoint := args[0]
|
||||
|
||||
// Check if this is a 'listen' directive for incoming connections
|
||||
isListener := endpoint == "listen"
|
||||
if isListener {
|
||||
if len(args) < 2 {
|
||||
return nil, c.Errf("dnstap listen requires an endpoint argument")
|
||||
}
|
||||
endpoint = args[1]
|
||||
// Shift args for listener mode
|
||||
args = args[1:]
|
||||
}
|
||||
|
||||
if len(args) >= 3 {
|
||||
tcpWriteBuf := args[2]
|
||||
v, err := strconv.Atoi(tcpWriteBuf)
|
||||
@@ -68,26 +79,52 @@ func parseConfig(c *caddy.Controller) ([]*Dnstap, error) {
|
||||
}
|
||||
|
||||
var dio *dio
|
||||
if strings.HasPrefix(endpoint, "tls://") {
|
||||
// remote network endpoint
|
||||
endpointURL, err := url.Parse(endpoint)
|
||||
if err != nil {
|
||||
return nil, c.ArgErr()
|
||||
var lstnr *listener
|
||||
|
||||
if isListener {
|
||||
// Incoming connection listener
|
||||
if strings.HasPrefix(endpoint, "tls://") {
|
||||
endpointURL, err := url.Parse(endpoint)
|
||||
if err != nil {
|
||||
return nil, c.ArgErr()
|
||||
}
|
||||
lstnr = newListener("tls", endpointURL.Host)
|
||||
d.listener = lstnr
|
||||
} else if strings.HasPrefix(endpoint, "tcp://") {
|
||||
endpointURL, err := url.Parse(endpoint)
|
||||
if err != nil {
|
||||
return nil, c.ArgErr()
|
||||
}
|
||||
lstnr = newListener("tcp", endpointURL.Host)
|
||||
d.listener = lstnr
|
||||
} else {
|
||||
endpoint = strings.TrimPrefix(endpoint, "unix://")
|
||||
lstnr = newListener("unix", endpoint)
|
||||
d.listener = lstnr
|
||||
}
|
||||
dio = newIO("tls", endpointURL.Host, d.MultipleQueue, d.MultipleTcpWriteBuf)
|
||||
d.io = dio
|
||||
} else if strings.HasPrefix(endpoint, "tcp://") {
|
||||
// remote network endpoint
|
||||
endpointURL, err := url.Parse(endpoint)
|
||||
if err != nil {
|
||||
return nil, c.ArgErr()
|
||||
}
|
||||
dio = newIO("tcp", endpointURL.Host, d.MultipleQueue, d.MultipleTcpWriteBuf)
|
||||
d.io = dio
|
||||
} else {
|
||||
endpoint = strings.TrimPrefix(endpoint, "unix://")
|
||||
dio = newIO("unix", endpoint, d.MultipleQueue, d.MultipleTcpWriteBuf)
|
||||
d.io = dio
|
||||
// Outgoing connection
|
||||
if strings.HasPrefix(endpoint, "tls://") {
|
||||
// remote network endpoint
|
||||
endpointURL, err := url.Parse(endpoint)
|
||||
if err != nil {
|
||||
return nil, c.ArgErr()
|
||||
}
|
||||
dio = newIO("tls", endpointURL.Host, d.MultipleQueue, d.MultipleTcpWriteBuf)
|
||||
d.io = dio
|
||||
} else if strings.HasPrefix(endpoint, "tcp://") {
|
||||
// remote network endpoint
|
||||
endpointURL, err := url.Parse(endpoint)
|
||||
if err != nil {
|
||||
return nil, c.ArgErr()
|
||||
}
|
||||
dio = newIO("tcp", endpointURL.Host, d.MultipleQueue, d.MultipleTcpWriteBuf)
|
||||
d.io = dio
|
||||
} else {
|
||||
endpoint = strings.TrimPrefix(endpoint, "unix://")
|
||||
dio = newIO("unix", endpoint, d.MultipleQueue, d.MultipleTcpWriteBuf)
|
||||
d.io = dio
|
||||
}
|
||||
}
|
||||
|
||||
d.IncludeRawMessage = len(args) >= 2 && args[1] == "full"
|
||||
@@ -100,7 +137,27 @@ func parseConfig(c *caddy.Controller) ([]*Dnstap, error) {
|
||||
switch c.Val() {
|
||||
case "skipverify":
|
||||
{
|
||||
dio.skipVerify = true
|
||||
if isListener && lstnr != nil {
|
||||
lstnr.skipVerify = true
|
||||
} else if dio != nil {
|
||||
dio.skipVerify = true
|
||||
}
|
||||
}
|
||||
case "tls":
|
||||
{
|
||||
// TLS configuration for listeners: tls <cert> <key> [ca]
|
||||
if !isListener || lstnr == nil {
|
||||
return nil, c.Errf("tls directive only valid for listeners")
|
||||
}
|
||||
args := c.RemainingArgs()
|
||||
if len(args) < 2 {
|
||||
return nil, c.Errf("tls requires cert and key file paths")
|
||||
}
|
||||
lstnr.certFile = args[0]
|
||||
lstnr.keyFile = args[1]
|
||||
if len(args) >= 3 {
|
||||
lstnr.caFile = args[2]
|
||||
}
|
||||
}
|
||||
case "identity":
|
||||
{
|
||||
@@ -139,19 +196,38 @@ func setup(c *caddy.Controller) error {
|
||||
for i := range dnstaps {
|
||||
dnstap := dnstaps[i]
|
||||
c.OnStartup(func() error {
|
||||
if err := dnstap.io.(*dio).connect(); err != nil {
|
||||
log.Errorf("No connection to dnstap endpoint: %s", err)
|
||||
// Start outgoing connection if configured
|
||||
if dnstap.io != nil {
|
||||
if err := dnstap.io.(*dio).connect(); err != nil {
|
||||
log.Errorf("No connection to dnstap endpoint: %s", err)
|
||||
}
|
||||
}
|
||||
// Start listener if configured
|
||||
if dnstap.listener != nil {
|
||||
if err := dnstap.listener.listen(); err != nil {
|
||||
log.Errorf("Failed to start dnstap listener: %s", err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
c.OnRestart(func() error {
|
||||
dnstap.io.(*dio).close()
|
||||
if dnstap.io != nil {
|
||||
dnstap.io.(*dio).close()
|
||||
}
|
||||
if dnstap.listener != nil {
|
||||
dnstap.listener.close()
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
c.OnFinalShutdown(func() error {
|
||||
dnstap.io.(*dio).close()
|
||||
if dnstap.io != nil {
|
||||
dnstap.io.(*dio).close()
|
||||
}
|
||||
if dnstap.listener != nil {
|
||||
dnstap.listener.close()
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
|
||||
@@ -18,6 +18,11 @@ type results struct {
|
||||
extraFormat string
|
||||
multipleTcpWriteBuf int
|
||||
multipleQueue int
|
||||
isListener bool
|
||||
certFile string
|
||||
keyFile string
|
||||
caFile string
|
||||
skipVerify bool
|
||||
}
|
||||
|
||||
func TestConfig(t *testing.T) {
|
||||
@@ -27,16 +32,16 @@ func TestConfig(t *testing.T) {
|
||||
fail bool
|
||||
expect []results
|
||||
}{
|
||||
{"dnstap dnstap.sock full", false, []results{{"dnstap.sock", true, "unix", []byte(hostname), []byte("-"), "", 1, 1}}},
|
||||
{"dnstap unix://dnstap.sock", false, []results{{"dnstap.sock", false, "unix", []byte(hostname), []byte("-"), "", 1, 1}}},
|
||||
{"dnstap tcp://127.0.0.1:6000", false, []results{{"127.0.0.1:6000", false, "tcp", []byte(hostname), []byte("-"), "", 1, 1}}},
|
||||
{"dnstap tcp://[::1]:6000", false, []results{{"[::1]:6000", false, "tcp", []byte(hostname), []byte("-"), "", 1, 1}}},
|
||||
{"dnstap tcp://example.com:6000", false, []results{{"example.com:6000", false, "tcp", []byte(hostname), []byte("-"), "", 1, 1}}},
|
||||
{"dnstap", true, []results{{"fail", false, "tcp", []byte(hostname), []byte("-"), "", 1, 1}}},
|
||||
{"dnstap dnstap.sock full {\nidentity NAME\nversion VER\n}\n", false, []results{{"dnstap.sock", true, "unix", []byte("NAME"), []byte("VER"), "", 1, 1}}},
|
||||
{"dnstap dnstap.sock full {\nidentity NAME\nversion VER\nextra EXTRA\n}\n", false, []results{{"dnstap.sock", true, "unix", []byte("NAME"), []byte("VER"), "EXTRA", 1, 1}}},
|
||||
{"dnstap dnstap.sock {\nidentity NAME\nversion VER\nextra EXTRA\n}\n", false, []results{{"dnstap.sock", false, "unix", []byte("NAME"), []byte("VER"), "EXTRA", 1, 1}}},
|
||||
{"dnstap {\nidentity NAME\nversion VER\nextra EXTRA\n}\n", true, []results{{"fail", false, "tcp", []byte("NAME"), []byte("VER"), "EXTRA", 1, 1}}},
|
||||
{"dnstap dnstap.sock full", false, []results{{endpoint: "dnstap.sock", full: true, proto: "unix", identity: []byte(hostname), version: []byte("-"), multipleTcpWriteBuf: 1, multipleQueue: 1}}},
|
||||
{"dnstap unix://dnstap.sock", false, []results{{endpoint: "dnstap.sock", full: false, proto: "unix", identity: []byte(hostname), version: []byte("-"), multipleTcpWriteBuf: 1, multipleQueue: 1}}},
|
||||
{"dnstap tcp://127.0.0.1:6000", false, []results{{endpoint: "127.0.0.1:6000", full: false, proto: "tcp", identity: []byte(hostname), version: []byte("-"), multipleTcpWriteBuf: 1, multipleQueue: 1}}},
|
||||
{"dnstap tcp://[::1]:6000", false, []results{{endpoint: "[::1]:6000", full: false, proto: "tcp", identity: []byte(hostname), version: []byte("-"), multipleTcpWriteBuf: 1, multipleQueue: 1}}},
|
||||
{"dnstap tcp://example.com:6000", false, []results{{endpoint: "example.com:6000", full: false, proto: "tcp", identity: []byte(hostname), version: []byte("-"), multipleTcpWriteBuf: 1, multipleQueue: 1}}},
|
||||
{"dnstap", true, []results{{endpoint: "fail", full: false, proto: "tcp", identity: []byte(hostname), version: []byte("-"), multipleTcpWriteBuf: 1, multipleQueue: 1}}},
|
||||
{"dnstap dnstap.sock full {\nidentity NAME\nversion VER\n}\n", false, []results{{endpoint: "dnstap.sock", full: true, proto: "unix", identity: []byte("NAME"), version: []byte("VER"), multipleTcpWriteBuf: 1, multipleQueue: 1}}},
|
||||
{"dnstap dnstap.sock full {\nidentity NAME\nversion VER\nextra EXTRA\n}\n", false, []results{{endpoint: "dnstap.sock", full: true, proto: "unix", identity: []byte("NAME"), version: []byte("VER"), extraFormat: "EXTRA", multipleTcpWriteBuf: 1, multipleQueue: 1}}},
|
||||
{"dnstap dnstap.sock {\nidentity NAME\nversion VER\nextra EXTRA\n}\n", false, []results{{endpoint: "dnstap.sock", full: false, proto: "unix", identity: []byte("NAME"), version: []byte("VER"), extraFormat: "EXTRA", multipleTcpWriteBuf: 1, multipleQueue: 1}}},
|
||||
{"dnstap {\nidentity NAME\nversion VER\nextra EXTRA\n}\n", true, []results{{endpoint: "fail", full: false, proto: "tcp", identity: []byte("NAME"), version: []byte("VER"), extraFormat: "EXTRA", multipleTcpWriteBuf: 1, multipleQueue: 1}}},
|
||||
{`dnstap dnstap.sock full {
|
||||
identity NAME
|
||||
version VER
|
||||
@@ -47,21 +52,39 @@ func TestConfig(t *testing.T) {
|
||||
version VER2
|
||||
extra EXTRA2
|
||||
}`, false, []results{
|
||||
{"dnstap.sock", true, "unix", []byte("NAME"), []byte("VER"), "EXTRA", 1, 1},
|
||||
{"127.0.0.1:6000", false, "tcp", []byte("NAME2"), []byte("VER2"), "EXTRA2", 1, 1},
|
||||
{endpoint: "dnstap.sock", full: true, proto: "unix", identity: []byte("NAME"), version: []byte("VER"), extraFormat: "EXTRA", multipleTcpWriteBuf: 1, multipleQueue: 1},
|
||||
{endpoint: "127.0.0.1:6000", full: false, proto: "tcp", identity: []byte("NAME2"), version: []byte("VER2"), extraFormat: "EXTRA2", multipleTcpWriteBuf: 1, multipleQueue: 1},
|
||||
}},
|
||||
{"dnstap tls://127.0.0.1:6000", false, []results{{"127.0.0.1:6000", false, "tls", []byte(hostname), []byte("-"), "", 1, 1}}},
|
||||
{"dnstap dnstap.sock {\nidentity\n}\n", true, []results{{"dnstap.sock", false, "unix", []byte(hostname), []byte("-"), "", 1, 1}}},
|
||||
{"dnstap dnstap.sock {\nversion\n}\n", true, []results{{"dnstap.sock", false, "unix", []byte(hostname), []byte("-"), "", 1, 1}}},
|
||||
{"dnstap dnstap.sock {\nextra\n}\n", true, []results{{"dnstap.sock", false, "unix", []byte(hostname), []byte("-"), "", 1, 1}}},
|
||||
{"dnstap tls://127.0.0.1:6000", false, []results{{endpoint: "127.0.0.1:6000", full: false, proto: "tls", identity: []byte(hostname), version: []byte("-"), multipleTcpWriteBuf: 1, multipleQueue: 1}}},
|
||||
{"dnstap dnstap.sock {\nidentity\n}\n", true, []results{{endpoint: "dnstap.sock", full: false, proto: "unix", identity: []byte(hostname), version: []byte("-"), multipleTcpWriteBuf: 1, multipleQueue: 1}}},
|
||||
{"dnstap dnstap.sock {\nversion\n}\n", true, []results{{endpoint: "dnstap.sock", full: false, proto: "unix", identity: []byte(hostname), version: []byte("-"), multipleTcpWriteBuf: 1, multipleQueue: 1}}},
|
||||
{"dnstap dnstap.sock {\nextra\n}\n", true, []results{{endpoint: "dnstap.sock", full: false, proto: "unix", identity: []byte(hostname), version: []byte("-"), multipleTcpWriteBuf: 1, multipleQueue: 1}}},
|
||||
// Limits and parsing for writebuffer (MiB) and queue (x10k)
|
||||
{"dnstap dnstap.sock full 1024 2048", false, []results{{"dnstap.sock", true, "unix", []byte(hostname), []byte("-"), "", 1024, 2048}}},
|
||||
{"dnstap dnstap.sock full 1025 1", true, []results{{"dnstap.sock", true, "unix", []byte(hostname), []byte("-"), "", 1, 1}}},
|
||||
{"dnstap dnstap.sock full 1 4097", true, []results{{"dnstap.sock", true, "unix", []byte(hostname), []byte("-"), "", 1, 1}}},
|
||||
{"dnstap dnstap.sock full 0 10", true, []results{{"dnstap.sock", true, "unix", []byte(hostname), []byte("-"), "", 1, 1}}},
|
||||
{"dnstap dnstap.sock full 10 0", true, []results{{"dnstap.sock", true, "unix", []byte(hostname), []byte("-"), "", 1, 1}}},
|
||||
{"dnstap dnstap.sock full x 10", true, []results{{"dnstap.sock", true, "unix", []byte(hostname), []byte("-"), "", 1, 1}}},
|
||||
{"dnstap dnstap.sock full 10 y", true, []results{{"dnstap.sock", true, "unix", []byte(hostname), []byte("-"), "", 1, 1}}},
|
||||
{"dnstap dnstap.sock full 1024 2048", false, []results{{endpoint: "dnstap.sock", full: true, proto: "unix", identity: []byte(hostname), version: []byte("-"), multipleTcpWriteBuf: 1024, multipleQueue: 2048}}},
|
||||
{"dnstap dnstap.sock full 1025 1", true, []results{{endpoint: "dnstap.sock", full: true, proto: "unix", identity: []byte(hostname), version: []byte("-"), multipleTcpWriteBuf: 1, multipleQueue: 1}}},
|
||||
{"dnstap dnstap.sock full 1 4097", true, []results{{endpoint: "dnstap.sock", full: true, proto: "unix", identity: []byte(hostname), version: []byte("-"), multipleTcpWriteBuf: 1, multipleQueue: 1}}},
|
||||
{"dnstap dnstap.sock full 0 10", true, []results{{endpoint: "dnstap.sock", full: true, proto: "unix", identity: []byte(hostname), version: []byte("-"), multipleTcpWriteBuf: 1, multipleQueue: 1}}},
|
||||
{"dnstap dnstap.sock full 10 0", true, []results{{endpoint: "dnstap.sock", full: true, proto: "unix", identity: []byte(hostname), version: []byte("-"), multipleTcpWriteBuf: 1, multipleQueue: 1}}},
|
||||
{"dnstap dnstap.sock full x 10", true, []results{{endpoint: "dnstap.sock", full: true, proto: "unix", identity: []byte(hostname), version: []byte("-"), multipleTcpWriteBuf: 1, multipleQueue: 1}}},
|
||||
{"dnstap dnstap.sock full 10 y", true, []results{{endpoint: "dnstap.sock", full: true, proto: "unix", identity: []byte(hostname), version: []byte("-"), multipleTcpWriteBuf: 1, multipleQueue: 1}}},
|
||||
|
||||
// Listener tests
|
||||
{"dnstap listen tcp://127.0.0.1:6000", false, []results{{endpoint: "127.0.0.1:6000", full: false, proto: "tcp", identity: []byte(hostname), version: []byte("-"), isListener: true}}},
|
||||
{"dnstap listen tcp://127.0.0.1:6000 full", false, []results{{endpoint: "127.0.0.1:6000", full: true, proto: "tcp", identity: []byte(hostname), version: []byte("-"), isListener: true}}},
|
||||
{"dnstap listen unix:///tmp/dnstap.sock", false, []results{{endpoint: "/tmp/dnstap.sock", full: false, proto: "unix", identity: []byte(hostname), version: []byte("-"), isListener: true}}},
|
||||
{"dnstap listen /tmp/dnstap.sock full", false, []results{{endpoint: "/tmp/dnstap.sock", full: true, proto: "unix", identity: []byte(hostname), version: []byte("-"), isListener: true}}},
|
||||
{"dnstap listen tls://127.0.0.1:6000 full {\ntls /path/to/cert.pem /path/to/key.pem\n}\n", false, []results{{endpoint: "127.0.0.1:6000", full: true, proto: "tls", identity: []byte(hostname), version: []byte("-"), isListener: true, certFile: "/path/to/cert.pem", keyFile: "/path/to/key.pem"}}},
|
||||
{"dnstap listen tls://127.0.0.1:6000 {\ntls /path/to/cert.pem /path/to/key.pem /path/to/ca.pem\n}\n", false, []results{{endpoint: "127.0.0.1:6000", full: false, proto: "tls", identity: []byte(hostname), version: []byte("-"), isListener: true, certFile: "/path/to/cert.pem", keyFile: "/path/to/key.pem", caFile: "/path/to/ca.pem"}}},
|
||||
{"dnstap listen tls://127.0.0.1:6000 {\ntls /path/to/cert.pem /path/to/key.pem\nskipverify\n}\n", false, []results{{endpoint: "127.0.0.1:6000", full: false, proto: "tls", identity: []byte(hostname), version: []byte("-"), isListener: true, certFile: "/path/to/cert.pem", keyFile: "/path/to/key.pem", skipVerify: true}}},
|
||||
{"dnstap listen", true, nil}, // Missing endpoint
|
||||
{"dnstap listen tcp://127.0.0.1:6000 {\ntls /path/to/cert.pem\n}\n", true, nil}, // Missing key file for TLS
|
||||
|
||||
// Mixed outgoing and listener
|
||||
{`dnstap tcp://remote.example.com:6000 full
|
||||
dnstap listen tcp://127.0.0.1:6001`, false, []results{
|
||||
{endpoint: "remote.example.com:6000", full: true, proto: "tcp", identity: []byte(hostname), version: []byte("-"), isListener: false, multipleTcpWriteBuf: 1, multipleQueue: 1},
|
||||
{endpoint: "127.0.0.1:6001", full: false, proto: "tcp", identity: []byte(hostname), version: []byte("-"), isListener: true},
|
||||
}},
|
||||
}
|
||||
for i, tc := range tests {
|
||||
c := caddy.NewTestController("dns", tc.in)
|
||||
@@ -76,30 +99,62 @@ func TestConfig(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatalf("Test %d: expected no error, got %s", i, err)
|
||||
}
|
||||
for i, tap := range taps {
|
||||
if x := tap.io.(*dio).endpoint; x != tc.expect[i].endpoint {
|
||||
t.Errorf("Test %d: expected endpoint %s, got %s", i, tc.expect[i].endpoint, x)
|
||||
for j, tap := range taps {
|
||||
if tc.expect[j].isListener {
|
||||
// Verify listener configuration
|
||||
if tap.listener == nil {
|
||||
t.Errorf("Test %d: expected listener to be set", i)
|
||||
continue
|
||||
}
|
||||
if x := tap.listener.endpoint; x != tc.expect[j].endpoint {
|
||||
t.Errorf("Test %d: expected listener endpoint %s, got %s", i, tc.expect[j].endpoint, x)
|
||||
}
|
||||
if x := tap.listener.proto; x != tc.expect[j].proto {
|
||||
t.Errorf("Test %d: expected listener proto %s, got %s", i, tc.expect[j].proto, x)
|
||||
}
|
||||
if x := tap.listener.certFile; x != tc.expect[j].certFile {
|
||||
t.Errorf("Test %d: expected listener certFile %s, got %s", i, tc.expect[j].certFile, x)
|
||||
}
|
||||
if x := tap.listener.keyFile; x != tc.expect[j].keyFile {
|
||||
t.Errorf("Test %d: expected listener keyFile %s, got %s", i, tc.expect[j].keyFile, x)
|
||||
}
|
||||
if x := tap.listener.caFile; x != tc.expect[j].caFile {
|
||||
t.Errorf("Test %d: expected listener caFile %s, got %s", i, tc.expect[j].caFile, x)
|
||||
}
|
||||
if x := tap.listener.skipVerify; x != tc.expect[j].skipVerify {
|
||||
t.Errorf("Test %d: expected listener skipVerify %t, got %t", i, tc.expect[j].skipVerify, x)
|
||||
}
|
||||
} else {
|
||||
// Verify outgoing connection configuration
|
||||
if tap.io == nil {
|
||||
t.Errorf("Test %d: expected io to be set", i)
|
||||
continue
|
||||
}
|
||||
if x := tap.io.(*dio).endpoint; x != tc.expect[j].endpoint {
|
||||
t.Errorf("Test %d: expected endpoint %s, got %s", i, tc.expect[j].endpoint, x)
|
||||
}
|
||||
if x := tap.io.(*dio).proto; x != tc.expect[j].proto {
|
||||
t.Errorf("Test %d: expected proto %s, got %s", i, tc.expect[j].proto, x)
|
||||
}
|
||||
if x := tap.MultipleTcpWriteBuf; x != tc.expect[j].multipleTcpWriteBuf {
|
||||
t.Errorf("Test %d: expected MultipleTcpWriteBuf %d, got %d", i, tc.expect[j].multipleTcpWriteBuf, x)
|
||||
}
|
||||
if x := tap.MultipleQueue; x != tc.expect[j].multipleQueue {
|
||||
t.Errorf("Test %d: expected MultipleQueue %d, got %d", i, tc.expect[j].multipleQueue, x)
|
||||
}
|
||||
}
|
||||
if x := tap.io.(*dio).proto; x != tc.expect[i].proto {
|
||||
t.Errorf("Test %d: expected proto %s, got %s", i, tc.expect[i].proto, x)
|
||||
// Common properties
|
||||
if x := tap.IncludeRawMessage; x != tc.expect[j].full {
|
||||
t.Errorf("Test %d: expected IncludeRawMessage %t, got %t", i, tc.expect[j].full, x)
|
||||
}
|
||||
if x := tap.IncludeRawMessage; x != tc.expect[i].full {
|
||||
t.Errorf("Test %d: expected IncludeRawMessage %t, got %t", i, tc.expect[i].full, x)
|
||||
if x := string(tap.Identity); x != string(tc.expect[j].identity) {
|
||||
t.Errorf("Test %d: expected identity %s, got %s", i, tc.expect[j].identity, x)
|
||||
}
|
||||
if x := string(tap.Identity); x != string(tc.expect[i].identity) {
|
||||
t.Errorf("Test %d: expected identity %s, got %s", i, tc.expect[i].identity, x)
|
||||
if x := string(tap.Version); x != string(tc.expect[j].version) {
|
||||
t.Errorf("Test %d: expected version %s, got %s", i, tc.expect[j].version, x)
|
||||
}
|
||||
if x := string(tap.Version); x != string(tc.expect[i].version) {
|
||||
t.Errorf("Test %d: expected version %s, got %s", i, tc.expect[i].version, x)
|
||||
}
|
||||
if x := tap.MultipleTcpWriteBuf; x != tc.expect[i].multipleTcpWriteBuf {
|
||||
t.Errorf("Test %d: expected MultipleTcpWriteBuf %d, got %d", i, tc.expect[i].multipleTcpWriteBuf, x)
|
||||
}
|
||||
if x := tap.MultipleQueue; x != tc.expect[i].multipleQueue {
|
||||
t.Errorf("Test %d: expected MultipleQueue %d, got %d", i, tc.expect[i].multipleQueue, x)
|
||||
}
|
||||
if x := tap.ExtraFormat; x != tc.expect[i].extraFormat {
|
||||
t.Errorf("Test %d: expected extra format %s, got %s", i, tc.expect[i].extraFormat, x)
|
||||
if x := tap.ExtraFormat; x != tc.expect[j].extraFormat {
|
||||
t.Errorf("Test %d: expected extra format %s, got %s", i, tc.expect[j].extraFormat, x)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user