Showing Household Power Use on a 128×32 Display

oled_stuff

Displaying Household Power Use on an SSD1306 OLED

For christmas I was given a nifty little 128×32 monochrome OLED display. What follows are a few experiments drawing to the display, culminating in a live display of household power usage provided by a Zigbee SE device.

Wiring

For this project we’ll be talking directly to the display using an FT232H Breakout. To communicate with the I2C display the FT232H needs:

  • D0 connected the I2C clock line (blue)
  • Both D1 and D2 together as the data line (yellow)
  • D0, and D1+D2 pulled up to +5V

FT232H wired for I2C

Connecting to the FT232H and Display

For this project I’m using Adafruit’s drivers for both the FTDI and the display. The latter requires a bit of care adjustment – by default it will automatically search for a GPIO pin to use on the display’s RST pin, a process which fails unless running on a Raspberry Pi or Beaglebone. As a workaround we can explicitly set gpio something other None.

In [1]:
from Adafruit_GPIO import FT232H #Multi-purpose board used for I2C
from Adafruit_SSD1306 import SSD1306_128_32 #Library for this display

ft232 = FT232H.FT232H()

#This snippet lists i2c devices, confirming SSD1306's address of 60
i2c_hits = [x for x in range(8, 120) if FT232H.I2CDevice(ft232, x).ping()]
print 'i2c devices found at: {}'.format(i2c_hits)
addr = i2c_hits[0]

try:
    print 'Initializing display with gpio=None'
    disp = SSD1306_128_32(rst=None, i2c=ft232, i2c_address=addr)
except Exception as err:
    if err.message == 'Could not determine platform.':
        print 'Failed: {}'.format(err.message)
        print 'Initializing display with gpio=-1'
        disp = SSD1306_128_32(rst=None, gpio=-1, i2c=ft232, i2c_address=addr)

disp.begin()   #Initialize the display device
disp.clear()   #Zero out the image buffer
disp.display() #Send buffer to device 
i2c devices found at: [60]
Initializing display with gpio=None
Failed: Could not determine platform.
Initializing display with gpio=-1

Making and Displaying Images

Adafruit’s display libary makes good use of PIL/pillow to send 1-bit images to the device

In [2]:
from PIL import Image, ImageDraw, ImageFont #Used to make images for display

im = Image.new('1', (disp.width, disp.height)) #Create a 1-bit image 
draw = ImageDraw.Draw(im) #Create an artist

Test 1: Filling half the screen

To test that the hardware is working we’ll first try to draw a simple rectangle.

Simple Display Test

In [3]:
xy0 = (0,0)
xy1 = (disp.width//2, disp.height)

draw.rectangle([xy0, xy1], fill=1)
disp.image(im)
disp.display()

Test 2: Drawing Text

To display power information we’ll need to display text. To that end we’ll use draw.text(). As an example, we’ll try to make a simple clock. PIL‘s built in font is 11 pixels high, not well suited to our 128×32 display, so we’ll use one of Google’s fonts.

Drawing Text

In [4]:
USE_EXTERNAL_FONT = True

if USE_EXTERNAL_FONT:
    import requests
    import tempfile

    FONT_URL = 'https://github.com/google/fonts/raw/master/apache/robotomono/RobotoMono-Regular.ttf'

    fontfile = tempfile.TemporaryFile()
    fontfile.write(requests.get(FONT_URL).content)

    fontfile.seek(0)
    font = ImageFont.truetype(fontfile, 15)
    fontfile.close()

else:
    font = ImageFont.load_default()
In [5]:
draw.rectangle([(0,0), (disp.width, disp.height)], fill=0)
draw.multiline_text((0,0), 'Hello,\nOLED!', font=font, fill=1, spacing=1)
disp.image(im)
disp.display()

Bonus: Making a Clock

As a final exercise in drawing to the display we’ll make a little clock.

Simple Clock

In [6]:
import time
from math import cos, sin, pi

TIME_TO_RUN = 60  #For this demo, only run 1 minute

CLOCK_XY1 = (disp.width-disp.height, 0)  #upper left corner
CLOCK_XY2 = (disp.width-1, disp.height-1)#bottom right corner
CLOCK_CENTER = (disp.width-(disp.height-1)//2, (disp.height-1)//2)


def draw_radial_line(degrees, length):
    xyxy = list(CLOCK_CENTER + CLOCK_CENTER)
    xyxy[2] += length * sin(degrees*pi/180)
    xyxy[3] -= length * cos(degrees*pi/180)
    draw.line(xyxy, fill=1)

for _ in range(TIME_TO_RUN):
    now = time.localtime()
    clock = time.strftime('%m/%d/%y\n%H:%M:%S', now) #Format date and time

    draw.rectangle([(0,0), (disp.width, disp.height)], fill=0)
    draw.multiline_text((0,0), clock, font=font, fill=1)
    
    #Clock outline
    draw.ellipse([(disp.width-disp.height, 0), (disp.width-1, disp.height-1)], outline=1, width=0)
    
    #Second Hand
    deg = now.tm_sec/60. * 360
    rad = (disp.height-1) * .5 * .9
    draw_radial_line(deg, rad)
    
    #Minute Hand
    deg = deg/60 + now.tm_min*6
    rad = (disp.height-1) * .5 * .75
    draw_radial_line(deg, rad)

    #Hour Hand
    deg = deg/12 + (now.tm_hour%12)*30
    rad = (disp.height-1) * .5 * .5
    draw_radial_line(deg, rad)

    disp.image(im)
    disp.display()

    #Sleep for the remainder of this second
    t0 = time.time()
    time.sleep(1 - t0%1)

Getting Power Data

Now that the display is working it’s time to try pulling data off the smart meter. I’m using the Rainforeast Eagle and evilpete’s RainEagle module.

The code beleow populates a dictionary with constants necessary instantiate an Eagle object. EAGLE_CLOUD_ID and EAGLE_INSTALL_CODE can be found on the device.

In [7]:
from RainEagle import Eagle

EAGLE_CLOUD_ID = '012abc'
EAGLE_INSTALL_CODE = '0123456789abcdef'

eagle_kws = {'username':EAGLE_CLOUD_ID,
             'password':EAGLE_INSTALL_CODE,
             'checkfirmware':False,
             'addr':'eagle-{}.local'.format(EAGLE_CLOUD_ID)}

eagle = Eagle(**eagle_kws)

The device itself has some quirks. Power demand is given in terms of hexadecimal strings along with a multiplier and divisor. Time stamps are given in seconds since 01/01/2000 GMT. Below are convenience functions that will make the data easier to work with.

In [8]:
Y2K = 946684800 # Epoch time of 01-01-2000 GMT

def unhex(d):
    'recursively replace hex strings with ints'
    if isinstance(d, str):
        if d.startswith('0x'):
            return int(d,0)
    elif isinstance(d, list):
        for i, di in enumerate(d):
            d[i] = unhex(di)
    elif isinstance(d, dict):
        for k,v in d.items():
            d[k] = unhex(v)
    return d

def extract_series(l):
    'Extract time series data to time-keyed dict.'
    if not isinstance(l, list):
        if isinstance(l, dict) and len(l)==1:
            return extract_series(l.values()[0])
        else:
            raise ValueError('{} is not a list.')
    
    out = {}
    for li in l:
        this_row = {}
        for k in ['SummationDelivered', 'SummationReceived', 'Demand']:
            if li.has_key(k):
                this_row[k] = 1.0 * li[k] * li['Multiplier'] / li['Divisor']
        if len(this_row) > 0:
            out[li['TimeStamp'] + Y2K] = this_row

    return out

def convert_demand(d):
    'For single-frame data, return time and load'
    if isinstance(d, dict) and len(d)==1:
        return convert_demand(d.values()[0])

    t = d['TimeStamp'] + Y2K
    load = 1.0 * d['Demand'] * d['Multiplier'] / d['Divisor']
    
    return t, load

Text-Only Power Display

The simplest power display will simply fetch current demand data and send it to the display in text form.

Text simulation (I forgot to take a real picture)

In [9]:
TIME_STEP = 2.5 #Time between refreshes, seconds

for _ in range(60):
    #Generate time string
    now = time.localtime()
    clock = time.strftime('%H:%M:%S', now)
    
    #Generate power use string
    demand_data = unhex(eagle.get_instantaneous_demand())
    _, load = convert_demand(demand_data)
    load_s = '{:.3f} kW'.format(load)

    #blank the screen
    draw.rectangle((-1, -1, disp.width+1, disp.height+1), fill=0)
    
    #draw text
    draw.text((0,0), '\n'.join([clock, load_s]), font=font, fill=1)

    disp.image(im)
    disp.display()
    
    t0 = time.time()
    time.sleep(TIME_STEP - (t0%TIME_STEP))

Graphing Yearly Usage

The EAGLE also stores long-term usage data. This code will let us plot daily, weekly, monthly, or yearly power use. (Note: Unlike other data, these are given as decimals.)

Yearly Power Use Shown Above: Yearly power use, with A/C driven peak in the summer.

In [10]:
def line_plot(xdata, ydata, artist, bounds):
    xmin = min(xdata) * 1.0
    xmax = max(xdata) * 1.0
    ymin = min(ydata) * 1.0
    ymax = max(ydata) * 1.0
    
    w_span = bounds[1][0] - bounds[0][0]
    h_span = bounds[1][1] - bounds[0][1]

    xpts = map(lambda x: bounds[0][0] + int((x-xmin)/(xmax-xmin) * w_span), xdata)
    ypts = map(lambda y: bounds[0][1] + int((1-(y-0)/(ymax-0)) * h_span), ydata)
    
    artist.line(zip(xpts, ypts), fill=1)
In [11]:
totals = unhex(eagle.get_summation_values(interval = 'year')) #or 'day', 'week', or 'month'

time_list = [x['TimeStamp'] + Y2K for x in totals['Reading']]
load_list = [float(x['Value']) for x in totals['Reading']]
In [12]:
draw.rectangle((-1, -1, disp.width+1, disp.height+1), outline=0,fill=0)
line_plot(time_list, load_list, draw, [(0,0), (disp.width, disp.height)])
disp.image(im)
disp.display()

Comparing Load to Recent Average

The EAGLE stores ‘summations,’ essentially meter readings. From these it’s possbile to calculate average power use over a period and make a display expressing current use relative to that baseline.

Short Term Average

In [ ]:
#Note - currently inefficient due to a bug in RainEagle
TIME_SPAN = 3600 * 24 * 1# 7-day average

TIME_STEP = 1.0 #Seconds
AVG_UPDATE_TIME = 60.

last_avg_update = 0.0

while True:
    
    if time.time() - last_avg_update > AVG_UPDATE_TIME:
        last_avg_update = time.time()
        
        start_time = int(time.time() - Y2K) - TIME_SPAN
        
        #all_delta = '0x{:x}'.format(int(end_time))
        hist = extract_series(unhex(eagle.get_history_data(starttime=start_time)))
        t_min = min(hist.keys())
        t_max = max(hist.keys())
        time_delta =  t_max - t_min
        summation_delta = hist[t_max]['SummationDelivered'] - hist[t_min]['SummationDelivered']

        avg_use = 3600*(summation_delta/time_delta)
        
    draw.rectangle((-1, -1, disp.width+1, disp.height+1), outline=0,fill=0)

    #Generate time string
    avg_line = 'AVG: {:.3f} kW'.format(avg_use)
    
    #Generate power use string
    demand_data = unhex(eagle.get_instantaneous_demand())
    t, load = convert_demand(demand_data)
    load_line = '    {:+.3f} kW'.format(load-avg_use)

    #draw text
    draw.text((0,0), '\n'.join([avg_line, load_line]), font=font, fill=1)
    

    disp.image(im)
    disp.display()
    
    t0 = time.time()
    time.sleep(TIME_STEP - (t0%TIME_STEP))

Live Display with Rolling Graph

For the final display we’ll combine a text-based display with a rolling line graph of recent measurements. To do so we’ll use a deque to keep a record of demand, one element per To that end we create a deque that with one element for each remaining pixel we have.

Text and Graphical Display (In this picture my tea kettle had just finished boiling)

In [ ]:
from collections import deque

example_text = time.strftime('%H:%M:%S')+'\n'+'{:.3f} kW'.format(1.234)
GRAPH_LEFT = font.getsize_multiline(example_text)[0] + 1

time_list = deque([int(time.time())] * (disp.width-GRAPH_LEFT)) #Create the time deque
load_list = deque([0.0] * (disp.width-GRAPH_LEFT)) #Create the load deque
In [ ]:
TIME_STEP = 1.0 #Seconds
for _ in range(3600):
    #Generate time string
    now = time.localtime()
    clock = time.strftime('%H:%M:%S', now)
    
    #Generate power use string
    demand_data = unhex(eagle.get_instantaneous_demand())
    t, load = convert_demand(demand_data)
    load_s = '{:.3f} kW'.format(load)

    #blank the screen
    draw.rectangle((-1, -1, disp.width+1, disp.height+1), fill=0)
    
    #draw text
    draw.text((0,0), '\n'.join([clock, load_s]), font=font, fill=1)
    
    #if data new, Shift the deque by one, update the max value
    if t > time_list[-1]:
        time_list.popleft()
        time_list.append(t)
        load_list.popleft()
        load_list.append(load)
    line_plot(time_list, load_list, draw, [(GRAPH_LEFT,0),(disp.width,disp.height)])

    disp.image(im)
    disp.display()
    
    t0 = time.time()
    time.sleep(TIME_STEP - (t0%TIME_STEP))

Final Notes

This project was made to run while connected to a standard computer (or Raspberry Pi), but it would be nice to have a standalone display. In my next project I’ll be trying to get similar code running on an ESP8266.

Joseph Albert

2 Comments

Leave a Reply

Your email address will not be published. Required fields are marked *