aboutsummaryrefslogtreecommitdiff
path: root/lib/model/folder_summary.go
blob: 95653c1de2a7d75de37f32a5eb7ee98bed236731 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
// Copyright (C) 2015 The Syncthing Authors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at https://mozilla.org/MPL/2.0/.

//go:generate -command counterfeiter go run github.com/maxbrunsfeld/counterfeiter/v6
//go:generate counterfeiter -o mocks/folderSummaryService.go --fake-name FolderSummaryService . FolderSummaryService

package model

import (
	"context"
	"fmt"
	"strings"
	"time"

	"github.com/thejerf/suture/v4"

	"github.com/syncthing/syncthing/lib/config"
	"github.com/syncthing/syncthing/lib/db"
	"github.com/syncthing/syncthing/lib/events"
	"github.com/syncthing/syncthing/lib/protocol"
	"github.com/syncthing/syncthing/lib/svcutil"
	"github.com/syncthing/syncthing/lib/sync"
)

type FolderSummaryService interface {
	suture.Service
	Summary(folder string) (*FolderSummary, error)
}

// The folderSummaryService adds summary information events (FolderSummary and
// FolderCompletion) into the event stream at certain intervals.
type folderSummaryService struct {
	*suture.Supervisor

	cfg       config.Wrapper
	model     Model
	id        protocol.DeviceID
	evLogger  events.Logger
	immediate chan string

	// For keeping track of folders to recalculate for
	foldersMut sync.Mutex
	folders    map[string]struct{}
}

func NewFolderSummaryService(cfg config.Wrapper, m Model, id protocol.DeviceID, evLogger events.Logger) FolderSummaryService {
	service := &folderSummaryService{
		Supervisor: suture.New("folderSummaryService", svcutil.SpecWithDebugLogger(l)),
		cfg:        cfg,
		model:      m,
		id:         id,
		evLogger:   evLogger,
		immediate:  make(chan string),
		folders:    make(map[string]struct{}),
		foldersMut: sync.NewMutex(),
	}

	service.Add(svcutil.AsService(service.listenForUpdates, fmt.Sprintf("%s/listenForUpdates", service)))
	service.Add(svcutil.AsService(service.calculateSummaries, fmt.Sprintf("%s/calculateSummaries", service)))

	return service
}

func (c *folderSummaryService) String() string {
	return fmt.Sprintf("FolderSummaryService@%p", c)
}

// FolderSummary replaces the previously used map[string]interface{}, and needs
// to keep the structure/naming for api backwards compatibility
type FolderSummary struct {
	Errors     int `json:"errors"`
	PullErrors int `json:"pullErrors"` // deprecated

	Invalid string `json:"invalid"` // deprecated

	GlobalFiles       int   `json:"globalFiles"`
	GlobalDirectories int   `json:"globalDirectories"`
	GlobalSymlinks    int   `json:"globalSymlinks"`
	GlobalDeleted     int   `json:"globalDeleted"`
	GlobalBytes       int64 `json:"globalBytes"`
	GlobalTotalItems  int   `json:"globalTotalItems"`

	LocalFiles       int   `json:"localFiles"`
	LocalDirectories int   `json:"localDirectories"`
	LocalSymlinks    int   `json:"localSymlinks"`
	LocalDeleted     int   `json:"localDeleted"`
	LocalBytes       int64 `json:"localBytes"`
	LocalTotalItems  int   `json:"localTotalItems"`

	NeedFiles       int   `json:"needFiles"`
	NeedDirectories int   `json:"needDirectories"`
	NeedSymlinks    int   `json:"needSymlinks"`
	NeedDeletes     int   `json:"needDeletes"`
	NeedBytes       int64 `json:"needBytes"`
	NeedTotalItems  int   `json:"needTotalItems"`

	ReceiveOnlyChangedFiles       int   `json:"receiveOnlyChangedFiles"`
	ReceiveOnlyChangedDirectories int   `json:"receiveOnlyChangedDirectories"`
	ReceiveOnlyChangedSymlinks    int   `json:"receiveOnlyChangedSymlinks"`
	ReceiveOnlyChangedDeletes     int   `json:"receiveOnlyChangedDeletes"`
	ReceiveOnlyChangedBytes       int64 `json:"receiveOnlyChangedBytes"`
	ReceiveOnlyTotalItems         int   `json:"receiveOnlyTotalItems"`

	InSyncFiles int   `json:"inSyncFiles"`
	InSyncBytes int64 `json:"inSyncBytes"`

	State        string    `json:"state"`
	StateChanged time.Time `json:"stateChanged"`
	Error        string    `json:"error"`

	Version        int64                       `json:"version"` // deprecated
	Sequence       int64                       `json:"sequence"`
	RemoteSequence map[protocol.DeviceID]int64 `json:"remoteSequence"`

	IgnorePatterns bool   `json:"ignorePatterns"`
	WatchError     string `json:"watchError"`
}

