` 本帖最后由 dvd1478 于 2016-1-16 14:10 编辑
原来的项目是这样的
【NanoPi2申请】安卓手机APP控制灯光,查看房间温湿度
https://bbs.elecfans.com/jishu_530944_1_1.html
(出处: 中国电子技术论坛)
项目变通完成,请申核 @xjallen
由于最近的微信公众号很热门,我也近一下潮流,用Nanopi2 + Python 打造微信公众号的开发
硬件
光敏电阻——自动控制灯
热释红外电偶——监控人体+ USB 实时抓拍,并上传到关注的微信
DHT11 ——温湿度监查
很多东西的都原自网络,也有自己的原创,决定把源代码上传,请大家拍砖,不说那么多,先上图
1)关注,以前按键
2)人体监控——智能防盗——自动抓拍并上传
3)灯的远程控制
4)灯的自动控制
源代码:
index.py
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- import os
- import web
- from weixinInterface import WeixinInterface
- urls = (
- '/weixin','WeixinInterface'
- )
- #/weixin 采用WeixinInterface类
- if __name__ == "__main__":
- application = web.application(urls, globals())
- application.run()
复制代码
weixinInterface.py
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- import os
- import web
- import time
- import threading
- import urllib
- import urllib2
- import hashlib
- import json
- import pylibmc
- import random
- import lxml
- from lxml import etree
- from youdao import Youdao
- from xiaohuangji import Simsim_talk
- from weixin import WeiXinClient
- from weixin import APIError
- from weixin import AccessTokenError
- import NanoPi2
- #http://mp.weixin.qq.com/debug/cgi-bin/sandbox?t=sandbox/login
- my_token="qq1696933323_dvd1478@21cn.com" #这里改写你在微信公众平台里输入的token
- my_appid = 'afdaf' #填写你的appid
- my_secret = 'afdafda' #填写你的app secret
- global flagled #LED全局标记
- flagled = 0
- global flagab #人体感应全局标记
- flagab = 0
- global flagal #光线感应全局标记
- flagal = 0
- bodySensorPin = NanoPi2.PIN3 #人体感应
- ledPin = NanoPi2.PIN1 #led控制端
- lightSensorPin = NanoPi2.PIN2 #光敏控制端
- #硬件外设的初始化
- def _hardware_init():
- if( NanoPi2.exportGPIOPin(ledPin) == -1 ) :
- print "exportGPIOPin(%d) failed" % ledPin
- if( NanoPi2.setGPIODirection(ledPin, NanoPi2.OUT) == -1):
- print "setGPIODirection(%d) failed" %ledPin
- if (flagled==0):
- NanoPi2.setGPIOValue(ledPin, NanoPi2.HIGH) #关灯
- else:
- NanoPi2.setGPIOValue(ledPin, NanoPi2.LOW) #开灯
- if( NanoPi2.exportGPIOPin(lightSensorPin) == -1 ) :
- print "exportGPIOPin(%d) failed" % lightSensorPin
- if( NanoPi2.setGPIODirection(lightSensorPin, NanoPi2.IN) == -1):
- print "setGPIODirection(%d) failed" %lightSensorPin
- if( NanoPi2.exportGPIOPin(bodySensorPin) == -1 ) :
- print "exportGPIOPin(%d) failed" % bodySensorPin
- if( NanoPi2.setGPIODirection(bodySensorPin, NanoPi2.IN) == -1):
- print "setGPIODirection(%d) failed" %bodySensorPin
- #微信的token认证
- def _check_hash(data):
- #sha1加密算法
- signature=data.signature
- timestamp=data.timestamp
- nonce=data.nonce
- #自己的token
- #my_token="qq1696933323_dvd1478@21cn.com" #这里改写你在微信公众平台里输入的token
- #字典序排序
- list=[my_token,timestamp,nonce]
- list.sort()
- sha1=hashlib.sha1()
- map(sha1.update,list)
- hashcode=sha1.hexdigest()
- #sha1加密算法
- #如果是来自微信的请求,则回复True
- if hashcode == signature:
- return True
- return False
- #微信的事件——按键处理
- #微信的事件——按键处理 实况截图
- def _take_snapshot(addr, port, client):
- url = 'http://%s:%d/?action=snapshot' %(addr, port)
- req = urllib2.Request(url)
- resp = urllib2.urlopen(req, timeout = 2)
- #print str(resp)
- #resp=open('/home/pi/tmp/wechat/RaspberryWechatPi/01.jpg','rb')
- return client.media.upload.file(type='image', jpg=resp)
-
- def _do_click_SNAPSHOT(server, fromUser, toUser, xml):
- print ' KeyName 实况截图'
- data = None
- err_msg = 'snapshot fail: '
- try:
- data = _take_snapshot('127.0.0.1', 8001, server.client)
- except Exception, e:
- err_msg += str(e)
- print '_do_click_SNAPSHOT', err_msg
- return server._reply_text(fromUser, toUser, err_msg)
- return server._reply_image(fromUser, toUser, data.media_id)
- #return server._reply_text(fromUser, toUser, u"摄像头抓拍内容")
- #微信的事件——按键处理 安全预警功能
- def _do_click_V1001_AUTOSAFE(server, fromUser, toUser, xml):
- print ' KeyName 安全预警功能'
- #return server._reply_text(fromUser, toUser, u"人体感")
- return _do_change_ALARM(server, fromUser, toUser, xml)
- #微信的事件——按键处理 LED控制
- def _do_click_V1001_LED_ON(server, fromUser, toUser, xml):
- print ' KeyName 电灯开'
- NanoPi2.setGPIOValue(ledPin, NanoPi2.LOW)
- flagled = 1
- return server._reply_text(fromUser, toUser, u"电灯开")
- def _do_click_V1001_LED_OFF(server, fromUser, toUser, xml):
- print ' KeyName 电灯关'
- NanoPi2.setGPIOValue(ledPin, NanoPi2.HIGH)
- flagled = 0
- return server._reply_text(fromUser, toUser, u"电灯关")
- #微信的事件——按键处理 自动光控功能
- def _do_click_V1001_AUTOLED(server, fromUser, toUser, xml):
- print ' KeyName 自动光控功能'
- #server._reply_text(fromUser, toUser, u"")
- #return server._reply_text(fromUser, toUser, u"光线感应——自动光控")
- return _do_change_LIGHT(server, fromUser, toUser, xml)
- def _do_click_V1001_HELP(server, fromUser, toUser, xml):
- print ' KeyName 帮助&简介'
- return server._reply_text(fromUser, toUser, u'''此微信公众平台是本人业余爱好所建立,可以随时随地的以微信端为控制器,与终端进行交互。具体功能请点击菜单选项 ''')
- _weixin_click_table = {
- 'V1001_SNAPSHOT' : _do_click_SNAPSHOT,
- 'V1001_LED_ON' : _do_click_V1001_LED_ON,
- 'V1001_LED_OFF' : _do_click_V1001_LED_OFF,
- 'V1001_HELP' : _do_click_V1001_HELP,
- 'V1001_AUTOSAFE' : _do_click_V1001_AUTOSAFE,
- 'V1001_AUTOLED' : _do_click_V1001_AUTOLED
- }
- #微信的事件——按键处理 安全预警功能
- def _do_change_ALARM(server, fromUser, toUser, xml):
- global flagab #人体感应标记
- if flagab == 0:
- try:
- ab = threading.Thread(target=_auto_control_body)
- ab.setDaemon(True)
- ab.start()
- flagab = 1
- return server._reply_text(fromUser, toUser, u"检测报警开")
- except:
- print "Error: unable to start thread"
- flagab = 0
- return server._reply_text(fromUser, toUser, u"检测报警没有成功开启")
- else:
- flagab = 0
- return server._reply_text(fromUser, toUser, u"检测报警关")
- def _auto_control_body():
- wc = WeiXinClient(my_appid, my_secret, fc=True, path='/tmp')
- wc.request_access_token()
- rjson = wc.user.get._get(next_openid=None)
- count = rjson.count
- id_list = rjson.data.openid
- while count < rjson.total:
- rjson = wc.user.get._get(next_openid=rjson.next_openid)
- count += rjson.count
- id_list.extend(rjson.openid)
- # 最后看看都有哪些用户
- #print id_list
- while flagab == 1:
- inputValue = NanoPi2.getGPIOValue(bodySensorPin)
- print "alarm is working %d" %inputValue
- if(inputValue!=0):
- #发送报警文字
- for uid in id_list:
- content = '{"touser":"%s", "msgtype":"text", "text":{ "content":"Waring! Somebody in your Room"}}' %uid
- #print 可以看有没有发送成功, 可以捕获api错误异常
- try:
- print wc.message.custom.send.post(body=content)
- except APIError, e:
- print e, uid
- url = 'http://127.0.0.1:8001/?action=snapshot'
- req = urllib2.Request(url)
- resp = urllib2.urlopen(req, timeout = 2)
- rjson = wc.media.upload.file(type='image', jpg=resp)
- #print rjson
- # 把上传的图片发出去
- for uid in id_list:
- content = '{"touser":"%s", "msgtype":"image", '
- '"image":{ "media_id":"%s"}}' % (uid, rjson.media_id)
- try:
- print wc.message.custom.send.post(body=content)
- except APIError, e:
- print e, uid
- time.sleep(5.0)
- #微信的事件——按键处理 自动光控功能
- def _do_change_LIGHT(server, fromUser, toUser, xml):
- global flagal #光线感应标记
- if flagal == 0:
- try:
- al = threading.Thread(target=_auto_control_light)
- al.setDaemon(True)
- al.start()
- flagal = 1
- return server._reply_text(fromUser, toUser, u"自动光控开")
- except:
- print "Error: unable to start thread"
- flagal = 0
- return server._reply_text(fromUser, toUser, u"自动光控没有成功开启")
- else:
- flagal = 0
- return server._reply_text(fromUser, toUser, u"自动光控关")
- def _auto_control_light():
- wc = WeiXinClient(my_appid, my_secret, fc=True, path='/tmp')
- wc.request_access_token()
- rjson = wc.user.get._get(next_openid=None)
- count = rjson.count
- id_list = rjson.data.openid
- while count < rjson.total:
- rjson = wc.user.get._get(next_openid=rjson.next_openid)
- count += rjson.count
- id_list.extend(rjson.openid)
- # 最后看看都有哪些用户
- #print id_list
- inputValue = NanoPi2.getGPIOValue(lightSensorPin)
- inputValueLast = inputValue+1
- while flagal == 1:
- print "auto light is working"
- #inputValue = 0
- if(inputValueLast != inputValue):
- if(inputValue==0):
- print ' KeyName 电灯关'
- NanoPi2.setGPIOValue(ledPin, NanoPi2.HIGH)
- flagled = 0
- for uid in id_list:
- content = '{"touser":"%s", "msgtype":"text", "text":{ "content":"The light already close"}}' %uid
- try:
- print wc.message.custom.send.post(body=content)
- except APIError, e:
- print e, uid
- else:
- print ' KeyName 电灯开'
- NanoPi2.setGPIOValue(ledPin, NanoPi2.LOW)
- flagled = 1
- for uid in id_list:
- content = '{"touser":"%s", "msgtype":"text", "text":{ "content":"The light already open"}}' %uid
- try:
- print wc.message.custom.send.post(body=content)
- except APIError, e:
- print e, uid
- inputValueLast = inputValue
- time.sleep(5.0)
- inputValue = NanoPi2.getGPIOValue(lightSensorPin)
- #微信的事件处理
- def _do_event_subscribe(server, fromUser, toUser, xml):
- return server._reply_text(fromUser, toUser, u'''欢迎关注此微信号,这个微信是本人业余爱好所建立,具体功能请点击下方菜单''')
- def _do_event_unsubscribe(server, fromUser, toUser, xml):
- return server._reply_text(fromUser, toUser,u'''我现在功能还很简单,知道满足不了您的需求,但是我会慢慢改进,欢迎您以后再来''')
- def _do_event_SCAN(server, fromUser, toUser, xml):
- pass
- def _do_event_LOCATION(server, fromUser, toUser, xml):
- pass
- def _do_event_CLICK(server, fromUser, toUser, xml):
- #按键的事件处理
- key = xml.find('EventKey').text
- print 'EventKey: '+key
- try:
- return _weixin_click_table[key](server, fromUser, toUser, xml)
- except KeyError, e:
- print '_do_event_CLICK: %s' %e
- return server._reply_text(fromUser, toUser, u'Unknow click: '+key)
- _weixin_event_table = {
- 'subscribe' : _do_event_subscribe,
- 'un***scribe' : _do_event_unsubscribe,
- 'SCAN' : _do_event_SCAN,
- 'LOCATION' : _do_event_LOCATION,
- 'CLICK' : _do_event_CLICK,
- }
-
- class WeixinInterface:
- def __init__(self):
- #web.py嵌入模块的路径为当前目录的templates文件夹
- self.app_root = os.path.dirname(__file__)
- self.templates_root = os.path.join(self.app_root, 'templates')
- self.render = web.template.render(self.templates_root)
- self.client = WeiXinClient(my_appid, my_secret, fc=True, path='/tmp')
- self.client.request_access_token()
- _hardware_init()
-
- def _recv_event(self, fromUser, toUser, xml):
- #获取事件
- event = xml.find('Event').text
- print 'Event'+event
- try:
- return _weixin_event_table[event](self, fromUser, toUser, xml)
- except KeyError, e:
- print '_recv_event: %s' %e
- return server._reply_text(fromUser, toUser, u'Unknow event: '+event)
- def _reply_text(self, toUser, fromUser, msg):
- return self.render.reply_text(toUser, fromUser, int(time.time()),msg)
- def _reply_image(self, toUser, fromUser, media_id):
- return self.render.reply_image(toUser, fromUser, int(time.time()), media_id)
- def _reply_news(self, toUser, fromUser, title, descrip, picUrl, hqUrl):
- return self.render.reply_news(toUser, fromUser, int(time.time()), title, descrip, picUrl, hqUrl)
-
- def GET(self):
- #获取输入参数
- data = web.input()
- #如果是来自微信的请求,则回复echostr
- if _check_hash(data):
- return data.echostr
-
- def POST(self):
- str_xml = web.data() #获得post来的数据
- xml = etree.fromstring(str_xml)#进行XML解析
- msgType=xml.find("MsgType").text
- fromUser=xml.find("FromUserName").text
- toUser=xml.find("ToUserName").text
-
- if msgType == "event":
- return self._recv_event(fromUser, toUser, xml)
-
- elif msgType == 'text':
- content=xml.find("Content").text#获得用户所输入的内容
-
- #对内容进行解释
- if content.lower() == 'help':
- replayText = u'1.输入中文或者英文返回对应的英中翻译
- 2.输入m随机听一首音乐
- 3.输入xhj进入调戏小黄鸡模式(bye退出)'
- return self.render.reply_text(fromUser,toUser,int(time.time()),replayText)
- elif content.lower() == 'xhj':
- mc.set(fromUser+'_xhj','xhj')
- return self.render.reply_text(fromUser,toUser,int(time.time()),u'您已经进入与小黄鸡的交谈中,请尽情的蹂躏它吧!输入bye跳出与小黄鸡的交谈')
- elif content.lower() == 'bye':
- mc.delete(fromUser+'_xhj')
- return self.render.reply_text(fromUser,toUser,int(time.time()),u'您已经跳出了和小黄鸡的交谈中,输入help来显示操作指令')
- else: #其他content处理
- #读取memcache中的缓存数据
- #mcxhj = mc.get(fromUser+'_xhj')
- #if mcxhj =='xhj':
- # server._reply_text(fromUser, toUser, u"")
- # simsim_talk = self.simsim_talk.get_response(content)
- # return self.render.reply_text(fromUser,toUser,int(time.time()),simsim_talk)
- #else:
- print "translation"
- #self._reply_text(fromUser, toUser, u"")
- return self.render.reply_null()
- trans = self.youdao.get_translation(content)
- return self.render.reply_text(fromUser,toUser,int(time.time()),trans)
-
- else : # if msgType else
- #以下的不可能执行
- return self.render.reply_text(fromUser,toUser,int(time.time()),u"Unknow msg:"+msgType)
-
- if __name__ == "__main__":
- urls = (
- '/weixin','WeixinInterface'
- )
- #/weixin 采用WeixinInterface类
- application = web.application(urls, globals())
- application.run()
复制代码
USB 摄像头要手动打开(请自己调试好再编写 start.sh)我的如下
- #!/bin/sh
- STREAMER=mjpg_streamer
- DEVICE=/dev/video9
- RESOLUTION=640x480
- FRAMERATE=10
- HTTP_PORT=8001
- PLUGINPATH=/workspace/sda/mjpg-streamer
- echo “正在加载摄像头驱动”
- $PLUGINPATH/$STREAMER -i "$PLUGINPATH/input_uvc.so -d $DEVICE -r $RESOLUTION -f $FRAMERATE -y YUYV -n" -o "$PLUGINPATH/output_http.so -n -p $HTTP_PORT " &
- echo “正在启动主程序”
- sudo python index.py 80
复制代码
weixin.py 参考网友的微信SDK
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- import time
- import json
- import urllib
- import urllib2
- '''
- Python client SDK for Micro Message Public Platform API.
- '''
- try:
- import memcache
- except Exception, e:
- print '\033[95mWrining: %s. use local FileCache.\033[0m' %e
- (_HTTP_GET, _HTTP_POST, _HTTP_FILE) = range(3)
- _CONTENT_TYPE_MEDIA = ('image/jpeg', 'audio/amr', 'video/mpeg4')
- _MEIDA_TYPE = ('mpeg4', 'jpeg', 'jpg', 'gif', 'png', 'bmp', 'mp3', 'wav', 'wma', 'amr')
- _CONTENT_TYPE_JSON= (
- 'application/json; encoding=utf-8',
- 'application/json',
- 'text/plain',
- )
- _API_URLS = (
- 'https://api.weixin.qq.com/',
- 'https://api.weixin.qq.com/cgi-bin/',
- )
- _OTHER_FEATURES = (
- 'semantic', #微信智能接口
- 'shackearound', #微信摇一摇周边
- 'sns', #网页授权相关
- 'datacube', #数据统计
- 'merchant', #微信小店
- )
- class APIError(StandardError):
- '''
- raise APIError if reciving json message indicating failure.
- '''
- def __init__(self, error_code, error_msg):
- self.error_code = error_code
- self.error_msg = error_msg
- StandardError.__init__(self, error_msg)
- def __str__(self):
- return '%d:%s' %(self.error_code, self.error_msg)
- class AccessTokenError(APIError):
- '''
- raise AccessTokenError if reciving json message indicating failure.
- '''
- def __init__(self, error_code, error_msg):
- APIError.__init__(self, error_code, error_msg)
- def __str__(self):
- return APIError.__str__(self)
- class JsonDict(dict):
- '''
- general json object that allows attributes to bound to and also behaves like a dict
- '''
- def __getattr__(self, attr):
- try:
- return self[attr]
- except KeyError:
- raise AttributeError(r"'JsonDict' object has no attribute '%s'" %(attr))
- def __setattr__(self, attr, value):
- self[attr] = value
- def _parse_json(s):
- ''' parse str into JsonDict '''
- def _obj_hook(pairs):
- o = JsonDict()
- for k, v in pairs.iteritems():
- o[str(k)] = v
- return o
- return json.loads(s, object_hook = _obj_hook)
- def _encode_params(**kw):
- '''
- do url-encode parmeters
- >>> _encode_params(a=1, b='R&D')
- 'a=1&b=R%26D'
- '''
- args = []
- body = None
- path = None
- for k, v in kw.iteritems():
- if k == 'body':
- body = v
- continue
- if k in _MEIDA_TYPE:
- continue
- if isinstance(v, basestring):
- qv = v.encode('utf-8') if isinstance(v, unicode) else v
- args.append('%s=%s' %(k, urllib.quote(qv)))
- else:
- if v is None:
- args.append('%s=' %(k))
- else:
- qv = str(v)
- args.append('%s=%s' %(k, urllib.quote(qv)))
- return ('&'.join(args), body)
- def _encode_multipart(**kw):
- '''
- build a multipart/form-data body with randomly generated boundary
- '''
- data = []
- boundary = '----------%s' % hex(int(time.time()) * 1000)
- media_key = [key for key in _MEIDA_TYPE if key in kw.keys()]
- media_key = media_key[0] if media_key else 'null'
- fd = kw.get(media_key)
- media_type = kw.get('type') if kw.has_key('type') else 'null'
- content = fd.read() if hasattr(fd, 'read') else 'null'
- filename = getattr(fd, 'name', None)
- if filename is None: filename = '/tmp/fake.%s' %(media_key)
- data.append('--%s' % boundary)
- data.append('Content-Disposition: form-data; name="%s"; filename="%s"' %(media_key, filename))
- data.append('Content-Length: %d' % len(content))
- data.append('Content-Type: %s/%s' %(media_type, media_key))
- data.append('Content-Transfer-Encoding: binary
- ')
- data.append(content)
- data.append('--%s--
- ' % boundary)
- if hasattr(fd, 'close'): fd.close()
- return '
- '.join(data), boundary
- class WeiXinResponse(object):
- ''' To deal with response of the base class '''
- def __init__(self, resp):
- self._resp = resp
- def read(self):
- return self._resp.read()
- def close(self):
- self._resp.close()
- def __str__(self):
- return self._resp.headers['Content-Type']
- class WeiXinJson(WeiXinResponse):
- ''' Json string '''
- def __init__(self, resp):
- WeiXinResponse.__init__(self, resp)
- def read(self):
- '''Check api or token error and return json'''
- rjson = _parse_json(self._resp.read())
- self._resp.close()
- if hasattr(rjson, 'errcode') and rjson.errcode != 0:
- if rjson.errcode in (40001, 40014, 41001, 42001):
- raise AccessTokenError(rjson.errcode, rjson.errmsg)
- raise APIError(rjson.errcode, rjson.errmsg)
- return rjson
- #response object close in read()
- def close(self):
- pass
- class WeiXinMedia(WeiXinResponse):
- ''' Audio, Image, Video etc... '''
- def __init__(self, resp):
- WeiXinResponse.__init__(self, resp)
- def _http_call(the_url, method, token, **kw):
- '''
- send an http request and return a json object if no error occurred.
- '''
- body = None
- params = None
- boundary = None
- (params, body) = _encode_params(**kw)
- if method == _HTTP_FILE:
- the_url = the_url.replace('https://api.', 'http://file.api.')
- body, boundary = _encode_multipart(**kw)
- if token == None:
- http_url = '%s?%s' %(the_url, params)
- else:
- the_url = '%s?access_token=%s' %(the_url, token)
- http_url = '%s&%s' %(the_url, params) if (method == _HTTP_GET or method == _HTTP_FILE) else the_url
- http_body = str(body) if (method == _HTTP_POST) else body
- req = urllib2.Request(http_url, data = http_body)
- if boundary: req.add_header('Content-Type', 'multipart/form-data; boundary=%s' % boundary)
- try:
- resp = urllib2.urlopen(req, timeout = 8)
- except urllib2.HTTPError, e:
- if resp.headers['Content-Type'] == _CONTENT_TYPE_JSON:
- json = WeiXinJson(resp)
- return json.read()
- except Exception, e:
- raise e
- try:
- content_type = resp.headers['Content-Type']
- except KeyError, e:
- content_type = '??'
- resp.headers['Content_Type'] = content_type
- #print content_type
- if content_type in _CONTENT_TYPE_MEDIA:
- return WeiXinMedia(resp)
- elif content_type in _CONTENT_TYPE_JSON:
- json = WeiXinJson(resp)
- return json.read()
- else:
- return WeiXinResponse(resp)
- class FileCache(object):
- '''
- the information is temporarily saved to the file.
- '''
- def __init__(self, path):
- self.path = path
- try:
- fd = open(self.path, 'rb'); data = fd.read(); fd.close()
- self.dict_data = json.loads(data)
- except Exception, e:
- self.dict_data = dict()
- def get(self, key):
- return self.dict_data.get(key)
- def set(self, key, value, time = 0):
- self.dict_data[key] = value
- def delete(self, key, time = 0):
- if self.dict_data.has_key(key): del self.dict_data[key]
- def save(self):
- data = (repr(self.dict_data)).replace(''', '"') #json must to use double quotation marks
- fd = open(self.path, 'wb'); fd.write(data); fd.close()
- def remove(self):
- import os
- try:
- os.remove(self.path)
- except Exception, e:
- pass
- def __str__(self):
- return repr(self.dict_data)
- class WeiXinClient(object):
- '''
- API clinet using synchronized invocation.
- >>> fc = True
- 'use memcache save access_token, otherwise use FileCache, path=[file_path | ip_addr]'
- '''
- def __init__(self, appID, appsecret, fc = True, path = '/tmp'):
- self.api_url= ''
- self.app_id = appID
- self.app_secret = appsecret
- self.access_token = None
- self.expires = 0
- self.fc = fc
- self.mc = FileCache('%s/access_token' %(path))
- if fc else memcache.Client([path], debug=0)
- def request_access_token(self):
- token_key = 'access_token_%s' %(self.app_id)
- expires_key = 'expires_%s' %(self.app_id)
- access_token = self.mc.get(token_key)
- expires = self.mc.get(expires_key)
- if not access_token or not expires or expires < int(time.time()):
- rjson =_http_call(_API_URLS[1] + 'token', _HTTP_GET,
- None, grant_type = 'client_credential',
- appid = self.app_id, secret = self.app_secret)
- self.access_token = str(rjson.access_token)
- self.expires = int(time.time()) + rjson.expires_in
- self.mc.set(token_key, self.access_token, time = rjson.expires_in)
- self.mc.set(expires_key, self.expires, time = rjson.expires_in)
- if self.fc: self.mc.save()
- else:
- self.access_token = str(access_token)
- self.expires = expires
- def del_access_token(self):
- token_key = 'access_token_%s' %(self.app_id)
- expires_key = 'expires_%s' %(self.app_id)
- self.access_token = None
- self.expires = 0
- self.mc.delete(token_key)
- self.mc.delete(expires_key)
- def refurbish_access_token(self):
- self.del_access_token()
- self.request_access_token()
- def set_access_token(self, token, expires_in, persistence=False):
- self.access_token = token
- self.expires = expires_in + int(time.time())
- if persistence:
- token_key = 'access_token_%s' %(self.app_id)
- expires_key = 'expires_%s' %(self.app_id)
- self.mc.set(token_key, token, time = expires_in)
- self.mc.set(expires_key, self.expires, time = expires_in)
- if self.fc: self.mc.save()
- def is_expires(self):
- return not self.access_token or int(time.time()) >= (self.expires - 10)
- def __getattr__(self, attr):
- self.api_url = _API_URLS[0] if attr in _OTHER_FEATURES else _API_URLS[1]
- return _Callable(self, attr)
- def __str__(self):
- return 'url=%s
- app_id=%s
- app_secret=%s
- access_token=%s
- expires=%d'
- %(self.api_url, self.app_id, self.app_secret, self.access_token, self.expires)
- class _Executable(object):
- def __init__(self, client, method, path):
- self._client = client
- self._method = method
- self._path = path
- def __call__(self, **kw):
- return _http_call('%s%s' %(self._client.api_url, self._path),
- self._method, self._client.access_token, **kw)
- def __str__(self):
- return '_Executable (%s)' %(self._path)
- __repr__ = __str__
- class _Callable(object):
- def __init__(self, client, name):
- self._client = client
- self._name = name
- def __getattr__(self, attr):
- if attr in ('dget', '_get'):
- return _Executable(self._client, _HTTP_GET, self._name)
- if attr == 'post':
- return _Executable(self._client, _HTTP_POST, self._name)
- if attr == 'file':
- return _Executable(self._client, _HTTP_FILE, self._name)
- name = '%s/%s' %(self._name, attr)
- return _Callable(self._client, name)
- def __str__(self):
- return '_Callable (%s)' %(self._name)
- def test():
- ''' test the API '''
- pass
- if __name__ == '__main__':
- test()
复制代码
启动的时候 ./start.sh
当返回
sudo: unable to resolve host nanopi2
[sudo] password for fa:
http://0.0.0.0:80/
即启动成功
由于学学做做,很多关键性的资料都没有记录,有问题或者感觉不好请大家拍砖
`
|