使用Google App Engine、Tweepy和RIOT API来创建Twitter机器人服务!(后篇)

故事梗概

继前面的内容之后,后面我们将实际参考各种API来运行BOT服务。源代码在GitBucket上,完成品在这里进行运维。

在任务队列中逐个进行处理

为了监视并在各个用户的战绩有更新时发布到Twitter,需要考虑API的使用限制,因此需要进行以秒为单位的间隔控制来执行处理。由于后面提到的Cron只能以最小1分钟的间隔进行调用,所以每个用户的处理将使用GAE提供的任务队列(Task Queue)机制。任务队列的概念类似于将任务放入一个名为Bucket的容器中,按照放入的顺序并根据设置逐个进行处理。通过增加Bucket的大小可以实现并行处理。配置文件名为queue.yaml。

queue:
- name: tweet #なんでもよい
  rate: 0.8/s #一秒間に0.8回のペースで処理
  bucket_size: 1 #並列で処理する数
  retry_parameters:
    task_retry_limit: 0 #処理失敗時のリトライ数

可以使用”add()”方法将Python任务投放到在queue.yaml文件中配置的存储桶中。在queue_name=中指定在queue.yaml文件中配置的名称。在url=中指定要调用的处理地址。此外,还可以传递参数。

from google.appengine.api.taskqueue import add

import webapp2

class modelTask(db.Model): #キューに入れるタスク
    resion = db.StringProperty()
    summoner_name = db.StringProperty()
    summoner_id = db.IntegerProperty()
    latest_game = db.IntegerProperty()
    access_key = db.StringProperty()
    access_secret = db.StringProperty()
    date_success = db.DateTimeProperty(auto_now_add=True)
    date = db.DateTimeProperty(auto_now_add=True)

class mainHandler(webapp2.RequestHandler):
    def get(self):
        qs = modelTask.all().order('-date_success')
        for q in qs: #タスクを全てQueueに追加する
            add(queue_name='tweet', url='/tweet', params={'qid': q.key().id()})

app = webapp2.WSGIApplication([ ('/launcher', mainHandler) ])

接下来需要在指定的url处实现被调用的处理程序,该处理程序在任务队列中按顺序被调用。任务队列会通过POST方法进行调用。

#! -*- coding: utf-8 -*-
from google.appengine.ext import db
from google.appengine.api.urlfetch import fetch
from django.utils.simplejson import loads

import webapp2, tweepy
from datetime import datetime

from laucher import modelTask

CONSUMER_KEY = '********************'
CONSUMER_SECRET = '**************************************'
RIOT_KEY = '***********************************'

class mainHandler(webapp2.RequestHandler):
    def post(self):
        getGame(long(self.request.get('qid')))

def getGame(qid):
    q = modelTask().get_by_id(qid, parent=None)
    #RIOT APIを呼び出す
    result = fetch('https://prod.api.pvp.net/api/lol/'+q.resion+'/v1.3/game/by-summoner/'+str(q.summoner_id)+'/recent?api_key='+RIOT_KEY)
    if result.status_code == 200:
        #APIから取得した各種値をセット
        j = loads(result.content)['games'][0]
        if j['stats']['win'] == True:
            win = '勝利'
        else:
            win = '敗北'
        try:
            kill = str(j['stats']['championsKilled'])
        except:
            kill = '0'
        try:
            death = str(j['stats']['numDeaths'])
        except:
            death = '0'
        try:
            assist = str(j['stats']['assists'])
        except:
            assist = '0'

        game_type = j['subType']

        #最終戦闘時間に更新があればTwitterに投稿
        if j['createDate'] > q.latest_game:
            q.latest_game = j['createDate']
            q.put()
            if tweet(q.summoner_name+'さん最新の'+game_type+'戦績は'+kill+'キル'+death+'デス'+assist+'アシストの'+win+'です 。 http://tol.orfx.jp #Tweet_of_Legends', q.access_key, q.access_secret):
                q.date_success = datetime.now()
                q.put()

#Twitter投稿処理
def tweet(message, access_key, access_secret):
    try:
        auth = tweepy.OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET)
        auth.set_access_token(access_key, access_secret)
        api = tweepy.API(auth_handler=auth, api_root='/1.1', secure=True)
        api.update_status(status=message)

        return True
    except:
        return False

app = webapp2.WSGIApplication([ ('/tweet', mainHandler) ])

定期处理通过Cron

由於我們希望定期地將上述任務添加到佇列中,因此我們將使用Cron。設定文件為cron.yaml。本次我們將設定每隔10分鐘在上午1點到下午6點之間執行一次,其他時間則每隔5分鐘執行一次。

cron:
- description: tweet job
  url: /launcher
  schedule: every 10 minutes from 1:00 to 17:59
  timezone: Asia/Tokyo #時間を指定する場合はタイムゾーンを忘れずに

- description: tweet job
  url: /launcher
  schedule: every 5 minutes from 18:00 to 0:59
  timezone: Asia/Tokyo

在后端进行资源消耗的分配

好的,使用此Cron(每5~10分钟)设置进行运行时,由于实例的启动时间计算是以15分钟为间隔的,所以Frontend实例的启动时间将确保消耗超过24小时。除此之外,还有注册页面处理等额外的工作,因此在免费配额的28小时(截至2014年3月)内可能会存在不安全因素。因此,我们决定利用Backend。Backend原本是用于执行批量处理和异步处理等后台工作的机制,但这次我们只是简单地利用它来获得(9小时)的额外实例运行时间。配置文件是backend.yaml。

