Skip to content

Instantly share code, notes, and snippets.

@countMonteCristo
Created January 13, 2015 21:14
Show Gist options
  • Select an option

  • Save countMonteCristo/5da3fbfc76082b6f92a0 to your computer and use it in GitHub Desktop.

Select an option

Save countMonteCristo/5da3fbfc76082b6f92a0 to your computer and use it in GitHub Desktop.
'''NyashStatistics
запрос для авторизации:
https://oauth.vk.com/authorize?client_id=APP_ID&scope=messages,photos&
redirect_uri=https://oauth.vk.com/blank.html&display=page&v=5.27&
response_type=token
ответ после разрешения прав для приложения:
https://oauth.vk.com/blank.html#access_token=TOKEN&expires_in=86400&user_id=UID
TOKEN из ответа берём access_token и засовываем его в ACCESS_TOKEN
'''
import json
from time import sleep
from datetime import datetime as dt
from datetime import timedelta
from urllib.parse import urlencode
from urllib.request import urlopen
from collections import Counter
ACCESS_TOKEN = 'fcdff1aed8eef78246a2d45fbaf9e7602a57ce520354a80fd39fecff8267d0aa55aa446f858147056002a'
USER_ID = '2269553'
class ChatUser:
def __init__(self, user_id, fname, lname):
self.uid = user_id
self.firstname = fname
self.lastname = lname
self.stat = Counter()
self.num = 0
def __str__(self):
return '{} {}'.format(self.firstname, self.lastname)
def get_stat(self):
return 'num={} stat={}'.format(self.num, self.stat)
def _create_users(d):
# print(d)
return {u['uid']: ChatUser(u['uid'], u['first_name'], u['last_name'])
for u in d}
def m_expired(start, curr, n):
td = timedelta(days=start.weekday()+n*7, hours=start.hour,
minutes=start.minute, seconds=start.second-1)
weekstart = start - td
return weekstart > curr
def call_api(method, params, token=None):
base = "https://api.vk.com/method/{}?{}"
if isinstance(params, list):
params_list = [kv for kv in params]
elif isinstance(params, dict):
params_list = params.items()
else:
params_list = [params]
if token is not None:
params_list.append(("access_token", token))
url = base.format(method, urlencode(params_list))
data = urlopen(url, timeout=5).read()
with open('Nya.log', 'a') as f:
print(method, url, file=f)
r = json.loads(data.decode())
if 'response' in r:
return r['response']
else:
return r
def get_chat_id(user_id, token, query):
p = [("user_id", user_id), ("q", query)]
chats = call_api("messages.searchDialogs", p, token)
# print(chats)
if not len(chats):
print('Can\'t find chat with title "{}", exiting'.format(query))
return None
chat_id = chats[0]['chat_id']
p = [('chat_id', chat_id), ('fields', 'nickname')]
u = call_api('messages.getChat', p, token)
users = _create_users(u['users'])
return users, chat_id
def get_chat_history(user_id, token, chat_id, nw):
off = 0
num = 200
start = dt.now()
stop_searching = False
history = []
while not stop_searching:
p = [("offset", off), ("count", num), ("chat_id", chat_id), ("rev", 0)]
dh = call_api("messages.getHistory", p, token)
j = 1
while j < len(dh):
curr = dt.fromtimestamp(dh[j]['date'])
if m_expired(start, curr, nw):
stop_searching = True
break
j += 1
if j:
history.extend(dh[1:j])
off += num
# stop_searching = True
return history
def attachment_filter(history):
return [m for m in history if
(('attachments' in m) and
(m['attachment']['type'] == 'photo') and
('action' not in m))]
def process_history(users, history, token):
err = 'Can\'t load photo from {} at {} with body "{}"'
ph_ids = []
ind = 0
delta = 20
for m in history:
atts = m['attachments']
for ph in atts:
# print(ph)
oid = ph['photo']['owner_id']
if not oid:
t = dt.fromtimestamp(m['date']).strftime("%H:%M:%S %d.%m.%Y")
print(err.format(users[m['from_id']], t, m['body']))
continue
pid = ph['photo']['pid']
users[oid].num += 1
if 'access_key' in ph['photo']:
ack = ph['photo']['access_key']
ph_ids.append('{}_{}_{}'.format(oid, pid, ack))
else:
ph_ids.append('{}_{}'.format(oid, pid))
while ind < len(ph_ids):
phs = ','.join(ph_ids[ind:ind+delta])
pr = [('photos', phs), ('extended', 1)]
d = call_api('photos.getById', pr, token)
if 'error' in d:
sleep(1)
continue
ind += delta
for p in d:
oid = p['owner_id']
if oid:
nl = p['likes']['count']
users[oid].stat[nl] += 1
# количество недель, начиная с текущей, сообщения за которые нужно просмотреть
NWEEK = 0
users, cid = get_chat_id(USER_ID, ACCESS_TOKEN, 'Няши')
h = get_chat_history(USER_ID, ACCESS_TOKEN, cid, NWEEK)
print('Messages per last {} week(s): {}'.format(NWEEK, len(h)))
h = attachment_filter(h)
print('Message with photos per last {} week(s): {}'.format(NWEEK, len(h)))
print()
process_history(users, h, ACCESS_TOKEN)
print()
ld = '|'.join(str(nl).ljust(4) for nl in range(len(users)))
print('{:23}|{}|{}|{:4}'.format('User', ld, 'Sum', 'Mid'))
print('-'*(32+5*len(users)))
likes = [0 for nl in range(len(users))]
for i in users:
num = [users[i].stat[nl] for nl in range(len(users))]
likes = [l+n for (l, n) in zip(likes, num)]
s = sum(num)
mid = 0
if s:
mid = sum(n*l for (n, l) in enumerate(num))/s
ld = '|'.join(str(nl).ljust(4) for nl in num)
print('{:23}|{}|{:3}|{:.2f}'.format(users[i], ld, s, mid))
print('-'*(32+5*len(users)))
ld = '|'.join(str(nl).ljust(4) for nl in likes)
print('{:23}|{}|{:3}|{:4}'.format('', ld, '', ''))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment