cmd/stdiscosrv: Add replication heartbeats (fixes #5117) (#5120)

This commit is contained in:
Paweł Rozlach 2018-08-15 16:52:20 +02:00 committed by Jakob Borg
parent 166a33037f
commit f6da436f4b
1 changed files with 23 additions and 1 deletions

View File

@ -18,6 +18,9 @@ import (
"github.com/syncthing/syncthing/lib/protocol"
)
const replicationReadTimeout = time.Minute
const replicationHeartbeatInterval = time.Second * 30
type replicator interface {
send(key string, addrs []DatabaseAddress, seen int64)
}
@ -79,10 +82,22 @@ func (s *replicationSender) Serve() {
return
}
heartBeatTicker := time.NewTicker(replicationHeartbeatInterval)
defer heartBeatTicker.Stop()
// Send records.
buf := make([]byte, 1024)
for {
select {
case <-heartBeatTicker.C:
if len(s.outbox) > 0 {
// No need to send heartbeats if there are events/prevrious
// heartbeats to send, they will keep the connection alive.
continue
}
// Empty replication message is the heartbeat:
s.outbox <- ReplicationRecord{}
case rec := <-s.outbox:
// Buffer must hold record plus four bytes for size
size := rec.Size()
@ -106,6 +121,7 @@ func (s *replicationSender) Serve() {
if _, err := conn.Write(buf[:4+n]); err != nil {
replicationSendsTotal.WithLabelValues("error").Inc()
log.Println("Replication write:", err)
// Yes, we are loosing the replication event here.
return
}
replicationSendsTotal.WithLabelValues("success").Inc()
@ -242,7 +258,7 @@ func (l *replicationListener) handle(conn net.Conn) {
default:
}
conn.SetReadDeadline(time.Now().Add(time.Minute))
conn.SetReadDeadline(time.Now().Add(replicationReadTimeout))
// First four bytes are the size
if _, err := io.ReadFull(conn, buf[:4]); err != nil {
@ -256,6 +272,12 @@ func (l *replicationListener) handle(conn net.Conn) {
if len(buf) < size {
buf = make([]byte, size)
}
if size == 0 {
// Heartbeat, ignore
continue
}
if _, err := io.ReadFull(conn, buf[:size]); err != nil {
log.Println("Replication read record:", err)
replicationRecvsTotal.WithLabelValues("error").Inc()