package tx
import (
fm "file_manager"
type ConCurrencyManager struct {
lock_table *LockTable
lock_map map[fm.BlockId]string
func NewConcurrencyManager() *ConCurrencyManager {
concurrency_mgr := &ConCurrencyManager{
lock_table: GetLockTableInstance(),
lock_map: make(map[fm.BlockId]string),
return concurrency_mgr
func (c *ConCurrencyManager) SLock(blk *fm.BlockId) error {
_, ok := c.lock_map[*blk]
if !ok {
err := c.lock_table.SLock(blk)
if err != nil {
return err
c.lock_map[*blk] = "S"
return nil
func (c *ConCurrencyManager) XLock(blk *fm.BlockId) error {
if !c.hasXLock(blk) {
//c.SLock(blk) //判断区块是否已经被加上共享锁,如果别人已经获得共享锁那么就会挂起
err := c.lock_table.XLock(blk)
if err != nil {
return err
c.lock_map[*blk] = "X"
return nil
func (c *ConCurrencyManager) Release() {
for key, _ := range c.lock_map {
func (c *ConCurrencyManager) hasXLock(blk *fm.BlockId) bool {
lock_type, ok := c.lock_map[*blk]
return ok && lock_type == "X"
blk1 := fm.NewBlockId("testfile", 1)
blk2 := fm.NewBlockId("testfile", 1)
blk1 和blk2 是两个数值不同的指针,他们的值不同,但是指向的却是同一个区块,因此如果用指针作为map的key就会出现问题。我们上一节实现的lock_table也有这个问题,所以我们修改后的代码也放在下面,同时修改了一些关于多线程的问题也在里面做了注释,修改后lock_table.go的内容如下:
package tx
import (
fm "file_manager"
const (
MAX_WAITING_TIME = 10 //3用于测试,在正式使用时设置为10
type LockTable struct {
lock_map map[fm.BlockId]int64 //将锁和区块对应起来
notify_chan map[fm.BlockId]chan struct{} //用于实现超时回退的管道
notify_wg map[fm.BlockId]*sync.WaitGroup //用于实现唤醒通知
method_lock sync.Mutex //实现方法调用的线程安全,相当于java的synchronize关键字
var lock_table_instance *LockTable
var lock = &sync.Mutex{}
func GetLockTableInstance() *LockTable {
defer lock.Unlock()
if lock_table_instance == nil {
lock_table_instance = NewLockTable()
return lock_table_instance
func (l *LockTable) waitGivenTimeOut(blk *fm.BlockId) {
wg, ok := l.notify_wg[*blk]
if !ok {
var new_wg sync.WaitGroup
l.notify_wg[*blk] = &new_wg
wg = &new_wg
defer wg.Done()
l.method_lock.Unlock() //挂起前释放方法锁
select {
case <-time.After(MAX_WAITING_TIME * time.Second):
fmt.Println("routine wake up for timeout")
case <-l.notify_chan[*blk]:
fmt.Println("routine wake up by notify channel")
l.method_lock.Lock() //唤起后加上方法锁
func (l *LockTable) notifyAll(blk *fm.BlockId) {
s := fmt.Sprintf("close channle for blk :%v\n", *blk)
channel, ok := l.notify_chan[*blk]
if ok {
delete(l.notify_chan, *blk)
mark := rand.Intn(10000)
s := fmt.Sprintf("delete blk: %v and launch rotinue to create it, mark: %d\n", *blk, mark)
go func(blk_unlock fm.BlockId, ran_num int) {
//注意这个线程不一定得到及时调度,因此可能不能及时创建channel对象从而导致close closed channel panic
s := fmt.Sprintf("wait group for blk: %v, with mark:%d\n", blk_unlock, ran_num)
l.notify_chan[blk_unlock] = make(chan struct{})
s = fmt.Sprintf("create notify channel for %v\n", blk_unlock)
}(*blk, mark)
} else {
s = fmt.Sprintf("channel for %v is already closed\n", *blk)
func NewLockTable() *LockTable {
lock_table := &LockTable{
lock_map: make(map[fm.BlockId]int64),
notify_chan: make(map[fm.BlockId]chan struct{}),
notify_wg: make(map[fm.BlockId]*sync.WaitGroup),
return lock_table
func (l *LockTable) initWaitingOnBlk(blk *fm.BlockId) {
_, ok := l.notify_chan[*blk]
if !ok {
l.notify_chan[*blk] = make(chan struct{})
_, ok = l.notify_wg[*blk]
if !ok {
l.notify_wg[*blk] = &sync.WaitGroup{}
func (l *LockTable) SLock(blk *fm.BlockId) error {
defer l.method_lock.Unlock()
start := time.Now()
for l.hasXlock(blk) && !l.waitingTooLong(start) {
if l.hasXlock(blk) {
fmt.Println("slock fail for xlock")
return errors.New("SLock Exception: XLock on given blk")
val := l.getLockVal(blk)
l.lock_map[*blk] = val + 1
return nil
func (l *LockTable) XLock(blk *fm.BlockId) error {
defer l.method_lock.Unlock()
start := time.Now()
for l.hasOtherSLocks(blk) && !l.waitingTooLong(start) {
fmt.Println("get xlock fail and sleep")
if l.hasOtherSLocks(blk) {
return errors.New("XLock error: SLock on given blk")
l.lock_map[*blk] = -1
return nil
func (l *LockTable) UnLock(blk *fm.BlockId) {
defer l.method_lock.Unlock()
val := l.getLockVal(blk)
if val > 1 {
l.lock_map[*blk] = val - 1
} else {
delete(l.lock_map, *blk)
s := fmt.Sprintf("unlock by blk: +%v\n", *blk)
func (l *LockTable) hasXlock(blk *fm.BlockId) bool {
return l.getLockVal(blk) < 0
func (l *LockTable) hasOtherSLocks(blk *fm.BlockId) bool {
return l.getLockVal(blk) >= 1
func (l *LockTable) waitingTooLong(start time.Time) bool {
elapsed := time.Since(start).Seconds()
if elapsed >= MAX_WAITING_TIME {
return true
return false
func (l *LockTable) getLockVal(blk *fm.BlockId) int64 {
val, ok := l.lock_map[*blk]
if !ok {
l.lock_map[*blk] = 0
return 0
return val
package tx
import (
bm "buffer_manager"
fm "file_manager"
lm "log_manager"
func TestCurrencyManager(_ *testing.T) {
file_manager, _ := fm.NewFileManager("txtest", 400)
log_manager, _ := lm.NewLogManager(file_manager, "logfile")
buffer_manager := bm.NewBufferManager(file_manager, log_manager, 3)
//tx.NewTransation(file_manager, log_manager, buffer_manager)
go func() {
txA := NewTransation(file_manager, log_manager, buffer_manager)
blk1 := fm.NewBlockId("testfile", 1)
blk2 := fm.NewBlockId("testfile", 2)
fmt.Println("Tx A: rquest slock 1")
txA.GetInt(blk1, 0) //如果返回错误,我们应该放弃执行下面操作并执行回滚,这里为了测试而省略
fmt.Println("Tx A: receive slock 1")
time.Sleep(2 * time.Second)
fmt.Println("Tx A: request slock 2")
txA.GetInt(blk2, 0)
fmt.Println("Tx A: receive slock 2")
fmt.Println("Tx A: Commit")
go func() {
time.Sleep(1 * time.Second)
txB := NewTransation(file_manager, log_manager, buffer_manager)
blk1 := fm.NewBlockId("testfile", 1)
blk2 := fm.NewBlockId("testfile", 2)
fmt.Println("Tx B: rquest xlock 2")
txB.SetInt(blk2, 0, 0, false)
fmt.Println("Tx B: receive xlock 2")
time.Sleep(2 * time.Second)
fmt.Println("Tx B: request slock 1")
txB.GetInt(blk1, 0)
fmt.Println("Tx B: receive slock 1")
fmt.Println("Tx B: Commit")
go func() {
time.Sleep(2 * time.Second)
txC := NewTransation(file_manager, log_manager, buffer_manager)
blk1 := fm.NewBlockId("testfile", 1)
blk2 := fm.NewBlockId("testfile", 2)
fmt.Println("Tx C: rquest xlock 1")
txC.SetInt(blk1, 0, 0, false)
fmt.Println("Tx C: receive xlock 1")
time.Sleep(1 * time.Second)
fmt.Println("Tx C: request slock 2")
txC.GetInt(blk2, 0)
fmt.Println("Tx C: receive slock 2")
fmt.Println("Tx C: Commit")
time.Sleep(20 * time.Second)
Tx A: rquest slock 1
Tx A: receive slock 1
Tx B: rquest xlock 2
Tx B: receive xlock 2
Tx A: request slock 2
Tx C: rquest xlock 1
get xlock fail and sleep
Tx B: request slock 1
Tx B: receive slock 1
Tx B: Commit
unlock by blk: +{testfile 2}
close channle for blk :{testfile 2}
delete blk: {testfile 2} and launch rotinue to create it, mark: 8081
routine wake up by notify channel
Tx A: receive slock 2
Tx A: Commit
unlock by blk: +{testfile 2}
close channle for blk :{testfile 2}
channel for {testfile 2} is already closed
unlock by blk: +{testfile 1}
close channle for blk :{testfile 1}
delete blk: {testfile 1} and launch rotinue to create it, mark: 7887
wait group for blk: {testfile 2}, with mark:8081
create notify channel for {testfile 2}
routine wake up by notify channel
Tx C: receive xlock 1
wait group for blk: {testfile 1}, with mark:7887
create notify channel for {testfile 1}
transation 2 committed
transation 1 committed
Tx C: request slock 2
Tx C: receive slock 2
Tx C: Commit
unlock by blk: +{testfile 1}
close channle for blk :{testfile 1}
delete blk: {testfile 1} and launch rotinue to create it, mark: 1847
unlock by blk: +{testfile 2}
close channle for blk :{testfile 2}
delete blk: {testfile 2} and launch rotinue to create it, mark: 4059
wait group for blk: {testfile 1}, with mark:1847
create notify channel for {testfile 1}
wait group for blk: {testfile 2}, with mark:4059
create notify channel for {testfile 2}
transation 3 committed
Tx A: rquest slock 1
Tx A: receive slock 1
Tx B: rquest xlock 2
Tx B: receive xlock 2
Tx A: request slock 2
Tx A: request slock 2
Tx C: rquest xlock 1
get xlock fail and sleep
Tx B: request slock 1
Tx B: receive slock 1
unlock by blk: +{testfile 2}
close channle for blk :{testfile 2}
delete blk: {testfile 2} and launch rotinue to create it, mark: 8081
routine wake up by notify channel
Tx A: receive slock 2
Tx A: Commit
“routine wake up by notify channel “这句输出表示交易A所在线程被唤醒,然后交易A获取区块2,然后交易完成,执行Commit操作并释放它加在区块1和2上的锁,它首先释放的也是加在区块2上的锁:
unlock by blk: +{testfile 2}
close channle for blk :{testfile 2}
channel for {testfile 2} is already closed
unlock by blk: +{testfile 1}
close channle for blk :{testfile 1}
delete blk: {testfile 1} and launch rotinue to create it, mark: 7887
wait group for blk: {testfile 2}, with mark:8081
create notify channel for {testfile 2}
routine wake up by notify channel
Tx C: receive xlock 1
wait group for blk: {testfile 1}, with mark:7887
create notify channel for {testfile 1}
transation 2 committed
transation 1 committed
Tx C: request slock 2
Tx C: receive slock 2
Tx C: Commit
unlock by blk: +{testfile 1}
close channle for blk :{testfile 1}
delete blk: {testfile 1} and launch rotinue to create it, mark: 1847
unlock by blk: +{testfile 2}
close channle for blk :{testfile 2}
delete blk: {testfile 2} and launch rotinue to create it, mark: 4059
wait group for blk: {testfile 1}, with mark:1847
create notify channel for {testfile 1}
wait group for blk: {testfile 2}, with mark:4059
create notify channel for {testfile 2}
transation 3 committed
在区块的锁释放后,lock_table会再次启动两个线程去创建这两个区块对应的管道对象,用于创建区块1的线程标号为1847,用于创建区块2的线程标号为4059,然后这两个线程分别执行并为对应区块创建管道对象,此时所有交易完成。我们从上面输出的信息可以看到,并发管理器能有效的针对不同线程中的交易在读写对应区块时准确加锁,保证区块的读写顺序满足可序列化原则,进而确保在多并发情况下,每个交易都能正确执行且不会互相影响,更详细的调试演示请在B站搜索Coding迪斯尼,代码下载路径: github 百度云盘,链接: https://pan.baidu.com/s/1VhJkALlGHbP7FvkEHoFUxw 提取码: j5mv