Skip to content

Instantly share code, notes, and snippets.

@kadnan
Created June 10, 2018 13:54
Show Gist options
  • Select an option

  • Save kadnan/a91554d7aedf89a7fca85956bc4da2c4 to your computer and use it in GitHub Desktop.

Select an option

Save kadnan/a91554d7aedf89a7fca85956bc4da2c4 to your computer and use it in GitHub Desktop.

Revisions

  1. kadnan created this gist Jun 10, 2018.
    100 changes: 100 additions & 0 deletions producer_consumer_parse_recipes.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,100 @@
    import json
    from time import sleep

    from bs4 import BeautifulSoup
    from kafka import KafkaConsumer, KafkaProducer


    def publish_message(producer_instance, topic_name, key, value):
    try:
    key_bytes = bytes(key, encoding='utf-8')
    value_bytes = bytes(value, encoding='utf-8')
    producer_instance.send(topic_name, key=key_bytes, value=value_bytes)
    producer_instance.flush()
    print('Message published successfully.')
    except Exception as ex:
    print('Exception in publishing message')
    print(str(ex))


    def connect_kafka_producer():
    _producer = None
    try:
    _producer = KafkaProducer(bootstrap_servers=['localhost:9092'], api_version=(0, 10))
    except Exception as ex:
    print('Exception while connecting Kafka')
    print(str(ex))
    finally:
    return _producer


    def parse(markup):
    title = '-'
    submit_by = '-'
    description = '-'
    calories = 0
    ingredients = []
    rec = {}

    try:

    soup = BeautifulSoup(markup, 'lxml')
    # title
    title_section = soup.select('.recipe-summary__h1')
    # submitter
    submitter_section = soup.select('.submitter__name')
    # description
    description_section = soup.select('.submitter__description')
    # ingredients
    ingredients_section = soup.select('.recipe-ingred_txt')

    # calories
    calories_section = soup.select('.calorie-count')
    if calories_section:
    calories = calories_section[0].text.replace('cals', '').strip()

    if ingredients_section:
    for ingredient in ingredients_section:
    ingredient_text = ingredient.text.strip()
    if 'Add all ingredients to list' not in ingredient_text and ingredient_text != '':
    ingredients.append({'step': ingredient.text.strip()})

    if description_section:
    description = description_section[0].text.strip().replace('"', '')

    if submitter_section:
    submit_by = submitter_section[0].text.strip()

    if title_section:
    title = title_section[0].text

    rec = {'title': title, 'submitter': submit_by, 'description': description, 'calories': calories,
    'ingredients': ingredients}

    except Exception as ex:
    print('Exception while parsing')
    print(str(ex))
    finally:
    return json.dumps(rec)


    if __name__ == '__main__':
    print('Running Consumer..')
    parsed_records = []
    topic_name = 'raw_recipes'
    parsed_topic_name = 'parsed_recipes'

    consumer = KafkaConsumer(topic_name, auto_offset_reset='earliest',
    bootstrap_servers=['localhost:9092'], api_version=(0, 10), consumer_timeout_ms=1000)
    for msg in consumer:
    html = msg.value
    result = parse(html)
    parsed_records.append(result)
    consumer.close()
    sleep(5)

    if len(parsed_records) > 0:
    print('Publishing records..')
    producer = connect_kafka_producer()
    for rec in parsed_records:
    publish_message(producer, parsed_topic_name, 'parsed', rec)