backends:
- name: tweet #なんでも良い
  class: B1 #処理リソースは最低限にする
  options: dynamic #インスタンスが常駐しない様にする

如果要使用Cron将处理任务分配给后端,请指定目标。本次我们将尝试在下午6点到凌晨1点之间在后端进行处理。

cron:
- description: tweet job
  url: /launcher
  schedule: every 10 minutes from 1:00 to 17:59
  timezone: Asia/Tokyo

- description: tweet job
  url: /launcher
  schedule: every 5 minutes from 18:00 to 0:59
  timezone: Asia/Tokyo
  target: tweet #backend.yamlのname

您可以使用TaskQueue的add(target=)来指定后端处理任务。要检查自己是否在后端启动,可以使用get_backend()。此外,启动后端时将强制调用/_ah/start。因为他是首先被调用的,所以可以在启动时编写相应的处理代码,但这次我们将保留空处理代码。

from google.appengine.api.taskqueue import add
from google.appengine.api.backends import get_backend

import webapp2

class modelTask(db.Model):
    resion = db.StringProperty()
    summoner_name = db.StringProperty()
    summoner_id = db.IntegerProperty()
    latest_game = db.IntegerProperty()
    access_key = db.StringProperty()
    access_secret = db.StringProperty()
    date_success = db.DateTimeProperty(auto_now_add=True)
    date = db.DateTimeProperty(auto_now_add=True)

class mainHandler(webapp2.RequestHandler):
    def get(self):
        qs = modelTask.all().order('-date_success')
        target = get_backend()
        if target is None: #Backendで起動されてればTaskQueueも同様に
            for q in qs:
                add(queue_name='tweet', url='/tweet', params={'qid': q.key().id()})
        else:
            for q in qs:
                add(queue_name='tweet', url='/tweet', params={'qid': q.key().id()}, target='tweet')

class startHandler(webapp2.RequestHandler): #Backend起動時に強制的に呼び出される
    def get(self):
        return

app = webapp2.WSGIApplication([ ('/launcher', mainHandler), ('/_ah/start', startHandler) ])

通过使用Memcache来减少资源消耗。

频繁地查询数据存储但几乎不进行更新的操作可以使用Memcache来大幅减少资源消耗。在RIOT API的game-v1.3中,冠军是通过ID返回的,因此需要根据champion-v1.1的信息将ID转换为名称。由于无法频繁调用API,所以将从champion-v1.1获取的信息存储到数据存储中。然后,将存储在Memcache中的信息进行复制。要向Memcache添加数据,可以使用memcache.add(key, value)函数。

from google.appengine.api import memcache
from google.appengine.ext import db
from google.appengine.api.urlfetch import fetch
from django.utils.simplejson import loads

import webapp2

RIOT_KEY = '***********************************'

class modelChampion(db.Model): #チャンピョン情報格納モデル
    name = db.StringProperty()
    date = db.DateTimeProperty(auto_now_add=True)

class mainHandler(webapp2.RequestHandler):
    def get(self):
        #チャンピョン情報取得
        result = fetch('https://prod.api.pvp.net/api/lol/na/v1.1/champion?api_key='+RIOT_KEY)
        if result.status_code == 200:
            js = loads(result.content)['champions']
            for j in js:
                #チャンピョン情報格納
                modelchampion = modelChampion().get_or_insert(str(j['id']))
                modelchampion.name = j['name']
                modelchampion.put()

                #Memcacheにチャンピョン情報をコピー
                memcache.add("champion_"+str(j['id']), j['name'], 86399)

app = webapp2.WSGIApplication([ ('/champion', mainHandler) ], debug=True)

要查看添加到Memcache中的数据,需要使用memcache.get(key)函数。由于添加到Memcache中的数据可能会消失,因此需要编写相应的处理代码。接下来,我们将在先前的tweet.py代码中添加冠军名称作为帖子内容。

from google.appengine.api import memcache
from champion import modelChampion

def getGame(qid):
    q = modelQueue().get_by_id(qid, parent=None)
    result = fetch('https://prod.api.pvp.net/api/lol/'+q.resion+'/v1.3/game/by-summoner/'+str(q.summoner_id)+'/recent?api_key='+RIOT_KEY)
    if result.status_code == 200:
        j = loads(result.content)['games'][0]
        if j['stats']['win'] == True:
            win = '勝利'
        else:
            win = '敗北'
        try:
            kill = str(j['stats']['championsKilled'])
        except:
            kill = '0'
        try:
            death = str(j['stats']['numDeaths'])
        except:
            death = '0'
        try:
            assist = str(j['stats']['assists'])
        except:
            assist = '0'

        #Memcacheからチャンピョン情報を取得
        champion = memcache.get("champion_"+str(j['championId']))
        if champion is None: #Memcacheからチャンピョン情報を取得できなければデータストアから
            champion = modelChampion.get_by_key_name(str(j['championId'])).name

        game_type = j['subType']

        if j['createDate'] > q.latest_game:
            q.latest_game = j['createDate']
            q.put()
            if tweet(q.summoner_name+'さん最新の'+game_type+'戦績は'+champion+'で'+kill+'キル'+death+'デス'+assist+'アシストの'+win+'です 。 http://tol.orfx.jp #Tweet_of_Legends', q.access_key, q.access_secret):
                q.date_success = datetime.now()
                q.put()

总结。

以上是我第一次使用GAE创建应用程序的备忘录。一旦掌握了窍门,以后就可以非常简单地创建应用程序,所以我推荐它!

bannerAds