mirror of
https://github.com/coredns/coredns.git
synced 2025-12-07 10:55:17 -05:00
plugin/dnstap: remove config struct (#4258)
* plugin/dnstap: remove config struct this struct is an uneeded intermidiate to get a dnstap it can be removed. Remove the dnstapio subpkg: it's also not needed. Make *many* functions and structs private now that we can. Signed-off-by: Miek Gieben <miek@miek.nl> * correct logging Signed-off-by: Miek Gieben <miek@miek.nl>
This commit is contained in:
121
plugin/dnstap/io.go
Normal file
121
plugin/dnstap/io.go
Normal file
@@ -0,0 +1,121 @@
|
||||
package dnstap
|
||||
|
||||
import (
|
||||
"net"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
tap "github.com/dnstap/golang-dnstap"
|
||||
)
|
||||
|
||||
const (
|
||||
tcpWriteBufSize = 1024 * 1024 // there is no good explanation for why this number has this value.
|
||||
queueSize = 10000 // idem.
|
||||
|
||||
tcpTimeout = 4 * time.Second
|
||||
flushTimeout = 1 * time.Second
|
||||
)
|
||||
|
||||
// tapper interface is used in testing to mock the Dnstap method.
|
||||
type tapper interface {
|
||||
Dnstap(tap.Dnstap)
|
||||
}
|
||||
|
||||
// dio implements the Tapper interface.
|
||||
type dio struct {
|
||||
endpoint string
|
||||
proto string
|
||||
conn net.Conn
|
||||
enc *encoder
|
||||
queue chan tap.Dnstap
|
||||
dropped uint32
|
||||
quit chan struct{}
|
||||
flushTimeout time.Duration
|
||||
tcpTimeout time.Duration
|
||||
}
|
||||
|
||||
// newIO returns a new and initialized pointer to a dio.
|
||||
func newIO(proto, endpoint string) *dio {
|
||||
return &dio{
|
||||
endpoint: endpoint,
|
||||
proto: proto,
|
||||
queue: make(chan tap.Dnstap, queueSize),
|
||||
quit: make(chan struct{}),
|
||||
flushTimeout: flushTimeout,
|
||||
tcpTimeout: tcpTimeout,
|
||||
}
|
||||
}
|
||||
|
||||
func (d *dio) dial() error {
|
||||
conn, err := net.DialTimeout(d.proto, d.endpoint, d.tcpTimeout)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if tcpConn, ok := conn.(*net.TCPConn); ok {
|
||||
tcpConn.SetWriteBuffer(tcpWriteBufSize)
|
||||
tcpConn.SetNoDelay(false)
|
||||
}
|
||||
|
||||
d.enc, err = newEncoder(conn, d.tcpTimeout)
|
||||
return err
|
||||
}
|
||||
|
||||
// Connect connects to the dnstap endpoint.
|
||||
func (d *dio) connect() error {
|
||||
err := d.dial()
|
||||
go d.serve()
|
||||
return err
|
||||
}
|
||||
|
||||
// Dnstap enqueues the payload for log.
|
||||
func (d *dio) Dnstap(payload tap.Dnstap) {
|
||||
select {
|
||||
case d.queue <- payload:
|
||||
default:
|
||||
atomic.AddUint32(&d.dropped, 1)
|
||||
}
|
||||
}
|
||||
|
||||
// close waits until the I/O routine is finished to return.
|
||||
func (d *dio) close() { close(d.quit) }
|
||||
|
||||
func (d *dio) write(payload *tap.Dnstap) error {
|
||||
if d.enc == nil {
|
||||
atomic.AddUint32(&d.dropped, 1)
|
||||
return nil
|
||||
}
|
||||
if err := d.enc.writeMsg(payload); err != nil {
|
||||
atomic.AddUint32(&d.dropped, 1)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *dio) serve() {
|
||||
timeout := time.After(d.flushTimeout)
|
||||
for {
|
||||
select {
|
||||
case <-d.quit:
|
||||
if d.enc == nil {
|
||||
return
|
||||
}
|
||||
d.enc.flush()
|
||||
d.enc.close()
|
||||
return
|
||||
case payload := <-d.queue:
|
||||
if err := d.write(&payload); err != nil {
|
||||
d.dial()
|
||||
}
|
||||
case <-timeout:
|
||||
if dropped := atomic.SwapUint32(&d.dropped, 0); dropped > 0 {
|
||||
log.Warningf("Dropped dnstap messages: %d", dropped)
|
||||
}
|
||||
if d.enc == nil {
|
||||
d.dial()
|
||||
} else {
|
||||
d.enc.flush()
|
||||
}
|
||||
timeout = time.After(d.flushTimeout)
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user