Codebase list golang-github-hashicorp-go-immutable-radix / 56d73df
New upstream version 0.0~git20170214.0.30664b8 Anthony Fok 6 years ago
4 changed file(s) with 305 addition(s) and 72 deletion(s). Raw diff Collapse all Expand all
0 # Compiled Object files, Static and Dynamic libs (Shared Objects)
1 *.o
2 *.a
3 *.so
4
5 # Folders
6 _obj
7 _test
8
9 # Architecture specific extensions/prefixes
10 *.[568vq]
11 [568vq].out
12
13 *.cgo1.go
14 *.cgo2.c
15 _cgo_defun.c
16 _cgo_gotypes.go
17 _cgo_export.*
18
19 _testmain.go
20
21 *.exe
22 *.test
23 *.prof
7070 // trackOverflow flag, which will cause us to use a more expensive
7171 // algorithm to perform the notifications. Mutation tracking is only
7272 // performed if trackMutate is true.
73 trackChannels map[*chan struct{}]struct{}
73 trackChannels map[chan struct{}]struct{}
7474 trackOverflow bool
7575 trackMutate bool
7676 }
9696 // overflow flag if we can no longer track any more. This limits the amount of
9797 // state that will accumulate during a transaction and we have a slower algorithm
9898 // to switch to if we overflow.
99 func (t *Txn) trackChannel(ch *chan struct{}) {
99 func (t *Txn) trackChannel(ch chan struct{}) {
100100 // In overflow, make sure we don't store any more objects.
101101 if t.trackOverflow {
102102 return
103103 }
104104
105 // Create the map on the fly when we need it.
106 if t.trackChannels == nil {
107 t.trackChannels = make(map[*chan struct{}]struct{})
108 }
109
110105 // If this would overflow the state we reject it and set the flag (since
111106 // we aren't tracking everything that's required any longer).
112107 if len(t.trackChannels) >= defaultModifiedCache {
108 // Mark that we are in the overflow state
113109 t.trackOverflow = true
110
111 // Clear the map so that the channels can be garbage collected. It is
112 // safe to do this since we have already overflowed and will be using
113 // the slow notify algorithm.
114 t.trackChannels = nil
114115 return
116 }
117
118 // Create the map on the fly when we need it.
119 if t.trackChannels == nil {
120 t.trackChannels = make(map[chan struct{}]struct{})
115121 }
116122
117123 // Otherwise we are good to track it.
133139 }
134140
135141 // If this node has already been modified, we can continue to use it
136 // during this transaction. If a node gets kicked out of cache then we
137 // *may* notify for its mutation if we end up copying the node again,
138 // but we don't make any guarantees about notifying for intermediate
139 // mutations that were never exposed outside of a transaction.
142 // during this transaction. We know that we don't need to track it for
143 // a node update since the node is writable, but if this is for a leaf
144 // update we track it, in case the initial write to this node didn't
145 // update the leaf.
140146 if _, ok := t.writable.Get(n); ok {
147 if t.trackMutate && forLeafUpdate && n.leaf != nil {
148 t.trackChannel(n.leaf.mutateCh)
149 }
141150 return n
142151 }
143152
144153 // Mark this node as being mutated.
145154 if t.trackMutate {
146 t.trackChannel(&(n.mutateCh))
155 t.trackChannel(n.mutateCh)
147156 }
148157
149158 // Mark its leaf as being mutated, if appropriate.
150159 if t.trackMutate && forLeafUpdate && n.leaf != nil {
151 t.trackChannel(&(n.leaf.mutateCh))
152 }
153
154 // Copy the existing node.
160 t.trackChannel(n.leaf.mutateCh)
161 }
162
163 // Copy the existing node. If you have set forLeafUpdate it will be
164 // safe to replace this leaf with another after you get your node for
165 // writing. You MUST replace it, because the channel associated with
166 // this leaf will be closed when this transaction is committed.
155167 nc := &Node{
156168 mutateCh: make(chan struct{}),
157169 leaf: n.leaf,
168180 // Mark this node as writable.
169181 t.writable.Add(nc, nil)
170182 return nc
183 }
184
185 // mergeChild is called to collapse the given node with its child. This is only
186 // called when the given node is not a leaf and has a single edge.
187 func (t *Txn) mergeChild(n *Node) {
188 // Mark the child node as being mutated since we are about to abandon
189 // it. We don't need to mark the leaf since we are retaining it if it
190 // is there.
191 e := n.edges[0]
192 child := e.node
193 if t.trackMutate {
194 t.trackChannel(child.mutateCh)
195 }
196
197 // Merge the nodes.
198 n.prefix = concat(n.prefix, child.prefix)
199 n.leaf = child.leaf
200 if len(child.edges) != 0 {
201 n.edges = make([]edge, len(child.edges))
202 copy(n.edges, child.edges)
203 } else {
204 n.edges = nil
205 }
171206 }
172207
173208 // insert does a recursive insertion
284319
285320 // Check if this node should be merged
286321 if n != t.root && len(nc.edges) == 1 {
287 nc.mergeChild()
322 t.mergeChild(nc)
288323 }
289324 return nc, n.leaf
290325 }
304339 }
305340
306341 // Copy this node. WATCH OUT - it's safe to pass "false" here because we
307 // will only ADD a leaf via nc.mergeChilde() if there isn't one due to
342 // will only ADD a leaf via nc.mergeChild() if there isn't one due to
308343 // the !nc.isLeaf() check in the logic just below. This is pretty subtle,
309344 // so be careful if you change any of the logic here.
310345 nc := t.writeNode(n, false)
313348 if newChild.leaf == nil && len(newChild.edges) == 0 {
314349 nc.delEdge(label)
315350 if n != t.root && len(nc.edges) == 1 && !nc.isLeaf() {
316 nc.mergeChild()
351 t.mergeChild(nc)
317352 }
318353 } else {
319354 nc.edges[idx].node = newChild
370405 // Commit is used to finalize the transaction and return a new tree. If mutation
371406 // tracking is turned on then notifications will also be issued.
372407 func (t *Txn) Commit() *Tree {
373 nt := t.commit()
408 nt := t.CommitOnly()
374409 if t.trackMutate {
375 t.notify()
410 t.Notify()
376411 }
377412 return nt
378413 }
379414
380 // commit is an internal helper for Commit(), useful for unit tests.
381 func (t *Txn) commit() *Tree {
415 // CommitOnly is used to finalize the transaction and return a new tree, but
416 // does not issue any notifications until Notify is called.
417 func (t *Txn) CommitOnly() *Tree {
382418 nt := &Tree{t.root, t.size}
383419 t.writable = nil
384420 return nt
447483 }
448484 }
449485
450 // notify is used along with TrackMutate to trigger notifications. This should
451 // only be done once a transaction is committed.
452 func (t *Txn) notify() {
486 // Notify is used along with TrackMutate to trigger notifications. This must
487 // only be done once a transaction is committed via CommitOnly, and it is called
488 // automatically by Commit.
489 func (t *Txn) Notify() {
490 if !t.trackMutate {
491 return
492 }
493
453494 // If we've overflowed the tracking state we can't use it in any way and
454495 // need to do a full tree compare.
455496 if t.trackOverflow {
456497 t.slowNotify()
457498 } else {
458499 for ch := range t.trackChannels {
459 close(*ch)
500 close(ch)
460501 }
461502 }
462503
604604 }
605605 }
606606
607 // isClosed returns true if the given channel is closed.
608 func isClosed(ch chan struct{}) bool {
609 select {
610 case <-ch:
611 return true
612 default:
613 return false
614 }
615 }
616
617 // hasAnyClosedMutateCh scans the given tree and returns true if there are any
618 // closed mutate channels on any nodes or leaves.
619 func hasAnyClosedMutateCh(r *Tree) bool {
620 for iter := r.root.rawIterator(); iter.Front() != nil; iter.Next() {
621 n := iter.Front()
622 if isClosed(n.mutateCh) {
623 return true
624 }
625 if n.isLeaf() && isClosed(n.leaf.mutateCh) {
626 return true
627 }
628 }
629 return false
630 }
631
607632 func TestTrackMutate_SeekPrefixWatch(t *testing.T) {
608633 for i := 0; i < 3; i++ {
609634 r := New()
645670 case 0:
646671 r = txn.Commit()
647672 case 1:
648 r = txn.commit()
649 txn.notify()
650 default:
651 r = txn.commit()
673 r = txn.CommitOnly()
674 txn.Notify()
675 default:
676 r = txn.CommitOnly()
652677 txn.slowNotify()
678 }
679 if hasAnyClosedMutateCh(r) {
680 t.Fatalf("bad")
653681 }
654682
655683 // Verify root and parent triggered, and leaf affected
699727 case 0:
700728 r = txn.Commit()
701729 case 1:
702 r = txn.commit()
703 txn.notify()
704 default:
705 r = txn.commit()
730 r = txn.CommitOnly()
731 txn.Notify()
732 default:
733 r = txn.CommitOnly()
706734 txn.slowNotify()
735 }
736 if hasAnyClosedMutateCh(r) {
737 t.Fatalf("bad")
707738 }
708739
709740 // Verify root and parent triggered, and leaf affected
784815 case 0:
785816 r = txn.Commit()
786817 case 1:
787 r = txn.commit()
788 txn.notify()
789 default:
790 r = txn.commit()
818 r = txn.CommitOnly()
819 txn.Notify()
820 default:
821 r = txn.CommitOnly()
791822 txn.slowNotify()
823 }
824 if hasAnyClosedMutateCh(r) {
825 t.Fatalf("bad")
792826 }
793827
794828 // Verify root and parent triggered, not leaf affected
832866 case 0:
833867 r = txn.Commit()
834868 case 1:
835 r = txn.commit()
836 txn.notify()
837 default:
838 r = txn.commit()
869 r = txn.CommitOnly()
870 txn.Notify()
871 default:
872 r = txn.CommitOnly()
839873 txn.slowNotify()
874 }
875 if hasAnyClosedMutateCh(r) {
876 t.Fatalf("bad")
840877 }
841878
842879 select {
887924 case 0:
888925 r = txn.Commit()
889926 case 1:
890 r = txn.commit()
891 txn.notify()
892 default:
893 r = txn.commit()
927 r = txn.CommitOnly()
928 txn.Notify()
929 default:
930 r = txn.CommitOnly()
894931 txn.slowNotify()
932 }
933 if hasAnyClosedMutateCh(r) {
934 t.Fatalf("bad")
895935 }
896936
897937 // Verify root and parent triggered, not leaf affected
935975 case 0:
936976 r = txn.Commit()
937977 case 1:
938 r = txn.commit()
939 txn.notify()
940 default:
941 r = txn.commit()
978 r = txn.CommitOnly()
979 txn.Notify()
980 default:
981 r = txn.CommitOnly()
942982 txn.slowNotify()
983 }
984 if hasAnyClosedMutateCh(r) {
985 t.Fatalf("bad")
943986 }
944987
945988 select {
10491092 txn.Insert([]byte("foobarbaz"), nil)
10501093
10511094 // Commit and make sure we overflowed but didn't take on extra stuff.
1052 r = txn.commit()
1053 if !txn.trackOverflow || len(txn.trackChannels) != defaultModifiedCache {
1095 r = txn.CommitOnly()
1096 if !txn.trackOverflow || txn.trackChannels != nil {
10541097 t.Fatalf("bad")
10551098 }
10561099
10571100 // Now do the trigger.
1058 txn.notify()
1101 txn.Notify()
1102
1103 // Make sure no closed channels escaped the transaction.
1104 if hasAnyClosedMutateCh(r) {
1105 t.Fatalf("bad")
1106 }
10591107
10601108 // Verify the watches fired as expected.
10611109 select {
10891137 t.Fatalf("bad")
10901138 }
10911139 }
1140
1141 func TestTrackMutate_mergeChild(t *testing.T) {
1142 // This case does a delete of the "acb" leaf, which causes the "aca"
1143 // leaf to get merged with the old "ac" node:
1144 //
1145 // [root] [root]
1146 // |a |a
1147 // [node] [node]
1148 // b/ \c b/ \c
1149 // (ab) [node] (ab) (aca)
1150 // a/ \b
1151 // (aca) (acb)
1152 //
1153 for i := 0; i < 3; i++ {
1154 r := New()
1155 r, _, _ = r.Insert([]byte("ab"), nil)
1156 r, _, _ = r.Insert([]byte("aca"), nil)
1157 r, _, _ = r.Insert([]byte("acb"), nil)
1158 snapIter := r.root.rawIterator()
1159
1160 // Run through all notification methods as there were bugs in
1161 // both that affected these operations. The slowNotify path
1162 // would detect copied but otherwise identical leaves as changed
1163 // and wrongly close channels. The normal path would fail to
1164 // notify on a child node that had been merged.
1165 txn := r.Txn()
1166 txn.TrackMutate(true)
1167 txn.Delete([]byte("acb"))
1168 switch i {
1169 case 0:
1170 r = txn.Commit()
1171 case 1:
1172 r = txn.CommitOnly()
1173 txn.Notify()
1174 default:
1175 r = txn.CommitOnly()
1176 txn.slowNotify()
1177 }
1178 if hasAnyClosedMutateCh(r) {
1179 t.Fatalf("bad")
1180 }
1181
1182 // Run through the old tree and make sure the exact channels we
1183 // expected were closed.
1184 for ; snapIter.Front() != nil; snapIter.Next() {
1185 n := snapIter.Front()
1186 path := snapIter.Path()
1187 switch path {
1188 case "", "a", "ac": // parent nodes all change
1189 if !isClosed(n.mutateCh) || n.leaf != nil {
1190 t.Fatalf("bad")
1191 }
1192 case "ab": // unrelated node / leaf sees no change
1193 if isClosed(n.mutateCh) || isClosed(n.leaf.mutateCh) {
1194 t.Fatalf("bad")
1195 }
1196 case "aca": // this node gets merged, but the leaf doesn't change
1197 if !isClosed(n.mutateCh) || isClosed(n.leaf.mutateCh) {
1198 t.Fatalf("bad")
1199 }
1200 case "acb": // this node / leaf gets deleted
1201 if !isClosed(n.mutateCh) || !isClosed(n.leaf.mutateCh) {
1202 t.Fatalf("bad")
1203 }
1204 default:
1205 t.Fatalf("bad: %s", path)
1206 }
1207 }
1208 }
1209 }
1210
1211 func TestTrackMutate_cachedNodeChange(t *testing.T) {
1212 // This case does a delete of the "acb" leaf, which causes the "aca"
1213 // leaf to get merged with the old "ac" node:
1214 //
1215 // [root] [root]
1216 // |a |a
1217 // [node] [node]
1218 // b/ \c b/ \c
1219 // (ab) [node] (ab) (aca*) <- this leaf gets modified
1220 // a/ \b post-merge
1221 // (aca) (acb)
1222 //
1223 // Then it makes a modification to the "aca" leaf on a node that will
1224 // be in the cache, so this makes sure that the leaf watch fires.
1225 for i := 0; i < 3; i++ {
1226 r := New()
1227 r, _, _ = r.Insert([]byte("ab"), nil)
1228 r, _, _ = r.Insert([]byte("aca"), nil)
1229 r, _, _ = r.Insert([]byte("acb"), nil)
1230 snapIter := r.root.rawIterator()
1231
1232 txn := r.Txn()
1233 txn.TrackMutate(true)
1234 txn.Delete([]byte("acb"))
1235 txn.Insert([]byte("aca"), nil)
1236 switch i {
1237 case 0:
1238 r = txn.Commit()
1239 case 1:
1240 r = txn.CommitOnly()
1241 txn.Notify()
1242 default:
1243 r = txn.CommitOnly()
1244 txn.slowNotify()
1245 }
1246 if hasAnyClosedMutateCh(r) {
1247 t.Fatalf("bad")
1248 }
1249
1250 // Run through the old tree and make sure the exact channels we
1251 // expected were closed.
1252 for ; snapIter.Front() != nil; snapIter.Next() {
1253 n := snapIter.Front()
1254 path := snapIter.Path()
1255 switch path {
1256 case "", "a", "ac": // parent nodes all change
1257 if !isClosed(n.mutateCh) || n.leaf != nil {
1258 t.Fatalf("bad")
1259 }
1260 case "ab": // unrelated node / leaf sees no change
1261 if isClosed(n.mutateCh) || isClosed(n.leaf.mutateCh) {
1262 t.Fatalf("bad")
1263 }
1264 case "aca": // merge changes the node, then we update the leaf
1265 if !isClosed(n.mutateCh) || !isClosed(n.leaf.mutateCh) {
1266 t.Fatalf("bad")
1267 }
1268 case "acb": // this node / leaf gets deleted
1269 if !isClosed(n.mutateCh) || !isClosed(n.leaf.mutateCh) {
1270 t.Fatalf("bad")
1271 }
1272 default:
1273 t.Fatalf("bad: %s", path)
1274 }
1275 }
1276 }
1277 }
8787 copy(n.edges[idx:], n.edges[idx+1:])
8888 n.edges[len(n.edges)-1] = edge{}
8989 n.edges = n.edges[:len(n.edges)-1]
90 }
91 }
92
93 func (n *Node) mergeChild() {
94 e := n.edges[0]
95 child := e.node
96 n.prefix = concat(n.prefix, child.prefix)
97 if child.leaf != nil {
98 n.leaf = new(leafNode)
99 *n.leaf = *child.leaf
100 } else {
101 n.leaf = nil
102 }
103 if len(child.edges) != 0 {
104 n.edges = make([]edge, len(child.edges))
105 copy(n.edges, child.edges)
106 } else {
107 n.edges = nil
10890 }
10991 }
11092