all: Use new Go 1.19 atomic types (#8772)

This commit is contained in:
greatroar 2023-02-07 12:07:34 +01:00 committed by GitHub
parent 882b711958
commit 38f2b34d29
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 94 additions and 121 deletions

View File

@ -20,7 +20,7 @@ import (
var ( var (
outboxesMut = sync.RWMutex{} outboxesMut = sync.RWMutex{}
outboxes = make(map[syncthingprotocol.DeviceID]chan interface{}) outboxes = make(map[syncthingprotocol.DeviceID]chan interface{})
numConnections int64 numConnections atomic.Int64
) )
func listener(_, addr string, config *tls.Config, token string) { func listener(_, addr string, config *tls.Config, token string) {
@ -128,7 +128,7 @@ func protocolConnectionHandler(tcpConn net.Conn, config *tls.Config, token strin
continue continue
} }
if atomic.LoadInt32(&overLimit) > 0 { if overLimit.Load() {
protocol.WriteMessage(conn, protocol.RelayFull{}) protocol.WriteMessage(conn, protocol.RelayFull{})
if debug { if debug {
log.Println("Refusing join request from", id, "due to being over limits") log.Println("Refusing join request from", id, "due to being over limits")
@ -267,7 +267,7 @@ func protocolConnectionHandler(tcpConn net.Conn, config *tls.Config, token strin
conn.Close() conn.Close()
} }
if atomic.LoadInt32(&overLimit) > 0 && !hasSessions(id) { if overLimit.Load() && !hasSessions(id) {
if debug { if debug {
log.Println("Dropping", id, "as it has no sessions and we are over our limits") log.Println("Dropping", id, "as it has no sessions and we are over our limits")
} }
@ -360,8 +360,8 @@ func sessionConnectionHandler(conn net.Conn) {
} }
func messageReader(conn net.Conn, messages chan<- interface{}, errors chan<- error) { func messageReader(conn net.Conn, messages chan<- interface{}, errors chan<- error) {
atomic.AddInt64(&numConnections, 1) numConnections.Add(1)
defer atomic.AddInt64(&numConnections, -1) defer numConnections.Add(-1)
for { for {
msg, err := protocol.ReadMessage(conn) msg, err := protocol.ReadMessage(conn)

View File

@ -49,7 +49,7 @@ var (
sessionLimitBps int sessionLimitBps int
globalLimitBps int globalLimitBps int
overLimit int32 overLimit atomic.Bool
descriptorLimit int64 descriptorLimit int64
sessionLimiter *rate.Limiter sessionLimiter *rate.Limiter
globalLimiter *rate.Limiter globalLimiter *rate.Limiter
@ -308,10 +308,10 @@ func main() {
func monitorLimits() { func monitorLimits() {
limitCheckTimer = time.NewTimer(time.Minute) limitCheckTimer = time.NewTimer(time.Minute)
for range limitCheckTimer.C { for range limitCheckTimer.C {
if atomic.LoadInt64(&numConnections)+atomic.LoadInt64(&numProxies) > descriptorLimit { if numConnections.Load()+numProxies.Load() > descriptorLimit {
atomic.StoreInt32(&overLimit, 1) overLimit.Store(true)
log.Println("Gone past our connection limits. Starting to refuse new/drop idle connections.") log.Println("Gone past our connection limits. Starting to refuse new/drop idle connections.")
} else if atomic.CompareAndSwapInt32(&overLimit, 1, 0) { } else if overLimit.CompareAndSwap(true, false) {
log.Println("Dropped below our connection limits. Accepting new connections.") log.Println("Dropped below our connection limits. Accepting new connections.")
} }
limitCheckTimer.Reset(time.Minute) limitCheckTimer.Reset(time.Minute)

View File

@ -23,8 +23,8 @@ var (
sessionMut = sync.RWMutex{} sessionMut = sync.RWMutex{}
activeSessions = make([]*session, 0) activeSessions = make([]*session, 0)
pendingSessions = make(map[string]*session) pendingSessions = make(map[string]*session)
numProxies int64 numProxies atomic.Int64
bytesProxied int64 bytesProxied atomic.Int64
) )
func newSession(serverid, clientid syncthingprotocol.DeviceID, sessionRateLimit, globalRateLimit *rate.Limiter) *session { func newSession(serverid, clientid syncthingprotocol.DeviceID, sessionRateLimit, globalRateLimit *rate.Limiter) *session {
@ -251,8 +251,8 @@ func (s *session) proxy(c1, c2 net.Conn) error {
log.Println("Proxy", c1.RemoteAddr(), "->", c2.RemoteAddr()) log.Println("Proxy", c1.RemoteAddr(), "->", c2.RemoteAddr())
} }
atomic.AddInt64(&numProxies, 1) numProxies.Add(1)
defer atomic.AddInt64(&numProxies, -1) defer numProxies.Add(-1)
buf := make([]byte, networkBufferSize) buf := make([]byte, networkBufferSize)
for { for {
@ -262,7 +262,7 @@ func (s *session) proxy(c1, c2 net.Conn) error {
return err return err
} }
atomic.AddInt64(&bytesProxied, int64(n)) bytesProxied.Add(int64(n))
if debug { if debug {
log.Printf("%d bytes from %s to %s", n, c1.RemoteAddr(), c2.RemoteAddr()) log.Printf("%d bytes from %s to %s", n, c1.RemoteAddr(), c2.RemoteAddr())

View File

@ -51,9 +51,9 @@ func getStatus(w http.ResponseWriter, _ *http.Request) {
status["numPendingSessionKeys"] = len(pendingSessions) status["numPendingSessionKeys"] = len(pendingSessions)
status["numActiveSessions"] = len(activeSessions) status["numActiveSessions"] = len(activeSessions)
sessionMut.Unlock() sessionMut.Unlock()
status["numConnections"] = atomic.LoadInt64(&numConnections) status["numConnections"] = numConnections.Load()
status["numProxies"] = atomic.LoadInt64(&numProxies) status["numProxies"] = numProxies.Load()
status["bytesProxied"] = atomic.LoadInt64(&bytesProxied) status["bytesProxied"] = bytesProxied.Load()
status["goVersion"] = runtime.Version() status["goVersion"] = runtime.Version()
status["goOS"] = runtime.GOOS status["goOS"] = runtime.GOOS
status["goArch"] = runtime.GOARCH status["goArch"] = runtime.GOARCH
@ -88,13 +88,13 @@ func getStatus(w http.ResponseWriter, _ *http.Request) {
} }
type rateCalculator struct { type rateCalculator struct {
counter *int64 // atomic, must remain 64-bit aligned counter *atomic.Int64
rates []int64 rates []int64
prev int64 prev int64
startTime time.Time startTime time.Time
} }
func newRateCalculator(keepIntervals int, interval time.Duration, counter *int64) *rateCalculator { func newRateCalculator(keepIntervals int, interval time.Duration, counter *atomic.Int64) *rateCalculator {
r := &rateCalculator{ r := &rateCalculator{
rates: make([]int64, keepIntervals), rates: make([]int64, keepIntervals),
counter: counter, counter: counter,
@ -112,7 +112,7 @@ func (r *rateCalculator) updateRates(interval time.Duration) {
next := now.Truncate(interval).Add(interval) next := now.Truncate(interval).Add(interval)
time.Sleep(next.Sub(now)) time.Sleep(next.Sub(now))
cur := atomic.LoadInt64(r.counter) cur := r.counter.Load()
rate := int64(float64(cur-r.prev) / interval.Seconds()) rate := int64(float64(cur-r.prev) / interval.Seconds())
copy(r.rates[1:], r.rates) copy(r.rates[1:], r.rates)
r.rates[0] = rate r.rates[0] = rate

View File

@ -44,7 +44,7 @@ func main() {
found := make(chan result) found := make(chan result)
stop := make(chan struct{}) stop := make(chan struct{})
var count int64 var count atomic.Int64
// Print periodic progress reports. // Print periodic progress reports.
go printProgress(prefix, &count) go printProgress(prefix, &count)
@ -72,7 +72,7 @@ func main() {
// Try certificates until one is found that has the prefix at the start of // Try certificates until one is found that has the prefix at the start of
// the resulting device ID. Increments count atomically, sends the result to // the resulting device ID. Increments count atomically, sends the result to
// found, returns when stop is closed. // found, returns when stop is closed.
func generatePrefixed(prefix string, count *int64, found chan<- result, stop <-chan struct{}) { func generatePrefixed(prefix string, count *atomic.Int64, found chan<- result, stop <-chan struct{}) {
notBefore := time.Now() notBefore := time.Now()
notAfter := time.Date(2049, 12, 31, 23, 59, 59, 0, time.UTC) notAfter := time.Date(2049, 12, 31, 23, 59, 59, 0, time.UTC)
@ -109,7 +109,7 @@ func generatePrefixed(prefix string, count *int64, found chan<- result, stop <-c
} }
id := protocol.NewDeviceID(derBytes) id := protocol.NewDeviceID(derBytes)
atomic.AddInt64(count, 1) count.Add(1)
if strings.HasPrefix(id.String(), prefix) { if strings.HasPrefix(id.String(), prefix) {
select { select {
@ -121,7 +121,7 @@ func generatePrefixed(prefix string, count *int64, found chan<- result, stop <-c
} }
} }
func printProgress(prefix string, count *int64) { func printProgress(prefix string, count *atomic.Int64) {
started := time.Now() started := time.Now()
wantBits := 5 * len(prefix) wantBits := 5 * len(prefix)
if wantBits > 63 { if wantBits > 63 {
@ -132,7 +132,7 @@ func printProgress(prefix string, count *int64) {
fmt.Printf("Want %d bits for prefix %q, about %.2g certs to test (statistically speaking)\n", wantBits, prefix, expectedIterations) fmt.Printf("Want %d bits for prefix %q, about %.2g certs to test (statistically speaking)\n", wantBits, prefix, expectedIterations)
for range time.NewTicker(15 * time.Second).C { for range time.NewTicker(15 * time.Second).C {
tried := atomic.LoadInt64(count) tried := count.Load()
elapsed := time.Since(started) elapsed := time.Since(started)
rate := float64(tried) / elapsed.Seconds() rate := float64(tried) / elapsed.Seconds()
expected := timeStr(expectedIterations / rate) expected := timeStr(expectedIterations / rate)

View File

@ -134,7 +134,7 @@ type wrapper struct {
subs []Committer subs []Committer
mut sync.Mutex mut sync.Mutex
requiresRestart uint32 // an atomic bool requiresRestart atomic.Bool
} }
// Wrap wraps an existing Configuration structure and ties it to a file on // Wrap wraps an existing Configuration structure and ties it to a file on
@ -340,7 +340,7 @@ func (w *wrapper) notifyListener(sub Committer, from, to Configuration) {
l.Debugln(sub, "committing configuration") l.Debugln(sub, "committing configuration")
if !sub.CommitConfiguration(from, to) { if !sub.CommitConfiguration(from, to) {
l.Debugln(sub, "requires restart") l.Debugln(sub, "requires restart")
w.setRequiresRestart() w.requiresRestart.Store(true)
} }
} }
@ -525,13 +525,7 @@ func (w *wrapper) Save() error {
return nil return nil
} }
func (w *wrapper) RequiresRestart() bool { func (w *wrapper) RequiresRestart() bool { return w.requiresRestart.Load() }
return atomic.LoadUint32(&w.requiresRestart) != 0
}
func (w *wrapper) setRequiresRestart() {
atomic.StoreUint32(&w.requiresRestart, 1)
}
type modifyEntry struct { type modifyEntry struct {
modifyFunc ModifyFunction modifyFunc ModifyFunction

View File

@ -25,7 +25,7 @@ type limiter struct {
mu sync.Mutex mu sync.Mutex
write *rate.Limiter write *rate.Limiter
read *rate.Limiter read *rate.Limiter
limitsLAN atomicBool limitsLAN atomic.Bool
deviceReadLimiters map[protocol.DeviceID]*rate.Limiter deviceReadLimiters map[protocol.DeviceID]*rate.Limiter
deviceWriteLimiters map[protocol.DeviceID]*rate.Limiter deviceWriteLimiters map[protocol.DeviceID]*rate.Limiter
} }
@ -157,7 +157,7 @@ func (lim *limiter) CommitConfiguration(from, to config.Configuration) bool {
limited = true limited = true
} }
lim.limitsLAN.set(to.Options.LimitBandwidthInLan) lim.limitsLAN.Store(to.Options.LimitBandwidthInLan)
l.Infof("Overall send rate %s, receive rate %s", sendLimitStr, recvLimitStr) l.Infof("Overall send rate %s, receive rate %s", sendLimitStr, recvLimitStr)
@ -282,13 +282,13 @@ func (w *limitedWriter) Write(buf []byte) (int, error) {
// waiter, valid for both writers and readers // waiter, valid for both writers and readers
type waiterHolder struct { type waiterHolder struct {
waiter waiter waiter waiter
limitsLAN *atomicBool limitsLAN *atomic.Bool
isLAN bool isLAN bool
} }
// unlimited returns true if the waiter is not limiting the rate // unlimited returns true if the waiter is not limiting the rate
func (w waiterHolder) unlimited() bool { func (w waiterHolder) unlimited() bool {
if w.isLAN && !w.limitsLAN.get() { if w.isLAN && !w.limitsLAN.Load() {
return true return true
} }
return w.waiter.Limit() == rate.Inf return w.waiter.Limit() == rate.Inf
@ -322,20 +322,6 @@ func (w waiterHolder) take(tokens int) {
} }
} }
type atomicBool int32
func (b *atomicBool) set(v bool) {
if v {
atomic.StoreInt32((*int32)(b), 1)
} else {
atomic.StoreInt32((*int32)(b), 0)
}
}
func (b *atomicBool) get() bool {
return atomic.LoadInt32((*int32)(b)) != 0
}
// totalWaiter waits for all of the waiters // totalWaiter waits for all of the waiters
type totalWaiter []waiter type totalWaiter []waiter

View File

@ -12,6 +12,7 @@ import (
crand "crypto/rand" crand "crypto/rand"
"io" "io"
"math/rand" "math/rand"
"sync/atomic"
"testing" "testing"
"github.com/syncthing/syncthing/lib/config" "github.com/syncthing/syncthing/lib/config"
@ -234,7 +235,7 @@ func TestLimitedWriterWrite(t *testing.T) {
writer: cw, writer: cw,
waiterHolder: waiterHolder{ waiterHolder: waiterHolder{
waiter: rate.NewLimiter(rate.Limit(42), limiterBurstSize), waiter: rate.NewLimiter(rate.Limit(42), limiterBurstSize),
limitsLAN: new(atomicBool), limitsLAN: new(atomic.Bool),
isLAN: false, // enables limiting isLAN: false, // enables limiting
}, },
} }
@ -263,7 +264,7 @@ func TestLimitedWriterWrite(t *testing.T) {
writer: cw, writer: cw,
waiterHolder: waiterHolder{ waiterHolder: waiterHolder{
waiter: rate.NewLimiter(rate.Limit(42), limiterBurstSize), waiter: rate.NewLimiter(rate.Limit(42), limiterBurstSize),
limitsLAN: new(atomicBool), limitsLAN: new(atomic.Bool),
isLAN: true, // disables limiting isLAN: true, // disables limiting
}, },
} }
@ -287,7 +288,7 @@ func TestLimitedWriterWrite(t *testing.T) {
writer: cw, writer: cw,
waiterHolder: waiterHolder{ waiterHolder: waiterHolder{
waiter: totalWaiter{rate.NewLimiter(rate.Inf, limiterBurstSize), rate.NewLimiter(rate.Inf, limiterBurstSize)}, waiter: totalWaiter{rate.NewLimiter(rate.Inf, limiterBurstSize), rate.NewLimiter(rate.Inf, limiterBurstSize)},
limitsLAN: new(atomicBool), limitsLAN: new(atomic.Bool),
isLAN: false, // enables limiting isLAN: false, // enables limiting
}, },
} }
@ -315,7 +316,7 @@ func TestLimitedWriterWrite(t *testing.T) {
rate.NewLimiter(rate.Limit(42), limiterBurstSize), rate.NewLimiter(rate.Limit(42), limiterBurstSize),
rate.NewLimiter(rate.Inf, limiterBurstSize), rate.NewLimiter(rate.Inf, limiterBurstSize),
}, },
limitsLAN: new(atomicBool), limitsLAN: new(atomic.Bool),
isLAN: false, // enables limiting isLAN: false, // enables limiting
}, },
} }

View File

@ -36,7 +36,7 @@ func init() {
type quicListener struct { type quicListener struct {
svcutil.ServiceWithError svcutil.ServiceWithError
nat atomic.Value nat atomic.Uint64 // Holds a stun.NATType.
onAddressesChangedNotifier onAddressesChangedNotifier
@ -56,7 +56,7 @@ func (t *quicListener) OnNATTypeChanged(natType stun.NATType) {
if natType != stun.NATUnknown { if natType != stun.NATUnknown {
l.Infof("%s detected NAT type: %s", t.uri, natType) l.Infof("%s detected NAT type: %s", t.uri, natType)
} }
t.nat.Store(natType) t.nat.Store(uint64(natType))
} }
func (t *quicListener) OnExternalAddressChanged(address *stun.Host, via string) { func (t *quicListener) OnExternalAddressChanged(address *stun.Host, via string) {
@ -205,7 +205,7 @@ func (t *quicListener) Factory() listenerFactory {
} }
func (t *quicListener) NATType() string { func (t *quicListener) NATType() string {
v := t.nat.Load().(stun.NATType) v := stun.NATType(t.nat.Load())
if v == stun.NATUnknown || v == stun.NATError { if v == stun.NATUnknown || v == stun.NATError {
return "unknown" return "unknown"
} }
@ -228,7 +228,7 @@ func (f *quicListenerFactory) New(uri *url.URL, cfg config.Wrapper, tlsCfg *tls.
registry: registry, registry: registry,
} }
l.ServiceWithError = svcutil.AsService(l.serve, l.String()) l.ServiceWithError = svcutil.AsService(l.serve, l.String())
l.nat.Store(stun.NATUnknown) l.nat.Store(uint64(stun.NATUnknown))
return l return l
} }

View File

@ -13,7 +13,6 @@ import (
"math/rand" "math/rand"
"path/filepath" "path/filepath"
"sort" "sort"
"sync/atomic"
"time" "time"
"github.com/syncthing/syncthing/lib/config" "github.com/syncthing/syncthing/lib/config"
@ -142,8 +141,8 @@ func newFolder(model *model, fset *db.FileSet, ignores *ignore.Matcher, cfg conf
} }
func (f *folder) Serve(ctx context.Context) error { func (f *folder) Serve(ctx context.Context) error {
atomic.AddInt32(&f.model.foldersRunning, 1) f.model.foldersRunning.Add(1)
defer atomic.AddInt32(&f.model.foldersRunning, -1) defer f.model.foldersRunning.Add(-1)
f.ctx = ctx f.ctx = ctx

View File

@ -23,6 +23,7 @@ import (
"runtime" "runtime"
"strings" "strings"
stdsync "sync" stdsync "sync"
"sync/atomic"
"time" "time"
"github.com/thejerf/suture/v4" "github.com/thejerf/suture/v4"
@ -166,7 +167,7 @@ type model struct {
indexHandlers map[protocol.DeviceID]*indexHandlerRegistry indexHandlers map[protocol.DeviceID]*indexHandlerRegistry
// for testing only // for testing only
foldersRunning int32 foldersRunning atomic.Int32
} }
var _ config.Verifier = &model{} var _ config.Verifier = &model{}

View File

@ -21,7 +21,6 @@ import (
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
"sync/atomic"
"testing" "testing"
"time" "time"
@ -3097,7 +3096,7 @@ func TestFolderRestartZombies(t *testing.T) {
m.ScanFolder("default") m.ScanFolder("default")
// Check how many running folders we have running before the test. // Check how many running folders we have running before the test.
if r := atomic.LoadInt32(&m.foldersRunning); r != 1 { if r := m.foldersRunning.Load(); r != 1 {
t.Error("Expected one running folder, not", r) t.Error("Expected one running folder, not", r)
} }
@ -3122,7 +3121,7 @@ func TestFolderRestartZombies(t *testing.T) {
wg.Wait() wg.Wait()
// Make sure the folder is up and running, because we want to count it. // Make sure the folder is up and running, because we want to count it.
m.ScanFolder("default") m.ScanFolder("default")
if r := atomic.LoadInt32(&m.foldersRunning); r != 1 { if r := m.foldersRunning.Load(); r != 1 {
t.Error("Expected one running folder, not", r) t.Error("Expected one running folder, not", r)
} }
} }

View File

@ -13,24 +13,24 @@ import (
var BufferPool bufferPool var BufferPool bufferPool
type bufferPool struct { type bufferPool struct {
puts int64 puts atomic.Int64
skips int64 skips atomic.Int64
misses int64 misses atomic.Int64
pools []sync.Pool pools []sync.Pool
hits []int64 // start of slice allocation is always aligned hits []atomic.Int64
} }
func newBufferPool() bufferPool { func newBufferPool() bufferPool {
return bufferPool{ return bufferPool{
pools: make([]sync.Pool, len(BlockSizes)), pools: make([]sync.Pool, len(BlockSizes)),
hits: make([]int64, len(BlockSizes)), hits: make([]atomic.Int64, len(BlockSizes)),
} }
} }
func (p *bufferPool) Get(size int) []byte { func (p *bufferPool) Get(size int) []byte {
// Too big, isn't pooled // Too big, isn't pooled
if size > MaxBlockSize { if size > MaxBlockSize {
atomic.AddInt64(&p.skips, 1) p.skips.Add(1)
return make([]byte, size) return make([]byte, size)
} }
@ -38,13 +38,13 @@ func (p *bufferPool) Get(size int) []byte {
bkt := getBucketForLen(size) bkt := getBucketForLen(size)
for j := bkt; j < len(BlockSizes); j++ { for j := bkt; j < len(BlockSizes); j++ {
if intf := p.pools[j].Get(); intf != nil { if intf := p.pools[j].Get(); intf != nil {
atomic.AddInt64(&p.hits[j], 1) p.hits[j].Add(1)
bs := *intf.(*[]byte) bs := *intf.(*[]byte)
return bs[:size] return bs[:size]
} }
} }
atomic.AddInt64(&p.misses, 1) p.misses.Add(1)
// All pools are empty, must allocate. For very small slices where we // All pools are empty, must allocate. For very small slices where we
// didn't have a block to reuse, just allocate a small slice instead of // didn't have a block to reuse, just allocate a small slice instead of
@ -60,11 +60,11 @@ func (p *bufferPool) Get(size int) []byte {
func (p *bufferPool) Put(bs []byte) { func (p *bufferPool) Put(bs []byte) {
// Don't buffer slices outside of our pool range // Don't buffer slices outside of our pool range
if cap(bs) > MaxBlockSize || cap(bs) < MinBlockSize { if cap(bs) > MaxBlockSize || cap(bs) < MinBlockSize {
atomic.AddInt64(&p.skips, 1) p.skips.Add(1)
return return
} }
atomic.AddInt64(&p.puts, 1) p.puts.Add(1)
bkt := putBucketForCap(cap(bs)) bkt := putBucketForCap(cap(bs))
p.pools[bkt].Put(&bs) p.pools[bkt].Put(&bs)
} }

View File

@ -108,13 +108,13 @@ func TestStressBufferPool(t *testing.T) {
default: default:
} }
t.Log(bp.puts, bp.skips, bp.misses, bp.hits) t.Log(bp.puts.Load(), bp.skips.Load(), bp.misses.Load(), bp.hits)
if bp.puts == 0 || bp.skips == 0 || bp.misses == 0 { if bp.puts.Load() == 0 || bp.skips.Load() == 0 || bp.misses.Load() == 0 {
t.Error("didn't exercise some paths") t.Error("didn't exercise some paths")
} }
var hits int64 var hits int64
for _, h := range bp.hits { for i := range bp.hits {
hits += h hits += bp.hits[i].Load()
} }
if hits == 0 { if hits == 0 {
t.Error("didn't exercise some paths") t.Error("didn't exercise some paths")

View File

@ -10,53 +10,49 @@ import (
type countingReader struct { type countingReader struct {
io.Reader io.Reader
tot int64 // bytes (atomic, must remain 64-bit aligned) tot atomic.Int64 // bytes
last int64 // unix nanos (atomic, must remain 64-bit aligned) last atomic.Int64 // unix nanos
} }
var ( var (
totalIncoming int64 totalIncoming atomic.Int64
totalOutgoing int64 totalOutgoing atomic.Int64
) )
func (c *countingReader) Read(bs []byte) (int, error) { func (c *countingReader) Read(bs []byte) (int, error) {
n, err := c.Reader.Read(bs) n, err := c.Reader.Read(bs)
atomic.AddInt64(&c.tot, int64(n)) c.tot.Add(int64(n))
atomic.AddInt64(&totalIncoming, int64(n)) totalIncoming.Add(int64(n))
atomic.StoreInt64(&c.last, time.Now().UnixNano()) c.last.Store(time.Now().UnixNano())
return n, err return n, err
} }
func (c *countingReader) Tot() int64 { func (c *countingReader) Tot() int64 { return c.tot.Load() }
return atomic.LoadInt64(&c.tot)
}
func (c *countingReader) Last() time.Time { func (c *countingReader) Last() time.Time {
return time.Unix(0, atomic.LoadInt64(&c.last)) return time.Unix(0, c.last.Load())
} }
type countingWriter struct { type countingWriter struct {
io.Writer io.Writer
tot int64 // bytes (atomic, must remain 64-bit aligned) tot atomic.Int64 // bytes
last int64 // unix nanos (atomic, must remain 64-bit aligned) last atomic.Int64 // unix nanos
} }
func (c *countingWriter) Write(bs []byte) (int, error) { func (c *countingWriter) Write(bs []byte) (int, error) {
n, err := c.Writer.Write(bs) n, err := c.Writer.Write(bs)
atomic.AddInt64(&c.tot, int64(n)) c.tot.Add(int64(n))
atomic.AddInt64(&totalOutgoing, int64(n)) totalOutgoing.Add(int64(n))
atomic.StoreInt64(&c.last, time.Now().UnixNano()) c.last.Store(time.Now().UnixNano())
return n, err return n, err
} }
func (c *countingWriter) Tot() int64 { func (c *countingWriter) Tot() int64 { return c.tot.Load() }
return atomic.LoadInt64(&c.tot)
}
func (c *countingWriter) Last() time.Time { func (c *countingWriter) Last() time.Time {
return time.Unix(0, atomic.LoadInt64(&c.last)) return time.Unix(0, c.last.Load())
} }
func TotalInOut() (int64, int64) { func TotalInOut() (int64, int64) {
return atomic.LoadInt64(&totalIncoming), atomic.LoadInt64(&totalOutgoing) return totalIncoming.Load(), totalOutgoing.Load()
} }

View File

@ -468,9 +468,9 @@ func TestWriteCompressed(t *testing.T) {
hdr := Header{Type: typeOf(msg)} hdr := Header{Type: typeOf(msg)}
size := int64(2 + hdr.ProtoSize() + 4 + msg.ProtoSize()) size := int64(2 + hdr.ProtoSize() + 4 + msg.ProtoSize())
if c.cr.tot > size { if c.cr.Tot() > size {
t.Errorf("compression enlarged message from %d to %d", t.Errorf("compression enlarged message from %d to %d",
size, c.cr.tot) size, c.cr.Tot())
} }
} }
} }

View File

@ -628,7 +628,7 @@ func (w *walker) String() string {
// A byteCounter gets bytes added to it via Update() and then provides the // A byteCounter gets bytes added to it via Update() and then provides the
// Total() and one minute moving average Rate() in bytes per second. // Total() and one minute moving average Rate() in bytes per second.
type byteCounter struct { type byteCounter struct {
total int64 // atomic, must remain 64-bit aligned total atomic.Int64
metrics.EWMA metrics.EWMA
stop chan struct{} stop chan struct{}
} }
@ -658,13 +658,11 @@ func (c *byteCounter) ticker() {
} }
func (c *byteCounter) Update(bytes int64) { func (c *byteCounter) Update(bytes int64) {
atomic.AddInt64(&c.total, bytes) c.total.Add(bytes)
c.EWMA.Update(bytes) c.EWMA.Update(bytes)
} }
func (c *byteCounter) Total() int64 { func (c *byteCounter) Total() int64 { return c.total.Load() }
return atomic.LoadInt64(&c.total)
}
func (c *byteCounter) Close() { func (c *byteCounter) Close() {
close(c.stop) close(c.stop)

View File

@ -39,36 +39,35 @@ const (
) )
type writeTrackingUdpConn struct { type writeTrackingUdpConn struct {
lastWrite int64 // atomic, must remain 64-bit aligned
// Needs to be UDPConn not PacketConn, as pfilter checks for WriteMsgUDP/ReadMsgUDP // Needs to be UDPConn not PacketConn, as pfilter checks for WriteMsgUDP/ReadMsgUDP
// and even if we embed UDPConn here, in place of a PacketConn, seems the interface // and even if we embed UDPConn here, in place of a PacketConn, seems the interface
// check fails. // check fails.
*net.UDPConn *net.UDPConn
lastWrite atomic.Int64
} }
func (c *writeTrackingUdpConn) WriteTo(p []byte, addr net.Addr) (n int, err error) { func (c *writeTrackingUdpConn) WriteTo(p []byte, addr net.Addr) (n int, err error) {
atomic.StoreInt64(&c.lastWrite, time.Now().Unix()) c.lastWrite.Store(time.Now().Unix())
return c.UDPConn.WriteTo(p, addr) return c.UDPConn.WriteTo(p, addr)
} }
func (c *writeTrackingUdpConn) WriteMsgUDP(b, oob []byte, addr *net.UDPAddr) (n, oobn int, err error) { func (c *writeTrackingUdpConn) WriteMsgUDP(b, oob []byte, addr *net.UDPAddr) (n, oobn int, err error) {
atomic.StoreInt64(&c.lastWrite, time.Now().Unix()) c.lastWrite.Store(time.Now().Unix())
return c.UDPConn.WriteMsgUDP(b, oob, addr) return c.UDPConn.WriteMsgUDP(b, oob, addr)
} }
func (c *writeTrackingUdpConn) WriteToUDP(b []byte, addr *net.UDPAddr) (int, error) { func (c *writeTrackingUdpConn) WriteToUDP(b []byte, addr *net.UDPAddr) (int, error) {
atomic.StoreInt64(&c.lastWrite, time.Now().Unix()) c.lastWrite.Store(time.Now().Unix())
return c.UDPConn.WriteToUDP(b, addr) return c.UDPConn.WriteToUDP(b, addr)
} }
func (c *writeTrackingUdpConn) Write(b []byte) (int, error) { func (c *writeTrackingUdpConn) Write(b []byte) (int, error) {
atomic.StoreInt64(&c.lastWrite, time.Now().Unix()) c.lastWrite.Store(time.Now().Unix())
return c.UDPConn.Write(b) return c.UDPConn.Write(b)
} }
func (c *writeTrackingUdpConn) getLastWrite() time.Time { func (c *writeTrackingUdpConn) getLastWrite() time.Time {
unix := atomic.LoadInt64(&c.lastWrite) return time.Unix(c.lastWrite.Load(), 0)
return time.Unix(unix, 0)
} }
type Subscriber interface { type Subscriber interface {
@ -91,7 +90,7 @@ type Service struct {
func New(cfg config.Wrapper, subscriber Subscriber, conn *net.UDPConn) (*Service, net.PacketConn) { func New(cfg config.Wrapper, subscriber Subscriber, conn *net.UDPConn) (*Service, net.PacketConn) {
// Wrap the original connection to track writes on it // Wrap the original connection to track writes on it
writeTrackingUdpConn := &writeTrackingUdpConn{lastWrite: 0, UDPConn: conn} writeTrackingUdpConn := &writeTrackingUdpConn{UDPConn: conn}
// Wrap it in a filter and split it up, so that stun packets arrive on stun conn, others arrive on the data conn // Wrap it in a filter and split it up, so that stun packets arrive on stun conn, others arrive on the data conn
filterConn := pfilter.NewPacketFilter(writeTrackingUdpConn) filterConn := pfilter.NewPacketFilter(writeTrackingUdpConn)

View File

@ -116,16 +116,16 @@ type loggedRWMutex struct {
readHolders map[int][]holder readHolders map[int][]holder
readHoldersMut sync.Mutex readHoldersMut sync.Mutex
logUnlockers int32 logUnlockers atomic.Bool
unlockers chan holder unlockers chan holder
} }
func (m *loggedRWMutex) Lock() { func (m *loggedRWMutex) Lock() {
start := timeNow() start := timeNow()
atomic.StoreInt32(&m.logUnlockers, 1) m.logUnlockers.Store(true)
m.RWMutex.Lock() m.RWMutex.Lock()
atomic.StoreInt32(&m.logUnlockers, 0) m.logUnlockers.Store(false)
holder := getHolder() holder := getHolder()
m.holder.Store(holder) m.holder.Store(holder)
@ -173,7 +173,7 @@ func (m *loggedRWMutex) RUnlock() {
m.readHolders[id] = current[:len(current)-1] m.readHolders[id] = current[:len(current)-1]
} }
m.readHoldersMut.Unlock() m.readHoldersMut.Unlock()
if atomic.LoadInt32(&m.logUnlockers) == 1 { if m.logUnlockers.Load() {
holder := getHolder() holder := getHolder()
select { select {
case m.unlockers <- holder: case m.unlockers <- holder: