Calling C Posix Threads from Python Through Cython

Call C Parallel Function from Python

C language is fast. Let alone parallelism in C. It is a great speed boost especially for heavy operations.

Here we will see how we can use or C parallel program in Python using the help of Cython language. Of course, Python has its own libraries, like multiprocessing and threading, to apply concurrency and thread-based parallelism which will speed up python applications greatly. But in case you love C, as much as I do, and you have an existing C code that you want to use in Python quickly without having to write a wrapper for it. Or you usually like to write some C code in your Python applications. In this blog, we will see how to do so easily using Cython.

For more about how to import C code in Python, have a look at these two blogs; part 1 and part 2

In this blog, we will add to the weather application built in a previous blog and try to make the application work in Python. The final application will have C code, command line commands, Cython wrapper functions and will be called from Python. This blog and the previous ones mentioned demonstrate how to combine so many tools together in one application to achieve better performance.

For a full example on this weather application using parallelism only at the command line, you can refere to this blog.

We will also have a look at topics like memoization and Python decorators. Let’s get started!

C Header

We define the variables that we will use along with declaring the functions in our C program. The header is so important because it is the one Cython will use to read the data from. Here’s our “cthreads.h” file:

// length of strings
#define STRLEN 256
// construct first command with the city
#define CMD1 "curl -s 'http://api.openweathermap.org/data/2.5/weather?q=%s"
// append api key and extract only needed info and convert to csv
#define CMD2 "&appid=<your-api-key-here>' | jq -r '{name: .name, temperature: .main.temp, desc: .weather[].description} | [.name, .temperature, .desc] | @csv'"

// error handling
void error(char * msg);
// the function to be parallelized
void * request(void * inp);
// the function to be wrapped
char ** weather_work(char ** cities, int NUM_CITIES);

C Main Code

In our main application, we define the function that we will pass to the threads to sumbit the requests to the API. We also define the function that’ll create the threads and join the results from them. The result will be a C array that the Cython code will convert into a Python list.

N.B. Comments on the code are so important. 

The following is our “cthreads.c” file.

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <errno.h>
#include "cthreads.h"

// error handling
void error(char * msg) {
  printf("%s: %s\n", msg, strerror(errno));
  exit(1);
}

void * request(void * inp) {
  // the final command to be called
  char cmd[STRLEN];
  // void * into char *
  char * city = inp;
  // the result string from a thread
  char * res = (char * ) malloc(STRLEN * sizeof(char));
  // to keep track of number of outbound requests and memoization
  printf(" Thread %ld is getting weather data for %s\n", pthread_self(), city);
  // construct the command
  sprintf(cmd, CMD1, city);
  strcat(cmd, CMD2);
  // save the command output into a variable
  FILE * fp;
  fp = popen(cmd, "r");
  // error handling
  if (fp == NULL) {
    error("Failed to run command\n");
  }
  // read command output including spaces and commas
  // until new line
  fscanf(fp, "%[^\n]s", res);
  pclose(fp);
  // return the pointer to res as void *
  return (void * ) res;
}

// the function to be called from Cython
char ** weather_work(char ** cities, int NUM_CITIES) {
  /* the array containing the gathered responses from threads
  will be freed from with Cython */
  char ** response = (char ** ) malloc(NUM_CITIES * sizeof(char * ));
  int m = 0;
  // allocate memory for individual strings in the array
  for (; m < NUM_CITIES; ++m)
    response[m] = (char * ) malloc(STRLEN * sizeof(char));
  // variable to collect responses in  
  void * result;
  // i variable will be used to loop in threads creating and joining
  int i = NUM_CITIES;
  // create threads (backwards) to go in parallel hitting the api
  pthread_t threads[NUM_CITIES];
  while (i--> 0) {
    // create the threads and pass the request function
    // passing arg pointer as void*
    if (pthread_create( & threads[i], NULL, request, (void * ) cities[i]) == -1)
      error("Can't create thread\n");
  }
  // collecting results from threads and fill the response array
  while (++i < NUM_CITIES) {
    if (pthread_join(threads[i], & result) == -1)
      error("Can't join thread\n");
    // void * to char *
    strcpy(response[i], (char * ) result);
  }
  return response;
}

