76 lines
2.3 KiB
Python
Executable File
76 lines
2.3 KiB
Python
Executable File
#!/usr/bin/env python
|
|
|
|
import datetime
|
|
import sys
|
|
|
|
from asterisk.agi import AGI
|
|
|
|
# Use next matched number after TIMEOUT seconds
|
|
TIMEOUT = 30
|
|
|
|
def part_match(value, e):
|
|
"""Matches single part of cron-like range expression"""
|
|
if e == '*':
|
|
return True
|
|
elif '/' in e:
|
|
everything, diff = e.split('/')
|
|
return part_match(value, everything) and value % int(diff) == 0
|
|
elif '-' in e:
|
|
start, end = map(int, e.split('-', 1))
|
|
return value >= start and value <= end
|
|
else:
|
|
return int(e) == value
|
|
|
|
def cron_match(value, expr):
|
|
"""Matches cron-like range expression"""
|
|
return any(part_match(value, e) for e in expr.split(','))
|
|
|
|
def cronline_match(dt, expr):
|
|
"""Matches cron-like expression to provided datetime"""
|
|
return all([
|
|
cron_match(t, e)
|
|
for t, e in
|
|
zip([dt.minute, dt.hour, dt.day, dt.month, dt.weekday()], expr.split())
|
|
])
|
|
|
|
def phonetab_list(fname, dt):
|
|
"""Returns matches for provided datetime from phonetab file"""
|
|
with open(fname) as fd:
|
|
return [
|
|
line.rsplit(None, 1)[1]
|
|
for line in fd
|
|
if line.strip()
|
|
and not line.startswith('#')
|
|
and cronline_match(dt, line.rsplit(None, 1)[0])
|
|
]
|
|
|
|
if __name__ == "__main__":
|
|
agi = AGI()
|
|
agi.verbose('Testing shite')
|
|
numbers = phonetab_list(sys.argv[1], datetime.datetime.now())
|
|
agi.verbose(str(numbers))
|
|
|
|
for num in numbers:
|
|
# Default to SIP/...@trunk-out if no type specified, multiple numbers
|
|
# can be comma-separated
|
|
# Example:
|
|
# 123456789,SIP/2137@local -> SIP/123456789@trunk-out&SIP/2137@local
|
|
numlist = [(n if '/' in n else 'SIP/%s@trunk-out' % n)for n in num.split(',')]
|
|
dialpath = '&'.join(numlist)
|
|
|
|
agi.verbose('Dialing %s' % (dialpath,))
|
|
res = agi.appexec('Dial', '%s,%d,m' % (dialpath, TIMEOUT))
|
|
agi.verbose('Response: %r' % res)
|
|
|
|
dialstatus = agi.get_variable('DIALSTATUS')
|
|
agi.verbose('Dialstatus: %r' % dialstatus)
|
|
|
|
if dialstatus == 'ANSWER':
|
|
break
|
|
|
|
agi.appexec("Agi", 'googletts.agi,\\"All lines are currently busy. Please try again later.\\",en')
|
|
|
|
agi.verbose('Number timed out, next...')
|
|
else:
|
|
agi.verbose('No more numbers')
|