Friday, April 02, 2010

Multithreaded application to process files in a directory

We may encounter scenarios when we need to do a defined set of operation on all the files in a directory. This very simple program is a typical example of how the problem can be approached with a mutex and a shared resource, by constantly creating new threads for new files instead of waiting for all the worker threads to complete.

#include 
#include 
#include 
#include 
#include 
#include 
#include 

#define NAME_MAX                (256)
/* #define MAX_THREADS          (99) */
#define SLEEP_DUR               (1)

struct thread_data {
    char file_name[NAME_MAX+1];
};

static int num_threads;

static pthread_cond_t cond;
static pthread_mutex_t count_mutex;

int select_filter( struct dirent *entry )
{
    /* Filesystem differences with EXT4 and JFS2 - Unable to use d_type member
     * to compare DT_REG to check for the filetype */
    if (!strcmp(entry->d_name, ".") || !strcmp(entry->d_name, ".."))
        return 0;
    else
        return 1;
}

void *process_file( void *arg )
{
    struct thread_data *data_t = (struct thread_data *)arg;
    unsigned int i, j;
    pthread_t p = pthread_self();
    /* Do some processing with data_t */
    fprintf( stdout, "%ld: Processing file %s\n", (unsigned long int)p, data_t->file_name );
    unlink( data_t->file_name );
    /* When done processing, free the resources */
    free( data_t );

    for (i=0; i< 9999; i++)
        for (j=0; j< 9999; j++);

    /* fprintf( stdout, "%p: Locking Mutex\n", (void *)p); */
    pthread_mutex_lock( &count_mutex );
    /* Critical section */
    num_threads --;
    /* fprintf( stdout, "%p: Signaling\n", (void *)p); */
    pthread_cond_signal( &cond );
    /* fprintf( stdout, "%p: Unlocking Mutex\n", (void *)p); */
    fprintf( stdout, "%d: Exiting. num_threads: %d\n", p, num_threads);
    pthread_mutex_unlock( &count_mutex );
    pthread_exit( NULL );
    return NULL;
}

int main(int argc, const char *argv[])
{
    struct dirent **file_list = NULL;
    short int no_of_files = 0;
    char *dir_name = "/u/home/raddanki/Arun/Threads/data";
    short int idx = 0;
    struct thread_data *data_t = NULL;
    int MAX_THREADS = 0;

    pthread_t tid = -1;
    pthread_attr_t attr;

    /* Get the arguments */
    if (argc < 2) {
        printf( "Usage: %s \n", argv[0] );
        exit(0);
    }

    MAX_THREADS = atoi( argv[1] );
    if (MAX_THREADS < 1 && MAX_THREADS > 99) {
        printf( "Max threads should be between 1 and 99. Provided: %d\n", MAX_THREADS );
        exit(0);
    }

    /* Initialize thread attributes */
    pthread_cond_init( &cond, NULL );
    pthread_mutex_init( &count_mutex, NULL );
    pthread_attr_init( &attr );
    pthread_attr_setdetachstate( &attr, PTHREAD_CREATE_DETACHED );

    while (1) {

        while (no_of_files == 0) {
            /* Open the dir for reading */
            pthread_mutex_lock( &count_mutex );
            if (num_threads == 0) {
                if (file_list)
                    free( file_list );
                if ((no_of_files = scandir( dir_name, &file_list, select_filter, alphasort )) == -1) {
                    fprintf( stdout, "Failed to open dir: %s\n", dir_name );
                    /* need to wait for threads to complete and then exit */
                    exit (1);
                }
            }
            pthread_mutex_unlock( &count_mutex );
            idx = no_of_files - 1;
            if (!no_of_files) {
#ifdef DEBUG
                if (num_threads == 0) {
                    free( file_list );
                    pthread_attr_destroy( &attr );
                    pthread_mutex_destroy( &count_mutex );
                    pthread_cond_destroy( &cond );
                    pthread_exit(0);
                }
#endif
                fprintf( stdout, "Going to sleep now\n" );
                sleep(SLEEP_DUR);
            }
        }

        /* fprintf( stdout, "MAIN: Locking Mutex\n" ); */
        /* while (1) { */
        pthread_mutex_lock( &count_mutex );
        if (num_threads == MAX_THREADS) {
            /* Let the threads finish */
            /* fprintf( stdout, "MAIN: Waiting on condition\n" ); */
            pthread_cond_wait( &cond, &count_mutex );
            fprintf( stdout, "MAIN: Waking up from condition\n" );
            /* fprintf( stdout, "MAIN: Unlocking Mutex\n" ); */
        }
        pthread_mutex_unlock( &count_mutex );
        /* else */
        /* break; */
        /* } */

        /* Be sure about the boundaries in File[] */
        if (num_threads > MAX_THREADS) {
            fprintf( stdout, "Prevented severe memory error %d\n", num_threads );
            exit(2);
        }
        if (((data_t = calloc( 1, sizeof(struct thread_data) )) == NULL)) {
            fprintf( stdout, "Failed to allocate memory\n" );
            exit(1);
        }

        snprintf( data_t->file_name, sizeof(data_t->file_name), "%s/%s", dir_name, file_list[idx]->d_name );
        free(file_list[idx--]);
        no_of_files--;
        fprintf( stdout, "MAIN: No of files: %d\n", no_of_files );

        /* Manage the thread stuff */
        if (pthread_create( &tid, &attr, process_file, (void *)data_t ) != 0) {
            fprintf( stdout, "Failed to allocate memory for thread\n" );
            exit(1);
        }
        fprintf( stdout, "Created thread: %d\nMAIN: Locking Mutex: %d\n", tid, num_threads );
        pthread_mutex_lock( &count_mutex );
        num_threads++;

        /* fprintf( stdout, "MAIN: Unlocking Mutex 2 - %d\n", num_threads ); */
        pthread_mutex_unlock( &count_mutex );
    }

    return 0;
}

1 comments:

Arun said...

Fixed a deadlock issue with the main thread and child threads.