def publish(client, pubsub_topic, data_lines): """Publish to the given pubsub topic.""" messages = [] for line in data_lines: messages.append({'data': line}) body = {'messages': messages} str_body = json.dumps(body) data = base64.urlsafe_b64encode(bytearray(str_body, 'utf8')) client.publish(topic=pubsub_topic, data=data) class TweetStreamListener(StreamListener): """ A listener handles tweets that are received from the stream. This listener dumps the tweets into a PubSub topic """ client = pubsub.PublisherClient() pubsub_topic = client.topic_path(GCP_PROJECT_NAME, PUBSUB_TOPIC_NAME) count = 0 tweets = [] batch_size = 1 # total_tweets = 10000 total_tweets = TOTAL_TWEETS def write_to_pubsub(self, tweets): publish(self.client, self.pubsub_topic, tweets) def on_status(self, status): # Converting the time to isoformat for serialisation created_at = status.created_at.isoformat() id_str = status.id_str text = status.text source = status.source user_name = status.user.name user_screen_name = status.user.screen_name loc = status.user.location bio = status.user.description tw = dict(text=text, bio=bio, created_at=created_at, tweet_id=id_str, location=loc, user_name=user_name, user_screen_name=user_screen_name, source=source) self.tweets.append(tw) if len(self.tweets) >= self.batch_size: self.write_to_pubsub(self.tweets) print(self.tweets) self.tweets = [] self.count += 1 if self.count >= self.total_tweets: return False if (self.count % 5) == 0: print("count is: {} at {}".format(self.count, datetime.datetime.now())) return True def on_error(self, status_code): print(status_code) if __name__ == '__main__': print '....' auth = OAuthHandler(TWITTER_CONSUMER_KEY, TWITTER_CONSUMER_SECRET) auth.set_access_token(TWITTER_ACCESS_TOKEN, TWITTER_ACCESS_TOKEN_SECRET) stream_listener = TweetStreamListener() stream = Stream(auth, stream_listener) stream.filter( track=['Royal Wedding', '#RoyalWedding', 'Prince Harry', 'Meghan Markle'] )