#!/usr/bin/env python # Slinke Class implements serial communication with the Nirvis Slink-e # device. Uses helper class slinke_devfile to read Nirvis remote # control ("button") code files. User can supply an alternate helper # class that provides 'load', 'store', 'add_button', and # 'button_to_str', methods. Included is a simple pickle based helper # that eases basic learning and playback of codes. import os, sys, time, string, select, re class slinke: # PUBLIC def __init__(self, file=None, dev='/dev/ttyS0', controlS=0, helpclass=None): if helpclass == None: helpclass = slinke_devfile self.helper = helpclass() self.load = self.helper.load self.store = self.helper.store self.delay = 0 self.dev = dev self.debug = 0 if file: self.load(file) self.S = os.open(self.dev, 2) self.serial_condition(flow=0) self.serial_timeout = 2 self.tx(cmd_resume) cc = 124 if controlS: cc = 0 print 'Control-S (baseband) mode' cc, cp = self.txrx(c_set_carrier, 0, cc) self.id() self.serial_condition(flow=1) self.serial_timeout = 60 def id(self): v, = self.txrx(c_get_version) print 'Slink-e Version %d.%d' % (v >> 4, v & 0xF) def learn(self, cmdname): self.helper.add_button(cmdname, self.capture()) # send out a command and return its response, takes optional "port=" argument. def txrx(self, cd, *args, **kw): port = cd.port if kw.has_key('port'): port = kw['port'] if not cd.tx: print 'txrx: cd not tx!'; sys.exit(1) if len(args) != cd.cnt: print 'txrx: arg cnt(%d) not cd.cnt(%d)!' % (len(args), cd.cnt) print cd.__dict__ sys.exit(1) self.tx((port << 5) | cd.c0, cd.c1) for j in args: self.tx(j) if cd.r: r = () while 1: p, id, m = self.getmsg() if ((port<<5) | cd.r.c0) == ord(m[0]) \ and cd.r.c1 == ord(m[1]): for i in range(cd.r.cnt): r = r + (ord(m[2+i]),) break else: print '''Unexpected response '%s' on port %d:''' % (id, p), for i in m: print '%2x ' % ord(i), print return r # output a space separated list of buttons # '-sleep #' is special and sleeps for a given number of seconds def do(self, args): a = string.split(args) i = 0 while i < len(a): if a[i] == '-sleep': time.sleep(string.atof(a[i+1])) print "sleeping", a[i+1] i=i+1 else: self.irport_send_cmd(a[i]) if self.delay: time.sleep(self.delay) i=i+1 # set the intercode delay in seconds def set_delay(self, delay): self.delay = delay # pull in data from the parallel port def port_receive(self, port): while 1: m, id = self.rx(port) if id == 'port recv': break print 'Unexpected message:', id r = () for i in m[1:]: r = r + (ord(i),) return r # debugging on prints incoming messages def set_debug(self, on): self.debug = on # PRIVATE def serial_condition(self, flow): flag='-' if flow: flag='' os.system('stty %scrtscts -echo raw 38400 < %s' % (flag, self.dev)) def capture(self): cmd = '' while 1: m, id = self.rx(port_ir) if id == 'port recv': cmd = cmd + m[1:] if id == r_last_rx_port_equals.text: break return cmd def irport_send_cmd(self, cmdname): self.irport_send_str(self.helper.button_to_str(cmdname)) def irport_send_str(self, str): l = len(str) o = 0 while l - o: c = MIN(l-o, 30) self.tx((port_ir<<5) + c) self.tx(str[o:o+c]) o = o + c self.tx(port_ir<<5) def tx(self, *args): for j in args: if type(j) == type(0): os.write(self.S, chr(j)) elif type(j) == type('str'): os.write(self.S, j) else: for i in j: self.tx(i) def rx(self, port): while 1: p, id, m = self.getmsg() if self.debug: print p, id if p == port: return m, id def getmsg(self): c, id, b = self.gethead() r = '' while len(r) < c: r = r + os.read(self.S, c-len(r)) if len(r) != c: print "SIZE MISMATCH", len(r), c m = b+r return ord(b[0]) >> 5, id, m def gethead(self): l = select.select([self.S],[],[], self.serial_timeout) if l == ([],[],[]): raise SLINKE_TIMEOUT ch0 = os.read(self.S, 1) ch1 = '' r = 0, 'unknown' c = ord(ch0) & 0x1F p = ord(ch0) >> 5 if c == 0: r = 0, 'rx end' elif c > 0 and c < 31: r = c, 'port recv' else: ch1 = os.read(self.S, 1) b1 = ord(ch1) if c == 31: d = findresponse(p, b1) r = d.cnt, d.text return r + (ch0 + ch1,) class tt: pass Respmap = {} def addmap(d): if not d.tx: Respmap[(d.port, d.c1)] = d def findresponse(port, c1): k = (port, c1) kn = (None, c1) if Respmap.has_key(k): return Respmap[k] elif Respmap.has_key(kn): return Respmap[kn] else: print 'findresponse: response not found', port, c1 # manufacture symbols and insert in address space def dx(c1, c0=0x1f, tx=0, t='', cnt=0, r=None): d = tt() d.tx = tx d.str = t d.port = Curport d.c0 = c0 d.c1 = c1 d.r = r d.cnt = cnt d.text = t addmap(d) var = re.sub(' ','_',t) if d.tx: var='c_' + var else: var='r_' + var globals()[var]=d port_ir = 4 port_par = 5 port_ser = 6 port_sys = 7 # add commands as needed Curport = port_ir dx(0x01, t='last rx port equals', cnt=1) dx(0x04, t='period_equals', cnt=2) dx(0x06, t='carrier_equals', cnt=2) dx(0x08, t='tx ports equal', cnt=1) dx(0x09, t='rx ports equal', cnt=1) dx(0x0a, t='ir routing equals', cnt=16) dx(0x0b, t='rx polarities equals', cnt=1) dx(0x0c, t='timeout equals', cnt=2) dx(0x0e, t='minimum equals', cnt=1) dx(0x82, t='invalid sample period') dx(0x5, t='get period', r=r_period_equals, tx=1) dx(0x6, t='set carrier', r=r_carrier_equals, tx=1, cnt=2) dx(0xd, t='get timeout', r=r_timeout_equals, tx=1) dx(0xf, t='get minimum', r=r_minimum_equals, tx=1) dx(0x10,t='get ir routing', r=r_ir_routing_equals, tx=1) dx(0x11,t='get rx polarities', r=r_rx_polarities_equals, tx=1) dx(0x12,t='get rx ports', r=r_rx_ports_equal, tx=1) dx(0x13,t='get tx ports', r=r_tx_ports_equal, tx=1) Curport = port_par dx(0x10, t='handshaking equals', cnt=1) dx(0x12, t='direction equals', cnt=1) dx(0x11, t='get handshaking', r=r_handshaking_equals, tx=1) dx(0x13, t='get direction', r=r_direction_equals, tx=1) dx(0x14, t='sample port', tx=1) Curport = port_ser dx(0x08, t='baud rate quals', cnt=1) dx(0x12, t='configuration direction equals', cnt=1) dx(0x83, t='port receive buffer overflow') dx(0x84, t='baud rate illegal') dx(0x85, t='port receive framing error') dx(0x86, t='port receive buffer overrun') Curport = port_sys dx(0x0b, t='version equals', cnt=1) dx(0x0c, t='serial equals', cnt=8) dx(0x0e, t='defaults saved') dx(0x0f, t='defaults loaded') dx(0x8f, t='SEEPROM write error') dx(0xb, t='get version', r=r_version_equals, tx=1) dx(0xc, t='get serial', r=r_serial_equals, tx=1) Curport = None dx(0x02, t='port disabled') dx(0x03, t='port enabled') dx(0x80, t='S-Link rx error') dx(0x81, t='transmission timeout') dx(0xff, t='illegal command') dx(0x2, t='disable port', r=r_port_disabled, tx=1) dx(0x3, t='enable port', r=r_port_enabled, tx=1) cmd_resume=(0xff, 0xaa) SLINKE_TIMEOUT='serial timeout' def MIN(a,b): if a < b: return a else: return b # handle slinke device files class slinke_devfile: # sample period in microseconds def __init__(self, file=None, sample_period=100): self.codes = {} self.vars = {} if file: self.load(file) self.set_sample_period(sample_period) def load(self, file): self.doload(file) self.setvars() def add_button(self, button, code): print 'WARNING: slinke_devfile.add_button() called! Ignored' def store(self, file): print 'WARNING: slinke_devfile.store() called! Ignored' def set_sample_period(self, sper): self.sper = sper # return the character sequence to output for a given remote button def button_to_str(self, button): l = self.ooseq(button) c = [] for i in l: self.rc(c, i) ch = '' for i in c: ch = ch + chr(i) return ch def getv(self, v, default=None): i = default if self.vars.has_key(v): i = string.atof(self.vars[v]) if i == None: print 'getv: variable %s not found' % v sys.exit(1) return i def gets(self, v, default=None): i = default if self.vars.has_key(v): i = self.vars[v] if i == None: print 'gets: variable %s not found' % v sys.exit(1) return i def gettuple(self, v, default=None): t = () if self.vars.has_key(v): for j in string.split(self.vars[v]): t = t + (string.atof(j),) else: t = default if t == None: print 'gettuple: variable %s not found' % v sys.exit(1) return t def setvars(self): self.carrier = self.getv('carrier', 0) self.zero = self.gettuple('zero') self.one = self.gettuple('one') self.start = self.gettuple('start',()) self.stop = self.gettuple('stop',()) self.repeat = self.getv('repeat', 1) self.pause = self.getv('pause', 0) self.sleep = self.getv('sleep', 0) self.prefix = self.gets('prefix','') self.suffix = self.gets('suffix','') def doload(self, file): f = open(file) mydir, myfile = os.path.split(file) while 1: l = f.readline() if not l: break l = re.sub('#.*$','',l) l = string.strip(l) if not l: continue if l[0] in '01': p, n = string.split(l,':') self.codes[n] = p else: n, p = string.split(l,'=') self.vars[n] = p if n == 'include': self.doload(os.path.join(mydir,p)) def rc(self, c, t): on = 0 if t > 0: on = 0200 t = abs(t)/self.sper if t != int(t): print 'rc: time not multiple of sper:', t t = int(t) while (t > 0): o = MIN(t, 0177) c.append(o | on) t = t - o def ooseq(self, button): if not self.codes.has_key(button): print 'ooseq: could not find code for', button sys.exit(1) seq = [] for i in range(self.repeat): self.ooseqcode(seq, button) seq.append(self.pause) seq.append(self.sleep) return seq def ooseqcode(self, seq, button): self.aptup(seq, self.start) self.ap01(seq, self.prefix) self.ap01(seq, self.codes[button]) self.ap01(seq, self.suffix) self.aptup(seq, self.stop) return seq def ap01(self, l, s): for i in s: if i == '0': self.aptup(l, self.zero) if i == '1': self.aptup(l, self.one) def aptup(self, l, t): for i in t: l.append(i) # simple pickle based helper for learning and playing back commands import pickle class pickled_helper: def __init__(self): self.cmds = {} def load(self, file): if os.path.exists(file): f = open(file) self.cmds = pickle.load(f) f.close() else: self.cmds = {} def store(self, file): f = open(file, 'w') pickle.dump(self.cmds, f) f.close() def button_to_str(self, button): return self.cmds[button] def add_button(self, button, str): self.cmds[button] = str def dev_file_test_main(): s = slinke_devfile(sys.argv[1]) print s.get_char_seq('display') def usage(): print '''Slinke control program, usage: slinke.py [options] learn | do | tty | test | nop [buttons] commands: learn