使用Google App Engine、Tweepy和RIOT API来创建Twitter机器人服务!(后篇)
故事梗概
继前面的内容之后,后面我们将实际参考各种API来运行BOT服务。源代码在GitBucket上,完成品在这里进行运维。
在任务队列中逐个进行处理
为了监视并在各个用户的战绩有更新时发布到Twitter,需要考虑API的使用限制,因此需要进行以秒为单位的间隔控制来执行处理。由于后面提到的Cron只能以最小1分钟的间隔进行调用,所以每个用户的处理将使用GAE提供的任务队列(Task Queue)机制。任务队列的概念类似于将任务放入一个名为Bucket的容器中,按照放入的顺序并根据设置逐个进行处理。通过增加Bucket的大小可以实现并行处理。配置文件名为queue.yaml。
queue:
- name: tweet #なんでもよい
rate: 0.8/s #一秒間に0.8回のペースで処理
bucket_size: 1 #並列で処理する数
retry_parameters:
task_retry_limit: 0 #処理失敗時のリトライ数
可以使用”add()”方法将Python任务投放到在queue.yaml文件中配置的存储桶中。在queue_name=中指定在queue.yaml文件中配置的名称。在url=中指定要调用的处理地址。此外,还可以传递参数。
from google.appengine.api.taskqueue import add
import webapp2
class modelTask(db.Model): #キューに入れるタスク
resion = db.StringProperty()
summoner_name = db.StringProperty()
summoner_id = db.IntegerProperty()
latest_game = db.IntegerProperty()
access_key = db.StringProperty()
access_secret = db.StringProperty()
date_success = db.DateTimeProperty(auto_now_add=True)
date = db.DateTimeProperty(auto_now_add=True)
class mainHandler(webapp2.RequestHandler):
def get(self):
qs = modelTask.all().order('-date_success')
for q in qs: #タスクを全てQueueに追加する
add(queue_name='tweet', url='/tweet', params={'qid': q.key().id()})
app = webapp2.WSGIApplication([ ('/launcher', mainHandler) ])
接下来需要在指定的url处实现被调用的处理程序,该处理程序在任务队列中按顺序被调用。任务队列会通过POST方法进行调用。
#! -*- coding: utf-8 -*-
from google.appengine.ext import db
from google.appengine.api.urlfetch import fetch
from django.utils.simplejson import loads
import webapp2, tweepy
from datetime import datetime
from laucher import modelTask
CONSUMER_KEY = '********************'
CONSUMER_SECRET = '**************************************'
RIOT_KEY = '***********************************'
class mainHandler(webapp2.RequestHandler):
def post(self):
getGame(long(self.request.get('qid')))
def getGame(qid):
q = modelTask().get_by_id(qid, parent=None)
#RIOT APIを呼び出す
result = fetch('https://prod.api.pvp.net/api/lol/'+q.resion+'/v1.3/game/by-summoner/'+str(q.summoner_id)+'/recent?api_key='+RIOT_KEY)
if result.status_code == 200:
#APIから取得した各種値をセット
j = loads(result.content)['games'][0]
if j['stats']['win'] == True:
win = '勝利'
else:
win = '敗北'
try:
kill = str(j['stats']['championsKilled'])
except:
kill = '0'
try:
death = str(j['stats']['numDeaths'])
except:
death = '0'
try:
assist = str(j['stats']['assists'])
except:
assist = '0'
game_type = j['subType']
#最終戦闘時間に更新があればTwitterに投稿
if j['createDate'] > q.latest_game:
q.latest_game = j['createDate']
q.put()
if tweet(q.summoner_name+'さん最新の'+game_type+'戦績は'+kill+'キル'+death+'デス'+assist+'アシストの'+win+'です 。 http://tol.orfx.jp #Tweet_of_Legends', q.access_key, q.access_secret):
q.date_success = datetime.now()
q.put()
#Twitter投稿処理
def tweet(message, access_key, access_secret):
try:
auth = tweepy.OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET)
auth.set_access_token(access_key, access_secret)
api = tweepy.API(auth_handler=auth, api_root='/1.1', secure=True)
api.update_status(status=message)
return True
except:
return False
app = webapp2.WSGIApplication([ ('/tweet', mainHandler) ])
定期处理通过Cron
由於我們希望定期地將上述任務添加到佇列中,因此我們將使用Cron。設定文件為cron.yaml。本次我們將設定每隔10分鐘在上午1點到下午6點之間執行一次,其他時間則每隔5分鐘執行一次。
cron:
- description: tweet job
url: /launcher
schedule: every 10 minutes from 1:00 to 17:59
timezone: Asia/Tokyo #時間を指定する場合はタイムゾーンを忘れずに
- description: tweet job
url: /launcher
schedule: every 5 minutes from 18:00 to 0:59
timezone: Asia/Tokyo
在后端进行资源消耗的分配
好的,使用此Cron(每5~10分钟)设置进行运行时,由于实例的启动时间计算是以15分钟为间隔的,所以Frontend实例的启动时间将确保消耗超过24小时。除此之外,还有注册页面处理等额外的工作,因此在免费配额的28小时(截至2014年3月)内可能会存在不安全因素。因此,我们决定利用Backend。Backend原本是用于执行批量处理和异步处理等后台工作的机制,但这次我们只是简单地利用它来获得(9小时)的额外实例运行时间。配置文件是backend.yaml。
backends:
- name: tweet #なんでも良い
class: B1 #処理リソースは最低限にする
options: dynamic #インスタンスが常駐しない様にする
如果要使用Cron将处理任务分配给后端,请指定目标。本次我们将尝试在下午6点到凌晨1点之间在后端进行处理。
cron:
- description: tweet job
url: /launcher
schedule: every 10 minutes from 1:00 to 17:59
timezone: Asia/Tokyo
- description: tweet job
url: /launcher
schedule: every 5 minutes from 18:00 to 0:59
timezone: Asia/Tokyo
target: tweet #backend.yamlのname
您可以使用TaskQueue的add(target=)来指定后端处理任务。要检查自己是否在后端启动,可以使用get_backend()。此外,启动后端时将强制调用/_ah/start。因为他是首先被调用的,所以可以在启动时编写相应的处理代码,但这次我们将保留空处理代码。
from google.appengine.api.taskqueue import add
from google.appengine.api.backends import get_backend
import webapp2
class modelTask(db.Model):
resion = db.StringProperty()
summoner_name = db.StringProperty()
summoner_id = db.IntegerProperty()
latest_game = db.IntegerProperty()
access_key = db.StringProperty()
access_secret = db.StringProperty()
date_success = db.DateTimeProperty(auto_now_add=True)
date = db.DateTimeProperty(auto_now_add=True)
class mainHandler(webapp2.RequestHandler):
def get(self):
qs = modelTask.all().order('-date_success')
target = get_backend()
if target is None: #Backendで起動されてればTaskQueueも同様に
for q in qs:
add(queue_name='tweet', url='/tweet', params={'qid': q.key().id()})
else:
for q in qs:
add(queue_name='tweet', url='/tweet', params={'qid': q.key().id()}, target='tweet')
class startHandler(webapp2.RequestHandler): #Backend起動時に強制的に呼び出される
def get(self):
return
app = webapp2.WSGIApplication([ ('/launcher', mainHandler), ('/_ah/start', startHandler) ])
通过使用Memcache来减少资源消耗。
频繁地查询数据存储但几乎不进行更新的操作可以使用Memcache来大幅减少资源消耗。在RIOT API的game-v1.3中,冠军是通过ID返回的,因此需要根据champion-v1.1的信息将ID转换为名称。由于无法频繁调用API,所以将从champion-v1.1获取的信息存储到数据存储中。然后,将存储在Memcache中的信息进行复制。要向Memcache添加数据,可以使用memcache.add(key, value)函数。
from google.appengine.api import memcache
from google.appengine.ext import db
from google.appengine.api.urlfetch import fetch
from django.utils.simplejson import loads
import webapp2
RIOT_KEY = '***********************************'
class modelChampion(db.Model): #チャンピョン情報格納モデル
name = db.StringProperty()
date = db.DateTimeProperty(auto_now_add=True)
class mainHandler(webapp2.RequestHandler):
def get(self):
#チャンピョン情報取得
result = fetch('https://prod.api.pvp.net/api/lol/na/v1.1/champion?api_key='+RIOT_KEY)
if result.status_code == 200:
js = loads(result.content)['champions']
for j in js:
#チャンピョン情報格納
modelchampion = modelChampion().get_or_insert(str(j['id']))
modelchampion.name = j['name']
modelchampion.put()
#Memcacheにチャンピョン情報をコピー
memcache.add("champion_"+str(j['id']), j['name'], 86399)
app = webapp2.WSGIApplication([ ('/champion', mainHandler) ], debug=True)
要查看添加到Memcache中的数据,需要使用memcache.get(key)函数。由于添加到Memcache中的数据可能会消失,因此需要编写相应的处理代码。接下来,我们将在先前的tweet.py代码中添加冠军名称作为帖子内容。
from google.appengine.api import memcache
from champion import modelChampion
def getGame(qid):
q = modelQueue().get_by_id(qid, parent=None)
result = fetch('https://prod.api.pvp.net/api/lol/'+q.resion+'/v1.3/game/by-summoner/'+str(q.summoner_id)+'/recent?api_key='+RIOT_KEY)
if result.status_code == 200:
j = loads(result.content)['games'][0]
if j['stats']['win'] == True:
win = '勝利'
else:
win = '敗北'
try:
kill = str(j['stats']['championsKilled'])
except:
kill = '0'
try:
death = str(j['stats']['numDeaths'])
except:
death = '0'
try:
assist = str(j['stats']['assists'])
except:
assist = '0'
#Memcacheからチャンピョン情報を取得
champion = memcache.get("champion_"+str(j['championId']))
if champion is None: #Memcacheからチャンピョン情報を取得できなければデータストアから
champion = modelChampion.get_by_key_name(str(j['championId'])).name
game_type = j['subType']
if j['createDate'] > q.latest_game:
q.latest_game = j['createDate']
q.put()
if tweet(q.summoner_name+'さん最新の'+game_type+'戦績は'+champion+'で'+kill+'キル'+death+'デス'+assist+'アシストの'+win+'です 。 http://tol.orfx.jp #Tweet_of_Legends', q.access_key, q.access_secret):
q.date_success = datetime.now()
q.put()
总结。
以上是我第一次使用GAE创建应用程序的备忘录。一旦掌握了窍门,以后就可以非常简单地创建应用程序,所以我推荐它!