Task Scheduler in C using Struct Pie

Using Struct Pie C Shared Libraries to Develop a Priority Queue Scheduler

Suppose you have a list of scripts or tasks that you want your computer to do in a certain order based on the priority of each task. You have a list so big that you don’t have the time to sort. Here comes the importance of the priority queue data structure.

A priority queue is a data structure where elements are served based on their priorities, not on their insertion order like normal queues. The most relative example to a priority queue, is hospitals’ emergency where the doctors check the patients according to their medical status not to when they came to the hospital.

Back to our example,  the task scheduler, we will use the priority queue from Struct Pie open source data structures shared libraries in our C program.

What is Struct Pie?

Struct Pie is a free open source project under MIT license. The project is both a python library and a set of C shared libraries.

For a quick start on python library, you can have a look at https://minimatech.org/struct-pie-quick-start/

The C shared libraries project is available on sourceforge.net and contains the data structures as C shared libraries that can readily be used in C projects. Download the libraries from here.

The Tasks

As we mentioned, we are aiming at creating a program that takes a list of tasks along with their priorities and executes them based on the priorities. Suppose we have 3 scripts in 3 different languages that we want to run on a certain file. In our case, the task is doing some calculations on a list of numbers from 1 to 10 saved in a file called “numbers.txt”.

The 3 scripts that we have are:

1. python script: “pysum.py”

import sys

file = sys.argv[1]

with open(file, "r") as f:
    numbers = f.read().splitlines()

numbers = [int(i) for i in numbers]
print("\n\tSum of number list in python is: %d" % sum(numbers))

2. R script: “rmean.R”

args <- commandArgs(trailingOnly = TRUE)
file <- args[1]
numbers <- as.numeric(read.table(file, header = FALSE, stringsAsFactors = FALSE)[, 1])
print(paste0("\n\t", "Average of number list in R is: ", mean(numbers), "\n"))

3. C script: “cprint.c”

#include <stdio.h>
#include <stdlib.h>

void print_array(int * array, size_t len) {
  puts("\n\tPrinting list in C");
  printf("\n\tLength of list is: %i\n", len);
  puts("\tList elements are: ");
  int i;
  for (i = 0; i < len; i++)
    printf("\t%i", array[i]);
  printf("\n");
}

void main(int argc, char * argv[]) {
  FILE * input = fopen(argv[1], "r");
  int number, val;
  int iter = 0;
  int b = 10; // initial capacity
  int * number_array = (int * ) malloc(b * sizeof(int));

  while (fscanf(input, "%i", & number) == 1) {

    if (iter >= b) {
      b = iter + 1; // new capacity to reallocate
      number_array = (int * ) realloc(number_array, sizeof(int) * b);
    }
    number_array[iter] = number;
    iter++;
  }
  fclose(input);

  print_array(number_array, b);
}

Each script does a different simple task. Now let’s have a look at the list which has them and their priorities: “tasks.txt”

cprint,2
pysum.py,4
rmean.R,1

N.B. we are having the compiled executable from the cprint.c script not the script itself.

gcc cprint.c -o cprint

Struct Pie Priority Queue Library

Now, as we have an idea about the task let’s get into developing the solution. We will first need to download Struct Pie source code from the SourceForge link shared above. Then we build the shared library for the “priority_queue” with the help of the “Makefile” in the directory and make it available for our program to include.

mkdir -p ./StructPie/priority_queue/libs)
gcc -fPIC -c ./StructPie/priority_queue/pq.c -o ./StructPie/priority_queue/pq.o
gcc -shared ./StructPie/priority_queue/pq.o -o ./StructPie/priority_queue/libs/libpq.so
        
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:./StructPie/priority_queue/libs

Last link and the .so extension is specific for Linux.

The Scheduler

Here we will not only look on how to simply use the priority_queue library. But also we will focus on some great C features, like piping and interprocess communication along with redirecting file descriptor and function pointers.

