diff --git a/modules/notification/ui/ui.go b/modules/notification/ui/ui.go index a8c904e22cec..4d80d43c91ce 100644 --- a/modules/notification/ui/ui.go +++ b/modules/notification/ui/ui.go @@ -6,14 +6,16 @@ package ui import ( "code.gitea.io/gitea/models" + "code.gitea.io/gitea/modules/graceful" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/notification/base" + "code.gitea.io/gitea/modules/queue" ) type ( notificationService struct { base.NullNotifier - issueQueue chan issueNotificationOpts + issueQueue queue.Queue } issueNotificationOpts struct { @@ -29,19 +31,24 @@ var ( // NewNotifier create a new notificationService notifier func NewNotifier() base.Notifier { - return ¬ificationService{ - issueQueue: make(chan issueNotificationOpts, 100), - } + ns := ¬ificationService{} + ns.issueQueue = queue.CreateQueue("notification-service", ns.handle, issueNotificationOpts{}) + return ns } -func (ns *notificationService) Run() { - for opts := range ns.issueQueue { +func (ns *notificationService) handle(data ...queue.Data) { + for _, datum := range data { + opts := datum.(issueNotificationOpts) if err := models.CreateOrUpdateIssueNotifications(opts.issueID, opts.commentID, opts.notificationAuthorID); err != nil { log.Error("Was unable to create issue notification: %v", err) } } } +func (ns *notificationService) Run() { + graceful.GetManager().RunWithShutdownFns(ns.issueQueue.Run) +} + func (ns *notificationService) NotifyCreateIssueComment(doer *models.User, repo *models.Repository, issue *models.Issue, comment *models.Comment) { var opts = issueNotificationOpts{ @@ -51,35 +58,39 @@ func (ns *notificationService) NotifyCreateIssueComment(doer *models.User, repo if comment != nil { opts.commentID = comment.ID } - ns.issueQueue <- opts + _ = ns.issueQueue.Push(opts) } func (ns *notificationService) NotifyNewIssue(issue *models.Issue) { - ns.issueQueue <- issueNotificationOpts{ + _ = ns.issueQueue.Push(issueNotificationOpts{ issueID: issue.ID, notificationAuthorID: issue.Poster.ID, - } + }) } func (ns *notificationService) NotifyIssueChangeStatus(doer *models.User, issue *models.Issue, actionComment *models.Comment, isClosed bool) { - ns.issueQueue <- issueNotificationOpts{ + _ = ns.issueQueue.Push(issueNotificationOpts{ issueID: issue.ID, notificationAuthorID: doer.ID, - } + }) } func (ns *notificationService) NotifyMergePullRequest(pr *models.PullRequest, doer *models.User) { - ns.issueQueue <- issueNotificationOpts{ + _ = ns.issueQueue.Push(issueNotificationOpts{ issueID: pr.Issue.ID, notificationAuthorID: doer.ID, - } + }) } func (ns *notificationService) NotifyNewPullRequest(pr *models.PullRequest) { - ns.issueQueue <- issueNotificationOpts{ + if err := pr.LoadIssue(); err != nil { + log.Error("Unable to load issue: %d for pr: %d: Error: %v", pr.IssueID, pr.ID, err) + return + } + _ = ns.issueQueue.Push(issueNotificationOpts{ issueID: pr.Issue.ID, notificationAuthorID: pr.Issue.PosterID, - } + }) } func (ns *notificationService) NotifyPullRequestReview(pr *models.PullRequest, r *models.Review, c *models.Comment) { @@ -90,5 +101,5 @@ func (ns *notificationService) NotifyPullRequestReview(pr *models.PullRequest, r if c != nil { opts.commentID = c.ID } - ns.issueQueue <- opts + _ = ns.issueQueue.Push(opts) }