Twitter Signal Bot in Python

Building a Twitter Bot for Crypto Trading Signals using Python

In this article, we will develop a Twitter bot in Python that will generate automated trading signals. Our bot will pull real-time price data on various cryptocurrencies (Bitcoin, Ethereum, Doge, etc.) from the crypto exchange Gate.io and analyze it using a predefined set of rules. Every time the bot detects a relevant change in prices, it automatically posts a tweet about it via Twitter.

Simple Twitter bots can proactively inform their audiences about relevant events in the market. Such an event can be a sharp rise or fall in price or a sudden spike in the trading volume. If we examine data for specific price movements, we can also store these events and use them later to train a predictive model. More advanced signal bots use predictive models to signal when it is appropriate to enter or exit the market. Or the bot executes the buy- and sell-orders directly itself. A well-defined signaling logic can therefore constitute the first step toward algorithmic trading. But one thing at a time. So in this article, we will begin by developing a simple signal bot.

twitter bot in Python
Bots can do a lot of cool things.

The rest of this article is structured as follows. First, we take a look at the different code modules of the Twitter bot. We will also quickly introduce the APIs used to build the bot. After that, we’ll implement the other code modules in Python. Finally, we will integrate the modules and run some tests.

Different Modules of the Signal Bot

The architecture of the signal bot to be developed can be divided into four different modules, each of which has a clear function. Let’s look at these modules next:

  1. The Data Collection Module retrieves price data from the crypto exchange Gate.io. The requests are being sent at a regular interval against the gate.io API. The module adds the data to separate data stores – one for each cryptocurrency. It then forwards the data to the preprocessing Module.
  2. The Data Preprocessing Module calculates the statistical indicators such as moving averages or means, which become the basis for the signaling logic.
  3. Based on the indicator values provided, the Signaling Module searches for relevant events. If a relevant event is detected, it is reported to the communication module.
  4. The Communication Module connects to the Twitter API. As soon as it is informed about a new event, it tweets about this event on Twitter.
Components of the Relataly Crypto Signal Bot
Components of the Relataly Crypto Signal Bot

About the APIs, used in this Tutorial

In this tutorial, we will be using two APIs:

  • The Gate.io API to fetch price data.
  • The Twitter to post Tweets about Trading Signals

The Gate.io API

Firstly, we will be using the Gate.io API to obtain prices for various cryptocurrencies. Gate.io is one of the smaller crypto exchanges but offers a wide range of cryptocurrencies, especially those you cannot trade anywhere else. As of now, the gate.io market endpoint does not require authentication. To learn more about pulling data via the gate.io API in Python, check out our recent tutorial.

The Twitter API

The second API that our bot will use is the Twitter API. We will use this API via the Python package Tweepy to post crypto price signals. If you are looking for a simple code example of submitting tweets via the Twitter API, check out this article. In case you don’t want to use Twitter, you can disable its use in the code.

Posting tweets via the API requires authentication with a valid developer account. You can apply for a developer account for free by following this link. Just be aware that the confirmation can sometimes take several days.

Application to a developer account for the Twitter API can take some time

Implementing a Twitter Signal Bot using Python

Let’s begin with the implementation. If you want to use the Twitter functionality, you will require a Twitter developer account. Without an account, you can still print out trading signals, but you will not be able to post them via the Twitter API.

As always, you can clone the code from the relataly Github repository.

Python Prerequisites

Before we start the coding part, make sure that you have set up your Python 3 environment and required packages. If you don’t have an environment set up yet, you can follow this tutorial to set up the Anaconda environment.

Also, make sure you install all required packages. In this tutorial, we will be working with the following standard packages: 

In addition, we will use the following two packages:

You can install packages using console commands:

  • pip install <package name>
  • conda install <package name> (if you are using the anaconda packet manager)

Step #1: Regular Retrieval of Price Data

First, we will define a “prices” class to handle the incoming flow of data. The prices class contains a “get_latest_prices” that retrieve price information from gate.io. The function regularly calls the gate.io list_ticker market endpoint.

The list_ticker endpoint returns a list of data fields for cryptocurrency pairs. Examples of price pairs are BTC_USD, BTC_ETH, BTC_ADA, and so on. We can limit the response to a single price pair by passing a single pair as a variable in the API call. However, it is not possible to restrict the response to multiple pairs. This means that we either get data for a single pair or all pairs. The response contains a list with the following data fields:

Response returned by the Gate.io API list_tickers operation
Overview of the data fields in the response

The following code maintains a separate dictionary for each cryptocurrency pair. The dictionary contains the name of the cryptocurrency pair and a data frame that includes the price data history. Each time the crypto bot receives a new response from the API, it goes through the response, extracts the price data(Price, Volume, etc.), and appends this data to the Data Frame of the respective cryptocurrency pair. Then the information is passed to the preprocessing module.

import pandas as pd
import numpy as np
import json
import requests
import datetime as dt
import logging
import threading
import time
from __future__ import print_function

import tweepy 
import gate_api
from gate_api.exceptions import ApiException, GateApiException
from twitter_secrets import twitter_secrets as ts # place the twitter_secrets file under <User>/anaconda3/Lib

class Prices:
    """Class that uses the gate api to retrieve currency data."""

    def __init__(self, config):
        self._config = config
        self._logger = logging.getLogger(__name__)
        configuration = gate_api.Configuration(host="https://api.gateio.ws/api/v4")
        api_client = gate_api.ApiClient(configuration)
        self._api_instance = gate_api.SpotApi(api_client)
        self._price_history = {}
        self._cont_update_thread = None
        self._stop_cont_update_thread = None
        self._price_history_lock = threading.Lock()

    def get_price_history(self):
        """Returns a dictionary with the price histories for the currencies."""
        return self._price_history, self._price_history_lock

    def get_latest_prices(self):
        """Gets new price data and adds the values to a DataFrame.

        Returns the DataFrame in a dictionary with the currencies as keys."""
        timestamp = dt.datetime.now()
        try:
            api_response = self._api_instance.list_tickers()
        except GateApiException as e:
            logging.warning(
                "Gate api exception, label: %s, message: %s\n" % (e.label, e.message)
            )
            return {}
        except ApiException as e:
            logging.warning("Exception when calling SpotApi->list_tickers: %s\n" % e)
            return {}
        latest_prices = {}
        for response in api_response:
            currency = response.currency_pair
            if "USDT" not in currency or "BEAR" in currency:
                continue
            value_dict = {
                "base_volume": pd.to_numeric(response.base_volume),
                "change_percentage": pd.to_numeric(response.change_percentage),
                "etf_leverage": pd.to_numeric(response.etf_leverage),
                "etf_net_value": pd.to_numeric(response.etf_net_value),
                "etf_pre_net_value": pd.to_numeric(response.etf_pre_net_value),
                "etf_pre_timestamp": response.etf_pre_timestamp,
                "high_24h": pd.to_numeric(response.high_24h),
                "highest_bid": pd.to_numeric(response.highest_bid),
                "high_bid": pd.to_numeric(response.highest_bid),
                "last": pd.to_numeric(response.last),
                "low_24h": pd.to_numeric(response.low_24h),
                "lowest_ask": pd.to_numeric(response.lowest_ask),
                "quote_volume": pd.to_numeric(response.quote_volume),
                "timestamp": timestamp,
            }
            latest_prices[currency] = pd.DataFrame(value_dict, index=[1])
        return latest_prices

    def start_cont_update(self):
        self._stop_cont_update_thread = threading.Event()
        self._stop_cont_update_thread.clear()
        self._cont_update_thread = threading.Thread(
            target=self._cont_update,
            args=(
                self._stop_cont_update_thread,
                self._price_history_lock,
            ),
        )
        self._cont_update_thread.start()
        self._logger.info("Started continuous price logging")

    def _cont_update(self, stop_event, lock):
        """Continuously adds new prices to the price history."""
        while not stop_event.is_set():
            start_time = time.time()
            lock.acquire()
            for currency, df in self.get_latest_prices().items():
                if currency in self._price_history.keys():
                    self._price_history[currency] = self._price_history[
                        currency
                    ].append(df, ignore_index=True)
                else:
                    self._price_history[currency] = df
            lock.release()
            self._logger.debug("Currency_dfs updated")
            self._wait_before_update(start_time)

    def _wait_before_update(self, start_time):
        elapsed_time = time.time() - start_time
        self._logger.debug(f"Elapsed time: {elapsed_time}")
        if elapsed_time > self._config["price_update_delay"]:
            delay = 0
            self._logger.warning(
                #"It took longer to retrieve the price data than the update_delay!"
            )
        else:
            delay = self._config["price_update_delay"] - elapsed_time
        self._logger.debug(f"Waiting {delay}s until next update")
        time.sleep(delay)

Step #2: Calculate Indicator Values

Next, we will define a few functions that process the regular data inflow from gate.io and calculate indicator values for the different cryptocurrencies.

