aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Anthony Knyszek <mknyszek@google.com>2020-08-03 20:11:04 +0000
committerMichael Knyszek <mknyszek@google.com>2020-10-26 18:28:14 +0000
commitfe7ff71185cf30f9bdee3e8d8897e8b6069ad02e (patch)
tree77d824e50c2b2777f0f9d17e76149520e50e6e5e
parent2159c26ceb32bbfa86036431750c0752fca84ef6 (diff)
downloadgo-fe7ff71185cf30f9bdee3e8d8897e8b6069ad02e.tar.gz
go-fe7ff71185cf30f9bdee3e8d8897e8b6069ad02e.zip
runtime: add consistent heap statistics
This change adds a global set of heap statistics which are similar to existing memory statistics. The purpose of these new statistics is to be able to read them and get a consistent result without stopping the world. The goal is to eventually replace as many of the existing memstats statistics with the sharded ones as possible. The consistent memory statistics use a tailor-made synchronization mechanism to allow writers (allocators) to proceed with minimal synchronization by using a sequence counter and a global generation counter to determine which set of statistics to update. Readers increment the global generation counter to effectively grab a snapshot of the statistics, and then iterate over all Ps using the sequence counter to ensure that they may safely read the snapshotted statistics. To keep statistics fresh, the reader also has a responsibility to merge sets of statistics. These consistent statistics are computed, but otherwise unused for now. Upcoming changes will integrate them with the rest of the codebase and will begin to phase out existing statistics. Change-Id: I637a11f2439e2049d7dccb8650c5d82500733ca5 Reviewed-on: https://go-review.googlesource.com/c/go/+/247037 Run-TryBot: Michael Knyszek <mknyszek@google.com> TryBot-Result: Go Bot <gobot@golang.org> Trust: Michael Knyszek <mknyszek@google.com> Reviewed-by: Michael Pratt <mpratt@google.com>
-rw-r--r--src/runtime/mcache.go4
-rw-r--r--src/runtime/mgcscavenge.go11
-rw-r--r--src/runtime/mheap.go34
-rw-r--r--src/runtime/mstats.go184
4 files changed, 230 insertions, 3 deletions
diff --git a/src/runtime/mcache.go b/src/runtime/mcache.go
index 5564e4a47d..e27a1c9ec0 100644
--- a/src/runtime/mcache.go
+++ b/src/runtime/mcache.go
@@ -61,6 +61,10 @@ type mcache struct {
// in this mcache are stale and need to the flushed so they
// can be swept. This is done in acquirep.
flushGen uint32
+
+ // statsSeq is a counter indicating whether this P is currently
+ // writing any stats. Its value is even when not, odd when it is.
+ statsSeq uint32
}
// A gclink is a node in a linked list of blocks, like mlink,
diff --git a/src/runtime/mgcscavenge.go b/src/runtime/mgcscavenge.go
index 8b1a0be353..5843ada981 100644
--- a/src/runtime/mgcscavenge.go
+++ b/src/runtime/mgcscavenge.go
@@ -711,7 +711,16 @@ func (p *pageAlloc) scavengeRangeLocked(ci chunkIdx, base, npages uint) uintptr
// Update global accounting only when not in test, otherwise
// the runtime's accounting will be wrong.
- atomic.Xadd64(&memstats.heap_released, int64(npages)*pageSize)
+ nbytes := int64(npages) * pageSize
+ atomic.Xadd64(&memstats.heap_released, nbytes)
+
+ // Update consistent accounting too.
+ c := getMCache()
+ stats := memstats.heapStats.acquire(c)
+ atomic.Xaddint64(&stats.committed, -nbytes)
+ atomic.Xaddint64(&stats.released, nbytes)
+ memstats.heapStats.release(c)
+
return addr
}
diff --git a/src/runtime/mheap.go b/src/runtime/mheap.go
index 87d2fd495b..d17b6fa284 100644
--- a/src/runtime/mheap.go
+++ b/src/runtime/mheap.go
@@ -1239,6 +1239,22 @@ HaveSpan:
// Manually managed memory doesn't count toward heap_sys.
memstats.heap_sys.add(-int64(nbytes))
}
+ // Update consistent stats.
+ c := getMCache()
+ stats := memstats.heapStats.acquire(c)
+ atomic.Xaddint64(&stats.committed, int64(scav))
+ atomic.Xaddint64(&stats.released, -int64(scav))
+ switch typ {
+ case spanAllocHeap:
+ atomic.Xaddint64(&stats.inHeap, int64(nbytes))
+ case spanAllocStack:
+ atomic.Xaddint64(&stats.inStacks, int64(nbytes))
+ case spanAllocPtrScalarBits:
+ atomic.Xaddint64(&stats.inPtrScalarBits, int64(nbytes))
+ case spanAllocWorkBuf:
+ atomic.Xaddint64(&stats.inWorkBufs, int64(nbytes))
+ }
+ memstats.heapStats.release(c)
// Publish the span in various locations.
@@ -1316,6 +1332,10 @@ func (h *mheap) grow(npage uintptr) bool {
// size which is always > physPageSize, so its safe to
// just add directly to heap_released.
atomic.Xadd64(&memstats.heap_released, int64(asize))
+ c := getMCache()
+ stats := memstats.heapStats.acquire(c)
+ atomic.Xaddint64(&stats.released, int64(asize))
+ memstats.heapStats.release(c)
// Recalculate nBase.
// We know this won't overflow, because sysAlloc returned
@@ -1415,6 +1435,20 @@ func (h *mheap) freeSpanLocked(s *mspan, typ spanAllocType) {
// Manually managed memory doesn't count toward heap_sys, so add it back.
memstats.heap_sys.add(int64(nbytes))
}
+ // Update consistent stats.
+ c := getMCache()
+ stats := memstats.heapStats.acquire(c)
+ switch typ {
+ case spanAllocHeap:
+ atomic.Xaddint64(&stats.inHeap, -int64(nbytes))
+ case spanAllocStack:
+ atomic.Xaddint64(&stats.inStacks, -int64(nbytes))
+ case spanAllocPtrScalarBits:
+ atomic.Xaddint64(&stats.inPtrScalarBits, -int64(nbytes))
+ case spanAllocWorkBuf:
+ atomic.Xaddint64(&stats.inWorkBufs, -int64(nbytes))
+ }
+ memstats.heapStats.release(c)
// Mark the space as free.
h.pages.free(s.base(), s.npages)
diff --git a/src/runtime/mstats.go b/src/runtime/mstats.go
index a6e38d1c1b..76546c0f0c 100644
--- a/src/runtime/mstats.go
+++ b/src/runtime/mstats.go
@@ -148,6 +148,9 @@ type mstats struct {
// unlike heap_live, heap_marked does not change until the
// next mark termination.
heap_marked uint64
+
+ // heapStats is a set of statistics
+ heapStats consistentHeapStats
}
var memstats mstats
@@ -426,10 +429,20 @@ type MemStats struct {
}
func init() {
- if unsafe.Offsetof(memstats.heap_live)%8 != 0 {
- println(unsafe.Offsetof(memstats.heap_live))
+ if offset := unsafe.Offsetof(memstats.heap_live); offset%8 != 0 {
+ println(offset)
throw("memstats.heap_live not aligned to 8 bytes")
}
+ if offset := unsafe.Offsetof(memstats.heapStats); offset%8 != 0 {
+ println(offset)
+ throw("memstats.heapStats not aligned to 8 bytes")
+ }
+ // Ensure the size of heapStatsDelta causes adjacent fields/slots (e.g.
+ // [3]heapStatsDelta) to be 8-byte aligned.
+ if size := unsafe.Sizeof(heapStatsDelta{}); size%8 != 0 {
+ println(size)
+ throw("heapStatsDelta not a multiple of 8 bytes in size")
+ }
}
// ReadMemStats populates m with memory allocator statistics.
@@ -687,3 +700,170 @@ func (s *sysMemStat) add(n int64) {
throw("sysMemStat overflow")
}
}
+
+// heapStatsDelta contains deltas of various runtime memory statistics
+// that need to be updated together in order for them to be kept
+// consistent with one another.
+type heapStatsDelta struct {
+ committed int64 // byte delta of memory committed
+ released int64 // byte delta of released memory generated
+ inHeap int64 // byte delta of memory placed in the heap
+ inStacks int64 // byte delta of memory reserved for stacks
+ inWorkBufs int64 // byte delta of memory reserved for work bufs
+ inPtrScalarBits int64 // byte delta of memory reserved for unrolled GC prog bits
+}
+
+// merge adds in the deltas from b into a.
+func (a *heapStatsDelta) merge(b *heapStatsDelta) {
+ a.committed += b.committed
+ a.released += b.released
+ a.inHeap += b.inHeap
+ a.inStacks += b.inStacks
+ a.inWorkBufs += b.inWorkBufs
+ a.inPtrScalarBits += b.inPtrScalarBits
+}
+
+// consistentHeapStats represents a set of various memory statistics
+// whose updates must be viewed completely to get a consistent
+// state of the world.
+//
+// To write updates to memory stats use the acquire and release
+// methods. To obtain a consistent global snapshot of these statistics,
+// use read.
+type consistentHeapStats struct {
+ // stats is a ring buffer of heapStatsDelta values.
+ // Writers always atomically update the delta at index gen.
+ //
+ // Readers operate by rotating gen (0 -> 1 -> 2 -> 0 -> ...)
+ // and synchronizing with writers by observing each mcache's
+ // statsSeq field. If the reader observes a P (to which the
+ // mcache is bound) not writing, it can be sure that it will
+ // pick up the new gen value the next time it writes.
+ // The reader then takes responsibility by clearing space
+ // in the ring buffer for the next reader to rotate gen to
+ // that space (i.e. it merges in values from index (gen-2) mod 3
+ // to index (gen-1) mod 3, then clears the former).
+ //
+ // Note that this means only one reader can be reading at a time.
+ // There is no way for readers to synchronize.
+ //
+ // This process is why we need ring buffer of size 3 instead
+ // of 2: one is for the writers, one contains the most recent
+ // data, and the last one is clear so writers can begin writing
+ // to it the moment gen is updated.
+ stats [3]heapStatsDelta
+
+ // gen represents the current index into which writers
+ // are writing, and can take on the value of 0, 1, or 2.
+ // This value is updated atomically.
+ gen uint32
+}
+
+// acquire returns a heapStatsDelta to be updated. In effect,
+// it acquires the shard for writing. release must be called
+// as soon as the relevant deltas are updated. c must be
+// a valid mcache not being used by any other thread.
+//
+// The returned heapStatsDelta must be updated atomically.
+//
+// Note however, that this is unsafe to call concurrently
+// with other writers and there must be only one writer
+// at a time.
+func (m *consistentHeapStats) acquire(c *mcache) *heapStatsDelta {
+ seq := atomic.Xadd(&c.statsSeq, 1)
+ if seq%2 == 0 {
+ // Should have been incremented to odd.
+ print("runtime: seq=", seq, "\n")
+ throw("bad sequence number")
+ }
+ gen := atomic.Load(&m.gen) % 3
+ return &m.stats[gen]
+}
+
+// release indicates that the writer is done modifying
+// the delta. The value returned by the corresponding
+// acquire must no longer be accessed or modified after
+// release is called.
+//
+// The mcache passed here must be the same as the one
+// passed to acquire.
+func (m *consistentHeapStats) release(c *mcache) {
+ seq := atomic.Xadd(&c.statsSeq, 1)
+ if seq%2 != 0 {
+ // Should have been incremented to even.
+ print("runtime: seq=", seq, "\n")
+ throw("bad sequence number")
+ }
+}
+
+// unsafeRead aggregates the delta for this shard into out.
+//
+// Unsafe because it does so without any synchronization. The
+// only safe time to call this is if the world is stopped or
+// we're freezing the world or going down anyway (and we just
+// want _some_ estimate).
+func (m *consistentHeapStats) unsafeRead(out *heapStatsDelta) {
+ for i := range m.stats {
+ out.merge(&m.stats[i])
+ }
+}
+
+// unsafeClear clears the shard.
+//
+// Unsafe because the world must be stopped and values should
+// be donated elsewhere before clearing.
+func (m *consistentHeapStats) unsafeClear() {
+ for i := range m.stats {
+ m.stats[i] = heapStatsDelta{}
+ }
+}
+
+// read takes a globally consistent snapshot of m
+// and puts the aggregated value in out. Even though out is a
+// heapStatsDelta, the resulting values should be complete and
+// valid statistic values.
+//
+// Not safe to call concurrently.
+func (m *consistentHeapStats) read(out *heapStatsDelta) {
+ // Getting preempted after this point is not safe because
+ // we read allp. We need to make sure a STW can't happen
+ // so it doesn't change out from under us.
+ mp := acquirem()
+
+ // Rotate gen, effectively taking a snapshot of the state of
+ // these statistics at the point of the exchange by moving
+ // writers to the next set of deltas.
+ //
+ // This exchange is safe to do because we won't race
+ // with anyone else trying to update this value.
+ currGen := atomic.Load(&m.gen)
+ atomic.Xchg(&m.gen, (currGen+1)%3)
+ prevGen := currGen - 1
+ if currGen == 0 {
+ prevGen = 2
+ }
+ for _, p := range allp {
+ c := p.mcache
+ if c == nil {
+ continue
+ }
+ // Spin until there are no more writers.
+ for atomic.Load(&c.statsSeq)%2 != 0 {
+ }
+ }
+
+ // At this point we've observed that each sequence
+ // number is even, so any future writers will observe
+ // the new gen value. That means it's safe to read from
+ // the other deltas in the stats buffer.
+
+ // Perform our responsibilities and free up
+ // stats[prevGen] for the next time we want to take
+ // a snapshot.
+ m.stats[currGen].merge(&m.stats[prevGen])
+ m.stats[prevGen] = heapStatsDelta{}
+
+ // Finally, copy out the complete delta.
+ *out = m.stats[currGen]
+ releasem(mp)
+}