func (c *folderSummaryService) Summary(folder string) (*FolderSummary, error) {
	res := new(FolderSummary)

	var local, global, need, ro db.Counts
	var ourSeq int64
	var remoteSeq map[protocol.DeviceID]int64
	errors, err := c.model.FolderErrors(folder)
	if err == nil {
		var snap *db.Snapshot
		if snap, err = c.model.DBSnapshot(folder); err == nil {
			global = snap.GlobalSize()
			local = snap.LocalSize()
			need = snap.NeedSize(protocol.LocalDeviceID)
			ro = snap.ReceiveOnlyChangedSize()
			ourSeq = snap.Sequence(protocol.LocalDeviceID)
			remoteSeq = snap.RemoteSequences()
			snap.Release()
		}
	}
	// For API backwards compatibility (SyncTrayzor needs it) an empty folder
	// summary is returned for not running folders, an error might actually be
	// more appropriate
	if err != nil && err != ErrFolderPaused && err != ErrFolderNotRunning {
		return nil, err
	}

	res.Errors = len(errors)
	res.PullErrors = len(errors) // deprecated

	res.Invalid = "" // Deprecated, retains external API for now

	res.GlobalFiles, res.GlobalDirectories, res.GlobalSymlinks, res.GlobalDeleted, res.GlobalBytes, res.GlobalTotalItems = global.Files, global.Directories, global.Symlinks, global.Deleted, global.Bytes, global.TotalItems()

	res.LocalFiles, res.LocalDirectories, res.LocalSymlinks, res.LocalDeleted, res.LocalBytes, res.LocalTotalItems = local.Files, local.Directories, local.Symlinks, local.Deleted, local.Bytes, local.TotalItems()

	fcfg, haveFcfg := c.cfg.Folder(folder)

	if haveFcfg && fcfg.IgnoreDelete {
		need.Deleted = 0
	}

	need.Bytes -= c.model.FolderProgressBytesCompleted(folder)
	// This may happen if we are in progress of pulling files that were
	// deleted globally after the pull started.
	if need.Bytes < 0 {
		need.Bytes = 0
	}
	res.NeedFiles, res.NeedDirectories, res.NeedSymlinks, res.NeedDeletes, res.NeedBytes, res.NeedTotalItems = need.Files, need.Directories, need.Symlinks, need.Deleted, need.Bytes, need.TotalItems()

	if haveFcfg && (fcfg.Type == config.FolderTypeReceiveOnly || fcfg.Type == config.FolderTypeReceiveEncrypted) {
		// Add statistics for things that have changed locally in a receive
		// only or receive encrypted folder.
		res.ReceiveOnlyChangedFiles = ro.Files
		res.ReceiveOnlyChangedDirectories = ro.Directories
		res.ReceiveOnlyChangedSymlinks = ro.Symlinks
		res.ReceiveOnlyChangedDeletes = ro.Deleted
		res.ReceiveOnlyChangedBytes = ro.Bytes
		res.ReceiveOnlyTotalItems = ro.TotalItems()
	}

	res.InSyncFiles, res.InSyncBytes = global.Files-need.Files, global.Bytes-need.Bytes

	res.State, res.StateChanged, err = c.model.State(folder)
	if err != nil {
		res.Error = err.Error()
	}

	res.Version = ourSeq // legacy
	res.Sequence = ourSeq
	res.RemoteSequence = remoteSeq

	ignorePatterns, _, _ := c.model.CurrentIgnores(folder)
	res.IgnorePatterns = false
	for _, line := range ignorePatterns {
		if len(line) > 0 && !strings.HasPrefix(line, "//") {
			res.IgnorePatterns = true
			break
		}
	}

	err = c.model.WatchError(folder)
	if err != nil {
		res.WatchError = err.Error()
	}

	return res, nil
}