Let’s first include the needed libraries. Include all the following code in “scheduler.c” file.

N.B. the comments in the code are so important as they have deeper explanation.

#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <unistd.h>
#include <string.h>
#include "./StructPie/priority_queue/pq.h"

Function Pointer

Sometimes you like to run a function from inside another function. This makes the wrapper function more dynamic as it can accept other functions as arguments and executes them within it.  Function pointers allow this in C.

In our case, we have scripts in different languages. That means different commands to be executed. So to dynamically execute them, we need some helper functions that assign the suitable values based on the task at hand.

// all definitions before main function could be wrapped into a header file if needed

/* helper functions to get the commands 
in shape based on their type */
void is_python(char * script, char * lang) {
  if (strstr(script, ".py")) {
    strcpy(lang, "python3");
  }
}

void is_r(char * script, char * lang) {
  if (strstr(script, ".R")) {
    strcpy(lang, "Rscript");
  }
}

void is_exe(char * script, char * lang) {
  int i, count = 0;
  /* count how many dots
   * the one in './' and whether it has the dot of extension */
  for (i = 0; script[i] != '\0'; i++) {
    if (script[i] == '.')
      count++;
  }
  if (count == 1)
    strcpy(lang, "exe");
}

I believe by now, you got the intention. Instead of having a function that checks the input and accordingly changes “lang” value, We have smaller functions, one for each language, and then we will wrap all of them inside a function that will call them. The advantage of this approach is that we will not need change the wrapper function whenever we have a new task in a new language. Instead, we will only define a small function for it and pass the function to the wrapper function.

Now let’s look at how the wrapper function that takes a function pointer will look like

// define a function that takes a function as its input (function pointer)
void assign_language(void( * assign)(char * , char * ), char * process, char * language) { 
  assign(process, language);
}

Return type (* pointer variable_name)(Args types)” is how you create a pointer to a function.

As you can see, the function only calls the smaller ones and accordingly updates the “language” variable for the command later.

Main Function

As all C programs, we will define a main function and we will explain the code inside it part by part

int main() {
  // linked queue from pq library
  struct linked_queue * front = NULL, * rear = NULL;
  int queue_size = 0, priority;
  // pipe to pipe parent and child processes
  int pipefds[2];
  // process id
  pid_t pid;
  // temp process string
  char proc[STRLEN];
  /* output file for the child to output result to
  we are defining it here so that the child appends to it not to overwrite */
  FILE * out;
  out = fopen("output.txt", "w");

  // read the tasks from the stdin which will be redirected upon starting
  while (scanf("%[^,],%d\n", proc, & priority) == 2) {
    // pushing tasks and priorities to the queue
    enqueue( & front, & rear, proc, priority);
    queue_size++;
  }

  puts("The queue now looks like:\n");
  display(front, rear);
  puts("\nNow executing the tasks based on priorities\n");
  // start piping
  if (pipe(pipefds) == -1) {
    printf("Unable to create pipe\n");
    exit(0);
  }

N.B. the last “}” will be in the last block of code, as we will take the code step by step.

Again! The comments are very important. As you see, the code is now so explainable so no need to repeat. But worth noting how we open a pipe between the processes using the pipe() function and the int pipefds[2].

Inter-Process Communication

We are now the most critical part. We will have two processes doing the job. A parent process which will dequeue the queue and forms the variables for the command to be executed and pass it through the pipe. The other process is the child process which reads the message from the pipe and executes the commands outputing the results in the output.txt file.

Let’s look at how we can accomplish that. We first fork() the running process

  pid = fork();

Now we will have two processes with two pids. Parent with pid > 0 and a child with pid == 0. So usually you will see the code structured in that way

int pid = fork();
if(pid == -1){
  puts("forking failed");
  exit(0);
} else if(pid == 0){
  // child's stuff to do
} else { // for pid > 0 i.e. parent
  // parent's stuff to do
}

