Streaming Tweets and Images via the Twitter API v2

Social media is an exciting playground for data science, and one of the most interesting sources of social media is certainly Twitter. Every minute, thousands of tweets are posted on Twitter. Twitter discussions revolve around a variety of topics, from everyday life experiences, to opinions on products, music, sports and movies, to political discussions. There is hardly any topic that is not tweeted about. Furthermore, tweets are often linked to hashtags and images, or in turn relate to other tweets, thus making them even more valuable to data science. The best thing is that Twitter provides easy access to this data via a free REST API and in this article I’ll briefly show how to use it. We will use Python and some basic packages to fetch tweets from Twitter directly into your python notebook.

Twitter data has a variety of applications. For example, tweets can be analyzed to discover trends or evaluate sentiment on a topic. Furthermore, images embedded in tweets along with hashtags can train image recognition models or validate them. Thus, if you are doing data science, knowing how to obtain data via the Twitter API can be useful.

The rest of this article is structured as follows: We’ll start by signing up to the free Twitter API and obtain an authentication token. We will then use this token in our requests to the Twitter API (V2). I’ll demonstrate two use cases: Streaming tweets and storing the text in a pandas data frame (A). And second, retrieving images from tweets and storing them to a local directory (B).

If you haven’t worked with APIs before, it might be a good idea to first take a look at one of my previous posts in which I describe the basics of working with APIs in Python.

Twitter API Basics

API Versions

Currently Twitter provides two different API versions: The two versions have their own documentation and are not compatible. While the API v1.1 is still more established, Twitter API v2 provides more options on what and how to fetch data from Twitter. For example, it allows to tailor the fields that are given back with the response, which can be useful if the goal is to minimize traffic. The Twitter API v2 is currently in early access mode, but it will sooner or later become the new standard API in the market. Therefore, I decided to base this tutorial on the newer v2-version.

Overview of Twitter APIs (Source: Twitter)

API Documentation

When working with the Twitter API v2, it is important to understand the twitter object model. According to the API documentation, the basic building block of Twitter is the Tweet object. The tweet object has various fields attached, such as the tweet text, created_at, and tweet id. A full list of these root level fields is provided in the documentation. Most fields are not provided in the standard response and if we want to retrieve them, we need to specify these fields in the request rules. The tweet object acts as the parent of four other objects:

Each object, in turn, has multiple fields for which we specify which fields to return in the rule, as with the Tweet Object. In this tutorial we will be working with the basic tweet object and the media object, which contains all the media (e.g., images or videos) that tweets can have attached.

Functioning of the Recent Search Endpoint

In this tutorial we will be working with the Twitter Recent Search Endpoint. There are also other API endpoints, but covering all of them would go beyond the scope of this article. One special feature of the Recent Search endpoint is that we can’t retrieve the data directly using GET requests, but first have to send a POST request to the API specifying which data should be retrieved. If we want to change these rules, we first have to delete them with a POST request and then pass the new rules to the API with another POST request. This may sound a bit complicated, but it actually gives the user more control over the API.

Different API Models

The Recent Search Endpoint can be used in both batch and streaming modes. In batch mode, the endpoint returns a list of tweets once. If the stream option is enabled, the API returns a continuous flow of individual tweets, plus any new tweets as they are published to Twitter. This allows streaming and processing of tweets in (almost) real time. In this tutorial, we will work with the streaming option enabled.

Stream Mode vs Batch Mode

Filters

By specifying parameters, we can limit which tweets and fields are returned in the response. For example, we can let the API know that we want to retrieve tweets with certain keywords or in a certain period, or only those tweets that have images attached. A list of all filter parameters can be found in the API documentation.

Prerequisites

Setup the Environment

It is assumed that you have setup your python environment. I recommend using Anaconda but any other environment will do as well. Follow this tutorial, if you don’t have an environment set up yet.

In this tutorial, we will work with pandas and the standard python packages. The pandas package can be installed using the console command: pip install pandas or, if you are using the anaconda packet manager, conda install pandas.

Setup a Twitter Developer Account

Using the Twitter API requires you to have your own Twitter developer account. If you don’t have an account yet, you need to create it at the Twitter’s developer page. As of Jan 2021, the standard developer account is free and comes with a limit of 500.000 tweets that you can fetch per month.

