Modulo 5.0.0
Loading...
Searching...
No Matches
SubscriptionHandler.hpp
1#pragma once
2
3#include "modulo_core/communication/SubscriptionInterface.hpp"
4
5#include "modulo_core/concepts.hpp"
6
8
14template<typename MsgT>
15class SubscriptionHandler : public SubscriptionInterface {
16public:
22 explicit SubscriptionHandler(
23 std::shared_ptr<MessagePairInterface> message_pair = nullptr,
24 const rclcpp::Logger& logger = rclcpp::get_logger("SubscriptionHandler"));
25
29 ~SubscriptionHandler() override;
30
34 [[nodiscard]] std::shared_ptr<rclcpp::Subscription<MsgT>> get_subscription() const;
35
41 void set_subscription(const std::shared_ptr<rclcpp::Subscription<MsgT>>& subscription);
42
47 void set_user_callback(const std::function<void()>& user_callback);
48
52 std::function<void(const std::shared_ptr<MsgT>)> get_callback();
53
59 std::function<void(const std::shared_ptr<MsgT>)> get_callback(const std::function<void()>& user_callback);
60
69 std::shared_ptr<SubscriptionInterface>
70 create_subscription_interface(const std::shared_ptr<rclcpp::Subscription<MsgT>>& subscription);
71
72private:
76 void handle_callback_exceptions();
77
84 std::function<void(const std::shared_ptr<MsgT>)> get_translated_callback();
85
92 std::function<void(const std::shared_ptr<MsgT>)> get_raw_callback();
93
94 std::shared_ptr<rclcpp::Subscription<MsgT>> subscription_;
95 rclcpp::Logger logger_;
96 std::shared_ptr<rclcpp::Clock> clock_;
97 std::function<void()> user_callback_ = [] {
98 };
99};
100
101template<typename MsgT>
103 std::shared_ptr<MessagePairInterface> message_pair, const rclcpp::Logger& logger)
104 : SubscriptionInterface(std::move(message_pair)), logger_(logger), clock_(std::make_shared<rclcpp::Clock>()) {}
105
106template<typename MsgT>
108 this->subscription_.reset();
109}
110
111template<typename MsgT>
112std::shared_ptr<rclcpp::Subscription<MsgT>> SubscriptionHandler<MsgT>::get_subscription() const {
113 return this->subscription_;
114}
115
116template<typename MsgT>
117void SubscriptionHandler<MsgT>::set_subscription(const std::shared_ptr<rclcpp::Subscription<MsgT>>& subscription) {
118 if (subscription == nullptr) {
119 throw exceptions::NullPointerException("Provide a valid pointer");
120 }
121 this->subscription_ = subscription;
122}
123
124template<typename MsgT>
125inline std::function<void(const std::shared_ptr<MsgT>)> SubscriptionHandler<MsgT>::get_callback() {
126 if (this->message_pair_ == nullptr) {
127 throw exceptions::NullPointerException("Message pair is not set, can't get subscription callback");
128 }
129 if constexpr (concepts::TranslatedMsgT<MsgT>) {
130 if (this->message_pair_->get_type() == MessageType::CUSTOM_MESSAGE) {
131 if constexpr (concepts::CustomT<MsgT>) {
132 return get_raw_callback();
133 }
134 __builtin_unreachable();
135 } else {
136 return get_translated_callback();
137 }
138 } else {
139 return get_raw_callback();
140 }
141}
142
143template<typename MsgT>
144std::function<void(const std::shared_ptr<MsgT>)> SubscriptionHandler<MsgT>::get_raw_callback() {
145 return [this](const std::shared_ptr<MsgT> message) {
146 try {
147 this->get_message_pair()->template read<MsgT, MsgT>(*message);
148 this->user_callback_();
149 } catch (...) {
150 this->handle_callback_exceptions();
151 }
152 };
153}
154
155template<typename MsgT>
156void SubscriptionHandler<MsgT>::set_user_callback(const std::function<void()>& user_callback) {
157 this->user_callback_ = user_callback;
158}
159
160template<typename MsgT>
161std::function<void(const std::shared_ptr<MsgT>)>
162SubscriptionHandler<MsgT>::get_callback(const std::function<void()>& user_callback) {
163 this->set_user_callback(user_callback);
164 return this->get_callback();
165}
166
167template<typename MsgT>
168std::shared_ptr<SubscriptionInterface> SubscriptionHandler<MsgT>::create_subscription_interface(
169 const std::shared_ptr<rclcpp::Subscription<MsgT>>& subscription) {
170 this->set_subscription(subscription);
171 return std::shared_ptr<SubscriptionInterface>(this->shared_from_this());
172}
173
174template<typename MsgT>
176 try {
177 // re-throw the original exception
178 throw;
179 } catch (const exceptions::CoreException& ex) {
180 RCLCPP_WARN_STREAM_THROTTLE(
181 this->logger_, *this->clock_, 1000, "Exception in subscription callback: " << ex.what());
182 } catch (const std::exception& ex) {
183 RCLCPP_WARN_STREAM_THROTTLE(
184 this->logger_, *this->clock_, 1000, "Unhandled exception in subscription user callback: " << ex.what());
185 }
186}
187
188}// namespace modulo_core::communication
The SubscriptionHandler handles different types of ROS subscriptions to receive data from those subsc...
std::function< void(const std::shared_ptr< MsgT >)> get_callback()
Get a callback function that will be associated with the ROS subscription to receive and translate me...
SubscriptionHandler(std::shared_ptr< MessagePairInterface > message_pair=nullptr, const rclcpp::Logger &logger=rclcpp::get_logger("SubscriptionHandler"))
Constructor with the message pair.
~SubscriptionHandler() override
Destructor to explicitly reset the subscription pointer.
std::shared_ptr< SubscriptionInterface > create_subscription_interface(const std::shared_ptr< rclcpp::Subscription< MsgT > > &subscription)
Create a SubscriptionInterface pointer through an instance of a SubscriptionHandler by providing a RO...
std::shared_ptr< rclcpp::Subscription< MsgT > > get_subscription() const
Getter of the ROS subscription.
void set_subscription(const std::shared_ptr< rclcpp::Subscription< MsgT > > &subscription)
Setter of the ROS subscription.
void set_user_callback(const std::function< void()> &user_callback)
Setter of a user callback function to be executed after the subscription callback.
Interface class to enable non-templated subscriptions with ROS subscriptions from derived Subscriptio...
A base class for all core exceptions.
An exception class to notify that a certain pointer is null.
Modulo Core communication module for handling messages on publication and subscription interfaces.