Displaying Household Power Use on an SSD1306 OLED¶
For christmas I was given a nifty little 128×32 monochrome OLED display. What follows are a few experiments drawing to the display, culminating in a live display of household power usage provided by a Zigbee SE device.
Wiring¶
For this project we’ll be talking directly to the display using an FT232H Breakout. To communicate with the I2C display the FT232H needs:
- D0 connected the I2C clock line (blue)
- Both D1 and D2 together as the data line (yellow)
- D0, and D1+D2 pulled up to +5V
Connecting to the FT232H and Display¶
For this project I’m using Adafruit’s drivers for both the FTDI and the display. The latter requires a bit of care adjustment – by default it will automatically search for a GPIO pin to use on the display’s RST pin, a process which fails unless running on a Raspberry Pi or Beaglebone. As a workaround we can explicitly set gpio
something other None
.
from Adafruit_GPIO import FT232H #Multi-purpose board used for I2C
from Adafruit_SSD1306 import SSD1306_128_32 #Library for this display
ft232 = FT232H.FT232H()
#This snippet lists i2c devices, confirming SSD1306's address of 60
i2c_hits = [x for x in range(8, 120) if FT232H.I2CDevice(ft232, x).ping()]
print 'i2c devices found at: {}'.format(i2c_hits)
addr = i2c_hits[0]
try:
print 'Initializing display with gpio=None'
disp = SSD1306_128_32(rst=None, i2c=ft232, i2c_address=addr)
except Exception as err:
if err.message == 'Could not determine platform.':
print 'Failed: {}'.format(err.message)
print 'Initializing display with gpio=-1'
disp = SSD1306_128_32(rst=None, gpio=-1, i2c=ft232, i2c_address=addr)
disp.begin() #Initialize the display device
disp.clear() #Zero out the image buffer
disp.display() #Send buffer to device
Making and Displaying Images¶
Adafruit’s display libary makes good use of PIL
/pillow
to send 1-bit images to the device
from PIL import Image, ImageDraw, ImageFont #Used to make images for display
im = Image.new('1', (disp.width, disp.height)) #Create a 1-bit image
draw = ImageDraw.Draw(im) #Create an artist
Test 1: Filling half the screen¶
To test that the hardware is working we’ll first try to draw a simple rectangle.
xy0 = (0,0)
xy1 = (disp.width//2, disp.height)
draw.rectangle([xy0, xy1], fill=1)
disp.image(im)
disp.display()
Test 2: Drawing Text¶
To display power information we’ll need to display text. To that end we’ll use draw.text()
. As an example, we’ll try to make a simple clock. PIL
‘s built in font is 11 pixels high, not well suited to our 128×32 display, so we’ll use one of Google’s fonts.
USE_EXTERNAL_FONT = True
if USE_EXTERNAL_FONT:
import requests
import tempfile
FONT_URL = 'https://github.com/google/fonts/raw/master/apache/robotomono/RobotoMono-Regular.ttf'
fontfile = tempfile.TemporaryFile()
fontfile.write(requests.get(FONT_URL).content)
fontfile.seek(0)
font = ImageFont.truetype(fontfile, 15)
fontfile.close()
else:
font = ImageFont.load_default()
draw.rectangle([(0,0), (disp.width, disp.height)], fill=0)
draw.multiline_text((0,0), 'Hello,\nOLED!', font=font, fill=1, spacing=1)
disp.image(im)
disp.display()
Bonus: Making a Clock¶
As a final exercise in drawing to the display we’ll make a little clock.
import time
from math import cos, sin, pi
TIME_TO_RUN = 60 #For this demo, only run 1 minute
CLOCK_XY1 = (disp.width-disp.height, 0) #upper left corner
CLOCK_XY2 = (disp.width-1, disp.height-1)#bottom right corner
CLOCK_CENTER = (disp.width-(disp.height-1)//2, (disp.height-1)//2)
def draw_radial_line(degrees, length):
xyxy = list(CLOCK_CENTER + CLOCK_CENTER)
xyxy[2] += length * sin(degrees*pi/180)
xyxy[3] -= length * cos(degrees*pi/180)
draw.line(xyxy, fill=1)
for _ in range(TIME_TO_RUN):
now = time.localtime()
clock = time.strftime('%m/%d/%y\n%H:%M:%S', now) #Format date and time
draw.rectangle([(0,0), (disp.width, disp.height)], fill=0)
draw.multiline_text((0,0), clock, font=font, fill=1)
#Clock outline
draw.ellipse([(disp.width-disp.height, 0), (disp.width-1, disp.height-1)], outline=1, width=0)
#Second Hand
deg = now.tm_sec/60. * 360
rad = (disp.height-1) * .5 * .9
draw_radial_line(deg, rad)
#Minute Hand
deg = deg/60 + now.tm_min*6
rad = (disp.height-1) * .5 * .75
draw_radial_line(deg, rad)
#Hour Hand
deg = deg/12 + (now.tm_hour%12)*30
rad = (disp.height-1) * .5 * .5
draw_radial_line(deg, rad)
disp.image(im)
disp.display()
#Sleep for the remainder of this second
t0 = time.time()
time.sleep(1 - t0%1)
Getting Power Data¶
Now that the display is working it’s time to try pulling data off the smart meter. I’m using the Rainforeast Eagle and evilpete’s RainEagle module.
The code beleow populates a dictionary with constants necessary instantiate an Eagle
object. EAGLE_CLOUD_ID
and EAGLE_INSTALL_CODE
can be found on the device.
from RainEagle import Eagle
EAGLE_CLOUD_ID = '012abc'
EAGLE_INSTALL_CODE = '0123456789abcdef'
eagle_kws = {'username':EAGLE_CLOUD_ID,
'password':EAGLE_INSTALL_CODE,
'checkfirmware':False,
'addr':'eagle-{}.local'.format(EAGLE_CLOUD_ID)}
eagle = Eagle(**eagle_kws)
The device itself has some quirks. Power demand is given in terms of hexadecimal strings along with a multiplier and divisor. Time stamps are given in seconds since 01/01/2000 GMT. Below are convenience functions that will make the data easier to work with.
Y2K = 946684800 # Epoch time of 01-01-2000 GMT
def unhex(d):
'recursively replace hex strings with ints'
if isinstance(d, str):
if d.startswith('0x'):
return int(d,0)
elif isinstance(d, list):
for i, di in enumerate(d):
d[i] = unhex(di)
elif isinstance(d, dict):
for k,v in d.items():
d[k] = unhex(v)
return d
def extract_series(l):
'Extract time series data to time-keyed dict.'
if not isinstance(l, list):
if isinstance(l, dict) and len(l)==1:
return extract_series(l.values()[0])
else:
raise ValueError('{} is not a list.')
out = {}
for li in l:
this_row = {}
for k in ['SummationDelivered', 'SummationReceived', 'Demand']:
if li.has_key(k):
this_row[k] = 1.0 * li[k] * li['Multiplier'] / li['Divisor']
if len(this_row) > 0:
out[li['TimeStamp'] + Y2K] = this_row
return out
def convert_demand(d):
'For single-frame data, return time and load'
if isinstance(d, dict) and len(d)==1:
return convert_demand(d.values()[0])
t = d['TimeStamp'] + Y2K
load = 1.0 * d['Demand'] * d['Multiplier'] / d['Divisor']
return t, load
Text-Only Power Display¶
The simplest power display will simply fetch current demand data and send it to the display in text form.
(I forgot to take a real picture)
TIME_STEP = 2.5 #Time between refreshes, seconds
for _ in range(60):
#Generate time string
now = time.localtime()
clock = time.strftime('%H:%M:%S', now)
#Generate power use string
demand_data = unhex(eagle.get_instantaneous_demand())
_, load = convert_demand(demand_data)
load_s = '{:.3f} kW'.format(load)
#blank the screen
draw.rectangle((-1, -1, disp.width+1, disp.height+1), fill=0)
#draw text
draw.text((0,0), '\n'.join([clock, load_s]), font=font, fill=1)
disp.image(im)
disp.display()
t0 = time.time()
time.sleep(TIME_STEP - (t0%TIME_STEP))
Graphing Yearly Usage¶
The EAGLE also stores long-term usage data. This code will let us plot daily, weekly, monthly, or yearly power use. (Note: Unlike other data, these are given as decimals.)
Shown Above: Yearly power use, with A/C driven peak in the summer.
def line_plot(xdata, ydata, artist, bounds):
xmin = min(xdata) * 1.0
xmax = max(xdata) * 1.0
ymin = min(ydata) * 1.0
ymax = max(ydata) * 1.0
w_span = bounds[1][0] - bounds[0][0]
h_span = bounds[1][1] - bounds[0][1]
xpts = map(lambda x: bounds[0][0] + int((x-xmin)/(xmax-xmin) * w_span), xdata)
ypts = map(lambda y: bounds[0][1] + int((1-(y-0)/(ymax-0)) * h_span), ydata)
artist.line(zip(xpts, ypts), fill=1)
totals = unhex(eagle.get_summation_values(interval = 'year')) #or 'day', 'week', or 'month'
time_list = [x['TimeStamp'] + Y2K for x in totals['Reading']]
load_list = [float(x['Value']) for x in totals['Reading']]
draw.rectangle((-1, -1, disp.width+1, disp.height+1), outline=0,fill=0)
line_plot(time_list, load_list, draw, [(0,0), (disp.width, disp.height)])
disp.image(im)
disp.display()
Comparing Load to Recent Average¶
The EAGLE stores ‘summations,’ essentially meter readings. From these it’s possbile to calculate average power use over a period and make a display expressing current use relative to that baseline.
#Note - currently inefficient due to a bug in RainEagle
TIME_SPAN = 3600 * 24 * 1# 7-day average
TIME_STEP = 1.0 #Seconds
AVG_UPDATE_TIME = 60.
last_avg_update = 0.0
while True:
if time.time() - last_avg_update > AVG_UPDATE_TIME:
last_avg_update = time.time()
start_time = int(time.time() - Y2K) - TIME_SPAN
#all_delta = '0x{:x}'.format(int(end_time))
hist = extract_series(unhex(eagle.get_history_data(starttime=start_time)))
t_min = min(hist.keys())
t_max = max(hist.keys())
time_delta = t_max - t_min
summation_delta = hist[t_max]['SummationDelivered'] - hist[t_min]['SummationDelivered']
avg_use = 3600*(summation_delta/time_delta)
draw.rectangle((-1, -1, disp.width+1, disp.height+1), outline=0,fill=0)
#Generate time string
avg_line = 'AVG: {:.3f} kW'.format(avg_use)
#Generate power use string
demand_data = unhex(eagle.get_instantaneous_demand())
t, load = convert_demand(demand_data)
load_line = ' {:+.3f} kW'.format(load-avg_use)
#draw text
draw.text((0,0), '\n'.join([avg_line, load_line]), font=font, fill=1)
disp.image(im)
disp.display()
t0 = time.time()
time.sleep(TIME_STEP - (t0%TIME_STEP))
Live Display with Rolling Graph¶
For the final display we’ll combine a text-based display with a rolling line graph of recent measurements. To do so we’ll use a deque
to keep a record of demand, one element per To that end we create a deque
that with one element for each remaining pixel we have.
(In this picture my tea kettle had just finished boiling)
from collections import deque
example_text = time.strftime('%H:%M:%S')+'\n'+'{:.3f} kW'.format(1.234)
GRAPH_LEFT = font.getsize_multiline(example_text)[0] + 1
time_list = deque([int(time.time())] * (disp.width-GRAPH_LEFT)) #Create the time deque
load_list = deque([0.0] * (disp.width-GRAPH_LEFT)) #Create the load deque
TIME_STEP = 1.0 #Seconds
for _ in range(3600):
#Generate time string
now = time.localtime()
clock = time.strftime('%H:%M:%S', now)
#Generate power use string
demand_data = unhex(eagle.get_instantaneous_demand())
t, load = convert_demand(demand_data)
load_s = '{:.3f} kW'.format(load)
#blank the screen
draw.rectangle((-1, -1, disp.width+1, disp.height+1), fill=0)
#draw text
draw.text((0,0), '\n'.join([clock, load_s]), font=font, fill=1)
#if data new, Shift the deque by one, update the max value
if t > time_list[-1]:
time_list.popleft()
time_list.append(t)
load_list.popleft()
load_list.append(load)
line_plot(time_list, load_list, draw, [(GRAPH_LEFT,0),(disp.width,disp.height)])
disp.image(im)
disp.display()
t0 = time.time()
time.sleep(TIME_STEP - (t0%TIME_STEP))
Final Notes¶
This project was made to run while connected to a standard computer (or Raspberry Pi), but it would be nice to have a standalone display. In my next project I’ll be trying to get similar code running on an ESP8266.
2 Comments