forked from hswaw/hscloud
lelegram: stuckness fixes, timeout
Change-Id: I3c1ad4e589ea66db846a56aab8a2c1698bdee539master
parent
400ac7a88d
commit
83e2690070
|
@ -8,7 +8,7 @@ local kube = import "../../kube/kube.libsonnet";
|
||||||
},
|
},
|
||||||
image: {
|
image: {
|
||||||
teleimg: "registry.k0.hswaw.net/q3k/teleimg:1578259776-a07688fe74efe1e190d58092a9f50d4275a15e3d",
|
teleimg: "registry.k0.hswaw.net/q3k/teleimg:1578259776-a07688fe74efe1e190d58092a9f50d4275a15e3d",
|
||||||
lelegram: "registry.k0.hswaw.net/q3k/lelegram:1578255597-a885488fd0f9fcd271f6a02416aae5bb3fd9c9ac",
|
lelegram: "registry.k0.hswaw.net/q3k/lelegram:1579785132-a2ee865a0cb74f88ba4b9861598e1efe78c4f066",
|
||||||
},
|
},
|
||||||
bridge: {
|
bridge: {
|
||||||
telegram: "-1001345766954",
|
telegram: "-1001345766954",
|
||||||
|
|
|
@ -4,6 +4,7 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net"
|
"net"
|
||||||
|
"regexp"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
|
@ -52,6 +53,8 @@ type ircconn struct {
|
||||||
connected int64
|
connected int64
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var reIRCNick = regexp.MustCompile(`[^a-z0-09]`)
|
||||||
|
|
||||||
// Say is called by the Manager when a message should be sent out by the
|
// Say is called by the Manager when a message should be sent out by the
|
||||||
// connection.
|
// connection.
|
||||||
func (i *ircconn) Say(msg *controlMessage) {
|
func (i *ircconn) Say(msg *controlMessage) {
|
||||||
|
@ -100,10 +103,14 @@ func NewConn(server, channel, user string, backup bool, h func(e *event)) (*ircc
|
||||||
}
|
}
|
||||||
|
|
||||||
// Generate IRC nick from username.
|
// Generate IRC nick from username.
|
||||||
nick := user
|
nick := reIRCNick.ReplaceAllString(user, "")
|
||||||
if len(nick) > 13 {
|
if len(nick) > 13 {
|
||||||
nick = nick[:13]
|
nick = nick[:13]
|
||||||
}
|
}
|
||||||
|
if len(nick) == 0 {
|
||||||
|
glog.Errorf("Could not create IRC nick for %q", user)
|
||||||
|
nick = "wtf"
|
||||||
|
}
|
||||||
nick += "[t]"
|
nick += "[t]"
|
||||||
|
|
||||||
// Configure IRC client to populate the IRC Queue.
|
// Configure IRC client to populate the IRC Queue.
|
||||||
|
@ -186,7 +193,7 @@ func (i *ircconn) loop(ctx context.Context) {
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("IRC/%s: WriteMessage: %v", i.user, err)
|
glog.Errorf("IRC/%s: WriteMessage: %v", i.user, err)
|
||||||
die()
|
die(err)
|
||||||
s.done <- err
|
s.done <- err
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
|
@ -8,16 +8,23 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
// Control: send a message to IRC.
|
// Control: send a message to IRC.
|
||||||
func (m *Manager) SendMessage(user, text string) error {
|
func (m *Manager) SendMessage(ctx context.Context, user, text string) error {
|
||||||
done := make(chan error)
|
done := make(chan error)
|
||||||
m.ctrl <- &control{
|
|
||||||
|
msg := &control{
|
||||||
message: &controlMessage{
|
message: &controlMessage{
|
||||||
from: user,
|
from: user,
|
||||||
message: text,
|
message: text,
|
||||||
done: done,
|
done: done,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
return <-done
|
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
case m.ctrl <- msg:
|
||||||
|
return <-done
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Control: subscribe to notifiactions.
|
// Control: subscribe to notifiactions.
|
||||||
|
|
|
@ -6,6 +6,7 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
"code.hackerspace.pl/hscloud/go/mirko"
|
"code.hackerspace.pl/hscloud/go/mirko"
|
||||||
|
|
||||||
|
@ -155,12 +156,14 @@ func (s *server) bridge(ctx context.Context) {
|
||||||
// This blocks until success or failure, making sure the log stays
|
// This blocks until success or failure, making sure the log stays
|
||||||
// totally ordered in the face of some of our IRC connections being
|
// totally ordered in the face of some of our IRC connections being
|
||||||
// dead/slow.
|
// dead/slow.
|
||||||
err := s.mgr.SendMessage(m.user, text)
|
ctxT, cancel := context.WithTimeout(ctx, 15*time.Second)
|
||||||
|
err := s.mgr.SendMessage(ctxT, m.user, text)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Warningf("Attempting redelivery of %v after error: %v...", m, err)
|
glog.Warningf("Attempting redelivery of %v after error: %v...", m, err)
|
||||||
err = s.mgr.SendMessage(m.user, text)
|
err = s.mgr.SendMessage(ctx, m.user, text)
|
||||||
glog.Errorf("Redelivery of %v failed: %v...", m, err)
|
glog.Errorf("Redelivery of %v failed: %v...", m, err)
|
||||||
}
|
}
|
||||||
|
cancel()
|
||||||
|
|
||||||
case n := <-s.ircLog:
|
case n := <-s.ircLog:
|
||||||
// Notification from IRC (message or new nickmap)
|
// Notification from IRC (message or new nickmap)
|
||||||
|
|
|
@ -38,6 +38,11 @@ func (s *server) telegramConnection(ctx context.Context) error {
|
||||||
glog.Infof("[ignored group %d] <%s> %v", u.Message.Chat.ID, u.Message.From, u.Message.Text)
|
glog.Infof("[ignored group %d] <%s> %v", u.Message.Chat.ID, u.Message.From, u.Message.Text)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
date := time.Unix(int64(u.Message.Date), 0)
|
||||||
|
if time.Since(date) > 2*time.Minute {
|
||||||
|
glog.Infof("[old message] <%s> %v", u.Message.From, u.Message.Text)
|
||||||
|
continue
|
||||||
|
}
|
||||||
if msg := plainFromTelegram(s.tel.Self.ID, &u); msg != nil {
|
if msg := plainFromTelegram(s.tel.Self.ID, &u); msg != nil {
|
||||||
s.telLog <- msg
|
s.telLog <- msg
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue