syncthing/lib/ur/failurereporting.go

240 lines
5.8 KiB
Go

// Copyright (C) 2020 The Syncthing Authors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at https://mozilla.org/MPL/2.0/.
package ur
import (
"bytes"
"context"
"encoding/json"
"net/http"
"runtime/pprof"
"strings"
"time"
"github.com/syncthing/syncthing/lib/build"
"github.com/syncthing/syncthing/lib/config"
"github.com/syncthing/syncthing/lib/dialer"
"github.com/syncthing/syncthing/lib/events"
"github.com/syncthing/syncthing/lib/svcutil"
"github.com/thejerf/suture/v4"
)
var (
// When a specific failure first occurs, it is delayed by minDelay. If
// more of the same failures occurs those are further delayed and
// aggregated for maxDelay.
minDelay = 10 * time.Second
maxDelay = time.Minute
sendTimeout = time.Minute
finalSendTimeout = svcutil.ServiceTimeout / 2
evChanClosed = "failure event channel closed"
invalidEventDataType = "failure event data is not a string"
)
type FailureReport struct {
FailureData
Count int
Version string
}
type FailureData struct {
Description string
Goroutines string
Extra map[string]string
}
func FailureDataWithGoroutines(description string) FailureData {
var buf strings.Builder
pprof.Lookup("goroutine").WriteTo(&buf, 1)
return FailureData{
Description: description,
Goroutines: buf.String(),
Extra: make(map[string]string),
}
}
type FailureHandler interface {
suture.Service
config.Committer
}
func NewFailureHandler(cfg config.Wrapper, evLogger events.Logger) FailureHandler {
return &failureHandler{
cfg: cfg,
evLogger: evLogger,
optsChan: make(chan config.OptionsConfiguration),
buf: make(map[string]*failureStat),
}
}
type failureHandler struct {
cfg config.Wrapper
evLogger events.Logger
optsChan chan config.OptionsConfiguration
buf map[string]*failureStat
}
type failureStat struct {
first, last time.Time
count int
data FailureData
}
func (h *failureHandler) Serve(ctx context.Context) error {
cfg := h.cfg.Subscribe(h)
defer h.cfg.Unsubscribe(h)
url, sub, evChan := h.applyOpts(cfg.Options, nil)
var err error
timer := time.NewTimer(minDelay)
resetTimer := make(chan struct{})
for err == nil {
select {
case opts := <-h.optsChan:
url, sub, evChan = h.applyOpts(opts, sub)
case e, ok := <-evChan:
if !ok {
// Just to be safe - shouldn't ever happen, as
// evChan is set to nil when unsubscribing.
h.addReport(FailureData{Description: evChanClosed}, time.Now())
evChan = nil
continue
}
var data FailureData
switch d := e.Data.(type) {
case string:
data.Description = d
case FailureData:
data = d
default:
// Same here, shouldn't ever happen.
h.addReport(FailureData{Description: invalidEventDataType}, time.Now())
continue
}
h.addReport(data, e.Time)
case <-timer.C:
reports := make([]FailureReport, 0, len(h.buf))
now := time.Now()
for descr, stat := range h.buf {
if now.Sub(stat.last) > minDelay || now.Sub(stat.first) > maxDelay {
reports = append(reports, newFailureReport(stat))
delete(h.buf, descr)
}
}
if len(reports) > 0 {
// Lets keep process events/configs while it might be timing out for a while
go func() {
sendFailureReports(ctx, reports, url)
select {
case resetTimer <- struct{}{}:
case <-ctx.Done():
}
}()
} else {
timer.Reset(minDelay)
}
case <-resetTimer:
timer.Reset(minDelay)
case <-ctx.Done():
err = ctx.Err()
}
}
if sub != nil {
sub.Unsubscribe()
if len(h.buf) > 0 {
reports := make([]FailureReport, 0, len(h.buf))
for _, stat := range h.buf {
reports = append(reports, newFailureReport(stat))
}
timeout, cancel := context.WithTimeout(context.Background(), finalSendTimeout)
defer cancel()
sendFailureReports(timeout, reports, url)
}
}
return err
}
func (h *failureHandler) applyOpts(opts config.OptionsConfiguration, sub events.Subscription) (string, events.Subscription, <-chan events.Event) {
// Sub nil checks just for safety - config updates can be racy.
url := opts.CRURL + "/failure"
if opts.URAccepted > 0 {
if sub == nil {
sub = h.evLogger.Subscribe(events.Failure)
}
return url, sub, sub.C()
}
if sub != nil {
sub.Unsubscribe()
}
return url, nil, nil
}
func (h *failureHandler) addReport(data FailureData, evTime time.Time) {
if stat, ok := h.buf[data.Description]; ok {
stat.last = evTime
stat.count++
return
}
h.buf[data.Description] = &failureStat{
first: evTime,
last: evTime,
count: 1,
data: data,
}
}
func (h *failureHandler) CommitConfiguration(from, to config.Configuration) bool {
if from.Options.CREnabled != to.Options.CREnabled || from.Options.CRURL != to.Options.CRURL {
h.optsChan <- to.Options
}
return true
}
func (*failureHandler) String() string {
return "FailureHandler"
}
func sendFailureReports(ctx context.Context, reports []FailureReport, url string) {
var b bytes.Buffer
if err := json.NewEncoder(&b).Encode(reports); err != nil {
panic(err)
}
client := &http.Client{
Transport: &http.Transport{
DialContext: dialer.DialContext,
Proxy: http.ProxyFromEnvironment,
},
}
reqCtx, reqCancel := context.WithTimeout(ctx, sendTimeout)
defer reqCancel()
req, err := http.NewRequestWithContext(reqCtx, http.MethodPost, url, &b)
if err != nil {
l.Infoln("Failed to send failure report:", err)
return
}
req.Header.Set("Content-Type", "application/json")
resp, err := client.Do(req)
if err != nil {
l.Infoln("Failed to send failure report:", err)
return
}
resp.Body.Close()
}
func newFailureReport(stat *failureStat) FailureReport {
return FailureReport{
FailureData: stat.data,
Count: stat.count,
Version: build.LongVersion,
}
}