The comments along with the previos blogs explain the code thoroughly.

This is assumed to be our existing working C application. We now need to see how we are going to wrap it in Cython code.

Cython Wrapper

In Cython, we will wrap the “weather_work” function in a Cython function that’ll work as an interface to be called from Python.

First, we import the needed functions from the header file, and we also write a function that converts Python lists into C arrays. Here is the “pyxthreads.pyx

cimport numpy as np
import numpy as np
from functools import wraps
from libc.stdlib cimport malloc, free

# import the interface function declaration from the header file
cdef extern from "cthreads.h":
    char ** weather_work(char ** cities, int NUM_CITIES)
    
cdef extern from "Python.h":
    char* PyUnicode_AsUTF8(object unicode)

# converting python lists into C arrays to pass it to C code
cdef char ** to_c_string_list(str_list):
    cdef int l = len(str_list)
    cdef char **c_string = <char **>malloc(l * sizeof(char *))
    for i in range(l):
        c_string[i] = PyUnicode_AsUTF8(str_list[i])
    return c_string

# a function that splits the message into a tuple of values instead of a single string
cdef np.ndarray fill_array(np.ndarray arr, char ** carr, int l):
    cdef int i = 0
    for i in range(l):
        arr[i] = tuple(carr[i].decode().replace('"', "").split(","))
    
    # free the memory for the response array allocated in the C code
    for s in range(l):
        free(carr[s])
    free(carr)
    return arr

This is the first part of the file. Straightforward. Now, we define a Python decorator that applies memoization in our application.

So what is memoization and what are the Python decorators?

Python Decorators

Simply, a decorator is a callable Python object, a function for instance, that takes, wraps, another function, or any other object, and extends its behavior and adds more functionalities to it without having to change the wrapped function itself. In decorators definition, we pass the decorated function as argument. Inside the decorator body, we define a wrapper function that takes the arguments of the decorated functions as its arguments, adds the functionalities and returns the desired return from the whole process. Then the decorator returns the wrapper function as its return value.

## decorator syntax
import functools
def decorator(decorated_func):
    # wraps is a decorator that copies the attributes
    # of the decorated func to the wrapper func
    @functools.wraps(decorated_func)
    def wrapper_func(decorated_func_args):
        ## do the stuff here
        return modified_output
    return wrapper_func

## calling the decorated function
@decorator
decorated_func(decorated_func_args)

## this call is equal to 
decorator(decorated_func(decorated_func_args))

Memoization

Wikipedia definition: In computing, memoization or memoisation is an optimization technique used primarily to speed up computer programs by storing the results of expensive function calls and returning the cached result when the same inputs occur again.

So memoization is used to make the code remember the last queries so that it returns the answer without redoing the work again if the same call of a function is called. This saves time especially if the work is heavy which is valid in our case. Fo example, imagine we already requested the weather data of Cairo city. and now we want to do it again. So instead of hitting the API again for Cairo’s weather, we make the code record the data for Cairo and returns it immediately once it sees Cairo again.

Of course in weather application you may want to have the data more and more again multiple times during the day. But the example here to demonstrate the concept.

So we will define a decorator that modifies the weather_query function to make it remember last calls and apply memoization:

def memoize(weather_query):
    queried = {}
    @wraps(weather_query)
    def memorize(cities):
        response = []
        # if a single string, convert it to list
        # C code takes an array
        if isinstance(cities, str):
            cities = [cities]
        # check which cities are already queried before
        for c in cities:
            if c in queried:
                # add the already queried data to the output 
                response.append(queried.get(c))
                # remove the city from the cities list to be queried
                #cities.remove(c) # for some reason remove doesn't work properly
        # this filter is used because .remove not working well
        cities = [c for c in cities if c not in queried]

        if len(cities) > 0:
            # call the C function on the shortlisted cities list
            weather = weather_query(cities)
            # add the new responses to the queried data
            for i in range(len(weather)):
                queried[weather[i][0]] = weather[i]
                response.append(weather[i])
        # return the desired output
        return response
    # return the wrapper function
    return memorize

