Twitter DiscoBot
This page contains historical information. It is provided for reference but does not reflect current information or policy. |
This was a demonstration and learning exercise to help generate some excitement for the Space Automation project.
What It Does
There is a multi-colored, spinning, disco-light-thingy sitting on top of the network rack. Every time the string 'i3detroit' gets mentioned on twitter, the thigy lights up an spins for 30 seconds.
How it works
A python script on the debian server talks to one of these things over RS485. Relay 0 on the i/o box controls power to the disco-light-thingy.
twitter_disco.py
import twitter import RelayDuino import time twitter = twitter.Api() relay = RelayDuino.RelayDuino('/dev/ttyUSB0').relay[0] relay.set('off') last_id = twitter.Search('i3detroit').max_id while True: result_set = twitter.Search('i3detroit', since_id=last_id) print "got %d new statuses" % len(result_set.results) for status in result_set.results: print "Got a mention, let's party!" print status.from_user_id print status.text relay.set('on') time.sleep(30) relay.set('off') time.sleep(30) last_id = result_set.max_id print "last_id == %s" % str(last_id) time.sleep(30)
RelayDuino.py
import sys import serial class RelayDuinoErr(Exception): pass class RelayState(object): def __init__(self, value): if isinstance(value, bool): self.value = value elif value in ('ON', 'On', 'on', 'TRUE', 'True', 'true', '1', 1): self.value = True elif value in ('OFF', 'Off', 'off', 'FALSE', 'False', 'false', '0', 0): self.value = False else: raise ValueError("'%s' is not a valid relay state" % value) def __str__(self): if self.value: return 'ON' else: return 'OFF' def __eq__(self, other): return self.value == RelayState(other).value def __ne__(self, other): return not self == other def __nonzero__(self): return self.value class Relay(object): def __init__(self, relayduino, index): self.relayduino = relayduino self.index = index def get_state(self): response = self.relayduino.command("RS %d" % self.index) return RelayState(response) def set_state(self, state): if RelayState(state): self.relayduino.command("ON %d" % self.index) else: self.relayduino.command("OFF %d" % self.index) get = get_state set = set_state state = property(get_state, set_state) def toggle(self): self.state = not self.state class DigitalInputState(object): def __init__(self, value): if value in (1, 0, '1', '0', True, False): self.value = int(value) elif isinstance(value, str): if value.lower() in ('hi', 'high', 'set'): self.value = 1 elif value.lower() in ('lo', 'low', 'clear'): self.value = 0 else: raise ValueError("'%s' is not a valid relay state" % value) else: raise ValueError("'%s' is not a valid relay state" % value) def __eq__(self, other): return self.value == DigitalInputState(other).value def __ne__(self, other): return not self == other def __nonzero__(self): return self.value class DigitalInput(object): def __init__(self, relayduino, index): self.relayduino = relayduino self.index = index def get_state(self): response = self.relayduino.command("IS %d" % self.index) return DigitalInputState(response) get = get_state state = property(get_state) class AnalogInput(object): def __init__(self, relayduino, index): self.relayduino = relayduino self.index = index def get_value(self): response = self.relayduino.command("IS %d" % self.index) return float(response) get = get_value value = property(get_value) class FractionalAnalogInput(AnalogInput): def get_value(self): raw = super(FractionalAnalogInput, self).get_value() return float(raw) / 1024 class RelayDuino(object): """Library with handy functions to connect an Ocean Controls KTA-223 Relay Box""" delim = '\x0D' default_baudrate = 115200 def autobaud(self): if self.is_open: self.port.close() for rate in (1200, 2400, 4800, 9600, 14400, 19200, 28800, 38400, 57600, 115200): try: self._open(rate) print "connected at %d" % rate break except RelayDuinoErr, ex: print "failed to connect at %d\n %s" % (rate, str(ex)) if self.port: self.port.close() time.sleep(0.500) def __init__ (self, comport_name=None , device_address=0, debug=False): self.comport_name = comport_name self.device_address = device_address self.port = None self.debug_flag = debug self.relay = tuple([Relay(self, idx+1) for idx in range(8)]) self.input = tuple([DigitalInput(self, idx+1) for idx in range(4)]) self.analog = tuple([AnalogInput(self, idx+1) for idx in range(3)]) self.analog_fractional = tuple([FractionalAnalogInput(self, idx+1) for idx in range(3)]) def _open(self, rate=None): if rate==None: rate = self.default_baudrate try: self.port = serial.Serial(self.comport_name, baudrate=rate, timeout=0.100, dsrdtr=False, rtscts=False) self.port.flushInput() self.port.write("@00 RS 0" + self.delim) self.port.setRTS(False) response = self.port.readline(None, self.delim) pass except Exception, ex: raise RelayDuinoErr("Can't talk to relay box on port '%s' (%s)" % (self.comport_name, str(ex))) else: pass if response.startswith('#'): pass else: raise RelayDuinoErr("Can't talk to relay box on port '%s' response=(%s)(%d) port=<%s>" % (self.comport_name, response, len(response), str(self.port))) pass def command(self, cmd): if not self.is_open: self._open() send_str = ("@%02d %s" + self.delim) % (self.device_address, cmd) if self.debug_flag: sys.stderr.write(("tx: %s\n") % repr(send_str)) self.port.setRTS(True) self.port.write(send_str) self.port.setRTS(False) response = self.port.readline(None, self.delim) if self.debug_flag: sys.stderr.write("rx: %s\n" % repr(response)) return response[5:-1] def __del__(self): try: self.port.close() except: pass @property def is_open(self): return self.port != None and self.port.isOpen() def ss(self): return self.command("SS") if __name__ == '__main__': import time kta223 = RelayDuino(sys.argv[1], 0) #kta223.autobaud() #if not kta223.is_open: sys.exit() for io_list in (kta223.relay, kta223.input, kta223.analog): for n in range(len(io_list)): print "%s number %d is %s" % (type(io_list[n]), n, io_list[n].get()) kta223.relay[0].set('on') time.sleep(1) kta223.relay[0].set('off')