- websocket_helper.py 这个握手协议实现得非常简单
import sys
try:
import ubinascii as binascii
except:
import binascii
try:
import uhashlib as hashlib
except:
import hashlib
DEBUG = 0
def server_handshake(sock):
clr = sock.makefile("rwb", 0)
l = clr.readline()
webkey = None
while 1:
l = clr.readline()
if not l:
raise OSError("EOF in headers")
if l == b"\r\n":
break
h, v = [x.strip() for x in l.split(b":", 1)]
if DEBUG:
print((h, v))
if h == b'Sec-WebSocket-Key':
webkey = v
if not webkey:
raise OSError("Not a websocket request")
if DEBUG:
print("Sec-WebSocket-Key:", webkey, len(webkey))
respkey = webkey + b"258EAFA5-E914-47DA-95CA-C5AB0DC85B11"
respkey = hashlib.sha1(respkey).digest()
respkey = binascii.b2a_base64(respkey)[:-1]
resp = b"""\
HTTP/1.1 101 Switching Protocols\r
Upgrade: websocket\r
Connection: Upgrade\r
Sec-WebSocket-Accept: %s\r
\r
""" % respkey
if DEBUG:
print(resp)
sock.send(resp)
def client_handshake(sock):
cl = sock.makefile("rwb", 0)
cl.write(b"""\
GET / HTTP/1.1\r
Host: echo.websocket.org\r
Connection: Upgrade\r
Upgrade: websocket\r
Sec-WebSocket-Key: foo\r
\r
""")
l = cl.readline()
while 1:
l = cl.readline()
if l == b"\r\n":
break
mywebsocket.py这个主要是建立socket监听、接收websocket的数据,解码的操作。同时驱动ssd1306。
from machine import I2C,Pin
from ssd1306 import SSD1306_I2C
from neopixel import NeoPixel
import time
sda_pin=Pin(15,Pin.PULL_UP)
scl_pin=Pin(16,Pin.PULL_UP)
i2c = I2C(1,sda=sda_pin, scl=scl_pin, freq=800_000)
print(i2c.scan())
oled = SSD1306_I2C(128, 64, i2c, addr=0x3c)
pin_48 = Pin(48)
np = NeoPixel(pin_48, 1,bpp=3, timing=1)
import socket
import websocket_helper
def display_hellowold():
oled.fill(0)
oled.fill_rect(0, 0, 32, 32, 1)
oled.fill_rect(2, 2, 28, 28, 0)
oled.vline(9, 8, 22, 1)
oled.vline(16, 2, 22, 1)
oled.vline(23, 8, 22, 1)
oled.fill_rect(26, 24, 2, 4, 1)
oled.text('MicroPython', 40, 0, 1)
oled.text('SSD1306', 40, 12, 1)
oled.text('OLED 128x64', 40, 24, 1)
oled.text('Banana Pi ',0,36,1)
oled.text('BPI leaf S3',40,45,1)
oled.text('bbs.elecfans.com',0,54,1)
oled.show()
try:
import network
except:
pass
import sys
import os
try:
import ustruct as struct
except:
import struct
DEBUG = False
class websocket:
def __init__(self, s):
self.s = s
def write(self, data):
l = len(data)
if l < 126:
hdr = struct.pack(">BB", 0x82, l)
else:
hdr = struct.pack(">BBH", 0x82, 126, l)
self.s.send(hdr)
self.s.send(data)
def recvexactly(self, sz):
res = b""
while sz:
data = self.s.recv(sz)
if not data:
break
res += data
sz -= len(data)
return res
def read(self):
while True:
hdr = self.recvexactly(2)
assert len(hdr) == 2
firstbyte, secondbyte = struct.unpack(">BB", hdr)
mskenable = True if secondbyte & 0x80 else False
length = secondbyte & 0x7f
if DEBUG:
print('test length=%d' % length)
print('mskenable=' + str(mskenable))
if length == 126:
hdr = self.recvexactly(2)
assert len(hdr) == 2
(length,) = struct.unpack(">H", hdr)
if length == 127:
hdr = self.recvexactly(8)
assert len(hdr) == 8
(length,) = struct.unpack(">Q", hdr)
if DEBUG:
print('length=%d' % length)
opcode = firstbyte & 0x0f
if opcode == 8:
self.s.close()
return ''
fin = True if firstbyte&0x80 else False
if DEBUG:
print('fin='+str(fin))
print('opcode=%d'%opcode)
if mskenable:
hdr = self.recvexactly(4)
assert len(hdr) == 4
(msk1,msk2,msk3,msk4) = struct.unpack(">BBBB", hdr)
msk = [msk1,msk2,msk3,msk4]
data = []
while length:
skip = self.s.recv(length)
length -= len(skip)
data.extend(skip)
newdata = []
for i,item in enumerate(data):
j = i % 4
newdata.append(chr(data[i] ^ msk[j]))
res = ''.join(newdata)
return res
print('my prog start...')
try:
sta_if = network.WLAN(network.STA_IF)
sta_if.active(True)
sta_if.connect('ssid','pwd')
while True:
if sta_if.ifconfig()[0] != '0.0.0.0':
break
print('succ connect wifi ap,get ipaddr:')
print(sta_if.ifconfig())
except:
pass
sock = socket.socket()
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(('0.0.0.0', 8008))
sock.listen(5)
print('websokcet listen at 8008...')
display_hellowold()
time.sleep(1)
oled.fill(0)
oled.text('client connect...:',0,0,1)
oled.show()
while True:
conn, address = sock.accept()
print('client connect...:')
print(address)
websocket_helper.server_handshake(conn)
ws = websocket(conn)
print('websocket connect succ')
while True:
text = ws.read()
if text =='':
break
print(text)
oled.fill(0)
oled.text('recv message:',0,0,1)
oled.text(text,0,16,1)
if text == 'LED ON':
np[0] = (25,25,25)
np.write()
oled.text('open led',0,24,1)
elif text == 'LED OFF':
np[0] = (0,0,0)
np.write()
oled.text('close led',0,24,1)
oled.show()
接收到的消息,更新到OLED屏上。如果是开灯关灯的命令,则执行开关LED灯的动作,并且在OLED灯上显示。
if text == 'LED ON':
np[0] = (25,25,25) #三色相同即亮白光
np.write() #输出显示
oled.text('open led',0,24,1)
elif text == 'LED OFF':
np[0] = (0,0,0) #三色相同即亮白光
np.write() #输出显示
oled.text('close led',0,24,1)
【调试】打开websocket在线调试工具:
发送命令后,在开发板上显示信息,同时LED灯也会跟随命令后亮与灭。
最后感谢网友的帖子
esp8266开发——使用micropython下的websocket