Skip to content

Instantly share code, notes, and snippets.

@Jackpakistan
Forked from ipconfiger/__init__.py
Created September 13, 2012 03:46
Show Gist options
  • Save Jackpakistan/3711710 to your computer and use it in GitHub Desktop.
Save Jackpakistan/3711710 to your computer and use it in GitHub Desktop.

Revisions

  1. @ipconfiger ipconfiger created this gist Jan 5, 2012.
    339 changes: 339 additions & 0 deletions __init__.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,339 @@
    # -*- Encoding: utf-8 -*-
    import base64
    import binascii
    import cgi
    import hashlib
    import hmac
    import logging
    import time
    import urllib
    import urlparse
    import uuid

    from tornado.auth import OAuthMixin, _oauth10a_signature

    from tornado import httpclient
    from tornado import escape
    from tornado.ioloop import IOLoop

    class DictObject(dict):

    def __getattr__(self, k):
    try:
    return self[k]
    except KeyError:
    raise AttributeError('Unknown attribute \'%s\'' % (k,))

    def __setattr__(self, k, v):
    self[k] = v

    def __repr__(self):
    return dict.__repr__(self)

    def __hash__(self):
    return id(self)

    class User(DictObject):
    pass

    class BaseOAuthMixin(OAuthMixin):

    _OAUTH_NO_CALLBACKS = False

    def authenticate_redirect(self):
    http = httpclient.AsyncHTTPClient()
    http.fetch(self._oauth_request_token_url(), self.async_callback(
    self._on_request_token, self._OAUTH_AUTHENTICATE_URL, None))

    def api_request(self, path, callback, access_token=None, post_args=None, **args):

    url = self._OAUTH_API_URL % path

    if access_token:

    all_args = {}
    all_args.update(args)
    all_args.update(post_args or {})

    method = 'POST' if post_args is not None else 'GET'
    oauth = self._oauth_request_parameters(
    url, access_token, all_args, method=method)
    args.update(oauth)

    if args:
    url += '?' + urllib.urlencode(args)

    callback = self.async_callback(self._on_api_request, callback)

    http = httpclient.AsyncHTTPClient()

    if post_args is not None:
    http.fetch(url, method='POST', body=urllib.urlencode(post_args), callback=callback)
    else:
    http.fetch(url, callback=callback)

    def _on_api_request(self, callback, response):

    if response.error:
    logging.warning('Error response %s fetching %s', response.error,
    response.request.url)
    callback(None)
    return

    callback(escape.json_decode(response.body))

    def _oauth_consumer_token(self):
    return dict(key=self._OAUTH_CONSUMER_KEY, secret=self._OAUTH_CONSUMER_SECRET)

    class SinaMixin(BaseOAuthMixin):

    _OAUTH_CONSUMER_KEY = 'XXXXXXXXXXX'
    _OAUTH_CONSUMER_SECRET = 'XXXXXXXXXXXXXXXXXXXXX'

    _OAUTH_REQUEST_TOKEN_URL = 'http://api.t.sina.com.cn/oauth/request_token'
    _OAUTH_ACCESS_TOKEN_URL = 'http://api.t.sina.com.cn/oauth/access_token'
    _OAUTH_AUTHORIZE_URL = 'http://api.t.sina.com.cn/oauth/authorize'

    _OAUTH_AUTHENTICATE_URL = 'http://api.t.sina.com.cn/account/verify_credentials.json'
    _OAUTH_API_URL = 'http://api.t.sina.com.cn/%s.json'

    def _oauth_get_user(self, access_token, callback):

    callback = self.async_callback(self._parse_user_response, callback)

    self.api_request('users/show/' + access_token['user_id'],
    access_token=access_token, callback=callback)

    def _parse_user_response(self, callback, user):
    parse_user = User()
    parse_user.id = user['id']
    parse_user.name = user['name']
    parse_user.portrait = user['profile_image_url']
    parse_user.email = ''
    callback(parse_user)

    class QQMixin(BaseOAuthMixin):

    _OAUTH_CONSUMER_KEY = 'YYYYYYYYYYYYYYYYYYY'
    _OAUTH_CONSUMER_SECRET = 'YYYYYYYYYYYYYYYYYYYYY'

    _OAUTH_REQUEST_TOKEN_URL = 'http://open.t.qq.com/cgi-bin/request_token'
    _OAUTH_ACCESS_TOKEN_URL = 'http://open.t.qq.com/cgi-bin/access_token'
    _OAUTH_AUTHORIZE_URL = 'http://open.t.qq.com/cgi-bin/authorize'

    _OAUTH_AUTHENTICATE_URL = 'http://open.t.qq.com/api/user/verify?format=json'
    _OAUTH_API_URL = 'http://open.t.qq.com/api/%s'

    _OAUTH_VERSION = '1.0'

    def authorize_redirect(self, callback_uri=None, extra_params=None):
    if callback_uri and getattr(self, "_OAUTH_NO_CALLBACKS", False):
    raise Exception("This service does not support oauth_callback")

    http = httpclient.AsyncHTTPClient()
    http.fetch(self._oauth_request_token_url(callback_uri=callback_uri,
    extra_params=extra_params),
    self.async_callback(
    self._on_request_token,
    self._OAUTH_AUTHORIZE_URL,
    callback_uri))

    def _oauth_request_token_url(self, callback_uri= None, extra_params=None):
    consumer_token = self._oauth_consumer_token()
    url = self._OAUTH_REQUEST_TOKEN_URL
    args = dict(
    oauth_consumer_key=consumer_token["key"],
    oauth_signature_method="HMAC-SHA1",
    oauth_timestamp=str(int(time.time())),
    oauth_nonce=binascii.b2a_hex(uuid.uuid4().bytes),
    oauth_version=getattr(self, "_OAUTH_VERSION", "1.0"),
    )

    if callback_uri:
    args["oauth_callback"] = urlparse.urljoin(
    self.request.full_url(), callback_uri)
    if extra_params: args.update(extra_params)
    signature = _oauth10a_signature(consumer_token, "GET", url, args)

    args["oauth_signature"] = signature
    return url + "?" + urllib.urlencode(args)

    def _oauth_access_token_url(self, request_token):
    consumer_token = self._oauth_consumer_token()
    url = self._OAUTH_ACCESS_TOKEN_URL
    args = dict(
    oauth_consumer_key=consumer_token["key"],
    oauth_token=request_token["key"],
    oauth_signature_method="HMAC-SHA1",
    oauth_timestamp=str(int(time.time())),
    oauth_nonce=binascii.b2a_hex(uuid.uuid4().bytes),
    oauth_version=getattr(self, "_OAUTH_VERSION", "1.0"),
    )
    if "verifier" in request_token:
    args["oauth_verifier"]=request_token["verifier"]

    signature = _oauth10a_signature(consumer_token, "GET", url, args,
    request_token)

    args["oauth_signature"] = signature
    return url + "?" + urllib.urlencode(args)

    def _oauth_request_parameters(self, url, access_token, parameters={}, method="GET"):
    consumer_token = self._oauth_consumer_token()
    base_args = dict(
    oauth_consumer_key=consumer_token["key"],
    oauth_token=access_token["key"],
    oauth_signature_method="HMAC-SHA1",
    oauth_timestamp=str(int(time.time())),
    oauth_nonce=binascii.b2a_hex(uuid.uuid4().bytes),
    oauth_version=getattr(self, "_OAUTH_VERSION", "1.0"),
    )
    args = {}
    args.update(base_args)
    args.update(parameters)
    signature = _oauth10a_signature(consumer_token, method, url, args,
    access_token)
    base_args["oauth_signature"] = signature
    return base_args

    def _oauth_get_user(self, access_token, callback):

    callback = self.async_callback(self._parse_user_response, callback)

    self.api_request('user/info', access_token=access_token, callback=callback, format='json')

    def _parse_user_response(self, callback, user):
    parse_user = User()
    parse_user.id = self.get_argument('openid', None)
    parse_user.name = user['data']['name']
    parse_user.portrait = user['data']['head'] + '/100'
    parse_user.email = user['data']['email']
    callback(parse_user)

    class NeteaseMixin(BaseOAuthMixin):

    _OAUTH_CONSUMER_KEY = 'ZZZZZZZZZZZZZZ'
    _OAUTH_CONSUMER_SECRET = 'ZZZZZZZZZZZZZZZZZZZZZZ'

    _OAUTH_REQUEST_TOKEN_URL = 'http://api.t.163.com/oauth/request_token'
    _OAUTH_ACCESS_TOKEN_URL = 'http://api.t.163.com/oauth/access_token'
    _OAUTH_AUTHORIZE_URL = 'http://api.t.163.com/oauth/authenticate'

    _OAUTH_AUTHENTICATE_URL = 'http://api.t.163.com/account/verify_credentials.json'
    _OAUTH_API_URL = 'http://api.t.163.com/%s.json'

    _OAUTH_VERSION = '1.0'

    def _oauth_get_user(self, access_token, callback):

    callback = self.async_callback(self._parse_user_response, callback)

    self.api_request('users/show', access_token=access_token, callback=callback)

    def _parse_user_response(self, callback, user):
    parse_user = User()
    parse_user.id = user['id']
    parse_user.name = user['name']
    parse_user.portrait = user['profile_image_url']
    parse_user.email = ''
    callback(parse_user)

    class SohuMixin(BaseOAuthMixin):

    _OAUTH_CONSUMER_KEY = '1111111111111111'
    _OAUTH_CONSUMER_SECRET = '111111111111111111111111111111'

    _OAUTH_REQUEST_TOKEN_URL = 'http://api.t.sohu.com/oauth/request_token'
    _OAUTH_ACCESS_TOKEN_URL = 'http://api.t.sohu.com/oauth/access_token'
    _OAUTH_AUTHORIZE_URL = 'http://api.t.sohu.com/oauth/authorize'

    _OAUTH_AUTHENTICATE_URL = 'http://api.t.sohu.com/account/verify_credentials.json'
    _OAUTH_API_URL = 'http://api.t.sohu.com/%s.json'

    _OAUTH_VERSION = '1.0'

    def authorize_redirect(self, callback_uri=None, extra_params=None):
    if callback_uri and getattr(self, "_OAUTH_NO_CALLBACKS", False):
    raise Exception("This service does not support oauth_callback")

    http = httpclient.AsyncHTTPClient()
    http.fetch(self._oauth_request_token_url(callback_uri=callback_uri,
    extra_params=extra_params),
    self.async_callback(
    self._on_request_token,
    self._OAUTH_AUTHORIZE_URL,
    callback_uri))

    def _oauth_request_token_url(self, callback_uri= None, extra_params=None):
    consumer_token = self._oauth_consumer_token()
    url = self._OAUTH_REQUEST_TOKEN_URL
    args = dict(
    oauth_consumer_key=consumer_token["key"],
    oauth_signature_method="HMAC-SHA1",
    oauth_timestamp=str(int(time.time())),
    oauth_nonce=binascii.b2a_hex(uuid.uuid4().bytes),
    oauth_version=getattr(self, "_OAUTH_VERSION", "1.0"),
    )

    if callback_uri:
    args["oauth_callback"] = urlparse.urljoin(
    self.request.full_url(), callback_uri)
    if extra_params: args.update(extra_params)
    signature = _oauth10a_signature(consumer_token, "GET", url, args)

    args["oauth_signature"] = signature
    return url + "?" + urllib.urlencode(args)

    def _oauth_access_token_url(self, request_token):
    consumer_token = self._oauth_consumer_token()
    url = self._OAUTH_ACCESS_TOKEN_URL
    args = dict(
    oauth_consumer_key=consumer_token["key"],
    oauth_token=request_token["key"],
    oauth_signature_method="HMAC-SHA1",
    oauth_timestamp=str(int(time.time())),
    oauth_nonce=binascii.b2a_hex(uuid.uuid4().bytes),
    oauth_version=getattr(self, "_OAUTH_VERSION", "1.0"),
    )
    if "verifier" in request_token:
    args["oauth_verifier"]=request_token["verifier"]

    signature = _oauth10a_signature(consumer_token, "GET", url, args,
    request_token)

    args["oauth_signature"] = signature
    return url + "?" + urllib.urlencode(args)

    def _oauth_request_parameters(self, url, access_token, parameters={}, method="GET"):

    consumer_token = self._oauth_consumer_token()
    base_args = dict(
    oauth_consumer_key=consumer_token["key"],
    oauth_token=access_token["key"],
    oauth_signature_method="HMAC-SHA1",
    oauth_timestamp=str(int(time.time())),
    oauth_nonce=binascii.b2a_hex(uuid.uuid4().bytes),
    oauth_version=getattr(self, "_OAUTH_VERSION", "1.0"),
    )
    args = {}
    args.update(base_args)
    args.update(parameters)
    signature = _oauth10a_signature(consumer_token, method, url, args,
    access_token)
    base_args["oauth_signature"] = signature
    return base_args

    def _oauth_get_user(self, access_token, callback):

    callback = self.async_callback(self._parse_user_response, callback)

    self.api_request('users/show',
    access_token=access_token, callback=callback)

    def _parse_user_response(self, callback, user):
    parse_user = User()
    parse_user.id = user['id']
    parse_user.name = user['screen_name']
    parse_user.portrait = user['profile_image_url']
    parse_user.email = ''
    callback(parse_user)
    70 changes: 70 additions & 0 deletions handlers.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,70 @@
    '''
    Created on Nov 19, 2011
    @author: zhuhua
    '''
    import app
    import oauth
    import tornado
    from common import flash
    from oauth.funcs import check_login

    @app.route('^/oauth/sina')
    class SinaOAuthHandler(app.BaseHandler, oauth.SinaMixin):

    @tornado.web.asynchronous
    def get(self):
    if self.get_argument("oauth_token", None):
    self.get_authenticated_user(self.async_callback(self._on_auth))
    return
    self.authorize_redirect(callback_uri='/oauth/sina')

    def _on_auth(self, user):
    if not user:
    raise tornado.web.HTTPError(500, "Sina auth failed")
    check_login(self, user, 'sina')

    @app.route('^/oauth/qq')
    class QQOAuthHandler(app.BaseHandler, oauth.QQMixin):

    @tornado.web.asynchronous
    def get(self):
    if self.get_argument("oauth_token", None):
    self.get_authenticated_user(self.async_callback(self._on_auth))
    return
    self.authorize_redirect(callback_uri='/oauth/qq')

    def _on_auth(self, user):
    if not user:
    raise tornado.web.HTTPError(500, "QQ auth failed")
    check_login(self, user, 'qq')

    @app.route('^/oauth/163')
    class NeteaseOAuthHandler(app.BaseHandler, oauth.NeteaseMixin):

    @tornado.web.asynchronous
    def get(self):
    if self.get_argument("oauth_token", None):
    self.get_authenticated_user(self.async_callback(self._on_auth))
    return
    self.authorize_redirect(callback_uri='/oauth/163')

    def _on_auth(self, user):
    if not user:
    raise tornado.web.HTTPError(500, "163 auth failed")
    check_login(self, user, '163')

    @app.route('^/oauth/sohu')
    class SohuOAuthHandler(app.BaseHandler, oauth.SohuMixin):

    @tornado.web.asynchronous
    def get(self):
    if self.get_argument("oauth_token", None):
    self.get_authenticated_user(self.async_callback(self._on_auth))
    return
    self.authorize_redirect(callback_uri='/oauth/sohu')

    def _on_auth(self, user):
    if not user:
    raise tornado.web.HTTPError(500, "Sohu auth failed")
    check_login(self, user, 'sohu')