Inter Process Communication Developer Guide

Communications between processes / applications (e.g. sending sensor values from the sensors app to the attitude filter app) is a key part of the PX4 software architecture. Processes (often called nodes in this context) exchange messages over named buses, called topics. In PX4, a topic contains only one message type, e.g. the vehicle_attitude topic transports a message containing the attitude struct (roll, pitch and yaw estimates). Nodes can publish a message on a bus/topic (“send” data) or subscribe to a bus/topic (“receive” data). They are not aware of who they are communicating with. There can be multiple publishers and multiple subscribers to a topic. This design pattern prevents locking issues and is very common in robotics. To make this efficient, there is always only one message on the bus and no queue is kept.

The publish/subscribe mechanism is implemented by the micro object request broker (uORB). The broker implements a simple version of the publish-subscribe pattern.

Existing topics are automatically documented using Doxygen in the PX4 uORB Topics List API Documentation.

Quick Start

Before diving into the details, here is a simple but complete publisher / subscriber pair. The publisher advertises a topic called random_integer and updates the topic with random integers. The subscriber checks for and prints these updates.

topic.h
/* declare the topic */
ORB_DECLARE(random_integer);
 
/* define the data structure that will be published where subscribers can see it */
struct random_integer_data {
	int r;
};
publisher.c
#include <topic.h>
 
/* create topic metadata */
ORB_DEFINE(random_integer);
 
/* file handle that will be used for publishing */
static int topic_handle;
 
int
init()
{
	/* generate the initial data for first publication */
	struct random_integer_data rd = { .r = random(), };
 
	/* advertise the topic and make the initial publication */
	topic_handle = orb_advertise(ORB_ID(random_integer), &rd);
}
 
int
update_topic()
{
	/* generate a new random number for publication */
	struct random_integer_data rd = { .r = random(), };
 
	/* publish the new data structure */
	orb_publish(ORB_ID(random_integer), topic_handle, &rd);
}
subscriber.c
#include <topic.h>
 
/* file handle that will be used for subscribing */
static int topic_handle;
 
int
init()
{
	/* subscribe to the topic */
	topic_handle = orb_subscribe(ORB_ID(random_integer));
}
 
void
check_topic()
{
	bool updated;
	struct random_integer_data rd;
 
	/* check to see whether the topic has updated since the last time we read it */
	orb_check(topic_handle, &updated);
 
	if (updated) {
		/* make a local copy of the updated data structure */
		orb_copy(ORB_ID(random_integer), topic_handle, &rd);
		printf("Random integer is now %d\n", rd.r);
	}
}

Publishing

Publishing involves three separate but related activities; defining the topic, advertising the topic and publishing updates.

Defining a Topic

The system defines many standard topics as a way of providing common interfaces between components. If a publisher wants to use one of the standard topics and related data structures, no more work needs to be done.

Custom Topics

To define a custom topic, the publisher needs to make a header file available to subscribers (see the topic.h example above). In this header file there must be:

  • An instance of the ORB_DECLARE() macro taking the name of the topic as an argument.
  • A structure definition that describes the data structure that will be published.

Topic names should be descriptive; the PX4 convention is to separate parts of a topic name with underscores, and for the components of the name to be ordered with the more general terms first.

For example, raw sensor data is published in the sensors_raw topic.

In addition to the header file, the publisher must have an instance of the ORB_DEFINE() macro in a source file that will be compiled and linked when the firmware is built (see the publisher.c example above). This macro constructs a data structure used by the ORB to uniquely identify the topic.

Optional Topics

If a topic is published by a software component that is optional and may not be present in the firmware, the header file may instead use the ORB_DECLARE_OPTIONAL() macro. Topics declared in this fashion require no special handling by the publisher, but there are additional considerations discussed below that the subscriber must pay attention to when handling optional topics.

Advertising a Topic

Before data can be published to a topic, it must be advertised. The publisher uses the following API to advertise a new topic:

/**
 * Advertise as the publisher of a topic.
 *
 * This performs the initial advertisement of a topic; it creates the topic
 * node in /obj if required and writes the initial data.
 *
 * @param meta		The uORB metadata (usually from the ORB_ID() macro)
 *			for the topic.
 * @param data		A pointer to the initial data to be published.
 *			For topics published by interrupt handlers, the advertisement
 *			must be performed from non-interrupt context.
 * @return		ERROR on error, otherwise returns a handle
 *			that can be used to publish to the topic.
 *			If the topic in question is not known (due to an
 *			ORB_DEFINE_OPTIONAL with no corresponding ORB_DECLARE)
 *			this function will return -1 and set errno to ENOENT.
 */
extern int	orb_advertise(const struct orb_metadata *meta, const void *data);