When you are logged into your developer account, go to the developer dashboard page and create a new project with a name of your choice. Once you have created your project, it will be shown in the projects dashboard along with an overview of your monthly tweet usage. In the next section, you will retrieve your personal API key from the project.

Twitter Projects Dashboard

Obtaining your Twitter API Security Key

The Twitter API accepts our requests only if we provide a personal Bearer token for authentication. Each project has its own Bearer token. You can find the bearer token in the Developer Portal under the Authentication Token section. Store the token somewhere in between. In the next step, we will store it in a secure location.

API Authentication Tokens

Storing and Loading API Tokens

The Twitter API requires the user to authenticate during use by providing a secret token. The token enables the API to identify and authenticate it’s users. It is a best practice, to not store these keys your project, but to put them separately in a safe place. In a production environment, you would of course want to decrypt the keys. However, for our test case, it should be sufficient to store the key in a separate python file.

Create a new Python file called “twitter_secrets.py” and fill in the following code. Then replace the Bearer_Key with your own key that you have retrieved in the previous step from the Twitter Developer portal.

In the following, create a Python file called “twitter_secrets.py” and fill in the code below:

"""Replace the values below with your own Twitter API Tokens"""

# Twitter Bearer Token
BEARER_KEY = "your own BEARER KEY"


class TwitterSecrets:
    """Class that holds Twitter Secrets"""

    def __init__(self):
        self.BEARER_KEY = BEARER_KEY
        
        # Tests if keys are present
        for key, secret in self.__dict__.items():
            assert secret != "", f"Please provide a valid secret for: {key}"

twitter_secrets = TwitterSecrets()

Then replace the Bearer_Key with your own key that you have retrieved in the previous step from the Twitter Developer portal.

The twitter_screts.py it has to go to the package library of your python environment. If you are using anaconda under Windows this is typically the path: <user>\anaconda3\Lib. Once you have placed the file in your python library, you can import it into your python project and use the bearer token from the import, as shown below:

# imports the twitter_secrets python file in which we store the twitter API keys
from twitter_secrets import twitter_secrets as ts

bearer_token = ts.BEARER_TOKEN

A) Streaming Tweets via the Twitter Recent Search Endpoint

In the first use case, we will first define some simple filter rules and then request tweets from the API based on these rules. As a response, the API returns a stream of tweets which we will process further. We store the text from the tweets in a dataframe along with further tweet information.

I won’t go into detail about all the code components, but I will explain the most important functions with inline code. You can download the code for this project with the links below:

import requests 
import json 
import pandas as pd

# imports the twitter_secrets python file in which we store the twitter API keys
from twitter_secrets import twitter_secrets as ts

def create_headers(bearer_token):
    headers = {"Authorization": "Bearer {}".format(bearer_token)}
    return headers
        
def set_rules(headers, delete, bearer_token, rules):
    payload = {"add": rules}
    response = requests.post(
        "https://api.twitter.com/2/tweets/search/stream/rules",
        headers=headers,
        json=payload,
    )
    if response.status_code != 201:
        raise Exception(
            "Cannot add rules (HTTP {}): {}".format(response.status_code, response.text)
        )
    print(json.dumps(response.json()))
    
def get_rules(headers, bearer_token):
    response = requests.get(
        "https://api.twitter.com/2/tweets/search/stream/rules", headers=headers
    )
    if response.status_code != 200:
        raise Exception(
            "Cannot get rules (HTTP {}): {}".format(response.status_code, response.text)
        )
    print(json.dumps(response.json()))
    return response.json()

def delete_all_rules(headers, bearer_token, rules):
    if rules is None or "data" not in rules:
        return None

    ids = list(map(lambda rule: rule["id"], rules["data"]))
    payload = {"delete": {"ids": ids}}
    response = requests.post(
        "https://api.twitter.com/2/tweets/search/stream/rules",
        headers=headers,
        json=payload
    )
    if response.status_code != 200:
        raise Exception(
            "Cannot delete rules (HTTP {}): {}".format(
                response.status_code, response.text
            )
        )
    print(json.dumps(response.json()))

