synchronization barrier implementation

wella wrote on Friday, January 29, 2010:

Hello,
below is a simple inter-task synchronization barrier implementation. Of course, because this is not a part of kernel, there are many assumptions of using it. The master must at first create barrier(you cannot re-enter if was not created first). After that, tasks can wait on this barrier with a timeout. If timeout expired, all waiting tasks are waken with error. The return value of function barrierState determines if there are still tasks which did not reach the barrier.

Martin

typedef struct
{
	unsigned portSHORT counter;
    unsigned portSHORT max;
    xQueueHandle xQueue;
} barrier;
#define BARRIER_SUCCESS pdTRUE
#define BARRIER_FAILED	pdFAIL
unsigned portBASE_TYPE barrierTake(barrier * b, portTickType xTicksToWait)
{
    unsigned portBASE_TYPE mess = BARRIER_SUCCESS;
    
	// Enter to shared memory region
	portENTER_CRITICAL();    
	// Increment common counter
    b->counter++;    
	// Exit critical section
	portEXIT_CRITICAL();
	
	// Barrier reached?
    if(b->counter < b->max)
    {
		// Not all tasks finished yet.	
        
		// Wait on the other tasks with timeout. Due to "peek" message will remain
		// in message queue.
		if(pdTRUE != xQueuePeek( b->xQueue, &( mess ),  xTicksToWait))
		{
			// Timeout. Inform other tasks about missed timeout.
			mess = BARRIER_FAILED;
			xQueueSend(  b->xQueue, ( void * ) &mess, ( portTickType ) 0 );			
		}		
    }
    else
    {		
		
		// If there is stored a message do not send.
		if(uxQueueMessagesWaiting( b->xQueue) > 0)
		{
			// Receive message(peek).
			xQueuePeek( b->xQueue, &( mess ), ( portTickType )0);
		}
		else
		{
			// All tasks finished.
			// Inform other tasks.
			xQueueSend(  b->xQueue, ( void * ) &mess, ( portTickType ) 0 );		
		}
    }
	
	// Return status. 
	return mess;
}
void barrierCreate(barrier * b, unsigned portSHORT max)
{
	// Used only as a dummy place(/dev/null) for flushing queue.
    unsigned portBASE_TYPE mess;
    
	// Set number of tasks to synchronize.
	b->max = max;
    // Reset counter.
	b->counter = 0;
 
	// Create or empty a message queue.
	if(NULL == b->xQueue)
    {
		// Queue was not created, create....
        b->xQueue= xQueueCreate( 1, sizeof( unsigned long  ) );
    }
    else
    {
		// Queue was created, flush....
        while(pdTRUE == xQueueReceive( b->xQueue, &( mess ), ( portTickType )0))
        {
			// NOP();
        }    
    }
}
portBASE_TYPE barrierState(barrier * b)
{
	// How many task was not entered into barrier?
	// -1 means more than max defined tasks.
	return (b->max - b->counter);
}
barrier bar;
void start(void * par)
{
	barrierCreate(&bar, 4);
	printf("start:%d\n",barrierTake(&bar,100));
	vTaskDelay(100);
	printf("start: state %i\n",barrierState(&bar));
	vTaskSuspend(NULL);
}
void A(void * par)
{
	
	printf("A:%d\n",barrierTake(&bar,portMAX_DELAY));	
	printf("A1:%d\n",barrierTake(&bar,portMAX_DELAY));	
	vTaskSuspend(NULL);
}
void B(void * par)
{
	vTaskDelay(1);
	printf("B:%d\n",barrierTake(&bar,portMAX_DELAY));		
	vTaskSuspend(NULL);
}
void C(void * par)
{
	printf("C:%d\n",barrierTake(&bar,portMAX_DELAY));	
		
	vTaskSuspend(NULL);
}
int main()
{	
	xTaskCreate( start, "start", 100,NULL,4, NULL );	
	xTaskCreate( A, "A", 100,NULL, 3, NULL );	
	xTaskCreate( B, "B,", 100,NULL, 2, NULL );	
	xTaskCreate( C, "C", 100,NULL, 1, NULL );	
	/* Start the scheduler, this function should not return as it causes the execution
	context to change from main() to one of the created tasks. */
	vTaskStartScheduler();
	/* Should never get here! */
	return 0;
}

wella wrote on Friday, January 29, 2010:

As I said, this is not a part of kernel. You should avoid of using timeouts. There is an example of inconsistency.
Task A timeouts and sets “mess” to BARRIER_FAILED. Meanwhile context switch is performed and Task A is preempted after mess = BARRIER_FAILED by task B.  The task B is the last and will send success to the queue. Context switch. Task A cannot send message(msg full) and will return BARRIER_FAILED. Other tasks will return BARRIER_SUCCESS. To avoid this situation, replace

// Timeout. Inform other tasks about missed timeout. 
mess = BARRIER_FAILED;
 xQueueSend( b->xQueue, ( void * ) &mess, ( portTickType ) 0 );

with

// Timeout. Inform other tasks about missed timeout.
			mess = BARRIER_FAILED;
			// If we cannot send message, some task preemted us here.
			if(pdFALSE == xQueueSend(  b->xQueue, ( void * ) &mess, ( portTickType ) 0 ))
			{
				// Receive message(peek).
				xQueuePeek( b->xQueue, &( mess ), ( portTickType )0);
			}

There is a drawback. If waiting timeouts a short time after can be taking successful. This is not a real RTOS. Use it carefully.
Martin

wella wrote on Friday, January 29, 2010:

should be if(pdTRUE != xQueueSend( b->xQueue, ( void * ) &mess, ( portTickType ) 0 ))