forked from hswaw/hscloud
111 lines
2.7 KiB
Go
111 lines
2.7 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"regexp"
|
|
"time"
|
|
|
|
"github.com/golang/glog"
|
|
)
|
|
|
|
// dispatcher is responsible for dispatching incoming SMS messages to subscribers
|
|
// that have chosen to receive them, filtering accordingly.
|
|
type dispatcher struct {
|
|
// New SMS messages to be dispatched.
|
|
incoming chan *sms
|
|
// New subscribers to send messages to.
|
|
subscribers chan *subscriber
|
|
}
|
|
|
|
// newDispatcher creates a new dispatcher.
|
|
func newDispatcher() *dispatcher {
|
|
return &dispatcher{
|
|
incoming: make(chan *sms),
|
|
subscribers: make(chan *subscriber),
|
|
}
|
|
}
|
|
|
|
// sms received from the upstream provider.
|
|
type sms struct {
|
|
from string
|
|
body string
|
|
timestamp time.Time
|
|
}
|
|
|
|
// subscriber that wants to receive messages with a given body filter.
|
|
type subscriber struct {
|
|
// regexp to filter message body by
|
|
re *regexp.Regexp
|
|
// channel to which messages will be sent, must be emptied regularly by the
|
|
// subscriber.
|
|
data chan *sms
|
|
// channel that needs to be closed when the subscriber doesn't want to receive
|
|
// any more messages.
|
|
cancel chan struct{}
|
|
}
|
|
|
|
func (p *dispatcher) publish(msg *sms) {
|
|
p.incoming <- msg
|
|
}
|
|
|
|
func (p *dispatcher) subscribe(sub *subscriber) {
|
|
p.subscribers <- sub
|
|
}
|
|
|
|
func (p *dispatcher) run(ctx context.Context) {
|
|
// Map of internal IDs to subscribers. Internal IDs are used to remove
|
|
// canceled subscribers easily.
|
|
subscriberMap := make(map[int64]*subscriber)
|
|
// Internal channel that will emit SIDs of subscribers that needs to be
|
|
// removed.
|
|
subscriberCancel := make(chan int64)
|
|
|
|
for {
|
|
select {
|
|
|
|
// Should the processor close?
|
|
case <-ctx.Done():
|
|
return
|
|
|
|
// Do we need to remove a given subscriber?
|
|
case sid := <-subscriberCancel:
|
|
delete(subscriberMap, sid)
|
|
|
|
// Do we have a new subscriber?
|
|
case sub := <-p.subscribers:
|
|
// Generate a SID. A UNIX nanosecond timestamp is enough, since
|
|
// we're not running in parallel.
|
|
sid := time.Now().UnixNano()
|
|
glog.V(5).Infof("New subscriber %x, regexp %v", sid, sub.re)
|
|
|
|
// Add to subscriber map.
|
|
subscriberMap[sid] = sub
|
|
|
|
// On sub.cancel closed, emit info that we need to delete that
|
|
// subscriber.
|
|
go func() {
|
|
_, _ = <-sub.cancel
|
|
subscriberCancel <- sid
|
|
}()
|
|
|
|
// Do we have a new message to dispatch?
|
|
case in := <-p.incoming:
|
|
for sid, s := range subscriberMap {
|
|
glog.V(10).Infof("Considering %x", sid)
|
|
// If this subscriber doesn't care, ignore.
|
|
if !s.re.MatchString(in.body) {
|
|
continue
|
|
}
|
|
|
|
// Send, non-blocking, to subscriber. This ensures that we
|
|
// don't get stuck if a subscriber doesn't drain fast enough.
|
|
go func(to *subscriber, sid int64) {
|
|
glog.V(10).Infof("Dispatching to %x, %v", sid, to.data)
|
|
to.data <- in
|
|
glog.V(10).Infof("Dispatched to %x", sid)
|
|
}(s, sid)
|
|
}
|
|
}
|
|
}
|
|
}
|