## decorate the weather_query function
@memoize
def weather_query(cities):
    # convert python list to C array
    cdef char ** ccities = to_c_string_list(cities)
    cdef int ln = len(cities)
    # get responses
    cdef char ** msg = weather_work(ccities, ln)
    # define an empty numpy array to be filled with data
    cdef np.ndarray[dtype = object, ndim = 1] arr = np.empty(ln, dtype = object)
    # free the C array memory    
    free(ccities)
    # return the clean version of the output
    # the msg array will be free in the fill_array function defined above
    return fill_array(arr, msg, ln)

Now, we have our Cython wrapper script ready, it is time to create the “setup.py” file and compile everything.

from distutils.core import setup
from distutils.extension import Extension
from Cython.Distutils import build_ext
import numpy

# first arg in Extension is the name of the produced module
# second arg, the list, is the cython file and the c file
# third is the numpy dir as it will be used inside the cython code
ext_modules = [Extension("weather_threads", ["pyxthreads.pyx", "cthreads.c"],
                         include_dirs = [numpy.get_include()])]

setup(
  cmdclass = {"build_ext": build_ext},
  ext_modules = ext_modules
)

Setup & Run

Let’s now produce the Python module to use it in a Python session:

python3 setup.py build_ext --inplace

This will produce “weather_threads” module. Let’s open a Python session and try to test everything

# import everything from the module
from weather_threads import *
# to prettily print output
from pprint import pprint
cities = ["Barcelona", "Cairo", "Dubai", "Berlin", "Amsterdam", "Kuala Lumpur"]
pprint(weather_query(cities))

# Thread 140359502006016 is getting weather data for Kuala Lumpur
# Thread 140359493551872 is getting weather data for Amsterdam
# Thread 140359485097728 is getting weather data for Berlin
# Thread 140359476643584 is getting weather data for Dubai
# Thread 140359262734080 is getting weather data for Cairo
# Thread 140359254279936 is getting weather data for Barcelona
#[('Barcelona', '287.69', 'few clouds'),
# ('Cairo', '291.41', 'few clouds'),
# ('Dubai', '294.3', 'dust'),
# ('Berlin', '276.67', 'clear sky'),
# ('Amsterdam', '278.14', 'few clouds'),
# ('Kuala Lumpur', '298.82', 'few clouds')]

The code is working fine. A different thread is created to get data for each city in parallel.

Let’s now test the memoization

pprint(weather_query(cities))

#[('Barcelona', '287.69', 'few clouds'),
# ('Cairo', '291.41', 'few clouds'),
# ('Dubai', '294.3', 'dust'),
# ('Berlin', '276.67', 'clear sky'),
# ('Amsterdam', '278.14', 'few clouds'),
# ('Kuala Lumpur', '298.82', 'few clouds')]

No threads were created and no API requests were made. The function quickly returned the stored response. Let’s test it again

# adding one new city
cities2 = ["Barcelona", "Cairo", "Rome"]
pprint(weather_query(cities2))

# Thread 140359502006016 is getting weather data for Rome
#[('Barcelona', '287.69', 'few clouds'),
# ('Cairo', '291.41', 'few clouds'),
# ('Rome', '278.98', 'clear sky')]

pprint(weather_query("Barcelona"))

#[('Barcelona', '287.69', 'few clouds')]

pprint(weather_query("Madrid"))

# Thread 140359502006016 is getting weather data for Madrid
#[('Madrid', '281.93', 'broken clouds')]

Great! Everything is working well. Now we have an application that is working in Python and calls C and Command Line code through Cython 😉 .

Share

Leave a Reply

Your email address will not be published. Required fields are marked *