package main import ( "context" "regexp" "time" "github.com/golang/glog" ) // dispatcher is responsible for dispatching incoming SMS messages to subscribers // that have chosen to receive them, filtering accordingly. type dispatcher struct { // New SMS messages to be dispatched. incoming chan *sms // New subscribers to send messages to. subscribers chan *subscriber } // newDispatcher creates a new dispatcher. func newDispatcher() *dispatcher { return &dispatcher{ incoming: make(chan *sms), subscribers: make(chan *subscriber), } } // sms received from the upstream provider. type sms struct { from string body string timestamp time.Time } // subscriber that wants to receive messages with a given body filter. type subscriber struct { // regexp to filter message body by re *regexp.Regexp // channel to which messages will be sent, must be emptied regularly by the // subscriber. data chan *sms // channel that needs to be closed when the subscriber doesn't want to receive // any more messages. cancel chan struct{} } func (p *dispatcher) publish(msg *sms) { p.incoming <- msg } func (p *dispatcher) subscribe(sub *subscriber) { p.subscribers <- sub } func (p *dispatcher) run(ctx context.Context) { // Map of internal IDs to subscribers. Internal IDs are used to remove // canceled subscribers easily. subscriberMap := make(map[int64]*subscriber) // Internal channel that will emit SIDs of subscribers that needs to be // removed. subscriberCancel := make(chan int64) for { select { // Should the processor close? case <-ctx.Done(): return // Do we need to remove a given subscriber? case sid := <-subscriberCancel: delete(subscriberMap, sid) // Do we have a new subscriber? case sub := <-p.subscribers: // Generate a SID. A UNIX nanosecond timestamp is enough, since // we're not running in parallel. sid := time.Now().UnixNano() glog.V(5).Infof("New subscriber %x, regexp %v", sid, sub.re) // Add to subscriber map. subscriberMap[sid] = sub // On sub.cancel closed, emit info that we need to delete that // subscriber. go func() { _, _ = <-sub.cancel subscriberCancel <- sid }() // Do we have a new message to dispatch? case in := <-p.incoming: for sid, s := range subscriberMap { glog.V(10).Infof("Considering %x", sid) // If this subscriber doesn't care, ignore. if !s.re.MatchString(in.body) { continue } // Send, non-blocking, to subscriber. This ensures that we // don't get stuck if a subscriber doesn't drain fast enough. go func(to *subscriber, sid int64) { glog.V(10).Infof("Dispatching to %x, %v", sid, to.data) to.data <- in glog.V(10).Infof("Dispatched to %x", sid) }(s, sid) } } } }