// listenForUpdates subscribes to the event bus and makes note of folders that
// need their data recalculated.
func (c *folderSummaryService) listenForUpdates(ctx context.Context) error {
	sub := c.evLogger.Subscribe(events.LocalIndexUpdated | events.RemoteIndexUpdated | events.StateChanged | events.RemoteDownloadProgress | events.DeviceConnected | events.ClusterConfigReceived | events.FolderWatchStateChanged | events.DownloadProgress)
	defer sub.Unsubscribe()

	for {
		// This loop needs to be fast so we don't miss too many events.

		select {
		case ev, ok := <-sub.C():
			if !ok {
				<-ctx.Done()
				return ctx.Err()
			}
			c.processUpdate(ev)
		case <-ctx.Done():
			return ctx.Err()
		}
	}
}

func (c *folderSummaryService) processUpdate(ev events.Event) {
	var folder string

	switch ev.Type {
	case events.DeviceConnected, events.ClusterConfigReceived:
		// When a device connects we schedule a refresh of all
		// folders shared with that device.

		var deviceID protocol.DeviceID
		if ev.Type == events.DeviceConnected {
			data := ev.Data.(map[string]string)
			deviceID, _ = protocol.DeviceIDFromString(data["id"])
		} else {
			data := ev.Data.(ClusterConfigReceivedEventData)
			deviceID = data.Device
		}

		c.foldersMut.Lock()
	nextFolder:
		for _, folder := range c.cfg.Folders() {
			for _, dev := range folder.Devices {
				if dev.DeviceID == deviceID {
					c.folders[folder.ID] = struct{}{}
					continue nextFolder
				}
			}
		}
		c.foldersMut.Unlock()

		return

	case events.DownloadProgress:
		data := ev.Data.(map[string]map[string]*pullerProgress)
		c.foldersMut.Lock()
		for folder := range data {
			c.folders[folder] = struct{}{}
		}
		c.foldersMut.Unlock()
		return

	case events.StateChanged:
		data := ev.Data.(map[string]interface{})
		if data["to"].(string) != "idle" {
			return
		}
		if from := data["from"].(string); from != "syncing" && from != "sync-preparing" {
			return
		}

		// The folder changed to idle from syncing. We should do an
		// immediate refresh to update the GUI. The send to
		// c.immediate must be nonblocking so that we can continue
		// handling events.

		folder = data["folder"].(string)
		select {
		case c.immediate <- folder:
			c.foldersMut.Lock()
			delete(c.folders, folder)
			c.foldersMut.Unlock()
			return
		default:
			// Refresh whenever we do the next summary.
		}

	default:
		// The other events all have a "folder" attribute that they
		// affect. Whenever the local or remote index is updated for a
		// given folder we make a note of it.
		// This folder needs to be refreshed whenever we do the next
		// refresh.

		folder = ev.Data.(map[string]interface{})["folder"].(string)
	}

	c.foldersMut.Lock()
	c.folders[folder] = struct{}{}
	c.foldersMut.Unlock()
}

// calculateSummaries periodically recalculates folder summaries and
// completion percentage, and sends the results on the event bus.
func (c *folderSummaryService) calculateSummaries(ctx context.Context) error {
	const pumpInterval = 2 * time.Second
	pump := time.NewTimer(pumpInterval)

	for {
		select {
		case <-pump.C:
			t0 := time.Now()
			for _, folder := range c.foldersToHandle() {
				select {
				case <-ctx.Done():
					return ctx.Err()
				default:
				}
				c.sendSummary(ctx, folder)
			}

			// We don't want to spend all our time calculating summaries. Lets
			// set an arbitrary limit at not spending more than about 30% of
			// our time here...
			wait := 2*time.Since(t0) + pumpInterval
			pump.Reset(wait)

		case folder := <-c.immediate:
			c.sendSummary(ctx, folder)

		case <-ctx.Done():
			return ctx.Err()
		}
	}
}