Advertising also publishes initial data to the topic.

The meta argument to the API is a pointer to the data generated by the ORB_DEFINE() macro. Normally this is provided using the ORB_ID() macro, which performs the conversion from topic name to topic metadata structure name.

Note that whilst updates can be published from interrupt handlers, advertising a topic must be performed from regular thread context.

Multiple Publishers

Only one publisher can have a topic advertised at a time, however the topic handle may be closed (it is a file descriptor, and can simply be passed to the close() function) by one publisher and then the topic advertised by another publisher.

Publishing Updates

Once a topic has been advertised, the handle returned from advertising can be used to publish updates to the topic using the following API:

/**
 * Publish new data to a topic.
 *
 * The data is atomically published to the topic and any waiting subscribers
 * will be notified.  Subscribers that are not waiting can check the topic
 * for updates using orb_check and/or orb_stat.
 *
 * @handle		The handle returned from orb_advertise.
 * @param meta		The uORB metadata (usually from the ORB() macro)
 *			for the topic.
 * @param data		A pointer to the data to be published.
 * @return		OK on success, ERROR otherwise with errno set accordingly.
 */
extern int	orb_publish(const struct orb_metadata *meta, int handle, const void *data);

Note that the ORB does not buffer multiple updates; when a subscriber examines a topic they will see only the most recent update.

Subscribing

Subscribing to a topic requires the following:

  • An ORB_DEFINE() or ORB_DEFINE_OPTIONAL() macro (e.g. in a header file included by the subscriber).
  • A definition of the data structure that is published to the topic (usually from the same header file).

Provided these conditions are met, a subscriber uses the following API to subscribe to a topic:

/**
 * Subscribe to a topic.
 *
 * The returned value is a file descriptor that can be passed to poll()
 * in order to wait for updates to a topic, as well as orb_read,
 * orb_check and orb_stat.
 *
 * Subscription will succeed even if the topic has not been advertised;
 * in this case the topic will have a timestamp of zero, it will never
 * signal a poll() event, checking will always return false and it cannot
 * be copied. When the topic is subsequently advertised, poll, check,
 * stat and copy calls will react to the initial publication that is
 * performed as part of the advertisement.
 *
 * Subscription will fail if the topic is not known to the system, i.e.
 * there is nothing in the system that has defined the topic and thus it
 * can never be published.
 *
 * @param meta		The uORB metadata (usually from the ORB_ID() macro)
 *			for the topic.
 * @return		ERROR on error, otherwise returns a handle
 *			that can be used to read and check the topic for updates.
 *			If the topic in question is not known (due to an
 *			ORB_DEFINE_OPTIONAL with no corresponding ORB_DECLARE)
 *			this function will return -1 and set errno to ENOENT.
 */
extern int	orb_subscribe(const struct orb_metadata *meta);

Subscription to an optional topic will fail if the topic is not present, however all other subscriptions will succeed and create the topic even if it has not yet been advertised by the publisher. This greatly reduces the need to carefully arrange system startup order.

There is no specific limit to the number of subscriptions that a task may perform.

To unsubscribe from a topic, the following API is used:

/**
 * Unsubscribe from a topic.
 *
 * @param handle	A handle returned from orb_subscribe.
 * @return		OK on success, ERROR otherwise with errno set accordingly.
 */
extern int	orb_unsubscribe(int handle);

Copying Data from a Topic

Subscribers don't reference data stored by the ORB or shared with other subscribers; instead the data is copied from the ORB into a per-subscriber temporary buffer at the subscriber's request. This copy avoids locking issues and keeps both publisher and subscriber APIs simple. It also allows the subscriber to modify the data in-place if necessary for its own use.

When a subscriber wants a fresh copy of the most recent data published to a topic, the following API is used:

/**
 * Fetch data from a topic.
 *
 * @param meta		The uORB metadata (usually from the ORB() macro)
 *			for the topic.
 * @param handle	A handle returned from orb_subscribe.
 * @param buffer	Pointer to the buffer receiving the data.
 * @return		OK on success, ERROR otherwise with errno set accordingly.
 */
extern int	orb_copy(const struct orb_metadata *meta, int handle, void *buffer);

Copies are atomic, so the data is guaranteed to have come from the most recent publication.

Checking for Updates

A subscriber can check whether a topic has received a publication since the last time the subscriber called orb_copy by using the following API:

