forked from gitea/gitea
		
	modules/sync: add UniqueQueue
This commit is contained in:
		
							parent
							
								
									43297148b2
								
							
						
					
					
						commit
						c1ecb6c60a
					
				| @ -20,8 +20,11 @@ import ( | ||||
| 	"github.com/gogits/gogs/modules/log" | ||||
| 	"github.com/gogits/gogs/modules/process" | ||||
| 	"github.com/gogits/gogs/modules/setting" | ||||
| 	"github.com/gogits/gogs/modules/sync" | ||||
| ) | ||||
| 
 | ||||
| var PullRequestQueue = sync.NewUniqueQueue(setting.Repository.PullRequestQueueLength) | ||||
| 
 | ||||
| type PullRequestType int | ||||
| 
 | ||||
| const ( | ||||
| @ -537,8 +540,6 @@ func (pr *PullRequest) UpdateCols(cols ...string) error { | ||||
| 	return err | ||||
| } | ||||
| 
 | ||||
| var PullRequestQueue = NewUniqueQueue(setting.Repository.PullRequestQueueLength) | ||||
| 
 | ||||
| // UpdatePatch generates and saves a new patch. | ||||
| func (pr *PullRequest) UpdatePatch() (err error) { | ||||
| 	if err = pr.GetHeadRepo(); err != nil { | ||||
|  | ||||
| @ -10,10 +10,8 @@ import ( | ||||
| 	"fmt" | ||||
| 	"io/ioutil" | ||||
| 	"strings" | ||||
| 	"sync" | ||||
| 	"time" | ||||
| 
 | ||||
| 	"github.com/Unknwon/com" | ||||
| 	"github.com/go-xorm/xorm" | ||||
| 	gouuid "github.com/satori/go.uuid" | ||||
| 
 | ||||
| @ -22,8 +20,11 @@ import ( | ||||
| 	"github.com/gogits/gogs/modules/httplib" | ||||
| 	"github.com/gogits/gogs/modules/log" | ||||
| 	"github.com/gogits/gogs/modules/setting" | ||||
| 	"github.com/gogits/gogs/modules/sync" | ||||
| ) | ||||
| 
 | ||||
| var HookQueue = sync.NewUniqueQueue(setting.Webhook.QueueLength) | ||||
| 
 | ||||
| type HookContentType int | ||||
| 
 | ||||
| const ( | ||||
| @ -500,64 +501,6 @@ func PrepareWebhooks(repo *Repository, event HookEventType, p api.Payloader) err | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| // UniqueQueue represents a queue that guarantees only one instance of same ID is in the line. | ||||
| type UniqueQueue struct { | ||||
| 	lock sync.Mutex | ||||
| 	ids  map[string]bool | ||||
| 
 | ||||
| 	queue chan string | ||||
| } | ||||
| 
 | ||||
| func (q *UniqueQueue) Queue() <-chan string { | ||||
| 	return q.queue | ||||
| } | ||||
| 
 | ||||
| func NewUniqueQueue(queueLength int) *UniqueQueue { | ||||
| 	if queueLength <= 0 { | ||||
| 		queueLength = 100 | ||||
| 	} | ||||
| 
 | ||||
| 	return &UniqueQueue{ | ||||
| 		ids:   make(map[string]bool), | ||||
| 		queue: make(chan string, queueLength), | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| func (q *UniqueQueue) Remove(id interface{}) { | ||||
| 	q.lock.Lock() | ||||
| 	defer q.lock.Unlock() | ||||
| 	delete(q.ids, com.ToStr(id)) | ||||
| } | ||||
| 
 | ||||
| func (q *UniqueQueue) AddFunc(id interface{}, fn func()) { | ||||
| 	newid := com.ToStr(id) | ||||
| 
 | ||||
| 	if q.Exist(id) { | ||||
| 		return | ||||
| 	} | ||||
| 
 | ||||
| 	q.lock.Lock() | ||||
| 	q.ids[newid] = true | ||||
| 	if fn != nil { | ||||
| 		fn() | ||||
| 	} | ||||
| 	q.lock.Unlock() | ||||
| 	q.queue <- newid | ||||
| } | ||||
| 
 | ||||
| func (q *UniqueQueue) Add(id interface{}) { | ||||
| 	q.AddFunc(id, nil) | ||||
| } | ||||
| 
 | ||||
| func (q *UniqueQueue) Exist(id interface{}) bool { | ||||
| 	q.lock.Lock() | ||||
| 	defer q.lock.Unlock() | ||||
| 
 | ||||
| 	return q.ids[com.ToStr(id)] | ||||
| } | ||||
| 
 | ||||
| var HookQueue = NewUniqueQueue(setting.Webhook.QueueLength) | ||||
| 
 | ||||
| func (t *HookTask) deliver() { | ||||
| 	t.IsDelivered = true | ||||
| 
 | ||||
|  | ||||
							
								
								
									
										70
									
								
								modules/sync/unique_queue.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										70
									
								
								modules/sync/unique_queue.go
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,70 @@ | ||||
| // Copyright 2016 The Gogs Authors. All rights reserved. | ||||
| // Use of this source code is governed by a MIT-style | ||||
| // license that can be found in the LICENSE file. | ||||
| 
 | ||||
| package sync | ||||
| 
 | ||||
| import ( | ||||
| 	"github.com/Unknwon/com" | ||||
| ) | ||||
| 
 | ||||
| // UniqueQueue is a queue which guarantees only one instance of same | ||||
| // identity is in the line. Instances with same identity will be | ||||
| // discarded if there is already one in the line. | ||||
| // | ||||
| // This queue is particularly useful for preventing duplicated task | ||||
| // of same purpose. | ||||
| type UniqueQueue struct { | ||||
| 	table *StatusTable | ||||
| 	queue chan string | ||||
| } | ||||
| 
 | ||||
| // NewUniqueQueue initializes and returns a new UniqueQueue object. | ||||
| func NewUniqueQueue(queueLength int) *UniqueQueue { | ||||
| 	if queueLength <= 0 { | ||||
| 		queueLength = 100 | ||||
| 	} | ||||
| 
 | ||||
| 	return &UniqueQueue{ | ||||
| 		table: NewStatusTable(), | ||||
| 		queue: make(chan string, queueLength), | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| // Queue returns channel of queue for retrieving instances. | ||||
| func (q *UniqueQueue) Queue() <-chan string { | ||||
| 	return q.queue | ||||
| } | ||||
| 
 | ||||
| // Exist returns true if there is an instance with given indentity | ||||
| // exists in the queue. | ||||
| func (q *UniqueQueue) Exist(id interface{}) bool { | ||||
| 	return q.table.IsRunning(com.ToStr(id)) | ||||
| } | ||||
| 
 | ||||
| // AddFunc adds new instance to the queue with a custom runnable function, | ||||
| // the queue is blocked until the function exits. | ||||
| func (q *UniqueQueue) AddFunc(id interface{}, fn func()) { | ||||
| 	if q.Exist(id) { | ||||
| 		return | ||||
| 	} | ||||
| 
 | ||||
| 	idStr := com.ToStr(id) | ||||
| 	q.table.lock.Lock() | ||||
| 	q.table.pool[idStr] = true | ||||
| 	if fn != nil { | ||||
| 		fn() | ||||
| 	} | ||||
| 	q.table.lock.Unlock() | ||||
| 	q.queue <- idStr | ||||
| } | ||||
| 
 | ||||
| // Add adds new instance to the queue. | ||||
| func (q *UniqueQueue) Add(id interface{}) { | ||||
| 	q.AddFunc(id, nil) | ||||
| } | ||||
| 
 | ||||
| // Remove removes instance from the queue. | ||||
| func (q *UniqueQueue) Remove(id interface{}) { | ||||
| 	q.table.Stop(com.ToStr(id)) | ||||
| } | ||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user
	 Unknwon
						Unknwon