def get_stream(headers, set, bearer_token, expansions, fields, save_to_disk, save_path):
    data = []
    response = requests.get(
        "https://api.twitter.com/2/tweets/search/stream" + expansions + fields, headers=headers, stream=True,
    )
    print(response.status_code)
    if response.status_code != 200:
        raise Exception(
            "Cannot get stream (HTTP {}): {}".format(
                response.status_code, response.text
            )
        )
    i = 0
    for response_line in response.iter_lines():
        i += 1
        if i == max_results:
            break
        else:
            json_response = json.loads(response_line)
            #print(json.dumps(json_response, indent=4, sort_keys=True))
            try:
                save_tweets(json_response)
                if save_to_disk == True:
                    save_media_to_disk(json_response, save_path)
            except (json.JSONDecodeError, KeyError) as err:
                # In case the JSON fails to decode, we skip this tweet
                print(f"{i}/{max_results}: ERROR: encountered a problem with a line of data... \n")
                continue
                
def save_tweets(tweet):
    print(json.dumps(tweet, indent=4, sort_keys=True))
    data = tweet['data']
    public_metrics = data['public_metrics']
    tweet_list.append([data['id'], data['author_id'], data['created_at'], data['text'], public_metrics['like_count']])




# the max number of tweets that will be returned
max_results = 20

# save to disk
save_media_to_disk = False
save_path = ""

# You can adjust the rules if needed
search_rules = [
    {"value": "dog has:images", "tag": "dog pictures", "lang": "en"},
    {"value": "cat has:images -grumpy", "tag": "cat pictures", "lang": "en"},
]
tweet_fields = "?tweet.fields=attachments,author_id,created_at,public_metrics"
expansions = ""
tweet_list = []


bearer_token = ts.BEARER_TOKEN
headers = create_headers(bearer_token)
rules = get_rules(headers, bearer_token)
delete = delete_all_rules(headers, bearer_token, rules)
set = set_rules(headers, delete, bearer_token, search_rules)
get_stream(headers, set, bearer_token, expansions, tweet_fields, save_media_to_disk, save_path)

df = pd.DataFrame (tweet_list, columns = ['tweetid', 'author_id' , 'created_at', 'text', 'like_count'])
df

B) Streaming images from Twitter and Saving them to Disk

As mentioned earlier, image recognition models can also be trained with image data from Twitter. In the second use case, we therefore look at how image data can be streamed from Twitter. In order to be able to use the data later, we save it directly to our local drive. To do this, we use most of the functions from the first use case. New functions are added for creating the folder structure in which we then store the images. As always, you can also find the code for this example on Github.

import requests 
import json 
import pandas as pd
import urllib
import os
from os import path
from datetime import datetime as dt

# imports the twitter_secrets python file in which we store the twitter API keys
from twitter_secrets import twitter_secrets as ts

def create_headers(bearer_token):
    headers = {"Authorization": "Bearer {}".format(bearer_token)}
    return headers
        
def set_rules(headers, delete, bearer_token, rules):
    payload = {"add": rules}
    response = requests.post(
        "https://api.twitter.com/2/tweets/search/stream/rules",
        headers=headers,
        json=payload,
    )
    if response.status_code != 201:
        raise Exception(
            "Cannot add rules (HTTP {}): {}".format(response.status_code, response.text)
        )
    print(json.dumps(response.json()))
    
def get_rules(headers, bearer_token):
    response = requests.get(
        "https://api.twitter.com/2/tweets/search/stream/rules", headers=headers
    )
    if response.status_code != 200:
        raise Exception(
            "Cannot get rules (HTTP {}): {}".format(response.status_code, response.text)
        )
    print(json.dumps(response.json()))
    return response.json()

def delete_all_rules(headers, bearer_token, rules):
    if rules is None or "data" not in rules:
        return None

    ids = list(map(lambda rule: rule["id"], rules["data"]))
    payload = {"delete": {"ids": ids}}
    response = requests.post(
        "https://api.twitter.com/2/tweets/search/stream/rules",
        headers=headers,
        json=payload
    )
    if response.status_code != 200:
        raise Exception(
            "Cannot delete rules (HTTP {}): {}".format(
                response.status_code, response.text
            )
        )
    print(json.dumps(response.json()))

