all: Add copy-on-write filesystem support (fixes #4271) (#6746)

This commit is contained in:
Audrius Butkevicius 2020-06-18 07:15:47 +01:00 committed by GitHub
parent 273cc9cef8
commit 4812fd3ec1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 858 additions and 84 deletions

View File

@ -60,6 +60,7 @@ type FolderConfiguration struct {
MaxConcurrentWrites int `xml:"maxConcurrentWrites" json:"maxConcurrentWrites" default:"2"`
DisableFsync bool `xml:"disableFsync" json:"disableFsync"`
BlockPullOrder BlockPullOrder `xml:"blockPullOrder" json:"blockPullOrder"`
CopyRangeMethod fs.CopyRangeMethod `xml:"copyRangeMethod" json:"copyRangeMethod" default:"standard"`
cachedFilesystem fs.Filesystem
cachedModTimeWindow time.Duration

View File

@ -0,0 +1,24 @@
// Copyright (C) 2020 The Syncthing Authors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at https://mozilla.org/MPL/2.0/.
package fs
import (
"syscall"
)
type copyRangeImplementationBasicFile func(src, dst basicFile, srcOffset, dstOffset, size int64) error
func copyRangeImplementationForBasicFile(impl copyRangeImplementationBasicFile) copyRangeImplementation {
return func(src, dst File, srcOffset, dstOffset, size int64) error {
srcFile, srcOk := src.(basicFile)
dstFile, dstOk := dst.(basicFile)
if !srcOk || !dstOk {
return syscall.ENOTSUP
}
return impl(srcFile, dstFile, srcOffset, dstOffset, size)
}
}

View File

@ -0,0 +1,45 @@
// Copyright (C) 2019 The Syncthing Authors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at https://mozilla.org/MPL/2.0/.
// +build linux
package fs
import (
"io"
"syscall"
"golang.org/x/sys/unix"
)
func init() {
registerCopyRangeImplementation(CopyRangeMethodCopyFileRange, copyRangeImplementationForBasicFile(copyRangeCopyFileRange))
}
func copyRangeCopyFileRange(src, dst basicFile, srcOffset, dstOffset, size int64) error {
for size > 0 {
// From MAN page:
//
// If off_in is not NULL, then off_in must point to a buffer that
// specifies the starting offset where bytes from fd_in will be read.
// The file offset of fd_in is not changed, but off_in is adjusted
// appropriately.
//
// Also, even if explicitly not stated, the same is true for dstOffset
n, err := unix.CopyFileRange(int(src.Fd()), &srcOffset, int(dst.Fd()), &dstOffset, int(size), 0)
if n == 0 && err == nil {
return io.ErrUnexpectedEOF
}
if err != nil && err != syscall.EAGAIN {
return err
}
// Handle case where err == EAGAIN and n == -1 (it's not clear if that can happen)
if n > 0 {
size -= int64(n)
}
}
return nil
}

View File

@ -0,0 +1,72 @@
// Copyright (C) 2019 The Syncthing Authors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at https://mozilla.org/MPL/2.0/.
// +build !windows,!solaris,!darwin
package fs
import (
"syscall"
"unsafe"
)
func init() {
registerCopyRangeImplementation(CopyRangeMethodIoctl, copyRangeImplementationForBasicFile(copyRangeIoctl))
}
const FICLONE = 0x40049409
const FICLONERANGE = 0x4020940d
/*
http://man7.org/linux/man-pages/man2/ioctl_ficlonerange.2.html
struct file_clone_range {
__s64 src_fd;
__u64 src_offset;
__u64 src_length;
__u64 dest_offset;
};
*/
type fileCloneRange struct {
srcFd int64
srcOffset uint64
srcLength uint64
dstOffset uint64
}
func copyRangeIoctl(src, dst basicFile, srcOffset, dstOffset, size int64) error {
fi, err := src.Stat()
if err != nil {
return err
}
// https://www.man7.org/linux/man-pages/man2/ioctl_ficlonerange.2.html
// If src_length is zero, the ioctl reflinks to the end of the source file.
if srcOffset+size == fi.Size() {
size = 0
}
if srcOffset == 0 && dstOffset == 0 && size == 0 {
// Optimization for whole file copies.
_, _, errNo := syscall.Syscall(syscall.SYS_IOCTL, dst.Fd(), FICLONE, src.Fd())
if errNo != 0 {
return errNo
}
return nil
}
params := fileCloneRange{
srcFd: int64(src.Fd()),
srcOffset: uint64(srcOffset),
srcLength: uint64(size),
dstOffset: uint64(dstOffset),
}
_, _, errNo := syscall.Syscall(syscall.SYS_IOCTL, dst.Fd(), FICLONERANGE, uintptr(unsafe.Pointer(&params)))
if errNo != 0 {
return errNo
}
return nil
}

View File

@ -0,0 +1,69 @@
// Copyright (C) 2019 The Syncthing Authors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at https://mozilla.org/MPL/2.0/.
// +build !windows,!darwin
package fs
import (
"io"
"syscall"
)
func init() {
registerCopyRangeImplementation(CopyRangeMethodSendFile, copyRangeImplementationForBasicFile(copyRangeSendFile))
}
func copyRangeSendFile(src, dst basicFile, srcOffset, dstOffset, size int64) error {
// Check that the destination file has sufficient space
if fi, err := dst.Stat(); err != nil {
return err
} else if fi.Size() < dstOffset+size {
if err := dst.Truncate(dstOffset + size); err != nil {
return err
}
}
// Record old dst offset.
oldDstOffset, err := dst.Seek(0, io.SeekCurrent)
if err != nil {
return err
}
defer func() { _, _ = dst.Seek(oldDstOffset, io.SeekStart) }()
// Seek to the offset we expect to write
if oldDstOffset != dstOffset {
if n, err := dst.Seek(dstOffset, io.SeekStart); err != nil {
return err
} else if n != dstOffset {
return io.ErrUnexpectedEOF
}
}
for size > 0 {
// From the MAN page:
//
// If offset is not NULL, then it points to a variable holding the file offset from which sendfile() will start
// reading data from in_fd. When sendfile() returns, this variable will be set to the offset of the byte
// following the last byte that was read. If offset is not NULL, then sendfile() does not modify the current
// file offset of in_fd; otherwise the current file offset is adjusted to reflect the number of bytes read from
// in_fd.
n, err := syscall.Sendfile(int(dst.Fd()), int(src.Fd()), &srcOffset, int(size))
if n == 0 && err == nil {
err = io.ErrUnexpectedEOF
}
if err != nil && err != syscall.EAGAIN {
return err
}
// Handle case where err == EAGAIN and n == -1 (it's not clear if that can happen)
if n > 0 {
size -= int64(n)
}
}
_, err = dst.Seek(oldDstOffset, io.SeekStart)
return err
}

View File

@ -0,0 +1,45 @@
// Copyright (C) 2019 The Syncthing Authors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at https://mozilla.org/MPL/2.0/.
package fs
import (
"syscall"
"github.com/syncthing/syncthing/lib/sync"
)
var (
copyRangeMethods = make(map[CopyRangeMethod]copyRangeImplementation)
mut = sync.NewMutex()
)
type copyRangeImplementation func(src, dst File, srcOffset, dstOffset, size int64) error
func registerCopyRangeImplementation(copyMethod CopyRangeMethod, impl copyRangeImplementation) {
mut.Lock()
defer mut.Unlock()
l.Debugln("Registering " + copyMethod.String() + " copyRange method")
copyRangeMethods[copyMethod] = impl
}
// CopyRange tries to use the specified method to copy data between two files.
// Takes size bytes at offset srcOffset from the source file, and copies the data to destination file at offset
// dstOffset. If required, adjusts the size of the destination file to fit that much data.
//
// On Linux/BSD you can ask it to use ioctl and copy_file_range system calls, which if the underlying filesystem supports
// it tries referencing existing data in the source file, instead of making a copy and taking up additional space.
//
// CopyRange does its best to have no effect on src and dst file offsets (copy operation should not affect it).
func CopyRange(copyMethod CopyRangeMethod, src, dst File, srcOffset, dstOffset, size int64) error {
if impl, ok := copyRangeMethods[copyMethod]; ok {
return impl(src, dst, srcOffset, dstOffset, size)
}
return syscall.ENOTSUP
}

View File

@ -0,0 +1,21 @@
// Copyright (C) 2019 The Syncthing Authors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at https://mozilla.org/MPL/2.0/.
package fs
func init() {
registerCopyRangeImplementation(CopyRangeMethodAllWithFallback, copyRangeAllWithFallback)
}
func copyRangeAllWithFallback(src, dst File, srcOffset, dstOffset, size int64) error {
var err error
for _, method := range []CopyRangeMethod{CopyRangeMethodIoctl, CopyRangeMethodCopyFileRange, CopyRangeMethodSendFile, CopyRangeMethodStandard} {
if err = CopyRange(method, src, dst, srcOffset, dstOffset, size); err == nil {
return nil
}
}
return err
}

View File

@ -0,0 +1,60 @@
// Copyright (C) 2020 The Syncthing Authors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at https://mozilla.org/MPL/2.0/.
package fs
type CopyRangeMethod int
const (
CopyRangeMethodStandard CopyRangeMethod = iota
CopyRangeMethodIoctl
CopyRangeMethodCopyFileRange
CopyRangeMethodSendFile
CopyRangeMethodAllWithFallback
)
func (o CopyRangeMethod) String() string {
switch o {
case CopyRangeMethodStandard:
return "standard"
case CopyRangeMethodIoctl:
return "ioctl"
case CopyRangeMethodCopyFileRange:
return "copy_file_range"
case CopyRangeMethodSendFile:
return "sendfile"
case CopyRangeMethodAllWithFallback:
return "all"
default:
return "unknown"
}
}
func (o CopyRangeMethod) MarshalText() ([]byte, error) {
return []byte(o.String()), nil
}
func (o *CopyRangeMethod) UnmarshalText(bs []byte) error {
switch string(bs) {
case "standard":
*o = CopyRangeMethodStandard
case "ioctl":
*o = CopyRangeMethodIoctl
case "copy_file_range":
*o = CopyRangeMethodCopyFileRange
case "sendfile":
*o = CopyRangeMethodSendFile
case "all":
*o = CopyRangeMethodAllWithFallback
default:
*o = CopyRangeMethodStandard
}
return nil
}
func (o *CopyRangeMethod) ParseDefault(str string) error {
return o.UnmarshalText([]byte(str))
}

View File

@ -0,0 +1,45 @@
// Copyright (C) 2019 The Syncthing Authors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at https://mozilla.org/MPL/2.0/.
package fs
import (
"io"
)
func init() {
registerCopyRangeImplementation(CopyRangeMethodStandard, copyRangeStandard)
}
func copyRangeStandard(src, dst File, srcOffset, dstOffset, size int64) error {
const bufSize = 4 << 20
buf := make([]byte, bufSize)
// TODO: In go 1.15, we should use file.ReadFrom that uses copy_file_range underneath.
// ReadAt and WriteAt does not modify the position of the file.
for size > 0 {
if size < bufSize {
buf = buf[:size]
}
n, err := src.ReadAt(buf, srcOffset)
if err != nil {
if err == io.EOF {
return io.ErrUnexpectedEOF
}
return err
}
if _, err = dst.WriteAt(buf[:n], dstOffset); err != nil {
return err
}
srcOffset += int64(n)
dstOffset += int64(n)
size -= int64(n)
}
return nil
}

View File

@ -0,0 +1,322 @@
// Copyright (C) 2019 The Syncthing Authors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at https://mozilla.org/MPL/2.0/.
package fs
import (
"bytes"
"io"
"io/ioutil"
"math/rand"
"os"
"syscall"
"testing"
)
var (
generationSize int64 = 4 << 20
defaultCopySize int64 = 1 << 20
testCases = []struct {
name string
// Starting size of files
srcSize int64
dstSize int64
// Offset from which to read
srcOffset int64
dstOffset int64
// Cursor position before the copy
srcStartingPos int64
dstStartingPos int64
// Expected destination size
expectedDstSizeAfterCopy int64
// Custom copy size
copySize int64
// Expected failure
expectedErrors map[CopyRangeMethod]error
}{
{
name: "append to end",
srcSize: generationSize,
dstSize: generationSize,
srcOffset: 0,
dstOffset: generationSize,
srcStartingPos: generationSize,
dstStartingPos: generationSize,
expectedDstSizeAfterCopy: generationSize + defaultCopySize,
copySize: defaultCopySize,
expectedErrors: nil,
},
{
name: "append to end, offsets at start",
srcSize: generationSize,
dstSize: generationSize,
srcOffset: 0,
dstOffset: generationSize,
srcStartingPos: 0, // We seek back to start, and expect src not to move after copy
dstStartingPos: 0, // Seek back, but expect dst pos to not change
expectedDstSizeAfterCopy: generationSize + defaultCopySize,
copySize: defaultCopySize,
expectedErrors: nil,
},
{
name: "overwrite part of destination region",
srcSize: generationSize,
dstSize: generationSize,
srcOffset: defaultCopySize,
dstOffset: generationSize,
srcStartingPos: generationSize,
dstStartingPos: generationSize,
expectedDstSizeAfterCopy: generationSize + defaultCopySize,
copySize: defaultCopySize,
expectedErrors: nil,
},
{
name: "overwrite all of destination",
srcSize: generationSize,
dstSize: generationSize,
srcOffset: 0,
dstOffset: 0,
srcStartingPos: generationSize,
dstStartingPos: generationSize,
expectedDstSizeAfterCopy: generationSize,
copySize: defaultCopySize,
expectedErrors: nil,
},
{
name: "overwrite part of destination",
srcSize: generationSize,
dstSize: generationSize,
srcOffset: defaultCopySize,
dstOffset: 0,
srcStartingPos: generationSize,
dstStartingPos: generationSize,
expectedDstSizeAfterCopy: generationSize,
copySize: defaultCopySize,
expectedErrors: nil,
},
// Write way past the end of the file
{
name: "destination gets expanded as it is being written to",
srcSize: generationSize,
dstSize: generationSize,
srcOffset: 0,
dstOffset: generationSize * 2,
srcStartingPos: generationSize,
dstStartingPos: generationSize,
expectedDstSizeAfterCopy: generationSize*2 + defaultCopySize,
copySize: defaultCopySize,
expectedErrors: nil,
},
// Source file does not have enough bytes to copy in that range, should result in an unexpected eof.
{
name: "source file too small",
srcSize: generationSize,
dstSize: generationSize,
srcOffset: 0,
dstOffset: 0,
srcStartingPos: 0,
dstStartingPos: 0,
expectedDstSizeAfterCopy: -11, // Does not matter, should fail.
copySize: defaultCopySize * 10,
// ioctl returns syscall.EINVAL, rest are wrapped
expectedErrors: map[CopyRangeMethod]error{
CopyRangeMethodIoctl: syscall.EINVAL,
CopyRangeMethodStandard: io.ErrUnexpectedEOF,
CopyRangeMethodCopyFileRange: io.ErrUnexpectedEOF,
CopyRangeMethodSendFile: io.ErrUnexpectedEOF,
CopyRangeMethodAllWithFallback: io.ErrUnexpectedEOF,
},
},
// Non block sized file
{
name: "not block aligned write",
srcSize: generationSize + 2,
dstSize: 0,
srcOffset: 1,
dstOffset: 0,
srcStartingPos: 0,
dstStartingPos: 0,
expectedDstSizeAfterCopy: generationSize + 1,
copySize: generationSize + 1,
// Only fails for ioctl
expectedErrors: map[CopyRangeMethod]error{
CopyRangeMethodIoctl: syscall.EINVAL,
},
},
// Last block that starts on a nice boundary
{
name: "last block",
srcSize: generationSize + 2,
dstSize: 0,
srcOffset: generationSize,
dstOffset: 0,
srcStartingPos: 0,
dstStartingPos: 0,
expectedDstSizeAfterCopy: 2,
copySize: 2,
// Succeeds on all, as long as the offset is file-system block aligned.
expectedErrors: nil,
},
// Copy whole file
{
name: "whole file copy block aligned",
srcSize: generationSize,
dstSize: 0,
srcOffset: 0,
dstOffset: 0,
srcStartingPos: 0,
dstStartingPos: 0,
expectedDstSizeAfterCopy: generationSize,
copySize: generationSize,
expectedErrors: nil,
},
{
name: "whole file copy not block aligned",
srcSize: generationSize + 1,
dstSize: 0,
srcOffset: 0,
dstOffset: 0,
srcStartingPos: 0,
dstStartingPos: 0,
expectedDstSizeAfterCopy: generationSize + 1,
copySize: generationSize + 1,
expectedErrors: nil,
},
}
)
func TestCopyRange(ttt *testing.T) {
randSrc := rand.New(rand.NewSource(rand.Int63()))
for copyMethod, impl := range copyRangeMethods {
ttt.Run(copyMethod.String(), func(tt *testing.T) {
for _, testCase := range testCases {
tt.Run(testCase.name, func(t *testing.T) {
srcBuf := make([]byte, testCase.srcSize)
dstBuf := make([]byte, testCase.dstSize)
td, err := ioutil.TempDir(os.Getenv("STFSTESTPATH"), "")
if err != nil {
t.Fatal(err)
}
defer func() { _ = os.RemoveAll(td) }()
fs := NewFilesystem(FilesystemTypeBasic, td)
if _, err := io.ReadFull(randSrc, srcBuf); err != nil {
t.Fatal(err)
}
if _, err := io.ReadFull(randSrc, dstBuf); err != nil {
t.Fatal(err)
}
src, err := fs.Create("src")
if err != nil {
t.Fatal(err)
}
defer func() { _ = src.Close() }()
dst, err := fs.Create("dst")
if err != nil {
t.Fatal(err)
}
defer func() { _ = dst.Close() }()
// Write some data
if _, err := src.Write(srcBuf); err != nil {
t.Fatal(err)
}
if _, err := dst.Write(dstBuf); err != nil {
t.Fatal(err)
}
// Set the offsets
if n, err := src.Seek(testCase.srcStartingPos, io.SeekStart); err != nil || n != testCase.srcStartingPos {
t.Fatal(err)
}
if n, err := dst.Seek(testCase.dstStartingPos, io.SeekStart); err != nil || n != testCase.dstStartingPos {
t.Fatal(err)
}
if err := impl(src.(basicFile), dst.(basicFile), testCase.srcOffset, testCase.dstOffset, testCase.copySize); err != nil {
if err == syscall.ENOTSUP {
// Test runner can adjust directory in which to run the tests, that allow broader tests.
t.Skip("Not supported on the current filesystem, set STFSTESTPATH env var.")
}
if testCase.expectedErrors[copyMethod] == err {
return
}
t.Fatal(err)
} else if testCase.expectedErrors[copyMethod] != nil {
t.Fatal("did not get expected error")
}
// Check offsets where we expect them
if srcCurPos, err := src.Seek(0, io.SeekCurrent); err != nil {
t.Fatal(err)
} else if srcCurPos != testCase.srcStartingPos {
t.Errorf("src pos expected %d got %d", testCase.srcStartingPos, srcCurPos)
}
if dstCurPos, err := dst.Seek(0, io.SeekCurrent); err != nil {
t.Fatal(err)
} else if dstCurPos != testCase.dstStartingPos {
t.Errorf("dst pos expected %d got %d", testCase.dstStartingPos, dstCurPos)
}
// Check dst size
if fi, err := dst.Stat(); err != nil {
t.Fatal(err)
} else if fi.Size() != testCase.expectedDstSizeAfterCopy {
t.Errorf("expected %d size, got %d", testCase.expectedDstSizeAfterCopy, fi.Size())
}
// Check the data is as expected
if _, err := dst.Seek(0, io.SeekStart); err != nil {
t.Fatal(err)
}
resultBuf := make([]byte, testCase.expectedDstSizeAfterCopy)
if _, err := io.ReadFull(dst, resultBuf); err != nil {
t.Fatal(err)
}
if !bytes.Equal(srcBuf[testCase.srcOffset:testCase.srcOffset+testCase.copySize], resultBuf[testCase.dstOffset:testCase.dstOffset+testCase.copySize]) {
t.Errorf("Not equal")
}
// Check not copied content does not get corrupted
if testCase.dstOffset > testCase.dstSize {
if !bytes.Equal(dstBuf[:testCase.dstSize], resultBuf[:testCase.dstSize]) {
t.Error("region before copy region not equals")
}
if !bytes.Equal(resultBuf[testCase.dstSize:testCase.dstOffset], make([]byte, testCase.dstOffset-testCase.dstSize)) {
t.Error("found non zeroes in expected zero region")
}
} else {
if !bytes.Equal(dstBuf[:testCase.dstOffset], resultBuf[:testCase.dstOffset]) {
t.Error("region before copy region not equals")
}
afterCopyStart := testCase.dstOffset + testCase.copySize
if afterCopyStart < testCase.dstSize {
if !bytes.Equal(dstBuf[afterCopyStart:], resultBuf[afterCopyStart:len(dstBuf)]) {
t.Error("region after copy region not equals")
}
}
}
})
}
})
}
}

View File

@ -986,13 +986,13 @@ func (f *sendReceiveFolder) renameFile(cur, source, target protocol.FileInfo, sn
if f.versioner != nil {
err = f.CheckAvailableSpace(source.Size)
if err == nil {
err = osutil.Copy(f.fs, f.fs, source.Name, tempName)
err = osutil.Copy(f.CopyRangeMethod, f.fs, f.fs, source.Name, tempName)
if err == nil {
err = f.inWritableDir(f.versioner.Archive, source.Name)
}
}
} else {
err = osutil.RenameOrCopy(f.fs, f.fs, source.Name, tempName)
err = osutil.RenameOrCopy(f.CopyRangeMethod, f.fs, f.fs, source.Name, tempName)
}
if err != nil {
return err
@ -1296,7 +1296,7 @@ func (f *sendReceiveFolder) copierRoutine(in <-chan copyBlocksState, pullChan ch
return true
}
_, err = f.limitedWriteAt(dstFd, buf, block.Offset)
err = f.limitedWriteAt(dstFd, buf, block.Offset)
if err != nil {
state.fail(errors.Wrap(err, "dst write"))
}
@ -1314,13 +1314,14 @@ func (f *sendReceiveFolder) copierRoutine(in <-chan copyBlocksState, pullChan ch
if !found {
found = f.model.finder.Iterate(folders, block.Hash, func(folder, path string, index int32) bool {
fs := folderFilesystems[folder]
fd, err := fs.Open(path)
ffs := folderFilesystems[folder]
fd, err := ffs.Open(path)
if err != nil {
return false
}
_, err = fd.ReadAt(buf, int64(state.file.BlockSize())*int64(index))
srcOffset := int64(state.file.BlockSize()) * int64(index)
_, err = fd.ReadAt(buf, srcOffset)
fd.Close()
if err != nil {
return false
@ -1331,7 +1332,15 @@ func (f *sendReceiveFolder) copierRoutine(in <-chan copyBlocksState, pullChan ch
return false
}
_, err = f.limitedWriteAt(dstFd, buf, block.Offset)
if f.CopyRangeMethod != fs.CopyRangeMethodStandard {
err = f.withLimiter(func() error {
dstFd.mut.Lock()
defer dstFd.mut.Unlock()
return fs.CopyRange(f.CopyRangeMethod, fd, dstFd.fd, srcOffset, block.Offset, int64(block.Size))
})
} else {
err = f.limitedWriteAt(dstFd, buf, block.Offset)
}
if err != nil {
state.fail(errors.Wrap(err, "dst write"))
}
@ -1480,7 +1489,7 @@ func (f *sendReceiveFolder) pullBlock(state pullBlockState, out chan<- *sharedPu
}
// Save the block data we got from the cluster
_, err = f.limitedWriteAt(fd, buf, state.block.Offset)
err = f.limitedWriteAt(fd, buf, state.block.Offset)
if err != nil {
state.fail(errors.Wrap(err, "save"))
} else {
@ -1535,7 +1544,7 @@ func (f *sendReceiveFolder) performFinish(file, curFile protocol.FileInfo, hasCu
// Replace the original content with the new one. If it didn't work,
// leave the temp file in place for reuse.
if err := osutil.RenameOrCopy(f.fs, f.fs, tempName, file.Name); err != nil {
if err := osutil.RenameOrCopy(f.CopyRangeMethod, f.fs, f.fs, tempName, file.Name); err != nil {
return err
}
@ -2006,12 +2015,19 @@ func (f *sendReceiveFolder) inWritableDir(fn func(string) error, path string) er
return inWritableDir(fn, f.fs, path, f.IgnorePerms)
}
func (f *sendReceiveFolder) limitedWriteAt(fd io.WriterAt, data []byte, offset int64) (int, error) {
func (f *sendReceiveFolder) limitedWriteAt(fd io.WriterAt, data []byte, offset int64) error {
return f.withLimiter(func() error {
_, err := fd.WriteAt(data, offset)
return err
})
}
func (f *sendReceiveFolder) withLimiter(fn func() error) error {
if err := f.writeLimiter.takeWithContext(f.ctx, 1); err != nil {
return 0, err
return err
}
defer f.writeLimiter.give(1)
return fd.WriteAt(data, offset)
return fn()
}
// A []FileError is sent as part of an event and will be JSON serialized.

View File

@ -980,7 +980,7 @@ func TestDeleteBehindSymlink(t *testing.T) {
must(t, ffs.MkdirAll(link, 0755))
fi := createFile(t, file, ffs)
f.updateLocalsFromScanning([]protocol.FileInfo{fi})
must(t, osutil.RenameOrCopy(ffs, destFs, file, "file"))
must(t, osutil.RenameOrCopy(fs.CopyRangeMethodStandard, ffs, destFs, file, "file"))
must(t, ffs.RemoveAll(link))
if err := osutil.DebugSymlinkForTestsOnly(destFs.URI(), filepath.Join(ffs.URI(), link)); err != nil {

View File

@ -341,7 +341,7 @@ func (m *model) addAndStartFolderLockedWithIgnores(cfg config.FolderConfiguratio
var ver versioner.Versioner
if cfg.Versioning.Type != "" {
var err error
ver, err = versioner.New(ffs, cfg.Versioning)
ver, err = versioner.New(cfg)
if err != nil {
panic(fmt.Errorf("creating versioner: %w", err))
}

View File

@ -3636,10 +3636,10 @@ func TestRenameSameFile(t *testing.T) {
}
must(t, ffs.Rename("file", "file1"))
must(t, osutil.Copy(ffs, ffs, "file1", "file0"))
must(t, osutil.Copy(ffs, ffs, "file1", "file2"))
must(t, osutil.Copy(ffs, ffs, "file1", "file3"))
must(t, osutil.Copy(ffs, ffs, "file1", "file4"))
must(t, osutil.Copy(fs.CopyRangeMethodStandard, ffs, ffs, "file1", "file0"))
must(t, osutil.Copy(fs.CopyRangeMethodStandard, ffs, ffs, "file1", "file2"))
must(t, osutil.Copy(fs.CopyRangeMethodStandard, ffs, ffs, "file1", "file3"))
must(t, osutil.Copy(fs.CopyRangeMethodStandard, ffs, ffs, "file1", "file4"))
m.ScanFolders()

View File

@ -7,7 +7,6 @@
package model
import (
"io"
"time"
"github.com/pkg/errors"
@ -118,7 +117,7 @@ func (w *lockedWriterAt) SyncClose(fsync bool) error {
// tempFile returns the fd for the temporary file, reusing an open fd
// or creating the file as necessary.
func (s *sharedPullerState) tempFile() (io.WriterAt, error) {
func (s *sharedPullerState) tempFile() (*lockedWriterAt, error) {
s.mut.Lock()
defer s.mut.Unlock()

View File

@ -8,7 +8,6 @@
package osutil
import (
"io"
"path/filepath"
"runtime"
"strings"
@ -24,7 +23,7 @@ var renameLock = sync.NewMutex()
// RenameOrCopy renames a file, leaving source file intact in case of failure.
// Tries hard to succeed on various systems by temporarily tweaking directory
// permissions and removing the destination file when necessary.
func RenameOrCopy(src, dst fs.Filesystem, from, to string) error {
func RenameOrCopy(method fs.CopyRangeMethod, src, dst fs.Filesystem, from, to string) error {
renameLock.Lock()
defer renameLock.Unlock()
@ -59,7 +58,7 @@ func RenameOrCopy(src, dst fs.Filesystem, from, to string) error {
}
}
err := copyFileContents(src, dst, from, to)
err := copyFileContents(method, src, dst, from, to)
if err != nil {
_ = dst.Remove(to)
return err
@ -74,9 +73,9 @@ func RenameOrCopy(src, dst fs.Filesystem, from, to string) error {
// Copy copies the file content from source to destination.
// Tries hard to succeed on various systems by temporarily tweaking directory
// permissions and removing the destination file when necessary.
func Copy(src, dst fs.Filesystem, from, to string) (err error) {
func Copy(method fs.CopyRangeMethod, src, dst fs.Filesystem, from, to string) (err error) {
return withPreparedTarget(dst, from, to, func() error {
return copyFileContents(src, dst, from, to)
return copyFileContents(method, src, dst, from, to)
})
}
@ -107,7 +106,7 @@ func withPreparedTarget(filesystem fs.Filesystem, from, to string, f func() erro
// by dst. The file will be created if it does not already exist. If the
// destination file exists, all its contents will be replaced by the contents
// of the source file.
func copyFileContents(srcFs, dstFs fs.Filesystem, src, dst string) (err error) {
func copyFileContents(method fs.CopyRangeMethod, srcFs, dstFs fs.Filesystem, src, dst string) (err error) {
in, err := srcFs.Open(src)
if err != nil {
return
@ -123,7 +122,11 @@ func copyFileContents(srcFs, dstFs fs.Filesystem, src, dst string) (err error) {
err = cerr
}
}()
_, err = io.Copy(out, in)
inFi, err := in.Stat()
if err != nil {
return
}
err = fs.CopyRange(method, in, out, 0, 0, inFi.Size())
return
}

View File

@ -139,7 +139,7 @@ func TestRenameOrCopy(t *testing.T) {
content = string(buf)
}
err := osutil.RenameOrCopy(test.src, test.dst, test.file, "new")
err := osutil.RenameOrCopy(fs.CopyRangeMethodStandard, test.src, test.dst, test.file, "new")
if err != nil {
t.Fatal(err)
}

View File

@ -14,6 +14,7 @@ import (
"strings"
"time"
"github.com/syncthing/syncthing/lib/config"
"github.com/syncthing/syncthing/lib/fs"
"github.com/kballard/go-shellquote"
@ -29,8 +30,8 @@ type external struct {
filesystem fs.Filesystem
}
func newExternal(filesystem fs.Filesystem, params map[string]string) Versioner {
command := params["command"]
func newExternal(cfg config.FolderConfiguration) Versioner {
command := cfg.Versioning.Params["command"]
if runtime.GOOS == "windows" {
command = strings.Replace(command, `\`, `\\`, -1)
@ -38,7 +39,7 @@ func newExternal(filesystem fs.Filesystem, params map[string]string) Versioner {
s := external{
command: command,
filesystem: filesystem,
filesystem: cfg.Filesystem(),
}
l.Debugf("instantiated %#v", s)

View File

@ -10,6 +10,7 @@ import (
"strconv"
"time"
"github.com/syncthing/syncthing/lib/config"
"github.com/syncthing/syncthing/lib/fs"
)
@ -19,21 +20,23 @@ func init() {
}
type simple struct {
keep int
folderFs fs.Filesystem
versionsFs fs.Filesystem
keep int
folderFs fs.Filesystem
versionsFs fs.Filesystem
copyRangeMethod fs.CopyRangeMethod
}
func newSimple(folderFs fs.Filesystem, params map[string]string) Versioner {
keep, err := strconv.Atoi(params["keep"])
func newSimple(cfg config.FolderConfiguration) Versioner {
var keep, err = strconv.Atoi(cfg.Versioning.Params["keep"])
if err != nil {
keep = 5 // A reasonable default
}
s := simple{
keep: keep,
folderFs: folderFs,
versionsFs: fsFromParams(folderFs, params),
keep: keep,
folderFs: cfg.Filesystem(),
versionsFs: versionerFsFromFolderCfg(cfg),
copyRangeMethod: cfg.CopyRangeMethod,
}
l.Debugf("instantiated %#v", s)
@ -43,7 +46,7 @@ func newSimple(folderFs fs.Filesystem, params map[string]string) Versioner {
// Archive moves the named file away to a version archive. If this function
// returns nil, the named file does not exist any more (has been archived).
func (v simple) Archive(filePath string) error {
err := archiveFile(v.folderFs, v.versionsFs, filePath, TagFilename)
err := archiveFile(v.copyRangeMethod, v.folderFs, v.versionsFs, filePath, TagFilename)
if err != nil {
return err
}
@ -68,5 +71,5 @@ func (v simple) GetVersions() (map[string][]FileVersion, error) {
}
func (v simple) Restore(filepath string, versionTime time.Time) error {
return restoreFile(v.versionsFs, v.folderFs, filepath, versionTime, TagFilename)
return restoreFile(v.copyRangeMethod, v.versionsFs, v.folderFs, filepath, versionTime, TagFilename)
}

View File

@ -7,6 +7,7 @@
package versioner
import (
"github.com/syncthing/syncthing/lib/config"
"io/ioutil"
"math"
"path/filepath"
@ -59,9 +60,18 @@ func TestSimpleVersioningVersionCount(t *testing.T) {
t.Error(err)
}
fs := fs.NewFilesystem(fs.FilesystemTypeBasic, dir)
cfg := config.FolderConfiguration{
FilesystemType: fs.FilesystemTypeBasic,
Path: dir,
Versioning: config.VersioningConfiguration{
Params: map[string]string{
"keep": "2",
},
},
}
fs := cfg.Filesystem()
v := newSimple(fs, map[string]string{"keep": "2"})
v := newSimple(cfg)
path := "test"

View File

@ -15,6 +15,7 @@ import (
"github.com/thejerf/suture"
"github.com/syncthing/syncthing/lib/config"
"github.com/syncthing/syncthing/lib/fs"
"github.com/syncthing/syncthing/lib/sync"
"github.com/syncthing/syncthing/lib/util"
@ -32,16 +33,18 @@ type interval struct {
type staggered struct {
suture.Service
cleanInterval int64
folderFs fs.Filesystem
versionsFs fs.Filesystem
interval [4]interval
mutex sync.Mutex
cleanInterval int64
folderFs fs.Filesystem
versionsFs fs.Filesystem
interval [4]interval
copyRangeMethod fs.CopyRangeMethod
mutex sync.Mutex
testCleanDone chan struct{}
}
func newStaggered(folderFs fs.Filesystem, params map[string]string) Versioner {
func newStaggered(cfg config.FolderConfiguration) Versioner {
params := cfg.Versioning.Params
maxAge, err := strconv.ParseInt(params["maxAge"], 10, 0)
if err != nil {
maxAge = 31536000 // Default: ~1 year
@ -53,11 +56,11 @@ func newStaggered(folderFs fs.Filesystem, params map[string]string) Versioner {
// Backwards compatibility
params["fsPath"] = params["versionsPath"]
versionsFs := fsFromParams(folderFs, params)
versionsFs := versionerFsFromFolderCfg(cfg)
s := &staggered{
cleanInterval: cleanInterval,
folderFs: folderFs,
folderFs: cfg.Filesystem(),
versionsFs: versionsFs,
interval: [4]interval{
{30, 60 * 60}, // first hour -> 30 sec between versions
@ -65,7 +68,8 @@ func newStaggered(folderFs fs.Filesystem, params map[string]string) Versioner {
{24 * 60 * 60, 30 * 24 * 60 * 60}, // next 30 days -> 1 day between versions
{7 * 24 * 60 * 60, maxAge}, // next year -> 1 week between versions
},
mutex: sync.NewMutex(),
copyRangeMethod: cfg.CopyRangeMethod,
mutex: sync.NewMutex(),
}
s.Service = util.AsService(s.serve, s.String())
@ -216,7 +220,7 @@ func (v *staggered) Archive(filePath string) error {
v.mutex.Lock()
defer v.mutex.Unlock()
if err := archiveFile(v.folderFs, v.versionsFs, filePath, TagFilename); err != nil {
if err := archiveFile(v.copyRangeMethod, v.folderFs, v.versionsFs, filePath, TagFilename); err != nil {
return err
}
@ -230,7 +234,7 @@ func (v *staggered) GetVersions() (map[string][]FileVersion, error) {
}
func (v *staggered) Restore(filepath string, versionTime time.Time) error {
return restoreFile(v.versionsFs, v.folderFs, filepath, versionTime, TagFilename)
return restoreFile(v.copyRangeMethod, v.versionsFs, v.folderFs, filepath, versionTime, TagFilename)
}
func (v *staggered) String() string {

View File

@ -7,6 +7,7 @@
package versioner
import (
"github.com/syncthing/syncthing/lib/config"
"sort"
"strconv"
"testing"
@ -96,9 +97,17 @@ func TestStaggeredVersioningVersionCount(t *testing.T) {
}
sort.Strings(delete)
v := newStaggered(fs.NewFilesystem(fs.FilesystemTypeFake, "testdata"), map[string]string{
"maxAge": strconv.Itoa(365 * 86400),
}).(*staggered)
cfg := config.FolderConfiguration{
FilesystemType: fs.FilesystemTypeBasic,
Path: "testdata",
Versioning: config.VersioningConfiguration{
Params: map[string]string{
"maxAge": strconv.Itoa(365 * 86400),
},
},
}
v := newStaggered(cfg).(*staggered)
rem := v.toRemove(versionsWithMtime, now)
sort.Strings(rem)

View File

@ -14,6 +14,7 @@ import (
"github.com/thejerf/suture"
"github.com/syncthing/syncthing/lib/config"
"github.com/syncthing/syncthing/lib/fs"
"github.com/syncthing/syncthing/lib/util"
)
@ -25,19 +26,21 @@ func init() {
type trashcan struct {
suture.Service
folderFs fs.Filesystem
versionsFs fs.Filesystem
cleanoutDays int
folderFs fs.Filesystem
versionsFs fs.Filesystem
cleanoutDays int
copyRangeMethod fs.CopyRangeMethod
}
func newTrashcan(folderFs fs.Filesystem, params map[string]string) Versioner {
cleanoutDays, _ := strconv.Atoi(params["cleanoutDays"])
func newTrashcan(cfg config.FolderConfiguration) Versioner {
cleanoutDays, _ := strconv.Atoi(cfg.Versioning.Params["cleanoutDays"])
// On error we default to 0, "do not clean out the trash can"
s := &trashcan{
folderFs: folderFs,
versionsFs: fsFromParams(folderFs, params),
cleanoutDays: cleanoutDays,
folderFs: cfg.Filesystem(),
versionsFs: versionerFsFromFolderCfg(cfg),
cleanoutDays: cleanoutDays,
copyRangeMethod: cfg.CopyRangeMethod,
}
s.Service = util.AsService(s.serve, s.String())
@ -48,7 +51,7 @@ func newTrashcan(folderFs fs.Filesystem, params map[string]string) Versioner {
// Archive moves the named file away to a version archive. If this function
// returns nil, the named file does not exist any more (has been archived).
func (t *trashcan) Archive(filePath string) error {
return archiveFile(t.folderFs, t.versionsFs, filePath, func(name, tag string) string {
return archiveFile(t.copyRangeMethod, t.folderFs, t.versionsFs, filePath, func(name, tag string) string {
return name
})
}
@ -144,7 +147,7 @@ func (t *trashcan) Restore(filepath string, versionTime time.Time) error {
return name
}
err := restoreFile(t.versionsFs, t.folderFs, filepath, versionTime, tagger)
err := restoreFile(t.copyRangeMethod, t.versionsFs, t.folderFs, filepath, versionTime, tagger)
if taggedName == "" {
return err
}

View File

@ -7,6 +7,7 @@
package versioner
import (
"github.com/syncthing/syncthing/lib/config"
"io/ioutil"
"os"
"path/filepath"
@ -53,7 +54,17 @@ func TestTrashcanCleanout(t *testing.T) {
}
}
versioner := newTrashcan(fs.NewFilesystem(fs.FilesystemTypeBasic, "testdata"), map[string]string{"cleanoutDays": "7"}).(*trashcan)
cfg := config.FolderConfiguration{
FilesystemType: fs.FilesystemTypeBasic,
Path: "testdata",
Versioning: config.VersioningConfiguration{
Params: map[string]string{
"cleanoutDays": "7",
},
},
}
versioner := newTrashcan(cfg).(*trashcan)
if err := versioner.cleanoutArchive(); err != nil {
t.Fatal(err)
}
@ -90,15 +101,23 @@ func TestTrashcanArchiveRestoreSwitcharoo(t *testing.T) {
t.Fatal(err)
}
folderFs := fs.NewFilesystem(fs.FilesystemTypeBasic, tmpDir1)
cfg := config.FolderConfiguration{
FilesystemType: fs.FilesystemTypeBasic,
Path: tmpDir1,
Versioning: config.VersioningConfiguration{
Params: map[string]string{
"fsType": "basic",
"fsPath": tmpDir2,
},
},
}
folderFs := cfg.Filesystem()
versionsFs := fs.NewFilesystem(fs.FilesystemTypeBasic, tmpDir2)
writeFile(t, folderFs, "file", "A")
versioner := newTrashcan(folderFs, map[string]string{
"fsType": "basic",
"fsPath": tmpDir2,
})
versioner := newTrashcan(cfg)
if err := versioner.Archive("file"); err != nil {
t.Fatal(err)

View File

@ -14,6 +14,8 @@ import (
"time"
"github.com/pkg/errors"
"github.com/syncthing/syncthing/lib/config"
"github.com/syncthing/syncthing/lib/fs"
"github.com/syncthing/syncthing/lib/osutil"
"github.com/syncthing/syncthing/lib/util"
@ -129,7 +131,7 @@ func retrieveVersions(fileSystem fs.Filesystem) (map[string][]FileVersion, error
type fileTagger func(string, string) string
func archiveFile(srcFs, dstFs fs.Filesystem, filePath string, tagger fileTagger) error {
func archiveFile(method fs.CopyRangeMethod, srcFs, dstFs fs.Filesystem, filePath string, tagger fileTagger) error {
filePath = osutil.NativeFilename(filePath)
info, err := srcFs.Lstat(filePath)
if fs.IsNotExist(err) {
@ -170,7 +172,7 @@ func archiveFile(srcFs, dstFs fs.Filesystem, filePath string, tagger fileTagger)
ver := tagger(file, now.Format(TimeFormat))
dst := filepath.Join(inFolderPath, ver)
l.Debugln("archiving", filePath, "moving to", dst)
err = osutil.RenameOrCopy(srcFs, dstFs, filePath, dst)
err = osutil.RenameOrCopy(method, srcFs, dstFs, filePath, dst)
mtime := info.ModTime()
// If it's a trashcan versioner type thing, then it does not have version time in the name
@ -184,7 +186,7 @@ func archiveFile(srcFs, dstFs fs.Filesystem, filePath string, tagger fileTagger)
return err
}
func restoreFile(src, dst fs.Filesystem, filePath string, versionTime time.Time, tagger fileTagger) error {
func restoreFile(method fs.CopyRangeMethod, src, dst fs.Filesystem, filePath string, versionTime time.Time, tagger fileTagger) error {
tag := versionTime.In(time.Local).Truncate(time.Second).Format(TimeFormat)
taggedFilePath := tagger(filePath, tag)
@ -200,7 +202,7 @@ func restoreFile(src, dst fs.Filesystem, filePath string, versionTime time.Time,
return errors.Wrap(err, "removing existing symlink")
}
case info.IsRegular():
if err := archiveFile(dst, src, filePath, tagger); err != nil {
if err := archiveFile(method, dst, src, filePath, tagger); err != nil {
return errors.Wrap(err, "archiving existing file")
}
default:
@ -247,12 +249,14 @@ func restoreFile(src, dst fs.Filesystem, filePath string, versionTime time.Time,
}
_ = dst.MkdirAll(filepath.Dir(filePath), 0755)
err := osutil.RenameOrCopy(src, dst, sourceFile, filePath)
err := osutil.RenameOrCopy(method, src, dst, sourceFile, filePath)
_ = dst.Chtimes(filePath, sourceMtime, sourceMtime)
return err
}
func fsFromParams(folderFs fs.Filesystem, params map[string]string) (versionsFs fs.Filesystem) {
func versionerFsFromFolderCfg(cfg config.FolderConfiguration) (versionsFs fs.Filesystem) {
params := cfg.Versioning.Params
folderFs := cfg.Filesystem()
if params["fsType"] == "" && params["fsPath"] == "" {
versionsFs = fs.NewFilesystem(folderFs.Type(), filepath.Join(folderFs.URI(), ".stversions"))

View File

@ -14,7 +14,6 @@ import (
"time"
"github.com/syncthing/syncthing/lib/config"
"github.com/syncthing/syncthing/lib/fs"
)
type Versioner interface {
@ -29,7 +28,7 @@ type FileVersion struct {
Size int64 `json:"size"`
}
type factory func(filesystem fs.Filesystem, params map[string]string) Versioner
type factory func(cfg config.FolderConfiguration) Versioner
var factories = make(map[string]factory)
@ -40,11 +39,11 @@ const (
timeGlob = "[0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9]-[0-9][0-9][0-9][0-9][0-9][0-9]" // glob pattern matching TimeFormat
)
func New(fs fs.Filesystem, cfg config.VersioningConfiguration) (Versioner, error) {
fac, ok := factories[cfg.Type]
func New(cfg config.FolderConfiguration) (Versioner, error) {
fac, ok := factories[cfg.Versioning.Type]
if !ok {
return nil, fmt.Errorf("requested versioning type %q does not exist", cfg.Type)
}
return fac(fs, cfg.Params), nil
return fac(cfg), nil
}