// foldersToHandle returns the list of folders needing a summary update, and
// clears the list.
func (c *folderSummaryService) foldersToHandle() []string {
	c.foldersMut.Lock()
	res := make([]string, 0, len(c.folders))
	for folder := range c.folders {
		res = append(res, folder)
		delete(c.folders, folder)
	}
	c.foldersMut.Unlock()
	return res
}

type FolderSummaryEventData struct {
	Folder  string         `json:"folder"`
	Summary *FolderSummary `json:"summary"`
}

// sendSummary send the summary events for a single folder
func (c *folderSummaryService) sendSummary(ctx context.Context, folder string) {
	// The folder summary contains how many bytes, files etc
	// are in the folder and how in sync we are.
	data, err := c.Summary(folder)
	if err != nil {
		return
	}
	c.evLogger.Log(events.FolderSummary, FolderSummaryEventData{
		Folder:  folder,
		Summary: data,
	})

	metricFolderSummary.WithLabelValues(folder, metricScopeGlobal, metricTypeFiles).Set(float64(data.GlobalFiles))
	metricFolderSummary.WithLabelValues(folder, metricScopeGlobal, metricTypeDirectories).Set(float64(data.GlobalDirectories))
	metricFolderSummary.WithLabelValues(folder, metricScopeGlobal, metricTypeSymlinks).Set(float64(data.GlobalSymlinks))
	metricFolderSummary.WithLabelValues(folder, metricScopeGlobal, metricTypeDeleted).Set(float64(data.GlobalDeleted))
	metricFolderSummary.WithLabelValues(folder, metricScopeGlobal, metricTypeBytes).Set(float64(data.GlobalBytes))

	metricFolderSummary.WithLabelValues(folder, metricScopeLocal, metricTypeFiles).Set(float64(data.LocalFiles))
	metricFolderSummary.WithLabelValues(folder, metricScopeLocal, metricTypeDirectories).Set(float64(data.LocalDirectories))
	metricFolderSummary.WithLabelValues(folder, metricScopeLocal, metricTypeSymlinks).Set(float64(data.LocalSymlinks))
	metricFolderSummary.WithLabelValues(folder, metricScopeLocal, metricTypeDeleted).Set(float64(data.LocalDeleted))
	metricFolderSummary.WithLabelValues(folder, metricScopeLocal, metricTypeBytes).Set(float64(data.LocalBytes))

	metricFolderSummary.WithLabelValues(folder, metricScopeNeed, metricTypeFiles).Set(float64(data.NeedFiles))
	metricFolderSummary.WithLabelValues(folder, metricScopeNeed, metricTypeDirectories).Set(float64(data.NeedDirectories))
	metricFolderSummary.WithLabelValues(folder, metricScopeNeed, metricTypeSymlinks).Set(float64(data.NeedSymlinks))
	metricFolderSummary.WithLabelValues(folder, metricScopeNeed, metricTypeDeleted).Set(float64(data.NeedDeletes))
	metricFolderSummary.WithLabelValues(folder, metricScopeNeed, metricTypeBytes).Set(float64(data.NeedBytes))

	for _, devCfg := range c.cfg.Folders()[folder].Devices {
		select {
		case <-ctx.Done():
			return
		default:
		}

		if devCfg.DeviceID.Equals(c.id) {
			// We already know about ourselves.
			continue
		}

		// Get completion percentage of this folder for the
		// remote device.
		comp, err := c.model.Completion(devCfg.DeviceID, folder)
		if err != nil {
			l.Debugf("Error getting completion for folder %v, device %v: %v", folder, devCfg.DeviceID, err)
			continue
		}
		ev := comp.Map()
		ev["folder"] = folder
		ev["device"] = devCfg.DeviceID.String()
		c.evLogger.Log(events.FolderCompletion, ev)
	}
}