syncthing/lib/model/sharedpullerstate.go

459 lines
15 KiB
Go

// Copyright (C) 2014 The Syncthing Authors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at https://mozilla.org/MPL/2.0/.
package model
import (
"encoding/binary"
"fmt"
"io"
"time"
"github.com/syncthing/syncthing/lib/fs"
"github.com/syncthing/syncthing/lib/osutil"
"github.com/syncthing/syncthing/lib/protocol"
"github.com/syncthing/syncthing/lib/sync"
)
// A sharedPullerState is kept for each file that is being synced and is kept
// updated along the way.
type sharedPullerState struct {
// Immutable, does not require locking
file protocol.FileInfo // The new file (desired end state)
fs fs.Filesystem
folder string
tempName string
realName string
reused int // Number of blocks reused from temporary file
ignorePerms bool
hasCurFile bool // Whether curFile is set
curFile protocol.FileInfo // The file as it exists now in our database
sparse bool
created time.Time
fsync bool
// Mutable, must be locked for access
err error // The first error we hit
writer *lockedWriterAt // Wraps fd to prevent fd closing at the same time as writing
copyTotal int // Total number of copy actions for the whole job
pullTotal int // Total number of pull actions for the whole job
copyOrigin int // Number of blocks copied from the original file
copyOriginShifted int // Number of blocks copied from the original file but shifted
copyNeeded int // Number of copy actions still pending
pullNeeded int // Number of block pulls still pending
updated time.Time // Time when any of the counters above were last updated
closed bool // True if the file has been finalClosed.
available []int // Indexes of the blocks that are available in the temporary file
availableUpdated time.Time // Time when list of available blocks was last updated
mut sync.RWMutex // Protects the above
}
func newSharedPullerState(file protocol.FileInfo, fs fs.Filesystem, folderID, tempName string, blocks []protocol.BlockInfo, reused []int, ignorePerms, hasCurFile bool, curFile protocol.FileInfo, sparse bool, fsync bool) *sharedPullerState {
return &sharedPullerState{
file: file,
fs: fs,
folder: folderID,
tempName: tempName,
realName: file.Name,
copyTotal: len(blocks),
copyNeeded: len(blocks),
reused: len(reused),
updated: time.Now(),
available: reused,
availableUpdated: time.Now(),
ignorePerms: ignorePerms,
hasCurFile: hasCurFile,
curFile: curFile,
mut: sync.NewRWMutex(),
sparse: sparse,
fsync: fsync,
created: time.Now(),
}
}
// A momentary state representing the progress of the puller
type pullerProgress struct {
Total int `json:"total"`
Reused int `json:"reused"`
CopiedFromOrigin int `json:"copiedFromOrigin"`
CopiedFromOriginShifted int `json:"copiedFromOriginShifted"`
CopiedFromElsewhere int `json:"copiedFromElsewhere"`
Pulled int `json:"pulled"`
Pulling int `json:"pulling"`
BytesDone int64 `json:"bytesDone"`
BytesTotal int64 `json:"bytesTotal"`
}
// lockedWriterAt adds a lock to protect from closing the fd at the same time as writing.
// WriteAt() is goroutine safe by itself, but not against for example Close().
type lockedWriterAt struct {
mut sync.RWMutex
fd fs.File
}
// WriteAt itself is goroutine safe, thus just needs to acquire a read-lock to
// prevent closing concurrently (see SyncClose).
func (w *lockedWriterAt) WriteAt(p []byte, off int64) (n int, err error) {
w.mut.RLock()
defer w.mut.RUnlock()
return w.fd.WriteAt(p, off)
}
// SyncClose ensures that no more writes are happening before going ahead and
// syncing and closing the fd, thus needs to acquire a write-lock.
func (w *lockedWriterAt) SyncClose(fsync bool) error {
w.mut.Lock()
defer w.mut.Unlock()
if fsync {
if err := w.fd.Sync(); err != nil {
// Sync() is nice if it works but not worth failing the
// operation over if it fails.
l.Debugf("fsync failed: %v", err)
}
}
return w.fd.Close()
}
// tempFile returns the fd for the temporary file, reusing an open fd
// or creating the file as necessary.
func (s *sharedPullerState) tempFile() (*lockedWriterAt, error) {
s.mut.Lock()
defer s.mut.Unlock()
// If we've already hit an error, return early
if s.err != nil {
return nil, s.err
}
// If the temp file is already open, return the file descriptor
if s.writer != nil {
return s.writer, nil
}
if err := s.addWriterLocked(); err != nil {
s.failLocked(err)
return nil, err
}
return s.writer, nil
}
func (s *sharedPullerState) addWriterLocked() error {
return inWritableDir(s.tempFileInWritableDir, s.fs, s.tempName, s.ignorePerms)
}
// tempFileInWritableDir should only be called from tempFile.
func (s *sharedPullerState) tempFileInWritableDir(_ string) error {
// The permissions to use for the temporary file should be those of the
// final file, except we need user read & write at minimum. The
// permissions will be set to the final value later, but in the meantime
// we don't want to have a temporary file with looser permissions than
// the final outcome.
mode := fs.FileMode(s.file.Permissions) | 0o600
if s.ignorePerms {
// When ignorePerms is set we use a very permissive mode and let the
// system umask filter it.
mode = 0o666
}
// Attempt to create the temp file
// RDWR because of issue #2994.
flags := fs.OptReadWrite
if s.reused == 0 {
flags |= fs.OptCreate | fs.OptExclusive
} else if !s.ignorePerms {
// With sufficiently bad luck when exiting or crashing, we may have
// had time to chmod the temp file to read only state but not yet
// moved it to its final name. This leaves us with a read only temp
// file that we're going to try to reuse. To handle that, we need to
// make sure we have write permissions on the file before opening it.
//
// When ignorePerms is set we trust that the permissions are fine
// already and make no modification, as we would otherwise override
// what the umask dictates.
if err := s.fs.Chmod(s.tempName, mode); err != nil {
return fmt.Errorf("setting perms on temp file: %w", err)
}
}
fd, err := s.fs.OpenFile(s.tempName, flags, mode)
if err != nil {
return fmt.Errorf("opening temp file: %w", err)
}
// Hide the temporary file
s.fs.Hide(s.tempName)
// Don't truncate symlink files, as that will mean that the path will
// contain a bunch of nulls.
if s.sparse && !s.file.IsSymlink() {
size := s.file.Size
// Trailer added to encrypted files
if len(s.file.Encrypted) > 0 {
size += encryptionTrailerSize(s.file)
}
// Truncate sets the size of the file. This creates a sparse file or a
// space reservation, depending on the underlying filesystem.
if err := fd.Truncate(size); err != nil {
// The truncate call failed. That can happen in some cases when
// space reservation isn't possible or over some network
// filesystems... This generally doesn't matter.
if s.reused > 0 {
// ... but if we are attempting to reuse a file we have a
// corner case when the old file is larger than the new one
// and we can't just overwrite blocks and let the old data
// linger at the end. In this case we attempt a delete of
// the file and hope for better luck next time, when we
// should come around with s.reused == 0.
fd.Close()
if remErr := s.fs.Remove(s.tempName); remErr != nil {
l.Debugln("failed to remove temporary file:", remErr)
}
return err
}
}
}
// Same fd will be used by all writers
s.writer = &lockedWriterAt{sync.NewRWMutex(), fd}
return nil
}
// fail sets the error on the puller state compose of error, and marks the
// sharedPullerState as failed. Is a no-op when called on an already failed state.
func (s *sharedPullerState) fail(err error) {
s.mut.Lock()
defer s.mut.Unlock()
s.failLocked(err)
}
func (s *sharedPullerState) failLocked(err error) {
if s.err != nil || err == nil {
return
}
s.err = err
}
func (s *sharedPullerState) failed() error {
s.mut.RLock()
err := s.err
s.mut.RUnlock()
return err
}
func (s *sharedPullerState) copyDone(block protocol.BlockInfo) {
s.mut.Lock()
s.copyNeeded--
s.updated = time.Now()
s.available = append(s.available, int(block.Offset/int64(s.file.BlockSize())))
s.availableUpdated = time.Now()
l.Debugln("sharedPullerState", s.folder, s.file.Name, "copyNeeded ->", s.copyNeeded)
s.mut.Unlock()
}
func (s *sharedPullerState) copiedFromOrigin(bytes int) {
s.mut.Lock()
s.copyOrigin++
s.updated = time.Now()
s.mut.Unlock()
metricFolderProcessedBytesTotal.WithLabelValues(s.folder, metricSourceLocalOrigin).Add(float64(bytes))
}
func (s *sharedPullerState) copiedFromElsewhere(bytes int) {
metricFolderProcessedBytesTotal.WithLabelValues(s.folder, metricSourceLocalOther).Add(float64(bytes))
}
func (s *sharedPullerState) skippedSparseBlock(bytes int) {
// pretend we copied it, historical
s.mut.Lock()
s.copyOrigin++
s.updated = time.Now()
s.mut.Unlock()
metricFolderProcessedBytesTotal.WithLabelValues(s.folder, metricSourceSkipped).Add(float64(bytes))
}
func (s *sharedPullerState) copiedFromOriginShifted(bytes int) {
s.mut.Lock()
s.copyOrigin++
s.copyOriginShifted++
s.updated = time.Now()
s.mut.Unlock()
metricFolderProcessedBytesTotal.WithLabelValues(s.folder, metricSourceLocalShifted).Add(float64(bytes))
}
func (s *sharedPullerState) pullStarted() {
s.mut.Lock()
s.copyTotal--
s.copyNeeded--
s.pullTotal++
s.pullNeeded++
s.updated = time.Now()
l.Debugln("sharedPullerState", s.folder, s.file.Name, "pullNeeded start ->", s.pullNeeded)
s.mut.Unlock()
}
func (s *sharedPullerState) pullDone(block protocol.BlockInfo) {
s.mut.Lock()
s.pullNeeded--
s.updated = time.Now()
s.available = append(s.available, int(block.Offset/int64(s.file.BlockSize())))
s.availableUpdated = time.Now()
l.Debugln("sharedPullerState", s.folder, s.file.Name, "pullNeeded done ->", s.pullNeeded)
s.mut.Unlock()
metricFolderProcessedBytesTotal.WithLabelValues(s.folder, metricSourceNetwork).Add(float64(block.Size))
}
// finalClose atomically closes and returns closed status of a file. A true
// first return value means the file was closed and should be finished, with
// the error indicating the success or failure of the close. A false first
// return value indicates the file is not ready to be closed, or is already
// closed and should in either case not be finished off now.
func (s *sharedPullerState) finalClose() (bool, error) {
s.mut.Lock()
defer s.mut.Unlock()
if s.closed {
// Already closed
return false, nil
}
if s.pullNeeded+s.copyNeeded != 0 && s.err == nil {
// Not done yet, and not errored
return false, nil
}
if len(s.file.Encrypted) > 0 {
if err := s.finalizeEncrypted(); err != nil && s.err == nil {
// This is our error as we weren't errored before.
s.err = err
}
}
if s.writer != nil {
if err := s.writer.SyncClose(s.fsync); err != nil && s.err == nil {
// This is our error as we weren't errored before.
s.err = err
}
s.writer = nil
}
s.closed = true
// Unhide the temporary file when we close it, as it's likely to
// immediately be renamed to the final name. If this is a failed temp
// file we will also unhide it, but I'm fine with that as we're now
// leaving it around for potentially quite a while.
s.fs.Unhide(s.tempName)
return true, s.err
}
// finalizeEncrypted adds a trailer to the encrypted file containing the
// serialized FileInfo and the length of that FileInfo. When initializing a
// folder from encrypted data we can extract this FileInfo from the end of
// the file and regain the original metadata.
func (s *sharedPullerState) finalizeEncrypted() error {
if s.writer == nil {
if err := s.addWriterLocked(); err != nil {
return err
}
}
trailerSize, err := writeEncryptionTrailer(s.file, s.writer)
if err != nil {
return err
}
s.file.Size += trailerSize
s.file.EncryptionTrailerSize = int(trailerSize)
return nil
}
// Returns the size of the written trailer.
func writeEncryptionTrailer(file protocol.FileInfo, writer io.WriterAt) (int64, error) {
// Here the file is in native format, while encryption happens in
// wire format (always slashes).
wireFile := file
wireFile.Name = osutil.NormalizedFilename(wireFile.Name)
trailerSize := encryptionTrailerSize(wireFile)
bs := make([]byte, trailerSize)
n, err := wireFile.MarshalTo(bs)
if err != nil {
return 0, err
}
binary.BigEndian.PutUint32(bs[n:], uint32(n))
bs = bs[:n+4]
if _, err := writer.WriteAt(bs, wireFile.Size); err != nil {
return 0, err
}
return trailerSize, nil
}
func encryptionTrailerSize(file protocol.FileInfo) int64 {
return int64(file.ProtoSize()) + 4
}
// Progress returns the momentarily progress for the puller
func (s *sharedPullerState) Progress() *pullerProgress {
s.mut.RLock()
defer s.mut.RUnlock()
total := s.reused + s.copyTotal + s.pullTotal
done := total - s.copyNeeded - s.pullNeeded
file := len(s.file.Blocks)
return &pullerProgress{
Total: total,
Reused: s.reused,
CopiedFromOrigin: s.copyOrigin,
CopiedFromElsewhere: s.copyTotal - s.copyNeeded - s.copyOrigin,
Pulled: s.pullTotal - s.pullNeeded,
Pulling: s.pullNeeded,
BytesTotal: blocksToSize(total, file, s.file.BlockSize(), s.file.Size),
BytesDone: blocksToSize(done, file, s.file.BlockSize(), s.file.Size),
}
}
// Updated returns the time when any of the progress related counters was last updated.
func (s *sharedPullerState) Updated() time.Time {
s.mut.RLock()
t := s.updated
s.mut.RUnlock()
return t
}
// AvailableUpdated returns the time last time list of available blocks was updated
func (s *sharedPullerState) AvailableUpdated() time.Time {
s.mut.RLock()
t := s.availableUpdated
s.mut.RUnlock()
return t
}
// Available returns blocks available in the current temporary file
func (s *sharedPullerState) Available() []int {
s.mut.RLock()
blocks := s.available
s.mut.RUnlock()
return blocks
}
func blocksToSize(blocks, blocksInFile, blockSize int, fileSize int64) int64 {
// The last/only block has somewhere between 1 and blockSize bytes. We do
// not know whether the smaller block is part of the blocks and use an
// estimate assuming a random chance that the small block is contained.
if blocksInFile == 0 {
return 0
}
return int64(blocks)*int64(blockSize) - (int64(blockSize)-fileSize%int64(blockSize))*int64(blocks)/int64(blocksInFile)
}