Some relevant info first: Platform: STM32F413 (though I’m not sure this is particularly important for my question) FreeRTOS Kernel: v10.3.1 (STM32Cube only has support up to this version) API functions: xQueueSend, xQueueReceive
On my mcu, I need to interface with the CAN bus, but I didn’t want any problems to occur due to the need for multiple tasks or ISRs to access the HW interface so I implemented a gatekeeper to manage the HW interface. The gatekeeper is a higher priority to any task that would be forwarding to it or that it would be sending to; other gatekeepers may have the same priority but I only need one at the moment.
I’m using a queue to contain the messages that the tasks are wanting to be transmitted via the HW interface (that the gatekeeper will manage) and some communication interface for the gatekeeper to each task (this this will probably also be queues). I was testing the throughput by sending a very dense burst of messages on the CAN bus and found that some messages were being dropped by the firmware. I have implemented a solution that’s been verified to work to get around the limited HW mailboxes for CAN so I know it’s not that. From testing it appears that xQueueSend and xQueueReceive was the bottleneck. I know for a fact that the gatekeeper will never wait for a message to be sent or received to/from a queue (pdMS_TO_TICKS(0)).
I saw a post discussing how to use the FromISR variants to be a lighter weight means to communicate but I don’t think they fit my needs.
So I read through the implementations of xQueueSend and xQueueReceive and rewrote them to get rid of the checks that I think are not needed for my application. I removed any time related checks and code since no wait will be required, any checks for waking up a blocking task as that won’t happen, and all trace and configASSERT macros since they expand to nothing for me.
BaseType_t xQueueFastSend( QueueHandle_t xQueue, const void * const pvItemToQueue) {
Queue_t * const pxQueue = xQueue;
taskENTER_CRITICAL();
{
/* Is there room on the queue now? The running task must be the
highest priority task wanting to access the queue */
if( pxQueue->uxMessagesWaiting < pxQueue->uxLength ) {
prvCopyDataToQueue( pxQueue, pvItemToQueue, queueSEND_TO_BACK );
taskEXIT_CRITICAL();
return pdPASS;
}
else {
/* The queue was full so leave now. */
taskEXIT_CRITICAL();
return errQUEUE_FULL;
}
}
taskEXIT_CRITICAL();
return errQUEUE_FULL;
}
BaseType_t xQueueFastReceive( QueueHandle_t xQueue, void * const pvBuffer ) {
Queue_t * const pxQueue = xQueue;
taskENTER_CRITICAL();
{
const UBaseType_t uxMessagesWaiting = pxQueue->uxMessagesWaiting;
/* Is there data in the queue now? To be running the calling task
must be the highest priority task wanting to access the queue. */
if( uxMessagesWaiting > ( UBaseType_t ) 0 )
{
/* Data available, remove one item. */
prvCopyDataFromQueue( pxQueue, pvBuffer );
traceQUEUE_RECEIVE( pxQueue );
pxQueue->uxMessagesWaiting = uxMessagesWaiting - ( UBaseType_t ) 1;
taskEXIT_CRITICAL();
return pdPASS;
}
else
{
/* The queue was empty so leave now. */
taskEXIT_CRITICAL();
return errQUEUE_EMPTY;
}
}
taskEXIT_CRITICAL();
}
My main questions are: did I miss a better way to this this? Is this code generally unsafe and unwise? I tested again with this code used and it improved the throughput to an amount that I’m happy with and it was quite noticeable.
Thank you for any discussions and recommendations
How fast is the CAN bus? (1 Mbps?) How fast is the processor? (100 MHz?) If those guesses are close, then you probably shouldn’t need any coding short-cuts, and the CAN bus itself would be the bottleneck (and therefore also the cause of queues filling and frames being dropped during your very dense burst of transmitting frames).
Yes the CAN bus is set to have a rate of 1Mbps and the processor is 100MHz. You are right the CAN bus is indeed to overall bottleneck which I can’t do anything about. Perhaps a bit more detail on the system should have been included.
There’s two different ways the incoming messages get processed by the gatekeeper: when receiving from an external CAN bus and receiving a message from a task to be transmitted on a CAN bus. The former is quite simple and can be quickly processed (could be a change of ID or slight adjustment to the payload) and gets sent immediately. The latter requires an extra transfer via a queue as it needs to go to a task to process.
I’ve completed testing with regular API functions for the former case (forwarding) and found the performance to be what I would expect. Given the same testing circumstances the latter (message processed by a task) and when using xQueueSend and xQueueReceive I found the performance to be way worse. Now I know there will be some loss of performance as there’s just extra processing happening but it was lower than I was expecting. Using the modified functions the performance was even better than the former case.
I’ve taken the CAN bus performance into account and given the only code I changed was the queue related functions I’ve isolated it to those functions.
Thank you for the concern, I also share this concern that I’m possibly introducing a bug.
And as for removing the trace and configASSERT, that’s a good point that if they expand to nothing there’s no point in me removing them. I can re-add them but I also liked how removing those lines made the included code snippet shorter for the post.
No, I haven’t explicitly done that. I’ll look into how to do that, thanks for the recommendation. Though I replied to jefftenney’s reply and I think it’s probably the overhead from the supplied xQueueSend and xQueueReceive for my application
Even with this evidence, you would still need a trace to see the actual impact of your code changes. It’s possible your changes reduce rapid task switching, in which case the right fix would be to the application design and/or task priorities etc.
Your changes wouldn’t normally be associated with drastic performance increase, so best to see that as an important clue rather than proof that you found performance issues in the queue functions.
Hey @aggarg and @jefftenney,
Thanks for the suggestion of using trace. Looking at how the trace was recommended to be used I decided to use a similar idea: I put digital write commands in the CAN gatekeeper (GK), the other task (Task), and xQueueReceive and xQueueSend where each had it’s own GPIO set to its fastest speed.
For the tasks (GK and Task) I placed the GPIO write before and after the blocking task notify they both infinitely block on so that the GPIO line will be high when they are awakened and go low once they finish. For xQueueReceive and xQueueSend I placed them right where the functions enters and right before any return statement is reached for the same effect.
This waveform shows the process to process one CAN message:
Note: task notifications to the CAN GK are being used as an event group
A: Once the CAN message 0x00AA0008 is received, an ISR uses a callback function which sends a task notification to the CAN GK.
B: The CAN GK will attempt to read from queue while it’s not empty, the first attempt is successful so it sends a task notification to Task.
C: This is the task notification that wakes Task, but Task is a lower priority than the CAN GK. It’s starred as I wanted to reiterate that Task isn’t blocking from a queue.
D: The second attempt to read from the reception queue yields no new messages, thus the CAN GK can wait until something else happens.
E: Now that the CAN GK is not active, Task can wake up and act on its task notification
F: The Task generates a message and sends it to the CAN GK’s toSend queue.
G: The CAN message is sent to the CAN GK, again it’s starred since the queue isn’t what the CAN GK is blocking on, it’s a task notification send immediately following the queue send.
H: The CAN GK receives the message generated by Task from its toSend queue.
I: With a message to send, the CAN GK sends the message onto the CAN bus.
J: same as D. The second attempt to read from the reception queue yields no new messages, thus the CAN GK can wait until something else happens.
K: Now that the CAN GK is not busy anymore, the scheduler allows Task to run again, which has nothing else to do and blocks on the next task notification.
L: ISR sends a task notification to CAN GK that a message was sent and now there’s more space in the hardware mailboxes for transmitting, but there’s no more messages to be sent.
So, with that out of the way, how’s the performance of the “fast” variants? Well. xQueueFastSend and xQueueFastReceive save maybe 0.5 - 1 us, and, using the law of small numbers, I don’t really have enough evidence to even say that; therefore, the changes where negligible to performance.
But what about the results I did see? Well with the GPIO writes, the performance gains of using the “fast” variants seems to have vanished. I’m not sure why as GPIO writes would be very fast.
I will need to think about what’s actually causing the performance loss. I was musing if it could be that interrupts are disabled somewhere and therefore can’t process all the incoming ISRs for received messages, though given it works for my previous tests with direct forwarding it makes it seem that the queue handling is the bottleneck, yet it’s so quick.
to me it looks as if your architecture needs an overhaul. Without having looked in detail at your description, I believe there is too much ping-pong of messages and notifications in the control flow. In most systems I have seen, not more than one task is necessary to process a single message.
You are right that no simple processing would require this amount of overhead, that’s why for the most part the CAN GK will do simple processing for messages are are just being forwarded (change of ID or simple payload modifications) and I tested that functionality and it works well. However, for messages that require more complex processing they are passed to a task. But your comment made me realize that my testing process is contrived. Any CAN message that would be passed onto a task would basically never be processed and sent back immediately as those tasks would be timed or wait on multiple messages etc. As such, I changed my task “Task” to run every 20ms, which is common for my system, and did the same burst analysis. Now the throughput is similar to the previous case. I originally thought that this instant ping-pong could act as a worst case, and in a way it still does, but it’s unlikely for my system to be in this state for any significant proportion of the time. This test suite is better as it matches how the system will have a burst of messages on the CAN bus but all the tasks in my system won’t run immediately after, I was just worried about dropping messages but I know it’s able to process a burst as the direct forwarding case already can process all of them.
So I guess the conclusion is: do not use the “fast” variants, they are hacky and probably not even that much faster. And my testing suites were a bit contrived for what I was trying to measure and with a test suite that better matches how the system actually works I got results I’m more or less happy with. Unless anyone else has any ideas or general suggestions, I’ll select this answer as the solution. There’s still loss when receiving the burst aligns with transmitting but hat’s a low chance.