譯者注:想要程序的性能得到提升嗎?嘗試下通過找到慢的東西,然后用更快的東西來代替它。作者以尋找網狀圖的直徑為例,僅僅七步性能就提升了100倍,值得學習。
我有一種熱衷于讓事情變得更快的強迫癥。這里有一個故事,那就是利用編譯器中的神奇工具,把GO語言的代碼運行速度提高100倍。這并不是讓代碼變得怪異和不可維護來達到目的,而是通過找到慢的東西,然后用更快的東西來代替它。
為了進行這個練習,我想試著確定一個圖形的“直徑”是很有趣的。這與工作中所做的事情很相似(比如在ravelin.com網站上用圖表來捕捉壞人),而且要花很長時間才能計算出來。
這里的直徑是圖中所有點對間短路徑長度的大值。圖中的任意兩點之間有一條短路徑。如果一對點的短路徑要比其他所有點對長,那么這條路徑的長度就是直徑。
首先每個節點都由一個字符串來標識。
type nodeId string
每個節點都有許多與之直接相連的相鄰節點。當把這些放在map集合中,就可以很容易地添加相鄰的節點,而不會有重復的風險。
type node struct { id nodeId adj map[nodeId]*node }
通過id找到的所有的節點(至少當從輸入列表的邊緣構建結構時)使用map集合保存。
type nodes map[nodeId]*node
為了找到兩個節點之間的短路徑,可以使用廣度優先搜索(BFS)算法。現在,從任何節點開始的BFS將找到從該節點到其他所有節點的短路徑,因此只需要在每個節點上運行BFS,就能找到它的直徑。下面是執行此操作的代碼。當訪問每個節點時,關鍵元素在某個地方進行跟蹤,以獲取已經完成了多少步驟,以及接下來要考慮的節點隊列。
// diameter is the maximum length of a shortest path in the network func (nodes nodes) diameter() int { var diameter int for id := range nodes {
df := nodes.longestShortestPath(id) if df > diameter {
diameter = df
}
} return diameter
} type bfsNode struct { // bfs tracking data parent *node
depth int } func (nodes nodes) longestShortestPath(start nodeId) int {
q := list.New()
bfsData := make(map[nodeId]bfsNode, len(nodes))
n := nodes.get(start)
bfsData[n.id] = bfsNode{parent: n, depth: 0}
q.PushBack(n) for {
elt := q.Front() if elt == nil { break }
n = q.Remove(elt).(*node) for id, m := range n.adj {
bm := bfsData[id] if bm.parent == nil {
bfsData[id] = bfsNode{parent: n, depth: bfsData[n.id].depth + 1}
q.PushBack(m)
}
}
} return bfsData[n.id].depth
}
個版本的這段代碼在這里:
https://github.com/philpearl/graphblog/commit/f4742fb1c65a896562052990780fe27b9ce85e3f
如果為一個有9997條邊的網狀圖運行一個基準調用diameter(),將得到以下結果。
BenchmarkDiameter/diameter-8 1 38108360293 ns/op 9170172832 B/op 82451888 allocs/op
1次迭代需要38s,分配8200萬個對象。所以要讓它更快一點是值得考慮的。因此使用cpu分析(go test -bench . -cpuprofile cpu.prof),然后運行profiler來創建a.svg顯示時間的去向(go tool pprof -svg graphblog.test cpu.prof > cpu1.svg)。這里有一個有趣的部分。
許多CPU使用都是在longestShortestPath()的map中分配和迭代的。也有一部分時間消耗在將數據項插入到list。如果仔細查看這個配置文件,會看到其實更多的時間是花在垃圾回收上。
因此,如果能夠刪除或改進map和list,或者減少消耗,那就好了。
對于改進的個設想是將字符串節點id轉換為int32。int32應該更小,更容易散列和比較,因此可能會使map訪問更快。這里保留了一個“符號表”,從字符串節點名映射到一個int32。個ID為0,并將為每個新節點增加一個ID。這是符號表。
type nodeName string type symbolTable map[nodeName]nodeId
func (s symbolTable) getId(name nodeName) nodeId { id, ok := s[name] if !ok { id = nodeId(len(s))
s[name] = id } return id }
然后需要將節點和symbolTable封裝成一個graph結構體。
type graph struct {
symbolTable
nodes
} func New() *graph { return &graph{
symbolTable: make(symbolTable),
nodes: make(nodes),
}
} func (g *graph) addEdge(a, b nodeName) {
aid := g.symbolTable.getId(a)
bid := g.symbolTable.getId(b)
g.nodes.addEdge(aid, bid)
}
這是有對一系列改變的完整提交。如果再次運行基準測試,我看到運行時間大約是30s,整整減少8s。
BenchmarkDiameter/diameter-8 1 29804044414 ns/op 7311829424 B/op 82451563 allocs/op
如果查看CPU的profile,會發現并沒有什么需要改變,map仍然是問題的主要部分。但現在nodeid是從0開始的連續整數。可以用切片來保存節點嗎?nodeId作為slice中的索引,查找節點數據會非常快。如果有一個固定數目的節點(或一個合理的上界),這是一個可以想象的簡單改變。可以預先分配適當大小的切片,然后只需要改變一些類型。
type nodes []node func (nl nodes) init() { for i := range nl {
nl[i].id = nodeId(i)
nl[i].adj = make(map[nodeId]*node)
}
} func (nl nodes) get(id nodeId) *node { return &nl[id]
}
測試結果再次提高,時間從原來的38s降到14s。
BenchmarkDiameter/diameter-8 1 13927658892 ns/op 5563335728 B/op 81779354 allocs/op
這一次性能分析數據發生了改變。
現在存在巨大的時間消耗有
現在把大的目標放在首位,然后把它替換掉。
下面將列出可以重用的從一個BFS運行到另一個BFS的list。該list需要支持從開始的節點出發并將節點保留到后。它只需要知道如何存儲指向節點結構的指針。如果它能重新使用list元素,從而減少分配的數量,那就更好了,所以對此添加了一個內部的免費元素list。
由于這個list比container/list更簡單,所以在可能的情況下重新使用它的元素,并且不使用接口。甚至在BFS運行之間嘗試重用它,在此之前可能是一個更快的替換。
package graphblog type listElt struct {
next *listElt
node *node
} type list struct {
head *listElt
tail *listElt
free *listElt
} func (l *list) getHead() *node {
elt := l.head if elt == nil { return nil } // Remove elt from the list l.head = elt.next if l.head == nil {
l.tail = nil } // Add elt to the free list elt.next = l.free
l.free = elt
n := elt.node
elt.node = nil return n
} func (l *list) pushBack(n *node) { // Get a free listElt to use to point to this node elt := l.free if elt == nil {
elt = &listElt{}
} else {
l.free = elt.next
elt.next = nil } // Add the element to the tail of the list elt.node = n if l.tail == nil {
l.tail = elt
l.head = elt
} else {
l.tail.next = elt
l.tail = elt
}
}
測試結果確實有所改善,從14s下降到了9.5s。
BenchmarkDiameter/diameter-8 1 9453521303 ns/op 1762047056 B/op 7737600 allocs/op
這個概要數據顯示仍然是GC在浪費時間。這可能發生在后臺CPU上,因此對計算直徑所花費的時間幾乎沒有影響。但排序將降低這段代碼的總體影響。
我們可以簡單地在diameter()中分配該list,并將其作為參數傳遞給longestShortestPath()。這不會對整體的時間造成很大的影響,但會極大地減少分配。
BenchmarkDiameter/diameter-8 1 9211307204 ns/op 1638428512 B/op 11301 allocs/op
現在map迭代是消耗時間多的,下面用切片替換相鄰的節點map。我們不期望每個節點都有上百個相鄰節點,因此就算不能比以前更快,對重復的線性搜索也至少和使用map一樣快。而且只需要相鄰的節點id,而不是指向節點的指針,所以在同一時間改變它,就能把時間降到了8s以下。
BenchmarkDiameter/diameter-8 1 7815493067 ns/op 1638427296 B/op 11273 allocs/op
下一個改進的備選內容是針對longestShortestPath()中分配的狀態數據,以跟蹤次搜索的進度。可以重用它嗎?可以的,通過將內存分配給diameter()而不是longestShortestPath()來重新使用內存,但是需要重置運行之間的內容,因此從表面上看可能正在做更多的工作,但不妨一試。
事實證明,這是一個巨大的勝利,因為現在降到了1.8s.
BenchmarkDiameter/diameter-8 1 1840585891 ns/op 185584 B/op 1233 allocs/op
現在這個profile.svg顯示的時間并不多,這些成果歸功于編寫的代碼。
大多數時間消耗在longestShortestPath()中。如果仔細看看就會發并沒有真正地在bfsNode結構中使用parent字段。只是用它來表示在搜索之前是否訪問過這個節點。可以嘗試刪除該字段,并使用-1的深度來表示節點沒有被檢查。這會將時間消耗降到1.5s(注意我在基準命令行中添加了-benchtime 10s)。
BenchmarkDiameter/diameter-8 10 1543700500 ns/op 101552 B/op 1229 allocs/op
如果假設直徑不大于32000,那么可以用一個int16替換int深度。這減少了需要更新的內存數量,并將時間減少到1.4s以下。
BenchmarkDiameter/diameter-8 10 1389635165 ns/op 40112 B/op 1229 allocs/op
現在已經實現想法的關鍵點,后可以做的一件事是讓代碼更容易運行。現在是從一個沒有變化的graph的每個結點上運行BFS。如果將這些節點拆分,那么就可以在不同的CPU核心上運行來自不同節點的BFS,這將會讓執行時間從原來的38秒降到0.3s。
BenchmarkDiameter/diameter-8 5 303704709 ns/op 309097 B/op 8969 allocs/op
所有這些代碼都可以在https://github.com/philpearl/graphblog上看到,過程中的每一個階段作為一次單獨的提交,并在后的提交中添加profiling.svgs文件。
本站文章版權歸原作者及原出處所有 。內容為作者個人觀點, 并不代表本站贊同其觀點和對其真實性負責,本站只提供參考并不構成任何投資及應用建議。本站是一個個人學習交流的平臺,網站上部分文章為轉載,并不用于任何商業目的,我們已經盡可能的對作者和來源進行了通告,但是能力有限或疏忽,造成漏登,請及時聯系我們,我們將根據著作權人的要求,立即更正或者刪除有關內容。本站擁有對此聲明的最終解釋權。