Absolute price values signal the bot the price is moving up or down. However, our signaling logic will primarily work with thresholds on percentage values. In the code below, these are the indicators with a p at the end of the name.

In addition, we will guard against misleading signals by also incorporating moving averages into the signaling logic. For this, we need to pass the price history when we call the “calc_indicators” function. We take over other indicators from the data frame, which give us information about the indicators of the preceding price points. Other indicators are the 24h_low and the 24h_high.

All indicators are calculated separately for each crypto pair, passed to a dictionary, and then passed to the signaling logic. We can then use these indicator values in our signaling logic in the next step.

def calc_indicators(price_history):
    indicators = {}
    indicators_over_all = calc_indicators_over_all(price_history)
    for currency, df in price_history.items():
        if len(df) <= 2:
            logging.getLogger().debug(
                f"Skipped '{currency} when calculating indicators due to a lack of information"
            )
            continue
        volume = df["base_volume"].iloc[-1]
        last_price = df["last"].iloc[-1]
        moving_avg_price = df["last"].mean()
        moving_average_volume = df["base_volume"].mean()
        moving_average_deviation_percent = np.round(
            div(last_price, moving_avg_price) - 1, 2
        )

        price_before = df["last"].iloc[-2]
        price_delta = last_price - price_before
        price_delta_p = div(price_delta, last_price)
        price_delta_before = price_before - df["last"].iloc[-3]
        price_delta_p_before = div((price_before - df["last"].iloc[-3]), price_before)
        low_24h = df["low_24h"].iloc[-1]
        high_24h = df["high_24h"].iloc[-1]
        low_high_diff_p = div(high_24h - low_24h, low_24h)
        change_percentage = df["change_percentage"].iloc[-1]

        indicator_values = {
            "last_price": last_price,
            "price_before": price_before,
            "volume": volume,
            "moving_avg_price": moving_avg_price,
            "moving_average_volume": moving_average_volume,
            "moving_average_deviation_percent": moving_average_deviation_percent,
            "price_delta_p": price_delta_p,
            "price_delta": price_delta,
            "price_delta_before": price_delta_before,
            "price_delta_p_before": price_delta_p_before,
            "high_24h": high_24h,
            "low_24h": low_24h,
            "low_high_diff_p": low_high_diff_p,
            "change_percentage": change_percentage,
        }
        indicator_values.update(indicators_over_all)
        indicators[currency] = indicator_values
    return indicators


def calc_indicators_over_all(price_history):
    avg_change_p = 0
    for currency, df in price_history.items():
        avg_change_p += df["change_percentage"].iloc[-1]
    nr_of_currencies = len(price_history)
    avg_change_p = div(avg_change_p, nr_of_currencies)
    values = {
        "avg_change_p": avg_change_p,
    }
    return values


def div(dividend, divisor, alt_value=0.0):
    return dividend / divisor if divisor != 0 else alt_value

Step #3: Define the Signaling Logic of the Twitter Bot

Our bot will use a signaling logic that differentiates between the following price signals:

  • A simple uptick: Price_delta_p must be higher than the threshold (10%) to trigger.
  • A simple downtick: Price_delta_p must be lower than the threshold (10%) to trigger.
  • The bot does also report on new 24 hour lows and highs
  • Another event on which the bot reports, is when an up or down price trend begins to accelerate or slows down.
  • The bot reports, when a price performs a trend reversal (pullback and recovery)

Overview of the different trading signals generated by the signaling logic
Overview of the different trading signals generated by the signaling logic

Be aware that the price_delta_p measures the percentage deviation from the previous price point. Thus, the signaling logic that our bot has in place is very dependent on the interval in which the bots request new price data. Shorter time intervals will have a lower chance of triggering because more considerable changes typically occur over a longer time. For more details regarding the signaling logic, please view the code below.

