syncthing/lib/events/events.go

301 lines
5.6 KiB
Go
Raw Normal View History

2014-11-16 21:13:20 +01:00
// Copyright (C) 2014 The Syncthing Authors.
2014-09-29 21:43:32 +02:00
//
2015-03-07 21:36:35 +01:00
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at http://mozilla.org/MPL/2.0/.
2014-07-25 14:50:14 +02:00
2014-07-13 21:07:24 +02:00
// Package events provides event subscription and polling functionality.
package events
import (
"errors"
2015-04-23 00:54:31 +02:00
stdsync "sync"
2014-07-13 21:07:24 +02:00
"time"
2015-04-23 00:54:31 +02:00
2015-08-06 11:29:25 +02:00
"github.com/syncthing/syncthing/lib/sync"
2014-07-13 21:07:24 +02:00
)
type EventType int
2014-07-13 21:07:24 +02:00
const (
Ping EventType = 1 << iota
2014-07-17 13:38:36 +02:00
Starting
2014-07-13 21:07:24 +02:00
StartupComplete
DeviceDiscovered
DeviceConnected
DeviceDisconnected
DeviceRejected
2015-08-23 21:56:10 +02:00
DevicePaused
DeviceResumed
2014-07-13 21:07:24 +02:00
LocalIndexUpdated
RemoteIndexUpdated
ItemStarted
2015-02-01 18:31:19 +01:00
ItemFinished
2014-07-17 13:38:36 +02:00
StateChanged
FolderRejected
2014-09-06 17:31:23 +02:00
ConfigSaved
DownloadProgress
FolderSummary
FolderCompletion
FolderErrors
2015-08-27 00:49:06 +02:00
FolderScanProgress
2015-09-12 21:59:15 +02:00
ExternalPortMappingChanged
RelayStateChanged
2014-07-13 21:07:24 +02:00
AllEvents = (1 << iota) - 1
2014-07-13 21:07:24 +02:00
)
func (t EventType) String() string {
switch t {
case Ping:
return "Ping"
2014-07-17 13:38:36 +02:00
case Starting:
return "Starting"
2014-07-13 21:07:24 +02:00
case StartupComplete:
return "StartupComplete"
case DeviceDiscovered:
return "DeviceDiscovered"
case DeviceConnected:
return "DeviceConnected"
case DeviceDisconnected:
return "DeviceDisconnected"
case DeviceRejected:
return "DeviceRejected"
2014-07-13 21:07:24 +02:00
case LocalIndexUpdated:
return "LocalIndexUpdated"
case RemoteIndexUpdated:
return "RemoteIndexUpdated"
case ItemStarted:
return "ItemStarted"
2015-02-01 18:31:19 +01:00
case ItemFinished:
return "ItemFinished"
2014-07-17 13:38:36 +02:00
case StateChanged:
return "StateChanged"
case FolderRejected:
return "FolderRejected"
2014-09-06 17:31:23 +02:00
case ConfigSaved:
return "ConfigSaved"
case DownloadProgress:
return "DownloadProgress"
case FolderSummary:
return "FolderSummary"
case FolderCompletion:
return "FolderCompletion"
case FolderErrors:
return "FolderErrors"
2015-08-23 21:56:10 +02:00
case DevicePaused:
return "DevicePaused"
case DeviceResumed:
return "DeviceResumed"
2015-08-27 00:49:06 +02:00
case FolderScanProgress:
return "FolderScanProgress"
2015-09-12 21:59:15 +02:00
case ExternalPortMappingChanged:
return "ExternalPortMappingChanged"
case RelayStateChanged:
return "RelayStateChanged"
2014-07-13 21:07:24 +02:00
default:
return "Unknown"
}
}
func (t EventType) MarshalText() ([]byte, error) {
return []byte(t.String()), nil
}
const BufferSize = 64
type Logger struct {
subs []*Subscription
2014-12-08 16:36:15 +01:00
nextID int
2014-07-13 21:07:24 +02:00
mutex sync.Mutex
}
type Event struct {
ID int `json:"id"`
Time time.Time `json:"time"`
Type EventType `json:"type"`
Data interface{} `json:"data"`
}
type Subscription struct {
mask EventType
events chan Event
timeout *time.Timer
2014-07-13 21:07:24 +02:00
}
var Default = NewLogger()
var (
ErrTimeout = errors.New("timeout")
ErrClosed = errors.New("closed")
)
func NewLogger() *Logger {
return &Logger{
2015-04-23 00:54:31 +02:00
mutex: sync.NewMutex(),
2014-07-13 21:07:24 +02:00
}
}
func (l *Logger) Log(t EventType, data interface{}) {
l.mutex.Lock()
2014-07-25 14:50:14 +02:00
if debug {
2014-12-08 16:36:15 +01:00
dl.Debugln("log", l.nextID, t.String(), data)
2014-07-25 14:50:14 +02:00
}
l.nextID++
2014-07-13 21:07:24 +02:00
e := Event{
2014-12-08 16:36:15 +01:00
ID: l.nextID,
2014-07-13 21:07:24 +02:00
Time: time.Now(),
Type: t,
Data: data,
}
for _, s := range l.subs {
if s.mask&t != 0 {
select {
case s.events <- e:
default:
// if s.events is not ready, drop the event
2014-07-13 21:07:24 +02:00
}
}
}
l.mutex.Unlock()
}
func (l *Logger) Subscribe(mask EventType) *Subscription {
l.mutex.Lock()
2014-07-25 14:50:14 +02:00
if debug {
dl.Debugln("subscribe", mask)
}
2014-07-13 21:07:24 +02:00
s := &Subscription{
mask: mask,
events: make(chan Event, BufferSize),
timeout: time.NewTimer(0),
2014-07-13 21:07:24 +02:00
}
l.subs = append(l.subs, s)
2014-07-13 21:07:24 +02:00
l.mutex.Unlock()
return s
}
func (l *Logger) Unsubscribe(s *Subscription) {
l.mutex.Lock()
2014-07-25 14:50:14 +02:00
if debug {
dl.Debugln("unsubscribe")
2014-07-25 14:50:14 +02:00
}
for i, ss := range l.subs {
if s == ss {
last := len(l.subs) - 1
l.subs[i] = l.subs[last]
l.subs[last] = nil
l.subs = l.subs[:last]
break
}
}
2014-07-13 21:07:24 +02:00
close(s.events)
l.mutex.Unlock()
}
// Poll returns an event from the subscription or an error if the poll times
// out of the event channel is closed. Poll should not be called concurrently
// from multiple goroutines for a single subscription.
2014-07-13 21:07:24 +02:00
func (s *Subscription) Poll(timeout time.Duration) (Event, error) {
2014-07-25 14:50:14 +02:00
if debug {
dl.Debugln("poll", timeout)
}
if !s.timeout.Reset(timeout) {
select {
case <-s.timeout.C:
default:
}
}
2014-07-13 21:07:24 +02:00
select {
case e, ok := <-s.events:
if !ok {
return e, ErrClosed
}
s.timeout.Stop()
2014-07-13 21:07:24 +02:00
return e, nil
case <-s.timeout.C:
2014-07-13 21:07:24 +02:00
return Event{}, ErrTimeout
}
}
func (s *Subscription) C() <-chan Event {
return s.events
}
2014-07-13 21:07:24 +02:00
type BufferedSubscription struct {
sub *Subscription
buf []Event
next int
cur int
mut sync.Mutex
2015-04-23 00:54:31 +02:00
cond *stdsync.Cond
2014-07-13 21:07:24 +02:00
}
func NewBufferedSubscription(s *Subscription, size int) *BufferedSubscription {
bs := &BufferedSubscription{
sub: s,
buf: make([]Event, size),
2015-04-23 00:54:31 +02:00
mut: sync.NewMutex(),
2014-07-13 21:07:24 +02:00
}
2015-04-23 00:54:31 +02:00
bs.cond = stdsync.NewCond(bs.mut)
2014-07-13 21:07:24 +02:00
go bs.pollingLoop()
return bs
}
func (s *BufferedSubscription) pollingLoop() {
for {
ev, err := s.sub.Poll(60 * time.Second)
if err == ErrTimeout {
continue
}
if err == ErrClosed {
return
}
if err != nil {
panic("unexpected error: " + err.Error())
}
s.mut.Lock()
s.buf[s.next] = ev
s.next = (s.next + 1) % len(s.buf)
s.cur = ev.ID
s.cond.Broadcast()
s.mut.Unlock()
}
}
func (s *BufferedSubscription) Since(id int, into []Event) []Event {
s.mut.Lock()
defer s.mut.Unlock()
for id >= s.cur {
s.cond.Wait()
}
for i := s.next; i < len(s.buf); i++ {
if s.buf[i].ID > id {
into = append(into, s.buf[i])
}
}
for i := 0; i < s.next; i++ {
if s.buf[i].ID > id {
into = append(into, s.buf[i])
}
}
return into
}
// Error returns a string pointer suitable for JSON marshalling errors. It
// retains the "null on sucess" semantics, but ensures the error result is a
// string regardless of the underlying concrete error type.
func Error(err error) *string {
if err == nil {
return nil
}
str := err.Error()
return &str
}