diff --git a/modules/log/event.go b/modules/log/event.go index 6975bf749d83..00a66c306abe 100644 --- a/modules/log/event.go +++ b/modules/log/event.go @@ -143,7 +143,7 @@ type MultiChannelledLog struct { name string bufferLength int64 queue chan *Event - mutex sync.Mutex + rwmutex sync.RWMutex loggers map[string]EventLogger flush chan bool close chan bool @@ -173,10 +173,10 @@ func NewMultiChannelledLog(name string, bufferLength int64) *MultiChannelledLog // AddLogger adds a logger to this MultiChannelledLog func (m *MultiChannelledLog) AddLogger(logger EventLogger) error { - m.mutex.Lock() + m.rwmutex.Lock() name := logger.GetName() if _, has := m.loggers[name]; has { - m.mutex.Unlock() + m.rwmutex.Unlock() return ErrDuplicateName{name} } m.loggers[name] = logger @@ -186,7 +186,7 @@ func (m *MultiChannelledLog) AddLogger(logger EventLogger) error { if logger.GetStacktraceLevel() < m.stacktraceLevel { m.stacktraceLevel = logger.GetStacktraceLevel() } - m.mutex.Unlock() + m.rwmutex.Unlock() go m.Start() return nil } @@ -195,15 +195,15 @@ func (m *MultiChannelledLog) AddLogger(logger EventLogger) error { // NB: If you delete the last sublogger this logger will simply drop // log events func (m *MultiChannelledLog) DelLogger(name string) bool { - m.mutex.Lock() + m.rwmutex.Lock() logger, has := m.loggers[name] if !has { - m.mutex.Unlock() + m.rwmutex.Unlock() return false } delete(m.loggers, name) m.internalResetLevel() - m.mutex.Unlock() + m.rwmutex.Unlock() logger.Flush() logger.Close() return true @@ -211,15 +211,15 @@ func (m *MultiChannelledLog) DelLogger(name string) bool { // GetEventLogger returns a sub logger from this MultiChannelledLog func (m *MultiChannelledLog) GetEventLogger(name string) EventLogger { - m.mutex.Lock() - defer m.mutex.Unlock() + m.rwmutex.RLock() + defer m.rwmutex.RUnlock() return m.loggers[name] } // GetEventLoggerNames returns a list of names func (m *MultiChannelledLog) GetEventLoggerNames() []string { - m.mutex.Lock() - defer m.mutex.Unlock() + m.rwmutex.RLock() + defer m.rwmutex.RUnlock() var keys []string for k := range m.loggers { keys = append(keys, k) @@ -228,12 +228,12 @@ func (m *MultiChannelledLog) GetEventLoggerNames() []string { } func (m *MultiChannelledLog) closeLoggers() { - m.mutex.Lock() + m.rwmutex.Lock() for _, logger := range m.loggers { logger.Flush() logger.Close() } - m.mutex.Unlock() + m.rwmutex.Unlock() m.closed <- true } @@ -249,8 +249,8 @@ func (m *MultiChannelledLog) Resume() { // ReleaseReopen causes this logger to tell its subloggers to release and reopen func (m *MultiChannelledLog) ReleaseReopen() error { - m.mutex.Lock() - defer m.mutex.Unlock() + m.rwmutex.Lock() + defer m.rwmutex.Unlock() var accumulatedErr error for _, logger := range m.loggers { if err := logger.ReleaseReopen(); err != nil { @@ -266,13 +266,13 @@ func (m *MultiChannelledLog) ReleaseReopen() error { // Start processing the MultiChannelledLog func (m *MultiChannelledLog) Start() { - m.mutex.Lock() + m.rwmutex.Lock() if m.started { - m.mutex.Unlock() + m.rwmutex.Unlock() return } m.started = true - m.mutex.Unlock() + m.rwmutex.Unlock() paused := false for { if paused { @@ -286,11 +286,11 @@ func (m *MultiChannelledLog) Start() { m.closeLoggers() return } - m.mutex.Lock() + m.rwmutex.RLock() for _, logger := range m.loggers { logger.Flush() } - m.mutex.Unlock() + m.rwmutex.RUnlock() case <-m.close: m.closeLoggers() return @@ -307,24 +307,24 @@ func (m *MultiChannelledLog) Start() { m.closeLoggers() return } - m.mutex.Lock() + m.rwmutex.RLock() for _, logger := range m.loggers { err := logger.LogEvent(event) if err != nil { fmt.Println(err) } } - m.mutex.Unlock() + m.rwmutex.RUnlock() case _, ok := <-m.flush: if !ok { m.closeLoggers() return } - m.mutex.Lock() + m.rwmutex.RLock() for _, logger := range m.loggers { logger.Flush() } - m.mutex.Unlock() + m.rwmutex.RUnlock() case <-m.close: m.closeLoggers() return @@ -359,11 +359,15 @@ func (m *MultiChannelledLog) Flush() { // GetLevel gets the level of this MultiChannelledLog func (m *MultiChannelledLog) GetLevel() Level { + m.rwmutex.RLock() + defer m.rwmutex.RUnlock() return m.level } // GetStacktraceLevel gets the level of this MultiChannelledLog func (m *MultiChannelledLog) GetStacktraceLevel() Level { + m.rwmutex.RLock() + defer m.rwmutex.RUnlock() return m.stacktraceLevel } @@ -384,8 +388,8 @@ func (m *MultiChannelledLog) internalResetLevel() Level { // ResetLevel will reset the level of this MultiChannelledLog func (m *MultiChannelledLog) ResetLevel() Level { - m.mutex.Lock() - defer m.mutex.Unlock() + m.rwmutex.Lock() + defer m.rwmutex.Unlock() return m.internalResetLevel() } diff --git a/modules/queue/queue_disk_channel_test.go b/modules/queue/queue_disk_channel_test.go index bbe8a5ecbd3b..06cb7a7a96b1 100644 --- a/modules/queue/queue_disk_channel_test.go +++ b/modules/queue/queue_disk_channel_test.go @@ -16,7 +16,6 @@ import ( func TestPersistableChannelQueue(t *testing.T) { handleChan := make(chan *testData) handle := func(data ...Data) { - assert.True(t, len(data) == 2) for _, datum := range data { testDatum := datum.(*testData) handleChan <- testDatum