Upon forking, all that we defined and declared above is shared between the two processes while inside each if (pid == x) block, variables and code will be execlusive to the process. That’s why we are using pipe to send variables over between the parent and child process.

Let’s see what the child is supposed to do

// looping until queue is empty
while (queue_size > 0) {
  if (pid == -1) { // error handling
    printf("\n\t Can't fork process: %s\n", strerror(errno));
    return 1;
  }

  // pid == 0 // child process
  else if (pid == 0) {
    // close the writing end of the pipe as it is not needed here
    close(pipefds[1]);
    // define child related variables and the input file on which the calcs will be done
    char message[STRLEN], message2[STRLEN], cmd[STRLEN * 3], input[STRLEN] = "./numbers.txt";
    /* redirect the standard output to a file so that 
    the output from parent and child don't overlap */
    dup2(fileno(out), STDOUT_FILENO);
    // read messages from parent
    // it is better to while loop until EOF if reading multiple messages
    read(pipefds[0], message, sizeof(message));
    read(pipefds[0], message2, sizeof(message2));
    printf("Process %s to be processed using %s ...\n", message, message2);
    // construct the command to be executed based on the type of the script
    if (strstr(message2, "exe")) {
      sprintf(cmd, "%s %s", message, input);
    } else {
      sprintf(cmd, "%s %s %s", message2, message, input);
    }
    system(cmd);
    puts("");

The child will read from the read end of the pipe the task name and the language, forms the command string and calls it. The output of the child process is redirected into “output.txt” file using dup2(fileno(out), STDOUT_FILENO).

N.B. It is very important to always handle errors!

Now the parent process

    } else { // pid > 0 // parent process
      // close the reading end of the pipe as it will not be needed
      close(pipefds[0]);
      // do the work in the parent
      char process[STRLEN], language[STRLEN];
      strcpy(proc, dequeue( & front, & rear)); // gets name of process
      strcpy(process, "./");
      strcat(process, proc); // concats everything in arg1
      // function pointer to finalize how the script should look like
      assign_language(is_python, process, language);
      assign_language(is_r, process, language);
      assign_language(is_exe, process, language);
      // send the messages to the child
      write(pipefds[1], process, sizeof(process));
      write(pipefds[1], language, sizeof(language));
      printf("Process %s to be processed using %s ...\n\n", process, language);
      // N.B. no need to use waitpid() as the child process doesn't exit
    }
    queue_size--;
  }
  return 0;
}

The parent process dequeues the queue and shapes the process name and the language needed, then writes it into the pipe for the child to receive. Note that the output of the parent is the same open terminal.

Now it is time to compile and test

Compile and Run

We need to compile the scheduler program with the pq library.

gcc scheduler.c -L./StructPie/priority_queue/libs -lpq -o scheduler

Now we can run the program with the tasks file

./scheduler < tasks.txt

The queue now looks like:

(
 {pysum.py, 4}
 {cprint, 2}
 {rmean.R, 1}
)

Now executing the tasks based on priorities

(pysum.py) with priority (4) has been removed
Process ./pysum.py to be processed using python3 ...

(cprint) with priority (2) has been removed
Process ./cprint to be processed using exe ...

(rmean.R) with priority (1) has been removed
Process ./rmean.R to be processed using Rscript ...

Remember, this is the parent’s output only. To check that the child made its work, we look at “output.txt” file:

cat output.txt

Process ./pysum.py to be processed using python3 ...

        Sum of number list in python is: 55

Process ./cprint to be processed using exe ...

        Printing list in C

        Length of list is: 10
        List elements are:
        1       2       3       4       5       6       7       8       9       10

Process ./rmean.R to be processed using Rscript ...
[1] "\n\tAverage of number list in R is: 5.5\n"

Perfect! Our scheduler is working like a charm.

Share

Leave a Reply

Your email address will not be published. Required fields are marked *