realms-wiki/realms/modules/auth/oauth/models.py

172 lines
5.5 KiB
Python
Raw Normal View History

2015-10-16 01:36:47 +03:00
from flask_login import login_user
2015-10-14 06:52:30 +03:00
from flask_oauthlib.client import OAuth
2015-10-16 01:36:47 +03:00
2015-10-14 06:52:30 +03:00
from realms import config
from ..models import BaseUser
oauth = OAuth()
2015-10-15 07:08:56 +03:00
users = {}
2015-10-14 06:52:30 +03:00
2015-10-15 07:08:56 +03:00
providers = {
'twitter': {
'oauth': dict(
2015-10-16 01:36:47 +03:00
base_url='https://api.twitter.com/1.1/',
2015-10-15 01:36:22 +03:00
request_token_url='https://api.twitter.com/oauth/request_token',
access_token_url='https://api.twitter.com/oauth/access_token',
2015-10-16 01:36:47 +03:00
authorize_url='https://api.twitter.com/oauth/authenticate',
access_token_method='GET'),
'button': '<a href="/login/oauth/twitter" class="btn btn-default"><i class="fa fa-twitter"></i> Twitter</a>',
2015-11-13 01:19:26 +02:00
'profile': None,
'field_map': {
'id': 'user_id',
'username': 'screen_name'
2015-11-13 01:19:26 +02:00
},
'token_name': 'oauth_token'
},
'github': {
'oauth': dict(
request_token_params={'scope': 'user:email'},
base_url='https://api.github.com/',
request_token_url=None,
access_token_method='POST',
access_token_url='https://github.com/login/oauth/access_token',
authorize_url='https://github.com/login/oauth/authorize'),
'button': '<a href="/login/oauth/github" class="btn btn-default"><i class="fa fa-github"></i> Github</a>',
2015-11-13 01:19:26 +02:00
'profile': 'user',
'field_map': {
'id': ['user', 'id'],
'username': ['user', 'login'],
'email': ['user', 'email']
2015-11-13 01:19:26 +02:00
},
'token_name': 'access_token'
},
'facebook': {
'oauth': dict(
request_token_params={'scope': 'email'},
base_url='https://graph.facebook.com',
request_token_url=None,
access_token_url='/oauth/access_token',
access_token_method='GET',
authorize_url='https://www.facebook.com/dialog/oauth'
),
'button': '<a href="/login/oauth/facebook" class="btn btn-default"><i class="fa fa-facebook"></i> Facebook</a>',
2015-11-13 01:19:26 +02:00
'profile': '/me',
'field_map': {
'id': 'id',
'username': 'name',
'email': 'email'
2015-11-13 01:19:26 +02:00
},
'token_name': 'access_name'
},
'google': {
'oauth': dict(
request_token_params={
'scope': 'https://www.googleapis.com/auth/userinfo.email'
},
base_url='https://www.googleapis.com/oauth2/v1/',
request_token_url=None,
access_token_method='POST',
access_token_url='https://accounts.google.com/o/oauth2/token',
authorize_url='https://accounts.google.com/o/oauth2/auth',
),
2015-11-13 01:19:26 +02:00
'button': '<a href="/login/oauth/google" class="btn btn-default"><i class="fa fa-google"></i> Google</a>',
'profile': 'userinfo',
'field_map': {
'id': 'id',
'username': 'name',
'email': 'email'
},
'token_name': 'access_token'
2015-10-15 07:08:56 +03:00
}
}
2015-10-15 01:36:22 +03:00
2015-10-15 07:08:56 +03:00
class User(BaseUser):
2015-10-16 01:36:47 +03:00
type = 'oauth'
provider = None
2015-11-13 01:19:26 +02:00
def __init__(self, provider, user_id, username=None, token=None, email=None):
2015-10-16 01:36:47 +03:00
self.provider = provider
self.username = username
2015-11-13 01:19:26 +02:00
self.email = email
self.id = user_id
2015-10-16 01:36:47 +03:00
self.token = token
self.auth_id = "%s-%s" % (provider, username)
2015-10-16 01:36:47 +03:00
@property
def auth_token_id(self):
return self.token
@staticmethod
def load_user(*args, **kwargs):
return User.get_by_id(args[0])
@staticmethod
def get_by_id(user_id):
return users.get(user_id)
@staticmethod
2015-11-13 01:19:26 +02:00
def auth(provider, data, resp):
oauth_token = resp.get(User.get_provider_value(provider, 'token_name'))
field_map = providers.get(provider).get('field_map')
if not field_map:
raise NotImplementedError
def get_value(d, key):
if isinstance(key, basestring):
return d.get(key)
# key should be list here
val = d.get(key.pop(0))
if len(key) == 0:
# if empty we have our value
return val
# keep digging
return get_value(val, key)
fields = {}
for k, v in field_map.items():
2015-11-13 01:19:26 +02:00
fields[k] = get_value(data, v)
2015-11-13 01:19:26 +02:00
user = User(provider, fields['id'], username=fields.get('username'), email=fields.get('email'),
token=User.hash_password(oauth_token))
users[user.auth_id] = user
2015-10-16 01:36:47 +03:00
if user:
login_user(user, remember=True)
return True
else:
return False
2015-10-15 07:08:56 +03:00
@classmethod
def get_app(cls, provider):
2015-10-16 01:36:47 +03:00
if oauth.remote_apps.get(provider):
return oauth.remote_apps.get(provider)
return oauth.remote_app(
provider,
consumer_key=config.OAUTH.get(provider, {}).get('key'),
consumer_secret=config.OAUTH.get(provider, {}).get(
'secret'),
**providers[provider]['oauth'])
2015-10-16 01:36:47 +03:00
2015-11-13 01:19:26 +02:00
@classmethod
def get_provider_value(cls, provider, key):
return providers.get(provider, {}).get(key)
@classmethod
def get_token(cls, provider, resp):
return resp.get(cls.get_provider_value(provider, 'token_name'))
2015-10-16 01:36:47 +03:00
def get_id(self):
return unicode("%s/%s" % (self.type, self.auth_id))
2015-10-14 06:52:30 +03:00
@staticmethod
def login_form():
buttons = []
for name, val in providers.items():
if not config.OAUTH.get(name, {}).get('key') or not config.OAUTH.get(name, {}).get('secret'):
continue
buttons.append(val.get('button'))
2015-10-16 01:36:47 +03:00
return "<h4>Social Login</h4>" + " ".join(buttons)