def check_signal(currency, indicators, cs_config):
    ind = indicators[currency]
    signal = ''
    if (ind['moving_avg_price'] > 0
            and ind['last_price'] > 0.0
            and abs(ind['price_delta']) > 0.0
            and abs(ind['price_delta_p']) >= cs_config["delta_threshold_p"]
            and ind['volume'] > 0
    ):
        # up
        if ind['price_delta'] > 0:
            movement_type = 'up +'
            if abs(ind['price_delta_p_before']) > cs_config["delta_threshold_p"]:
                if ind['price_delta_before'] <= 0:
                    movement_type = 'recovery from ' + str(ind['price_before']) + ' to ' + str(ind['last_price'])
                else:
                    if ind['price_delta_p'] * (1-cs_config["delta_threshold_p"]) > ind['price_delta_p_before']:
                        movement_type = 'upward trend accelerates +'
                    elif ind['price_delta_p'] < ind['price_delta_p_before'] * (1-cs_config["delta_threshold_p"]):
                        movement_type = 'upward trend slows down +'
                    elif ind['price_delta_p'] * (1+cs_config["delta_threshold_p"]) >= ind['price_delta_p_before'] >= ind['price_delta_p'] * (1-cs_config["delta_threshold_p"]):
                        movement_type = 'upward trend continues +'
        # down
        elif ind['price_delta'] < 0:
            movement_type = 'down '
            if abs(ind['price_delta_p_before']) > cs_config["delta_threshold_p"]:
                if ind['price_delta_before'] > 0:
                    movement_type = 'pullback from ' + str(ind['price_before']) + ' to ' + str(ind['last_price'])
                else:
                    if ind['price_delta_p'] * (1-cs_config["delta_threshold_p"]) > ind['price_delta_p_before']:
                        movement_type = 'down trend accelerates '
                    elif ind['price_delta_p'] * (1+cs_config["delta_threshold_p"]) >= ind['price_delta_p_before'] >= ind['price_delta_p'] * (1-cs_config["delta_threshold_p"]):
                        movement_type = 'down trend continues '
                    elif ind['price_delta_p'] < ind['price_delta_p_before'] * (1+cs_config["delta_threshold_p"]):
                        movement_type = 'downward trend slows down '

        signal = get_signal_log(movement_type, currency, ind['price_delta_p'], ind['last_price'],
                                ind['moving_avg_price'], ind['volume'], ind['price_delta'], ind['change_percentage'],
                                ind['high_24h'], ind['low_24h'], ind['low_high_diff_p'])

        check_24h_peak(currency, ind['last_price'], ind['low_24h'], ind['high_24h'])

    return signal
    # trade_signal


def check_24h_peak(currency, last_price, low_24h, high_24h):
    if last_price < low_24h:
        print(currency + ' new 24h low $' + str(last_price))
    elif last_price > high_24h:
        print(currency + ' new 24h high $' + str(last_price))


def get_signal_log(movement_type, currency, price_delta_p, last_price, moving_avg_price, volume, price_delta,
                   daily_up_p, high_24h, low_24h, low_high_diff_p):
    signal = f'{currency} {movement_type} ' \
             f'{np.round(price_delta_p * 100, 5)}% ' \
             f'MA:${np.round(moving_avg_price, 6)} ' \
             f'last_price:${np.round(last_price, 6)} ' \
             f'price delta:{np.round(price_delta, 6)} ' \
             f'volume:${np.round(volume, 1)} ' \
             f'daily_change:{np.round(daily_up_p, 2)}% ' \
             f'high_24h:${high_24h} ' \
             f'low_24h:${low_24h} ' \
             f'low_high_diff_p:{np.round(low_high_diff_p * 100, 2)}%'
    return signal

Step #4: Send Tweets via Twitter

Next, we define a simple function that calls the Twitter API and tweets our price signal. Because the Twitter API requires authentication, you will need to provide the API authentication credentials from a valid Twitter developer account.

It’s a best practice not to store the API credentials directly in code. Still not perfect, but slightly better is to keep the data in a separate python file (for example, called “twitter_secrets”) that you put into your package folder (for example, under /anaconda3/Lib), from where you can import it directly into your code.

twitter_secrets.py

# Twitter Consumer API keys
CONSUMER_KEY    = "api123"
CONSUMER_SECRET = "api123"

# Twitter Access token & access token secret
ACCESS_TOKEN    = "api123"
ACCESS_SECRET   = "api123"

BEARER_TOKEN = "api123"

class TwitterSecrets:
    """Class that holds Twitter Secrets"""

    def __init__(self):
        self.CONSUMER_KEY    = CONSUMER_KEY
        self.CONSUMER_SECRET = CONSUMER_SECRET
        self.ACCESS_TOKEN    = ACCESS_TOKEN
        self.ACCESS_SECRET   = ACCESS_SECRET
        self.BEARER_TOKEN   = BEARER_TOKEN
        
        # Tests if keys are present
        for key, secret in self.__dict__.items():
            assert secret != "", f"Please provide a valid secret for: {key}"

twitter_secrets = TwitterSecrets()

Once you have imported the file, you can then load the API credentials from the file in the following way:

consumer_key = ts.CONSUMER_KEY
consumer_secret = ts.CONSUMER_SECRET
access_token = ts.ACCESS_TOKEN
access_secret = ts.ACCESS_SECRET

