| | 接上文| 【Banana PI Leaf S3开发板试用体验】Demo例程加载 |
------------------------------------------------------------------------------------------------- |
根据大佬们的支持和群内容: |
想用Banana PI Leaf S3开发板在此前boot.py(配置wifi-sta和GPIO48上的彩灯NeoPixel和SSD1306_I2C)基础上,资源 rtc,(Pin13)输出,板(pin_48板载的一颗 NeoPixel LED)LED,板button,通过mqtt网络光耦中间继电器控制加热设备。
准备rtc时钟(可用ntptime或时钟网站校对时间),NeoPixel彩灯,lcd,光耦中间继电器。
参考| 【Banana PI Leaf S3开发板试用体验】+OLED屏显示驱动 |
| | 【Banana PI Leaf S3开发板试用体验】Demo例程加载 |
| | 【Banana PI Leaf S3开发板试用体验】之开箱.点灯快速上手 |
| | 【Banana PI Leaf S3开发板试用体验】BananaPi Leaf-S3开发板评测------乐鑫官方v4.4.2版本cSDK |
| | 【Banana PI Leaf S3开发板试用体验】8X8点阵模块显示测试 | 李唐 |
| | 【Banana PI Leaf S3开发板试用体验】DAYU200——TCP控制三色灯 |
| | 【Banana PI Leaf S3开发板试用体验】BananaPi Leaf-S3开发板评测------MQTT消息收发 ! |
| | 【Banana PI Leaf S3开发板试用体验】开箱+环境搭建初步测试 ! |
| | 【Banana PI Leaf S3开发板试用体验】websocket远程控制LED |
| | 【Banana PI Leaf S3开发板试用体验】Banana PI Leaf S3 开发板远程交互开发方法 ! |
| | 【Banana PI Leaf S3开发板试用体验】温湿度计 |
| | 【Banana PI Leaf S3开发板试用体验】使用SSD1306 OLED显示屏 |
| | 【Banana PI Leaf S3开发板试用体验】基于MicroPython的OLED显示 |
| | 【Banana PI Leaf S3开发板试用体验】WS2812灯环灯带控制 |
| | 【Banana PI Leaf S3开发板试用体验】基于Banana PI Leaf S3的天气预报系统 |
| | 【Banana PI Leaf S3开发板试用体验】开发环境构建 |
| | 【Banana PI Leaf S3开发板试用体验】IDF 建立hello world! |
代码:
import _thread
from machine import I2C,Pin,RTC
from ssd1306 import SSD1306_I2C
from neopixel import NeoPixel
import urequests
import ujson
import utime
from simple import MQTTClient
rtc_originally = 946656000
url ="http:
publish_topic='home/garden/fountain'
gLock = _thread.allocate_lock()
qw1='e'
b_OUT = Pin(13, Pin.OUT)
rtc = RTC()
print(rtc.datetime())
print("utime.time()",utime.time())
update_timertc = 0# utime.ticks_ms() - web_query_delay
pin_48 =Pin(48) #BPI-Leaf-S3 板载的一颗 NeoPixel LED 在GPIO48 上
np =NeoPixel(pin_48, 1,bpp=3, timing=1)# 初始化NeoPixel
print("_thread.get_ident()", _thread.get_ident())
print("utime.time()",utime.time())
def msg_callback(topic, msg):
print("接收到数据", (topic, msg))#print(f'Topic: "{topic.decode()}" Message: "{msg.decode()}" Retained: {retained}')#
# mymessage = '{"AAAA": %d }' % (time())
# gLock.acquire()
# client.publish(topic=publish_topic, msg=mymessage, retain=True, qos=1)
# client.publish(topic=publish_topic, msg='led '+ msg.decode(), retain=True, qos=1)
# gLock.release()
# print("publish message", mymessage)
if msg==b'on':
np[0] = (20,20,0) #三色相同即亮白光
b_OUT(1)
np.write() #输出显示
print("l-on")
elif msg==b'off':
np[0] = (0,0,20) #三色相同即亮白光
b_OUT(0)
np.write() #输出显示
print("l-off")
else:
print("l-?")
pass
client = MQTTClient("l11","gwcat",port=1883,user="admin",password="YWOA")
print("client.connect(clean_session=False)", client.connect(clean_session=False))
client.set_callback(msg_callback)
client.subscribe("led")
def threadMQTT():
# main loop
mq=rtc.datetime()[5]
print("in_new_thread",_thread.get_ident(),utime.sleep(2))
while True:
utime.sleep(2)
if mq!=rtc.datetime()[5]:
print("mq",mq,rtc.datetime()[5])
mq=rtc.datetime()[5]
gLock.acquire()
client.publish(topic='Temp', msg=qw1, retain=True, qos=1)
print("publish Temp message", qw1)
gLock.release()
oled.fill_rect(1, 56, 128, 64, 0)
oled.text("Date:"+str(rtc.datetime()[2]), 0, 56)
oled.text(str(rtc.datetime()[4])+':'+str(rtc.datetime()[5]), 60, 56)
oled.show()
print("oled.text Date:",rtc.datetime())
print("_new_thread",_thread.start_new_thread(threadMQTT,()))
utime.sleep(10)
print("main .time()",utime.time())
# main loop
while True:
if utime.time()-update_timertc >7200 or utime.time()<1150: # if utime.ticks_ms() - update_time >= web_query_delay:
# HTTP GET data
update_timertc = utime.time()
np[0] = (20,0,20) #三色相同即亮白光
np.write() #输出显示
print("unixtime.time()",utime.time())
gLock.acquire()
response = urequests.get(url)
gLock.release()
print("unixtime.time end",utime.time())
if response.status_code == 200: # query success
print(response.text)
# parse JSON
parsed =ujson.loads(response.text)
# you can also use parsed =response.json()
datetime_str =str(parsed["datetime"])
unixtime_str =str(parsed["unixtime"])# "unixtime":1666736507
year =int(datetime_str[0:4])
month =int(datetime_str[5:7])
day =int(datetime_str[8:10])
hour =int(datetime_str[11:13])
minute =int(datetime_str[14:16])
second =int(datetime_str[17:19])
subsecond =int(round(int(datetime_str[20:26]) / 10000))
# update internal RTC
if abs(utime.time() + rtc_originally -int(unixtime_str) ) > 5:
a=rtc.datetime()
rtc.datetime((year, month, day, 0, hour, minute, second, subsecond))
update_timertc = utime.time()
print(a,"RTC updating\n",rtc.datetime())
print(utime.time(),unixtime_str,utime.time() + rtc_originally,"RTC updated\n")
else:print(utime.time(),unixtime_str,utime.time() + rtc_originally,"NO RTC updated\n")
else: # query failed, retry retry_delay ms later
update_timertc = 0# update_time =utime.ticks_ms() - web_query_delay ; # retry_delay
# # generate formated date/time strings from internal RTC
# date_str ="{:02}/{:02}/{:4}".format(rtc.datetime()[1], rtc.datetime()[2], rtc.datetime()[0])
# time_str ="{:02}:{:02}:{:02}".format(rtc.datetime()[4], rtc.datetime()[5], rtc.datetime()[6])
#
# # update SSD1306 OLED display
# oled.fill(0)
# oled.text("ESP8266 Clock", 0, 5)
oled.fill_rect(0, 10, 128, 64, 0) # draw a solid rectangle 10,10 to 107,43, colour=1
oled.text("Date:"+datetime_str[8:10], 0, 56)
oled.text(datetime_str[11:13]+':'+datetime_str[14:16], 60, 56)
oled.show()
np[0] = (00,20,20) #三色相同即亮白光
np.write() #输出显示
print(rtc.datetime())
print("tianqi.time()",utime.time())
gLock.acquire()
response = urequests.get('http:
gLock.release()
print("tianqi.time end",utime.time())
parsed=ujson.loads(response.text)
temp=parsed["data"]
print(parsed)
qw1=temp.get('qw')
showLine1 = 'Temperature:'+qw1+ ' C'
oled.text(showLine1, 0, 15)
tq=temp.get('numtq')
if tq == '00': wt = 'Sun'
elif tq == '01': wt = 'Cloud'
elif tq == '02': wt = 'Overcast'
elif tq == '03': wt = 'Shower'
elif tq == '14': wt = 'Light snow'
elif tq == '07': wt = 'Light rain'
elif tq == '13': wt = 'Snow flurry'
elif tq == '18':
wt = 'Fog'
elif tq == '100':
wt = 'Windy'
else:
wt='UNKNOW'
showLine2 = 'Weather:'+wt
oled.text(showLine2, 0, 30)
showLine3 = 'Date:'+temp.get('lastUpdate')
oled.text(showLine3, 0, 40)
oled.show()
showLine4 = 'humidity:'+temp.get('sd')+ ' %'
print(showLine1 )
print(showLine2 )
print(showLine3 )
print(showLine4 )
print(qw1)
tq=int(temp.get('numfl'))
if tq : print("fengli",tq+2,'~',tq+3)
else:print("fengli",0,'~',2)
print("fengxiang",temp.get('numfx'))
np[0] = (00,00,00) #三色相同即亮白光
np.write() #输出显示
utime.sleep(1)
print("client.wait_msg()", client.wait_msg(),rtc.datetime())
# try:
# print("client.wait_msg()", client.wait_msg(),rtc.datetime())
# except ( OSError): # Mandatory timeout error trapping
# print("client.connect(clean_session=False)", client.connect(clean_session=False))
# print("client.check_msg()", client.check_msg(),rtc.datetime())
# print("utime.sleep(2900)",rtc.datetime())# 7200=2905s
# update_timertc = update_timertc + 1
# if update_timertc == 4 :update_timertc = 0
com信息
服务代:
手机控制:
板正常显示和控制达到预期状态。