1
0
Fork 0
mirror of https://codeberg.org/forgejo/forgejo.git synced 2024-12-13 11:42:23 -05:00
forgejo/modules/eventsource/manager.go
zeripath 8e32eeb5de
Hold the event source when there are no listeners (#15725)
* Hold the event source when there are no listeners

The event source does not need to run when there are no listeners. Therefore
pause it when there are none.

* add some more logging

Signed-off-by: Andrew Thornton <art27@cantab.net>
2021-05-15 23:46:13 +02:00

90 lines
1.9 KiB
Go

// Copyright 2020 The Gitea Authors. All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package eventsource
import (
"sync"
)
// Manager manages the eventsource Messengers
type Manager struct {
mutex sync.Mutex
messengers map[int64]*Messenger
connection chan struct{}
}
var manager *Manager
func init() {
manager = &Manager{
messengers: make(map[int64]*Messenger),
connection: make(chan struct{}, 1),
}
}
// GetManager returns a Manager and initializes one as singleton if there's none yet
func GetManager() *Manager {
return manager
}
// Register message channel
func (m *Manager) Register(uid int64) <-chan *Event {
m.mutex.Lock()
messenger, ok := m.messengers[uid]
if !ok {
messenger = NewMessenger(uid)
m.messengers[uid] = messenger
}
select {
case m.connection <- struct{}{}:
default:
}
m.mutex.Unlock()
return messenger.Register()
}
// Unregister message channel
func (m *Manager) Unregister(uid int64, channel <-chan *Event) {
m.mutex.Lock()
defer m.mutex.Unlock()
messenger, ok := m.messengers[uid]
if !ok {
return
}
if messenger.Unregister(channel) {
delete(m.messengers, uid)
}
}
// UnregisterAll message channels
func (m *Manager) UnregisterAll() {
m.mutex.Lock()
defer m.mutex.Unlock()
for _, messenger := range m.messengers {
messenger.UnregisterAll()
}
m.messengers = map[int64]*Messenger{}
}
// SendMessage sends a message to a particular user
func (m *Manager) SendMessage(uid int64, message *Event) {
m.mutex.Lock()
messenger, ok := m.messengers[uid]
m.mutex.Unlock()
if ok {
messenger.SendMessage(message)
}
}
// SendMessageBlocking sends a message to a particular user
func (m *Manager) SendMessageBlocking(uid int64, message *Event) {
m.mutex.Lock()
messenger, ok := m.messengers[uid]
m.mutex.Unlock()
if ok {
messenger.SendMessageBlocking(message)
}
}