aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJakob Borg <jakob@kastelo.net>2023-05-25 09:04:14 +0200
committerJakob Borg <jakob@kastelo.net>2023-05-25 09:04:30 +0200
commit795cbff55db41571355f408f9f4fffce5adc74c4 (patch)
treef4f0050c876d1ddca169cf10a12baa548fcdaa4f
parent962fedc37848237c3e52a652542d7574b95ed4c7 (diff)
downloadsyncthing-795cbff55db41571355f408f9f4fffce5adc74c4.tar.gz
syncthing-795cbff55db41571355f408f9f4fffce5adc74c4.zip
wip
-rw-r--r--lib/connections/service.go23
-rw-r--r--lib/connections/structs.go19
-rw-r--r--lib/model/model.go38
-rw-r--r--test/h1/config.xml6
-rw-r--r--test/h2/config.xml7
5 files changed, 64 insertions, 29 deletions
diff --git a/lib/connections/service.go b/lib/connections/service.go
index f832c5685..4786eec54 100644
--- a/lib/connections/service.go
+++ b/lib/connections/service.go
@@ -68,7 +68,6 @@ var (
const (
perDeviceWarningIntv = 15 * time.Minute
tlsHandshakeTimeout = 10 * time.Second
- minConnectionReplaceAge = 10 * time.Second
minConnectionLoopSleep = 5 * time.Second
stdConnectionLoopSleep = time.Minute
worstDialerPriority = math.MaxInt32
@@ -318,17 +317,17 @@ func (s *service) connectionCheckEarly(remoteID protocol.DeviceID, c internalCon
return errNetworkNotAllowed
}
- // Lower priority is better, just like nice etc.
- if ct, ok := s.model.Connection(remoteID); ok {
- if ct.Priority() > c.priority || time.Since(ct.Statistics().StartedAt) > minConnectionReplaceAge {
- l.Debugf("Switching connections %s (existing: %s new: %s)", remoteID, ct, c)
- } else if cfg.MultipleConnections <= s.connectionsForDevice(cfg.DeviceID) {
- // We should not already be connected to the other party. TODO: This
- // could use some better handling. If the old connection is dead but
- // hasn't timed out yet we may want to drop *that* connection and keep
- // this one. But in case we are two devices connecting to each other
- // in parallel we don't want to do that or we end up with no
- // connections still established...
+ if existing := s.connectionsForDevice(cfg.DeviceID); existing > 0 {
+ // Check if the new connection is better than an existing one. Lower
+ // priority is better, just like `nice` etc.
+ if ct, ok := s.model.Connection(remoteID); ok {
+ if ct.Priority() > c.priority {
+ l.Debugf("Switching connections %s (existing: %s new: %s)", remoteID, ct, c)
+ return nil
+ }
+ }
+ if existing >= cfg.MultipleConnections {
+ // We're not allowed to accept any more connections to this device.
return errDeviceAlreadyConnected
}
}
diff --git a/lib/connections/structs.go b/lib/connections/structs.go
index 2040fcb08..10feadaa3 100644
--- a/lib/connections/structs.go
+++ b/lib/connections/structs.go
@@ -93,21 +93,28 @@ func (t connType) Transport() string {
func newInternalConn(tc tlsConn, connType connType, isLocal bool, priority int) internalConn {
now := time.Now()
- buf := binary.BigEndian.AppendUint64(nil, uint64(now.UnixNano()))
- hash := sha256.Sum224([]byte(fmt.Sprintf("%v-%v-%v", tc.LocalAddr(), connType.Transport(), tc.RemoteAddr())))
- buf = append(buf, hash[:]...)
- connectionID := base32.HexEncoding.WithPadding(base32.NoPadding).EncodeToString(buf)
-
return internalConn{
tlsConn: tc,
connType: connType,
isLocal: isLocal,
priority: priority,
establishedAt: now.Truncate(time.Second),
- connectionID: connectionID,
+ connectionID: newConnectionID(tc, connType, now),
}
}
+// newConnection generates a connection ID. The connection ID is designed to
+// be 1) unique for each connection (even those reusing the same socket
+// address on both sides), 2) sortable so that the connection with the
+// lowest ID will be the primary one. This also coincides with being the
+// oldest connection.
+func newConnectionID(tc tlsConn, connType connType, now time.Time) string {
+ buf := binary.BigEndian.AppendUint64(nil, uint64(now.UnixNano()))
+ hash := sha256.Sum224([]byte(fmt.Sprintf("%v-%v-%v", tc.LocalAddr(), connType.Transport(), tc.RemoteAddr())))
+ buf = append(buf, hash[:]...)
+ return base32.HexEncoding.WithPadding(base32.NoPadding).EncodeToString(buf)
+}
+
func (c internalConn) Close() error {
// *tls.Conn.Close() does more than it says on the tin. Specifically, it
// sends a TLS alert message, which might block forever if the
diff --git a/lib/model/model.go b/lib/model/model.go
index 967a9f629..7d15f9130 100644
--- a/lib/model/model.go
+++ b/lib/model/model.go
@@ -163,6 +163,7 @@ type model struct {
pmut sync.RWMutex
conns map[string]protocol.Connection // connection ID -> connection
deviceConns map[protocol.DeviceID][]string // device -> connection IDs (invariant: if the key exists, the value is len >= 1, with the primary connection at the start of the slice)
+ promotedConn map[protocol.DeviceID]string // device -> last promoted connection ID
connRequestLimiters map[protocol.DeviceID]*util.Semaphore
closed map[string]chan struct{} // connection ID -> closed channel
helloMessages map[protocol.DeviceID]protocol.Hello
@@ -248,6 +249,7 @@ func NewModel(cfg config.Wrapper, id protocol.DeviceID, clientName, clientVersio
pmut: sync.NewRWMutex(),
conns: make(map[string]protocol.Connection),
deviceConns: make(map[protocol.DeviceID][]string),
+ promotedConn: make(map[protocol.DeviceID]string),
connRequestLimiters: make(map[protocol.DeviceID]*util.Semaphore),
closed: make(map[string]chan struct{}),
helloMessages: make(map[protocol.DeviceID]protocol.Hello),
@@ -1912,6 +1914,7 @@ func (m *model) Closed(conn protocol.Connection, err error) {
if len(remainingConns) == 0 {
// All device connections closed
delete(m.deviceConns, deviceID)
+ delete(m.promotedConn, deviceID)
delete(m.connRequestLimiters, deviceID)
delete(m.helloMessages, deviceID)
delete(m.remoteFolderStates, deviceID)
@@ -2433,10 +2436,33 @@ func (m *model) promoteConnections() {
defer m.pmut.Unlock()
for deviceID, connIDs := range m.deviceConns {
+ // Figure out the best current connection priority for this device.
+ bestPriority := m.conns[connIDs[0]].Priority()
+ for _, connID := range connIDs[1:] {
+ priority := m.conns[connID].Priority()
+ if priority < bestPriority {
+ bestPriority = priority
+ }
+ }
+
+ // Close connections with a worse connection priority than best.
+ closing := make(map[string]bool)
+ for _, connID := range connIDs {
+ if m.conns[connID].Priority() > bestPriority {
+ l.Infoln("Closing connection", connID, "to", deviceID.Short(), "because it has a worse connection priority than the best connection")
+ go m.conns[connID].Close(errReplacingConnection)
+ closing[connID] = true
+ }
+ }
+
cm, passwords := m.generateClusterConfigFRLocked(deviceID)
- if idxh, ok := m.indexHandlers[deviceID]; !ok || idxh.conn.ConnectionID() != connIDs[0] {
- // Primary device lacks an index handler. We should promote the
- // primary connection to be the index handling one.
+ if !closing[connIDs[0]] && m.promotedConn[deviceID] != connIDs[0] {
+ // The last promoted connection is not the current primary; we
+ // should promote the primary connection to be the index
+ // handling one. We do this by sending a ClusterConfig on it,
+ // which will cause the other side to start sending us index
+ // messages there. (On our side, we manage index handlers based
+ // on where we get ClusterConfigs from the peer.)
l.Infoln("Promoting connection", connIDs[0], "to", deviceID.Short())
conn := m.conns[connIDs[0]]
if conn.Statistics().StartedAt.IsZero() {
@@ -2444,11 +2470,15 @@ func (m *model) promoteConnections() {
conn.Start()
}
conn.ClusterConfig(cm)
+ m.promotedConn[deviceID] = connIDs[0]
}
- // Make sure any new connections also get started, and a
+ // Make sure any other new connections also get started, and a
// secondary-marked ClusterConfig.
for _, connID := range connIDs[1:] {
+ if closing[connID] {
+ continue
+ }
conn := m.conns[connID]
if conn.Statistics().StartedAt.IsZero() {
conn.SetFolderPasswords(passwords)
diff --git a/test/h1/config.xml b/test/h1/config.xml
index 8dce0e261..7e2f87944 100644
--- a/test/h1/config.xml
+++ b/test/h1/config.xml
@@ -1,5 +1,5 @@
<configuration version="37">
- <folder id="default" label="" path="s1?files=10000&amp;sizeavg=25000000" type="sendreceive" rescanIntervalS="3600" fsWatcherEnabled="false" fsWatcherDelayS="10" ignorePerms="false" autoNormalize="true">
+ <folder id="default" label="" path="s1" type="sendreceive" rescanIntervalS="3600" fsWatcherEnabled="false" fsWatcherDelayS="10" ignorePerms="false" autoNormalize="true">
<filesystemType>fake</filesystemType>
<device id="I6KAH76-66SLLLB-5PFXSOA-UFJCDZC-YAOMLEK-CP2GB32-BV5RQST-3PSROAU" introducedBy="">
<encryptionPassword></encryptionPassword>
@@ -123,9 +123,9 @@
<connectionLimitEnough>0</connectionLimitEnough>
<connectionLimitMax>0</connectionLimitMax>
<insecureAllowOldTLSVersions>false</insecureAllowOldTLSVersions>
- <connectionPriorityTcpLan>20</connectionPriorityTcpLan>
+ <connectionPriorityTcpLan>10</connectionPriorityTcpLan>
<connectionPriorityQuicLan>20</connectionPriorityQuicLan>
- <connectionPriorityTcpWan>40</connectionPriorityTcpWan>
+ <connectionPriorityTcpWan>30</connectionPriorityTcpWan>
<connectionPriorityQuicWan>40</connectionPriorityQuicWan>
<connectionPriorityRelay>50</connectionPriorityRelay>
<connectionPriorityUpgradeThreshold>0</connectionPriorityUpgradeThreshold>
diff --git a/test/h2/config.xml b/test/h2/config.xml
index 704dafbd2..a56c6335e 100644
--- a/test/h2/config.xml
+++ b/test/h2/config.xml
@@ -44,8 +44,7 @@
</xattrFilter>
</folder>
<device id="I6KAH76-66SLLLB-5PFXSOA-UFJCDZC-YAOMLEK-CP2GB32-BV5RQST-3PSROAU" name="s1" compression="metadata" introducer="false" skipIntroductionRemovals="false" introducedBy="">
- <address>tcp://127.0.0.1:22001</address>
- <address>quic://127.0.0.1:22001</address>
+ <address>dynamic</address>
<paused>false</paused>
<autoAcceptFolders>false</autoAcceptFolders>
<maxSendKbps>0</maxSendKbps>
@@ -121,9 +120,9 @@
<connectionLimitEnough>0</connectionLimitEnough>
<connectionLimitMax>0</connectionLimitMax>
<insecureAllowOldTLSVersions>false</insecureAllowOldTLSVersions>
- <connectionPriorityTcpLan>20</connectionPriorityTcpLan>
+ <connectionPriorityTcpLan>10</connectionPriorityTcpLan>
<connectionPriorityQuicLan>20</connectionPriorityQuicLan>
- <connectionPriorityTcpWan>40</connectionPriorityTcpWan>
+ <connectionPriorityTcpWan>30</connectionPriorityTcpWan>
<connectionPriorityQuicWan>40</connectionPriorityQuicWan>
<connectionPriorityRelay>50</connectionPriorityRelay>
<connectionPriorityUpgradeThreshold>0</connectionPriorityUpgradeThreshold>