### Print API Auth Data (leave disabled for security reasons)
# print(f'consumer_key: {consumer_key}')
# print(f'consumer_secret: {consumer_secret}')
# print(f'access_token: {access_token}')
# print(f'access_secret: {access_token}')

#authenticating to access the twitter API
auth=tweepy.OAuthHandler(consumer_key,consumer_secret)
auth.set_access_token(access_token,access_secret)
api=tweepy.API(auth)

def send_pricechange_tweet(signal):
    api.update_status(f"{signal} \n {relataly_url}")

Step #5 Starting the Crypto Signal Bot

Finally, we can hit the start button of our crypto signal bot. But before we do this, take a look at some configuration options of the bot.

  • CYCLE_DELAY is the standard interval in seconds in which the bot will call the gate.io API.
  • CURRENCY_PAIR is another API parameter, which can be used to limit the cryptocurrency pairs to a specific pair. In the standard setting, the bot will scan the whole market including all USDT pairs.
  • TWITTER_ACTIVE defines, whether signals are posted on Twitter. Be aware that if you enable it, your bot may instantly report any signal on your Twitter account.
  • RUNS defines the max number of prices that will be retrieved before the bot stops.

Now, let’s test the bot:

RUNS = 50 # the bot will stop after 50 price points
CYCLE_DELAY = 20 # the interval for checking the data and retrieving another price point
EVAL_PRICES_DELAY = 10
CURRENCY_PAIR = "" # the bot will retrieve data for all currency pairs listed on gate.io
PRICES_CONFIG = {"price_update_delay": 20}
TWITTER_ACTIVE = False

CHECK_SIGNAL_CONFIG = {
    "moving_avg_threshold_down_p": 0.10,
    "moving_avg_threshold_up_p": 0.10,
    "delta_threshold_p": 0.07,
    'enable_twitter': TWITTER_ACTIVE,
}

if __name__ == "__main__":
    logging.basicConfig(
        level=logging.INFO, format="\033[02m%(asctime)s %(levelname)s: %(message)s"
    )
    logger = logging.getLogger(__name__)
    prices = Prices(PRICES_CONFIG)
    prices.start_cont_update()
    currency_dfs = {}
    logging.info(f"Crypto bot is starting - please wait")
    logger.info(f"Collecting crypto data from gate.io for {EVAL_PRICES_DELAY}s")
    time.sleep(EVAL_PRICES_DELAY)
    logger.info(f"\n<< Crypto signal bot started :-) >>")
    logger.info(f"<< Checking prices every {CYCLE_DELAY} seconds >>")
    logger.info(f"Now checking for signals - please wait\n")
    for i in range(RUNS):
        price_history, lock = prices.get_price_history()
        lock.acquire()
        indicators = calc_indicators(price_history)
        lock.release()
        for currency in indicators.keys():
            if not indicators[currency]:
                continue
            signal = check_signal(
                currency,
                indicators,
                CHECK_SIGNAL_CONFIG,
            )
            if signal:
                logger.info(signal)
                if CHECK_SIGNAL_CONFIG['enable_twitter']:
                    send_pricechange_tweet(signal)
                    print('send via twitter')
        time.sleep(CYCLE_DELAY)

output of our twitter bot

Summary

Congratulations, you have reached the end of this tutorial. In this article, you have learned to develop a Twitter crypto signal bot in Python. When you run the Twitter bot, it will regularly fetch cryptocurrencies quotes from the Gate.io exchange. Every time the signaling logic triggers, the bot will publish a tweet about the price movement on Twitter. And this is what the tweets will look like on Twitter:

In this article, we have deliberately kept the signaling logic of our Twitter bot simple and primarily created the basic framework of the bot. Based on this foundation, you can now further develop and improve the logic. One possibility, for example, would be to define signals based on changes in volume or price volatility. Have fun trying it out!

If you found this article helpful, please show your appreciation by leaving a comment or like. Cheers

Authors

  • Hi, I am Florian, a Zurich-based consultant for AI and Data. Since the completion of my Ph.D. in 2017, I have been working on the design and implementation of ML use cases in the Swiss financial sector. I started this blog in 2020 with the goal in mind to share my experiences and create a place where you can find key concepts of machine learning and materials that will allow you to kick-start your own Python projects.

  • Hi, I am a student at the Technical University of Munich and currently pursuing a Masters degree in Electrical Engineering and Information Technology. I am very passionate about Machine Learning, Software Development, and Signal Processing.

Leave a Reply