def get_stream(headers, set, bearer_token, expansions, fields, save_to_disk, save_path):
    data = []
    response = requests.get(
        "https://api.twitter.com/2/tweets/search/stream" + expansions + fields, headers=headers, stream=True,
    )
    print(response.status_code)
    if response.status_code != 200:
        raise Exception(
            "Cannot get stream (HTTP {}): {}".format(
                response.status_code, response.text
            )
        )
    i = 0
    for response_line in response.iter_lines():
        i += 1
        if i == max_results:
            break
        else:
            json_response = json.loads(response_line)
            #print(json.dumps(json_response, indent=4, sort_keys=True))
            try:
                save_tweets(json_response)
                if save_to_disk == True:
                    save_media_to_disk(json_response, save_path)
            except (json.JSONDecodeError, KeyError) as err:
                # In case the JSON fails to decode, we skip this tweet
                print(f"{i}/{max_results}: ERROR: encountered a problem with a line of data... \n")
                continue
                
def save_tweets(tweet):
    #print(json.dumps(tweet, indent=4, sort_keys=True))
    data = tweet['data']
    includes = tweet['includes']
    media = includes['media']
    for line in media:
        tweet_list.append([data['id'], line['url']])  
        
def save_media_to_disk(tweet, save_path):
    data = tweet['data']
    #print(json.dumps(data, indent=4, sort_keys=True))
    includes = tweet['includes']
    media = includes['media']
    for line in media:
        media_url = line['url']
        media_key = line['media_key']
        pic = urllib.request.urlopen(media_url)
        file_path = save_path + "/" + media_key + ".jpg"
        try:
            with open(file_path, 'wb') as localFile:
                localFile.write(pic.read())
            tweet_list.append(media_key, media_url)
        except Exception as e:
            print('exception when saving media url ' + media_url + ' to path: ' + file_path)
            if path.exists(file_path):
                print("path exists")
    
def createDir(save_path):
    try:
        os.makedirs(save_path)
    except OSError:
        print ("Creation of the directory %s failed" % save_path)
        if path.exists(savepath):
            print("file already exists")
    else:
        print ("Successfully created the directory %s " % save_path)
        
# save to disk
save_to_disk = True
 
if save_to_disk == True: 
    # detect the current working directory and print it
    base_path = os.getcwd()
    print ("The current working directory is %s" % base_path)
    img_dir = '/twitter/downloaded_media/'
    # the write path in which the data will be stored. If it does not yet exist, it will be created
    now = dt.now()
    dt_string = now.strftime("%d%m%Y-%H%M%S")# ddmmYY-HMS
    save_path = base_path + img_dir + dt_string
    createDir(save_path)
    
# the max number of tweets that will be returned
max_results = 10

# You can adjust the rules if needed
search_rules = [
    {"value": "dog has:images", "tag": "dog pictures", "lang": "en"},
]

media_fields = "&media.fields=duration_ms,height,media_key,preview_image_url,public_metrics,type,url,width"
expansions = "?expansions=attachments.media_keys"
tweet_list = []

bearer_token = ts.BEARER_TOKEN
headers = create_headers(bearer_token)
rules = get_rules(headers, bearer_token)
delete = delete_all_rules(headers, bearer_token, rules)
set = set_rules(headers, delete, bearer_token, search_rules)
get_stream(headers, set, bearer_token, expansions, media_fields, save_to_disk, save_path)

df = pd.DataFrame (tweet_list, columns = ['tweetid', 'preview_image_url'])
df

Summary

In this tutorial, you learned how to stream and process Twitter data in near real-time using the Twitter API v2 with two use cases. The first use case has demonstrated how to retrieve text from tweets and store it in a dataframe. In the second case, we have streamed images and saved them to a local directory. There are many more ways to interact with the Twitter API, but based on these two cases, it’s already possible to implement some exciting projects. Have fun with it and if you liked this post, leave a comment. Cheers

Sources: Some of the presented code stems from the Twitter API documentation and has been modified in the presentation o

Follow Florian Müller:

Data Scientist & Machine Learning Consultant

Hi, my name is Florian! I am a Zurich-based Data Scientist with a passion for Artificial Intelligence and Machine Learning. After completing my PhD in Business Informatics at the University of Bremen, I started working as a Machine Learning Consultant for the swiss consulting firm ipt. When I'm not working on use cases for our clients, I work on own analytics projects and report on them in this blog.

Leave a Reply

Your email address will not be published. Required fields are marked *