#include#include #include #include #include #include #include #define NAME_MAX (256) /* #define MAX_THREADS (99) */ #define SLEEP_DUR (1) struct thread_data { char file_name[NAME_MAX+1]; }; static int num_threads; static pthread_cond_t cond; static pthread_mutex_t count_mutex; int select_filter( struct dirent *entry ) { /* Filesystem differences with EXT4 and JFS2 - Unable to use d_type member * to compare DT_REG to check for the filetype */ if (!strcmp(entry->d_name, ".") || !strcmp(entry->d_name, "..")) return 0; else return 1; } void *process_file( void *arg ) { struct thread_data *data_t = (struct thread_data *)arg; unsigned int i, j; pthread_t p = pthread_self(); /* Do some processing with data_t */ fprintf( stdout, "%ld: Processing file %s\n", (unsigned long int)p, data_t->file_name ); unlink( data_t->file_name ); /* When done processing, free the resources */ free( data_t ); for (i=0; i< 9999; i++) for (j=0; j< 9999; j++); /* fprintf( stdout, "%p: Locking Mutex\n", (void *)p); */ pthread_mutex_lock( &count_mutex ); /* Critical section */ num_threads --; /* fprintf( stdout, "%p: Signaling\n", (void *)p); */ pthread_cond_signal( &cond ); /* fprintf( stdout, "%p: Unlocking Mutex\n", (void *)p); */ fprintf( stdout, "%d: Exiting. num_threads: %d\n", p, num_threads); pthread_mutex_unlock( &count_mutex ); pthread_exit( NULL ); return NULL; } int main(int argc, const char *argv[]) { struct dirent **file_list = NULL; short int no_of_files = 0; char *dir_name = "/u/home/raddanki/Arun/Threads/data"; short int idx = 0; struct thread_data *data_t = NULL; int MAX_THREADS = 0; pthread_t tid = -1; pthread_attr_t attr; /* Get the arguments */ if (argc < 2) { printf( "Usage: %s \n", argv[0] ); exit(0); } MAX_THREADS = atoi( argv[1] ); if (MAX_THREADS < 1 && MAX_THREADS > 99) { printf( "Max threads should be between 1 and 99. Provided: %d\n", MAX_THREADS ); exit(0); } /* Initialize thread attributes */ pthread_cond_init( &cond, NULL ); pthread_mutex_init( &count_mutex, NULL ); pthread_attr_init( &attr ); pthread_attr_setdetachstate( &attr, PTHREAD_CREATE_DETACHED ); while (1) { while (no_of_files == 0) { /* Open the dir for reading */ pthread_mutex_lock( &count_mutex ); if (num_threads == 0) { if (file_list) free( file_list ); if ((no_of_files = scandir( dir_name, &file_list, select_filter, alphasort )) == -1) { fprintf( stdout, "Failed to open dir: %s\n", dir_name ); /* need to wait for threads to complete and then exit */ exit (1); } } pthread_mutex_unlock( &count_mutex ); idx = no_of_files - 1; if (!no_of_files) { #ifdef DEBUG if (num_threads == 0) { free( file_list ); pthread_attr_destroy( &attr ); pthread_mutex_destroy( &count_mutex ); pthread_cond_destroy( &cond ); pthread_exit(0); } #endif fprintf( stdout, "Going to sleep now\n" ); sleep(SLEEP_DUR); } } /* fprintf( stdout, "MAIN: Locking Mutex\n" ); */ /* while (1) { */ pthread_mutex_lock( &count_mutex ); if (num_threads == MAX_THREADS) { /* Let the threads finish */ /* fprintf( stdout, "MAIN: Waiting on condition\n" ); */ pthread_cond_wait( &cond, &count_mutex ); fprintf( stdout, "MAIN: Waking up from condition\n" ); /* fprintf( stdout, "MAIN: Unlocking Mutex\n" ); */ } pthread_mutex_unlock( &count_mutex ); /* else */ /* break; */ /* } */ /* Be sure about the boundaries in File[] */ if (num_threads > MAX_THREADS) { fprintf( stdout, "Prevented severe memory error %d\n", num_threads ); exit(2); } if (((data_t = calloc( 1, sizeof(struct thread_data) )) == NULL)) { fprintf( stdout, "Failed to allocate memory\n" ); exit(1); } snprintf( data_t->file_name, sizeof(data_t->file_name), "%s/%s", dir_name, file_list[idx]->d_name ); free(file_list[idx--]); no_of_files--; fprintf( stdout, "MAIN: No of files: %d\n", no_of_files ); /* Manage the thread stuff */ if (pthread_create( &tid, &attr, process_file, (void *)data_t ) != 0) { fprintf( stdout, "Failed to allocate memory for thread\n" ); exit(1); } fprintf( stdout, "Created thread: %d\nMAIN: Locking Mutex: %d\n", tid, num_threads ); pthread_mutex_lock( &count_mutex ); num_threads++; /* fprintf( stdout, "MAIN: Unlocking Mutex 2 - %d\n", num_threads ); */ pthread_mutex_unlock( &count_mutex ); } return 0; }
Friday, April 02, 2010
Multithreaded application to process files in a directory
We may encounter scenarios when we need to do a defined set of operation on all the files in a directory. This very simple program is a typical example of how the problem can be approached with a mutex and a shared resource, by constantly creating new threads for new files instead of waiting for all the worker threads to complete.
Subscribe to:
Post Comments (Atom)
1 comments:
Fixed a deadlock issue with the main thread and child threads.
Post a Comment