aboutsummaryrefslogtreecommitdiff
path: root/lib/model/indexhandler_test.go
blob: bf91565099574dad3b5e27a5877c1c84c9b64630 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
// Copyright (C) 2024 The Syncthing Authors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at https://mozilla.org/MPL/2.0/.

package model_test

import (
	"context"
	"fmt"
	"io"
	"sync"
	"testing"

	"github.com/syncthing/syncthing/lib/db"
	"github.com/syncthing/syncthing/lib/model/mocks"
	"github.com/syncthing/syncthing/lib/protocol"
	protomock "github.com/syncthing/syncthing/lib/protocol/mocks"
	"github.com/syncthing/syncthing/lib/testutil"
)

func TestIndexhandlerConcurrency(t *testing.T) {
	// Verify that sending a lot of index update messages using the
	// FileInfoBatch works and doesn't trigger the race detector.
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	ar, aw := io.Pipe()
	br, bw := io.Pipe()
	ci := &protomock.ConnectionInfo{}

	m1 := &mocks.Model{}
	c1 := protocol.NewConnection(protocol.EmptyDeviceID, ar, bw, testutil.NoopCloser{}, m1, ci, protocol.CompressionNever, nil, nil)
	c1.Start()
	defer c1.Close(io.EOF)

	m2 := &mocks.Model{}
	c2 := protocol.NewConnection(protocol.EmptyDeviceID, br, aw, testutil.NoopCloser{}, m2, ci, protocol.CompressionNever, nil, nil)
	c2.Start()
	defer c2.Close(io.EOF)

	c1.ClusterConfig(protocol.ClusterConfig{})
	c2.ClusterConfig(protocol.ClusterConfig{})
	c1.Index(ctx, "foo", nil)
	c2.Index(ctx, "foo", nil)

	const msgs = 5e2
	const files = 1e3

	recvdEntries := 0
	recvdBatches := 0
	var wg sync.WaitGroup
	m2.IndexUpdateCalls(func(_ protocol.Connection, idxUp *protocol.IndexUpdate) error {
		for j := 0; j < files; j++ {
			if n := idxUp.Files[j].Name; n != fmt.Sprintf("f%d-%d", recvdBatches, j) {
				t.Error("wrong filename", n)
			}
			recvdEntries++
		}
		recvdBatches++
		wg.Done()
		return nil
	})

	b1 := db.NewFileInfoBatch(func(fs []protocol.FileInfo) error {
		return c1.IndexUpdate(ctx, "foo", fs)
	})
	sentEntries := 0
	for i := 0; i < msgs; i++ {
		for j := 0; j < files; j++ {
			b1.Append(protocol.FileInfo{
				Name:   fmt.Sprintf("f%d-%d", i, j),
				Blocks: []protocol.BlockInfo{{Hash: make([]byte, 32)}},
			})
			sentEntries++
		}
		wg.Add(1)
		if err := b1.Flush(); err != nil {
			t.Fatal(err)
		}
	}

	// Every sent IndexUpdate should be matched by a corresponding index
	// message on the other side. Use the waitgroup to wait for this to
	// complete, as otherwise the Close below can race with the last
	// outgoing index message and the count between sent and received is
	// wrong.
	wg.Wait()

	c1.Close(io.EOF)
	c2.Close(io.EOF)
	<-c1.Closed()
	<-c2.Closed()

	if recvdEntries != sentEntries {
		t.Error("didn't receive all expected messages", recvdEntries, sentEntries)
	}
}