/**
 * Check whether a topic has been published to since the last orb_copy.
 *
 * This check can be used to determine whether to copy from the topic when
 * not using poll(), or to avoid the overhead of calling poll() when the
 * topic is likely to have updated.
 *
 * Updates are tracked on a per-handle basis; this call will continue to
 * return true until orb_copy is called using the same handle. This interface
 * should be preferred over calling orb_stat due to the race window between
 * stat and copy that can lead to missed updates.
 *
 * @param handle	A handle returned from orb_subscribe.
 * @param updated	Set to true if the topic has been published since the
 *			last time it was copied using this handle.
 * @return		OK if the check was successful, ERROR otherwise with
 *			errno set accordingly.
 */
extern int	orb_check(int handle, bool *updated);

When a topic has been subscribed before it has been advertised, this API will return not-updated until the topic is advertised.

Publication Timestamps

A subscriber can check the time of the most recent publication to a topic with the following API:

/**
 * Return the last time that the topic was published.
 *
 * @param handle	A handle returned from orb_subscribe.
 * @param time		Returns the time that the topic was published, or zero if it has
 *			never been published/advertised.
 * @return		OK on success, ERROR otherwise with errno set accordingly.
 */
extern int	orb_stat(int handle, uint64_t *time);

Note that care should be exercised when using this call, as there is no guarantee that the topic will not be published shortly after the call returns.

Waiting for Updates

A subscriber that depends on publications as a source of data can wait for publications to any number of subscriptions simultaneously. This is done using the poll() function in the same fashion as waiting for data on a file descriptor. This works because subscriptions are actually file descriptors themselves.

The following example shows a subscriber with three separate subscriptions waiting for publications to each, and responding when these publications are received. If a second passes without an update to any of the subscriptions, a timeout counter is updated and published for other subscribers to follow.

color_counter.h
ORB_DECLARE(color_red);
ORB_DECLARE(color_green);
ORB_DECLARE(color_blue);
ORB_DECLARE(color_timeouts);
 
/* structure published to color_red, color_green, color_blue and color_timeouts */
struct color_update
{
	int number;
};
color_counter.c
#include <poll.h>
 
ORB_DEFINE(color_timeouts, struct color_update);
 
void
subscriber(void)
{
	int	sub_red, sub_green, sub_blue;
	int	pub_timeouts;
	int	timeouts = 0;
	struct color_update cu;
 
	/* subscribe to color topics */
	sub_red = orb_subscribe(ORB_ID(color_red));
	sub_green = orb_subscribe(ORB_ID(color_green));
	sub_blue = orb_subscribe(ORB_ID(color_blue));
 
	/* advertise the timeout topic */
	cu.number = 0;
	pub_timeouts = orb_advertise(ORB_ID(color_timeouts), &cu);
 
	/* loop waiting for updates */
	for (;;) {
 
		/* wait for updates or a 1-second timeout */
		struct pollfd fds[3] = {
			{ .fd = sub_red,   .events = POLLIN },
			{ .fd = sub_green, .events = POLLIN },
			{ .fd = sub_blue,  .events = POLLIN }
		};
		int ret = poll(fds, 3, 1000);
 
 
		/* check for a timeout */
		if (ret == 0) {
			puts("timeout");
			cu.number = ++timeouts;
			orb_publish(ORB_ID(color_timeouts), pub_timeouts, &cu);
 
		/* check for color updates */
		} else {
 
			if (fds[0].revents & POLLIN) {
				orb_copy(ORB_ID(color_red), sub_red, &cu);
				printf("red is now %d\n", cu.number);
			}
			if (fds[1].revents & POLLIN) {
				orb_copy(ORB_ID(color_green), sub_green, &cu);
				printf("green is now %d\n", cu.number);
			}
			if (fds[2].revents & POLLIN) {
				orb_copy(ORB_ID(color_blue), sub_blue, &cu);
				printf("blue is now %d\n", cu.number);
			}
		}
	}
}

Limiting the Rate of Updates

A subscriber may wish to limit the rate at which they receive updates on a topic. This can be achieved with the following API:

/**
 * Set the minimum interval between which updates are seen for a subscription.
 *
 * If this interval is set, the subscriber will not see more than one update 
 * within the period.
 *
 * Specifically, the first time an update is reported to the subscriber a timer
 * is started. The update will continue to be reported via poll and orb_check, but 
 * once fetched via orb_copy another update will not be reported until the timer
 * expires.
 *
 * This feature can be used to pace a subscriber that is watching a topic that
 * would otherwise update too quickly.
 *
 * @param handle	A handle returned from orb_subscribe.
 * @param interval	An interval period in milliseconds.
 * @return		OK on success, ERROR otherwise with ERRNO set accordingly.
 */
extern int	orb_set_interval(int handle, unsigned interval);

Rate limits are specific to a subscription; a single topic may have more than one subscriber with different (or no) rate limits.

本页面的其他翻译:


Quick Links

QR Code: URL of current page