Skip to content

Instantly share code, notes, and snippets.

@umrysh
Created July 26, 2019 15:39
Show Gist options
  • Select an option

  • Save umrysh/483f37e4bb8d0f573e10879df5da84b3 to your computer and use it in GitHub Desktop.

Select an option

Save umrysh/483f37e4bb8d0f573e10879df5da84b3 to your computer and use it in GitHub Desktop.

Revisions

  1. umrysh created this gist Jul 26, 2019.
    180 changes: 180 additions & 0 deletions masto_search.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,180 @@
    import urllib.request
    import sys
    import os
    import re
    import sqlite3 as lite
    from bs4 import BeautifulSoup, Comment


    # Using https://fediverse.network to get the list of Mastodon Instances
    url = 'https://fediverse.network/mastodon'

    # A list of instances that I don't care about
    dontCare = ['gab.com']

    def searchData(con,cur):
    # ask them to enter a term to search
    answer = input('\nPlease enter a term you would like to search for: ')

    print('\nID\t| Category Name')
    print('-------------------------------')

    cur.execute('SELECT _id,name from category where name LIKE "%'+answer.strip()+'%" order by name')
    rows = cur.fetchall()
    for row in rows:
    print('%s\t| %s' % (row['_id'],row['name']))

    answer = input('\nPlease enter either an ID to display the users in that category, s to search again, or m to go back to the menu: ')
    if answer == 'm':
    menu(con,cur)
    elif answer == 's':
    searchData(con,cur)
    else:
    print('\n-------------------------------')
    cur.execute('SELECT user_id from members where cat_id = "%s"' % answer)
    rows = cur.fetchall()
    for row in rows:

    cur.execute('SELECT url FROM users WHERE _id = "%s"' % row['user_id'])
    row2 = cur.fetchone()
    print('%s' % row2['url'])

    print('-------------------------------\n')
    menu(con,cur)



    def getData(con,cur):
    # Clear the old data
    cur.execute('DELETE FROM category')
    cur.execute('DELETE FROM instance')
    cur.execute('DELETE FROM users')
    cur.execute('DELETE FROM members')
    con.commit()

    # Get new data

    print('Getting the list of Mastodon instances...')

    page = urllib.request.urlopen(url)
    content = page.read().decode('utf-8')
    soup = BeautifulSoup(content, 'html.parser')

    table = soup.find('table', {"class": "table-hover"})
    rows = table.findAll('tr')

    for row in rows:
    cols = row.findAll('td')

    if len(cols) > 2:
    if re.sub(r'[^a-zA-Z0-9.]', '', cols[1].get_text()).strip() not in dontCare:
    print('Adding `%s`' % re.sub(r'[^a-zA-Z0-9.]', '', cols[1].get_text()).strip())
    cur.execute('INSERT INTO instance(url) VALUES("%s")' % re.sub(r'[^a-zA-Z0-9.]', '', cols[1].get_text()).strip())
    con.commit()



    # For each instance let's check if there is any information on their /explore endpoint
    cur.execute('SELECT url from instance')
    instances = cur.fetchall()
    for instance_result in instances:
    try:
    page = urllib.request.urlopen('https://'+instance_result['url']+'/explore')
    content = page.read().decode('utf-8')
    soup = BeautifulSoup(content, 'html.parser')

    directories = soup.findAll('div', {"class": "directory__tag"})
    for directory in directories:
    for a in directory.findAll('a', href=True):
    splitArray = a['href'].split('/')
    print('Found category `%s` on `%s`' % (splitArray[2],instance_result['url']))

    # Do I already have this category?
    cat_id = None
    cur.execute('SELECT _id FROM category WHERE name = "%s"' % splitArray[2])
    row = cur.fetchone()
    if row is not None:
    # Grab the ID
    cat_id = row["_id"]
    else:
    # If not, then add it and grab the ID
    cur.execute('INSERT INTO category(name) VALUES ("%s")' % splitArray[2])
    con.commit()
    cat_id = cur.lastrowid

    # Let's find the users who belong to this category
    page2 = urllib.request.urlopen('https://'+instance_result['url']+'/explore/'+splitArray[2])
    content2 = page2.read().decode('utf-8')
    soup2 = BeautifulSoup(content2, 'html.parser')

    for a in soup2.findAll('a', {"class": "account__display-name"}, href=True):
    # Do I already have this user?
    user_id = None
    cur.execute('SELECT _id FROM users WHERE url = "%s"' % a['href'])
    row = cur.fetchone()
    if row is not None:
    # Grab the ID
    user_id = row['_id']
    else:
    # If not, then add it and grab the ID
    cur.execute('INSERT INTO users(url) VALUES ("%s")' % a['href'])
    con.commit()
    user_id = cur.lastrowid

    # Add user as member of category
    cur.execute('INSERT INTO members(cat_id,user_id) VALUES ("%s","%s")' % (cat_id,user_id))
    con.commit()

    print('Adding user `%s` to the `%s` category' % (a['href'],splitArray[2]))
    except:
    print('Error with `%s`' % instance_result['url'])

    def menu(con,cur):
    # Ask user what they want to do
    answer = input('\nWhat would you like to do?\n(1) Refresh all data\n(2) Search current data\n(3) Quit: ')

    if answer=='1':
    answer = input('Are you sure? [y/N] ')
    if answer == 'y':
    getData(con,cur)
    else:
    menu(con,cur)
    elif answer=='2' or answer=='s':
    searchData(con,cur)
    else:
    print('Quitting...')


    # main program:
    def main():
    # Set up the database
    if not os.path.isfile('MastoData.sqlite'):
    firstTime = True
    else:
    firstTime = False
    con = lite.connect('MastoData.sqlite')

    with con:
    con.row_factory = lite.Row
    cur = con.cursor()
    if firstTime:
    # Create all the tables
    print ('Creating the tables...')
    cur.execute('DROP TABLE IF EXISTS category')
    cur.execute('CREATE TABLE IF NOT EXISTS category(_id INTEGER PRIMARY KEY AUTOINCREMENT, name text)')

    cur.execute('DROP TABLE IF EXISTS instance')
    cur.execute('CREATE TABLE IF NOT EXISTS instance(_id INTEGER PRIMARY KEY AUTOINCREMENT, url text)')

    cur.execute('DROP TABLE IF EXISTS users')
    cur.execute('CREATE TABLE IF NOT EXISTS users(_id INTEGER PRIMARY KEY AUTOINCREMENT, url text)')

    cur.execute('DROP TABLE IF EXISTS members')
    cur.execute('CREATE TABLE IF NOT EXISTS members(cat_id INTEGER, user_id INTEGER)')

    con.commit()

    menu(con,cur)

    if __name__ == '__main__':
    main()