diff --git a/net-p2p/pulsar-client-cpp/Makefile b/net-p2p/pulsar-client-cpp/Makefile index cbc0de2f52ad..2e8611438f09 100644 --- a/net-p2p/pulsar-client-cpp/Makefile +++ b/net-p2p/pulsar-client-cpp/Makefile @@ -1,36 +1,35 @@ PORTNAME= pulsar-client-cpp # this port requires instruction sets crc32, pclmul above the default sse2 DISTVERSIONPREFIX= v -DISTVERSION= 3.7.0 -PORTREVISION= 3 +DISTVERSION= 4.1.0 CATEGORIES= net-p2p MAINTAINER= yuri@FreeBSD.org COMMENT= Apache Pulsar C++ client library WWW= https://pulsar.apache.org/ \ https://github.com/apache/pulsar-client-cpp LICENSE= APACHE20 LICENSE_FILE= ${WRKSRC}/LICENSE ONLY_FOR_ARCHS= amd64 i386 # due to requirement of instruction sets crc32, pclmul BROKEN_i386= compilation fails due to overflow, see https://github.com/apache/pulsar-client-cpp/issues/449 BUILD_DEPENDS= ${LOCALBASE}/include/boost/algorithm/string.hpp:devel/boost-libs LIB_DEPENDS= libcurl.so:ftp/curl \ libprotobuf.so:devel/protobuf \ libsnappy.so:archivers/snappy \ libzstd.so:archivers/zstd USES= cmake:testing ssl USE_GITHUB= yes GH_ACCOUNT= apache CMAKE_OFF= BUILD_TESTS CMAKE_ARGS= -DCMAKE_CXX_STANDARD=17 \ -DOPENSSL_ROOT_DIR=/usr CMAKE_TESTING_ON= BUILD_TESTS # tests fail to compile, see https://github.com/apache/pulsar-client-cpp/issues/472 CXXFLAGS+= -mcrc32 -mpclmul .include diff --git a/net-p2p/pulsar-client-cpp/distinfo b/net-p2p/pulsar-client-cpp/distinfo index 3d7c80b515e0..0648f4df6ce5 100644 --- a/net-p2p/pulsar-client-cpp/distinfo +++ b/net-p2p/pulsar-client-cpp/distinfo @@ -1,3 +1,3 @@ -TIMESTAMP = 1743152964 -SHA256 (apache-pulsar-client-cpp-v3.7.0_GH0.tar.gz) = 33d6ea82e1f03a2e77f85d3b6ee8e3ac37bfd760ea450537ec2e59ef122c4671 -SIZE (apache-pulsar-client-cpp-v3.7.0_GH0.tar.gz) = 1604627 +TIMESTAMP = 1776406245 +SHA256 (apache-pulsar-client-cpp-v4.1.0_GH0.tar.gz) = 172c9697caf62551c336e0bf64a136ccdff0674e9ea66f94f3f60962c5d41958 +SIZE (apache-pulsar-client-cpp-v4.1.0_GH0.tar.gz) = 1635369 diff --git a/net-p2p/pulsar-client-cpp/files/patch-cc30a7b5f52c6d6d7ff94d93a9509fc3f6becc83 b/net-p2p/pulsar-client-cpp/files/patch-cc30a7b5f52c6d6d7ff94d93a9509fc3f6becc83 deleted file mode 100644 index ca6cb6a02135..000000000000 --- a/net-p2p/pulsar-client-cpp/files/patch-cc30a7b5f52c6d6d7ff94d93a9509fc3f6becc83 +++ /dev/null @@ -1,1111 +0,0 @@ -- backport of https://github.com/apache/pulsar-client-cpp/pull/477 unbreaking for boost 1.87+ - -diff --git CMakeLists.txt CMakeLists.txt -index b0046534..2efeec89 100644 ---- CMakeLists.txt -+++ CMakeLists.txt -@@ -19,15 +19,16 @@ - - cmake_minimum_required(VERSION 3.13) - --option(USE_ASIO "Use Asio instead of Boost.Asio" OFF) -- - option(INTEGRATE_VCPKG "Integrate with Vcpkg" OFF) - if (INTEGRATE_VCPKG) -- set(USE_ASIO ON) -+ option(USE_ASIO "Use Asio instead of Boost.Asio" ON) - if (NOT CMAKE_TOOLCHAIN_FILE) - set(CMAKE_TOOLCHAIN_FILE "${CMAKE_SOURCE_DIR}/vcpkg/scripts/buildsystems/vcpkg.cmake") - endif () -+else () -+ option(USE_ASIO "Use Asio instead of Boost.Asio" OFF) - endif () -+message(STATUS "USE_ASIO: ${USE_ASIO}") - - option(BUILD_TESTS "Build tests" ON) - message(STATUS "BUILD_TESTS: " ${BUILD_TESTS}) -diff --git lib/AckGroupingTrackerEnabled.cc lib/AckGroupingTrackerEnabled.cc -index 7233b2c9..bc8da970 100644 ---- lib/AckGroupingTrackerEnabled.cc -+++ lib/AckGroupingTrackerEnabled.cc -@@ -117,8 +117,7 @@ void AckGroupingTrackerEnabled::close() { - this->flush(); - std::lock_guard lock(this->mutexTimer_); - if (this->timer_) { -- ASIO_ERROR ec; -- this->timer_->cancel(ec); -+ this->timer_->cancel(); - } - } - -@@ -168,7 +167,7 @@ void AckGroupingTrackerEnabled::scheduleTimer() { - - std::lock_guard lock(this->mutexTimer_); - this->timer_ = this->executor_->createDeadlineTimer(); -- this->timer_->expires_from_now(std::chrono::milliseconds(std::max(1L, this->ackGroupingTimeMs_))); -+ this->timer_->expires_after(std::chrono::milliseconds(std::max(1L, this->ackGroupingTimeMs_))); - auto self = shared_from_this(); - this->timer_->async_wait([this, self](const ASIO_ERROR& ec) -> void { - if (!ec) { -diff --git lib/ClientConnection.cc lib/ClientConnection.cc -index 2037722f..de226a85 100644 ---- lib/ClientConnection.cc -+++ lib/ClientConnection.cc -@@ -266,7 +266,7 @@ ClientConnection::ClientConnection(const std::string& logicalAddress, const std: - if (!clientConfiguration.isTlsAllowInsecureConnection() && clientConfiguration.isValidateHostName()) { - LOG_DEBUG("Validating hostname for " << serviceUrl.host() << ":" << serviceUrl.port()); - std::string urlHost = isSniProxy_ ? proxyUrl.host() : serviceUrl.host(); -- tlsSocket_->set_verify_callback(ASIO::ssl::rfc2818_verification(urlHost)); -+ tlsSocket_->set_verify_callback(ASIO::ssl::host_name_verification(urlHost)); - } - - LOG_DEBUG("TLS SNI Host: " << serviceUrl.host()); -@@ -309,7 +309,7 @@ void ClientConnection::handlePulsarConnected(const proto::CommandConnected& cmdC - // Only send keep-alive probes if the broker supports it - keepAliveTimer_ = executor_->createDeadlineTimer(); - if (keepAliveTimer_) { -- keepAliveTimer_->expires_from_now(std::chrono::seconds(keepAliveIntervalInSeconds_)); -+ keepAliveTimer_->expires_after(std::chrono::seconds(keepAliveIntervalInSeconds_)); - auto weakSelf = weak_from_this(); - keepAliveTimer_->async_wait([weakSelf](const ASIO_ERROR&) { - auto self = weakSelf.lock(); -@@ -354,7 +354,7 @@ void ClientConnection::startConsumerStatsTimer(std::vector consumerSta - // If the close operation has reset the consumerStatsRequestTimer_ then the use_count will be zero - // Check if we have a timer still before we set the request timer to pop again. - if (consumerStatsRequestTimer_) { -- consumerStatsRequestTimer_->expires_from_now(operationsTimeout_); -+ consumerStatsRequestTimer_->expires_after(operationsTimeout_); - auto weakSelf = weak_from_this(); - consumerStatsRequestTimer_->async_wait([weakSelf, consumerStatsRequests](const ASIO_ERROR& err) { - auto self = weakSelf.lock(); -@@ -388,129 +388,87 @@ typedef ASIO::detail::socket_option::integer tcp_kee - typedef ASIO::detail::socket_option::integer tcp_keep_alive_idle; - #endif - --/* -- * TCP Connect handler -- * -- * if async_connect without any error, connected_ would be set to true -- * at this point the connection is deemed valid to be used by clients of this class -- */ --void ClientConnection::handleTcpConnected(const ASIO_ERROR& err, tcp::resolver::iterator endpointIterator) { -- if (!err) { -- std::stringstream cnxStringStream; -- try { -- cnxStringStream << "[" << socket_->local_endpoint() << " -> " << socket_->remote_endpoint() -- << "] "; -- cnxString_ = cnxStringStream.str(); -- } catch (const ASIO_SYSTEM_ERROR& e) { -- LOG_ERROR("Failed to get endpoint: " << e.what()); -- close(ResultRetryable); -- return; -- } -- if (logicalAddress_ == physicalAddress_) { -- LOG_INFO(cnxString_ << "Connected to broker"); -- } else { -- LOG_INFO(cnxString_ << "Connected to broker through proxy. Logical broker: " << logicalAddress_ -- << ", proxy: " << proxyServiceUrl_ -- << ", physical address:" << physicalAddress_); -- } -+void ClientConnection::completeConnect(ASIO::ip::tcp::endpoint endpoint) { -+ std::stringstream cnxStringStream; -+ try { -+ cnxStringStream << "[" << socket_->local_endpoint() << " -> " << socket_->remote_endpoint() << "] "; -+ cnxString_ = cnxStringStream.str(); -+ } catch (const ASIO_SYSTEM_ERROR& e) { -+ LOG_ERROR("Failed to get endpoint: " << e.what()); -+ close(ResultRetryable); -+ return; -+ } -+ if (logicalAddress_ == physicalAddress_) { -+ LOG_INFO(cnxString_ << "Connected to broker"); -+ } else { -+ LOG_INFO(cnxString_ << "Connected to broker through proxy. Logical broker: " << logicalAddress_ -+ << ", proxy: " << proxyServiceUrl_ << ", physical address:" << physicalAddress_); -+ } - -- Lock lock(mutex_); -- if (isClosed()) { -- LOG_INFO(cnxString_ << "Connection already closed"); -- return; -- } -- state_ = TcpConnected; -- lock.unlock(); -+ Lock lock(mutex_); -+ if (isClosed()) { -+ LOG_INFO(cnxString_ << "Connection already closed"); -+ return; -+ } -+ state_ = TcpConnected; -+ lock.unlock(); - -- ASIO_ERROR error; -- socket_->set_option(tcp::no_delay(true), error); -- if (error) { -- LOG_WARN(cnxString_ << "Socket failed to set tcp::no_delay: " << error.message()); -- } -+ ASIO_ERROR error; -+ socket_->set_option(tcp::no_delay(true), error); -+ if (error) { -+ LOG_WARN(cnxString_ << "Socket failed to set tcp::no_delay: " << error.message()); -+ } - -- socket_->set_option(tcp::socket::keep_alive(true), error); -- if (error) { -- LOG_WARN(cnxString_ << "Socket failed to set tcp::socket::keep_alive: " << error.message()); -- } -+ socket_->set_option(tcp::socket::keep_alive(true), error); -+ if (error) { -+ LOG_WARN(cnxString_ << "Socket failed to set tcp::socket::keep_alive: " << error.message()); -+ } - -- // Start TCP keep-alive probes after connection has been idle after 1 minute. Ideally this -- // should never happen, given that we're sending our own keep-alive probes (within the TCP -- // connection) every 30 seconds -- socket_->set_option(tcp_keep_alive_idle(1 * 60), error); -- if (error) { -- LOG_DEBUG(cnxString_ << "Socket failed to set tcp_keep_alive_idle: " << error.message()); -- } -+ // Start TCP keep-alive probes after connection has been idle after 1 minute. Ideally this -+ // should never happen, given that we're sending our own keep-alive probes (within the TCP -+ // connection) every 30 seconds -+ socket_->set_option(tcp_keep_alive_idle(1 * 60), error); -+ if (error) { -+ LOG_DEBUG(cnxString_ << "Socket failed to set tcp_keep_alive_idle: " << error.message()); -+ } - -- // Send up to 10 probes before declaring the connection broken -- socket_->set_option(tcp_keep_alive_count(10), error); -- if (error) { -- LOG_DEBUG(cnxString_ << "Socket failed to set tcp_keep_alive_count: " << error.message()); -- } -+ // Send up to 10 probes before declaring the connection broken -+ socket_->set_option(tcp_keep_alive_count(10), error); -+ if (error) { -+ LOG_DEBUG(cnxString_ << "Socket failed to set tcp_keep_alive_count: " << error.message()); -+ } - -- // Interval between probes: 6 seconds -- socket_->set_option(tcp_keep_alive_interval(6), error); -- if (error) { -- LOG_DEBUG(cnxString_ << "Socket failed to set tcp_keep_alive_interval: " << error.message()); -- } -+ // Interval between probes: 6 seconds -+ socket_->set_option(tcp_keep_alive_interval(6), error); -+ if (error) { -+ LOG_DEBUG(cnxString_ << "Socket failed to set tcp_keep_alive_interval: " << error.message()); -+ } - -- if (tlsSocket_) { -- if (!isTlsAllowInsecureConnection_) { -- ASIO_ERROR err; -- Url service_url; -- if (!Url::parse(physicalAddress_, service_url)) { -- LOG_ERROR(cnxString_ << "Invalid Url, unable to parse: " << err << " " << err.message()); -- close(); -- return; -- } -- } -- auto weakSelf = weak_from_this(); -- auto socket = socket_; -- auto tlsSocket = tlsSocket_; -- // socket and ssl::stream objects must exist until async_handshake is done, otherwise segmentation -- // fault might happen -- auto callback = [weakSelf, socket, tlsSocket](const ASIO_ERROR& err) { -- auto self = weakSelf.lock(); -- if (self) { -- self->handleHandshake(err); -- } -- }; -- tlsSocket_->async_handshake(ASIO::ssl::stream::client, -- ASIO::bind_executor(strand_, callback)); -- } else { -- handleHandshake(ASIO_SUCCESS); -- } -- } else if (endpointIterator != tcp::resolver::iterator()) { -- LOG_WARN(cnxString_ << "Failed to establish connection: " << err.message()); -- // The connection failed. Try the next endpoint in the list. -- ASIO_ERROR closeError; -- socket_->close(closeError); // ignore the error of close -- if (closeError) { -- LOG_WARN(cnxString_ << "Failed to close socket: " << err.message()); -- } -- connectTimeoutTask_->stop(); -- ++endpointIterator; -- if (endpointIterator != tcp::resolver::iterator()) { -- LOG_DEBUG(cnxString_ << "Connecting to " << endpointIterator->endpoint() << "..."); -- connectTimeoutTask_->start(); -- tcp::endpoint endpoint = *endpointIterator; -- auto weakSelf = weak_from_this(); -- socket_->async_connect(endpoint, [weakSelf, endpointIterator](const ASIO_ERROR& err) { -- auto self = weakSelf.lock(); -- if (self) { -- self->handleTcpConnected(err, endpointIterator); -- } -- }); -- } else { -- if (err == ASIO::error::operation_aborted) { -- // TCP connect timeout, which is not retryable -+ if (tlsSocket_) { -+ if (!isTlsAllowInsecureConnection_) { -+ ASIO_ERROR err; -+ Url service_url; -+ if (!Url::parse(physicalAddress_, service_url)) { -+ LOG_ERROR(cnxString_ << "Invalid Url, unable to parse: " << err << " " << err.message()); - close(); -- } else { -- close(ResultRetryable); -+ return; - } - } -+ auto weakSelf = weak_from_this(); -+ auto socket = socket_; -+ auto tlsSocket = tlsSocket_; -+ // socket and ssl::stream objects must exist until async_handshake is done, otherwise segmentation -+ // fault might happen -+ auto callback = [weakSelf, socket, tlsSocket](const ASIO_ERROR& err) { -+ auto self = weakSelf.lock(); -+ if (self) { -+ self->handleHandshake(err); -+ } -+ }; -+ tlsSocket_->async_handshake(ASIO::ssl::stream::client, -+ ASIO::bind_executor(strand_, callback)); - } else { -- LOG_ERROR(cnxString_ << "Failed to establish connection: " << err.message()); -- close(ResultRetryable); -+ handleHandshake(ASIO_SUCCESS); - } - } - -@@ -603,60 +561,71 @@ void ClientConnection::tcpConnectAsync() { - } - - LOG_DEBUG(cnxString_ << "Resolving " << service_url.host() << ":" << service_url.port()); -- tcp::resolver::query query(service_url.host(), std::to_string(service_url.port())); -+ tcp::resolver::endpoint_type endpoint(ASIO::ip::make_address(service_url.host()), service_url.port()); - auto weakSelf = weak_from_this(); -- resolver_->async_resolve(query, [weakSelf](const ASIO_ERROR& err, tcp::resolver::iterator iterator) { -- auto self = weakSelf.lock(); -- if (self) { -- self->handleResolve(err, iterator); -- } -- }); -+ resolver_->async_resolve( -+ endpoint, [this, weakSelf](const ASIO_ERROR& err, tcp::resolver::results_type results) { -+ auto self = weakSelf.lock(); -+ if (!self) { -+ return; -+ } -+ if (err) { -+ std::string hostUrl = isSniProxy_ ? cnxString_ : proxyServiceUrl_; -+ LOG_ERROR(hostUrl << "Resolve error: " << err << " : " << err.message()); -+ close(); -+ return; -+ } -+ if (results.empty()) { -+ LOG_ERROR(cnxString_ << "No IP address found"); -+ close(); -+ return; -+ } -+ connectTimeoutTask_->setCallback([weakSelf](const PeriodicTask::ErrorCode& ec) { -+ ClientConnectionPtr ptr = weakSelf.lock(); -+ if (!ptr) { -+ // Connection was already destroyed -+ return; -+ } -+ -+ if (ptr->state_ != Ready) { -+ LOG_ERROR(ptr->cnxString_ << "Connection was not established in " -+ << ptr->connectTimeoutTask_->getPeriodMs() -+ << " ms, close the socket"); -+ PeriodicTask::ErrorCode err; -+ ptr->socket_->close(err); -+ if (err) { -+ LOG_WARN(ptr->cnxString_ << "Failed to close socket: " << err.message()); -+ } -+ } -+ ptr->connectTimeoutTask_->stop(); -+ }); -+ connectTimeoutTask_->start(); -+ std::vector endpoints; -+ for (const auto& result : results) { -+ endpoints.emplace_back(result.endpoint()); -+ } -+ asyncConnect(endpoints, 0); -+ }); - } - --void ClientConnection::handleResolve(const ASIO_ERROR& err, tcp::resolver::iterator endpointIterator) { -- if (err) { -- std::string hostUrl = isSniProxy_ ? cnxString_ : proxyServiceUrl_; -- LOG_ERROR(hostUrl << "Resolve error: " << err << " : " << err.message()); -- close(); -+void ClientConnection::asyncConnect(const std::vector& endpoints, size_t index) { -+ if (index >= endpoints.size()) { -+ close(ResultRetryable); - return; - } -- - auto weakSelf = weak_from_this(); -- connectTimeoutTask_->setCallback([weakSelf](const PeriodicTask::ErrorCode& ec) { -- ClientConnectionPtr ptr = weakSelf.lock(); -- if (!ptr) { -- // Connection was already destroyed -+ socket_->async_connect(endpoints[index], [this, weakSelf, endpoints, index](const ASIO_ERROR& err) { -+ auto self = weakSelf.lock(); -+ if (!self) { - return; - } -- -- if (ptr->state_ != Ready) { -- LOG_ERROR(ptr->cnxString_ << "Connection was not established in " -- << ptr->connectTimeoutTask_->getPeriodMs() << " ms, close the socket"); -- PeriodicTask::ErrorCode err; -- ptr->socket_->close(err); -- if (err) { -- LOG_WARN(ptr->cnxString_ << "Failed to close socket: " << err.message()); -- } -+ if (err) { -+ LOG_ERROR(cnxString_ << "Failed to establish connection: " << err.message()); -+ asyncConnect(endpoints, index + 1); -+ return; - } -- ptr->connectTimeoutTask_->stop(); -+ completeConnect(endpoints[index]); - }); -- -- LOG_DEBUG(cnxString_ << "Connecting to " << endpointIterator->endpoint() << "..."); -- connectTimeoutTask_->start(); -- if (endpointIterator != tcp::resolver::iterator()) { -- LOG_DEBUG(cnxString_ << "Resolved hostname " << endpointIterator->host_name() // -- << " to " << endpointIterator->endpoint()); -- socket_->async_connect(*endpointIterator, [weakSelf, endpointIterator](const ASIO_ERROR& err) { -- auto self = weakSelf.lock(); -- if (self) { -- self->handleTcpConnected(err, endpointIterator); -- } -- }); -- } else { -- LOG_WARN(cnxString_ << "No IP address found"); -- close(); -- return; -- } - } - - void ClientConnection::readNextCommand() { -@@ -1058,7 +1027,7 @@ void ClientConnection::newLookup(const SharedBuffer& cmd, const uint64_t request - LookupRequestData requestData; - requestData.promise = promise; - requestData.timer = executor_->createDeadlineTimer(); -- requestData.timer->expires_from_now(operationsTimeout_); -+ requestData.timer->expires_after(operationsTimeout_); - auto weakSelf = weak_from_this(); - requestData.timer->async_wait([weakSelf, requestData](const ASIO_ERROR& ec) { - auto self = weakSelf.lock(); -@@ -1174,8 +1143,9 @@ void ClientConnection::sendPendingCommands() { - PairSharedBuffer buffer = - Commands::newSend(outgoingBuffer_, outgoingCmd, getChecksumType(), *args); - -- // Capture the buffer because asio does not copy the buffer, if the buffer is destroyed before the -- // callback is called, an invalid buffer range might be passed to the underlying socket send. -+ // Capture the buffer because asio does not copy the buffer, if the buffer is destroyed before -+ // the callback is called, an invalid buffer range might be passed to the underlying socket -+ // send. - asyncWrite(buffer, customAllocWriteHandler([this, self, buffer](const ASIO_ERROR& err, size_t) { - handleSendPair(err); - })); -@@ -1198,7 +1168,7 @@ Future ClientConnection::sendRequestWithId(SharedBuffer cm - - PendingRequestData requestData; - requestData.timer = executor_->createDeadlineTimer(); -- requestData.timer->expires_from_now(operationsTimeout_); -+ requestData.timer->expires_after(operationsTimeout_); - auto weakSelf = weak_from_this(); - requestData.timer->async_wait([weakSelf, requestData](const ASIO_ERROR& ec) { - auto self = weakSelf.lock(); -@@ -1251,7 +1221,7 @@ void ClientConnection::handleKeepAliveTimeout() { - // be zero And we do not attempt to dereference the pointer. - Lock lock(mutex_); - if (keepAliveTimer_) { -- keepAliveTimer_->expires_from_now(std::chrono::seconds(keepAliveIntervalInSeconds_)); -+ keepAliveTimer_->expires_after(std::chrono::seconds(keepAliveIntervalInSeconds_)); - auto weakSelf = weak_from_this(); - keepAliveTimer_->async_wait([weakSelf](const ASIO_ERROR&) { - auto self = weakSelf.lock(); -@@ -1430,7 +1400,7 @@ Future ClientConnection::newGetLastMessageId(u - LastMessageIdRequestData requestData; - requestData.promise = promise; - requestData.timer = executor_->createDeadlineTimer(); -- requestData.timer->expires_from_now(operationsTimeout_); -+ requestData.timer->expires_after(operationsTimeout_); - auto weakSelf = weak_from_this(); - requestData.timer->async_wait([weakSelf, requestData](const ASIO_ERROR& ec) { - auto self = weakSelf.lock(); -@@ -1478,7 +1448,7 @@ Future ClientConnection::newGetSchema(const std::string& top - lock.unlock(); - - auto weakSelf = weak_from_this(); -- timer->expires_from_now(operationsTimeout_); -+ timer->expires_after(operationsTimeout_); - timer->async_wait([this, weakSelf, requestId](const ASIO_ERROR& ec) { - auto self = weakSelf.lock(); - if (!self) { -@@ -2047,8 +2017,7 @@ void ClientConnection::unsafeRemovePendingRequest(long requestId) { - auto it = pendingRequests_.find(requestId); - if (it != pendingRequests_.end()) { - it->second.promise.setFailed(ResultDisconnected); -- ASIO_ERROR ec; -- it->second.timer->cancel(ec); -+ it->second.timer->cancel(); - pendingRequests_.erase(it); - } - } -diff --git lib/ClientConnection.h lib/ClientConnection.h -index 7646f85e..14e07652 100644 ---- lib/ClientConnection.h -+++ lib/ClientConnection.h -@@ -25,13 +25,13 @@ - #include - #ifdef USE_ASIO - #include --#include -+#include - #include - #include - #include - #else - #include --#include -+#include - #include - #include - #include -@@ -231,13 +231,8 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this& endpoints, size_t index); -+ void completeConnect(ASIO::ip::tcp::endpoint endpoint); - - void handleHandshake(const ASIO_ERROR& err); - -@@ -260,8 +255,6 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this strand_; -+ ASIO::strand strand_; - - const std::string logicalAddress_; - /* -diff --git lib/ConsumerImpl.cc lib/ConsumerImpl.cc -index 250845b3..cfdb0b2d 100644 ---- lib/ConsumerImpl.cc -+++ lib/ConsumerImpl.cc -@@ -422,7 +422,7 @@ void ConsumerImpl::discardChunkMessages(std::string uuid, MessageId messageId, b - } - - void ConsumerImpl::triggerCheckExpiredChunkedTimer() { -- checkExpiredChunkedTimer_->expires_from_now(milliseconds(expireTimeOfIncompleteChunkedMessageMs_)); -+ checkExpiredChunkedTimer_->expires_after(milliseconds(expireTimeOfIncompleteChunkedMessageMs_)); - std::weak_ptr weakSelf{shared_from_this()}; - checkExpiredChunkedTimer_->async_wait([this, weakSelf](const ASIO_ERROR& ec) -> void { - auto self = weakSelf.lock(); -@@ -1668,7 +1668,7 @@ void ConsumerImpl::internalGetLastMessageIdAsync(const BackoffPtr& backoff, Time - } - remainTime -= next; - -- timer->expires_from_now(next); -+ timer->expires_after(next); - - auto self = shared_from_this(); - timer->async_wait([this, backoff, remainTime, timer, next, callback, -@@ -1791,9 +1791,8 @@ std::shared_ptr ConsumerImpl::get_shared_this_ptr() { - } - - void ConsumerImpl::cancelTimers() noexcept { -- ASIO_ERROR ec; -- batchReceiveTimer_->cancel(ec); -- checkExpiredChunkedTimer_->cancel(ec); -+ batchReceiveTimer_->cancel(); -+ checkExpiredChunkedTimer_->cancel(); - unAckedMessageTrackerPtr_->stop(); - consumerStatsBasePtr_->stop(); - } -diff --git lib/ConsumerImplBase.cc lib/ConsumerImplBase.cc -index 098f2d5b..76d99370 100644 ---- lib/ConsumerImplBase.cc -+++ lib/ConsumerImplBase.cc -@@ -51,7 +51,7 @@ ConsumerImplBase::ConsumerImplBase(ClientImplPtr client, const std::string& topi - - void ConsumerImplBase::triggerBatchReceiveTimerTask(long timeoutMs) { - if (timeoutMs > 0) { -- batchReceiveTimer_->expires_from_now(std::chrono::milliseconds(timeoutMs)); -+ batchReceiveTimer_->expires_after(std::chrono::milliseconds(timeoutMs)); - std::weak_ptr weakSelf{shared_from_this()}; - batchReceiveTimer_->async_wait([weakSelf](const ASIO_ERROR& ec) { - auto self = weakSelf.lock(); -diff --git lib/ExecutorService.cc lib/ExecutorService.cc -index 794e3619..7f2a2c14 100644 ---- lib/ExecutorService.cc -+++ lib/ExecutorService.cc -@@ -18,6 +18,12 @@ - */ - #include "ExecutorService.h" - -+#ifdef USE_ASIO -+#include -+#else -+#include -+#endif -+ - #include "LogUtils.h" - #include "TimeUtils.h" - DECLARE_LOG_OBJECT() -@@ -31,18 +37,13 @@ ExecutorService::~ExecutorService() { close(0); } - void ExecutorService::start() { - auto self = shared_from_this(); - std::thread t{[this, self] { -- LOG_DEBUG("Run io_service in a single thread"); -- ASIO_ERROR ec; -+ LOG_DEBUG("Run io_context in a single thread"); - while (!closed_) { -- io_service_.restart(); -- IOService::work work{getIOService()}; -- io_service_.run(ec); -- } -- if (ec) { -- LOG_ERROR("Failed to run io_service: " << ec.message()); -- } else { -- LOG_DEBUG("Event loop of ExecutorService exits successfully"); -+ io_context_.restart(); -+ auto work{ASIO::make_work_guard(io_context_)}; -+ io_context_.run(); - } -+ LOG_DEBUG("Event loop of ExecutorService exits successfully"); - { - std::lock_guard lock{mutex_}; - ioServiceDone_ = true; -@@ -63,12 +64,12 @@ ExecutorServicePtr ExecutorService::create() { - } - - /* -- * factory method of ASIO::ip::tcp::socket associated with io_service_ instance -+ * factory method of ASIO::ip::tcp::socket associated with io_context_ instance - * @ returns shared_ptr to this socket - */ - SocketPtr ExecutorService::createSocket() { - try { -- return SocketPtr(new ASIO::ip::tcp::socket(io_service_)); -+ return SocketPtr(new ASIO::ip::tcp::socket(io_context_)); - } catch (const ASIO_SYSTEM_ERROR &e) { - restart(); - auto error = std::string("Failed to create socket: ") + e.what(); -@@ -82,12 +83,12 @@ TlsSocketPtr ExecutorService::createTlsSocket(SocketPtr &socket, ASIO::ssl::cont - } - - /* -- * factory method of Resolver object associated with io_service_ instance -+ * factory method of Resolver object associated with io_context_ instance - * @returns shraed_ptr to resolver object - */ - TcpResolverPtr ExecutorService::createTcpResolver() { - try { -- return TcpResolverPtr(new ASIO::ip::tcp::resolver(io_service_)); -+ return TcpResolverPtr(new ASIO::ip::tcp::resolver(io_context_)); - } catch (const ASIO_SYSTEM_ERROR &e) { - restart(); - auto error = std::string("Failed to create resolver: ") + e.what(); -@@ -97,7 +98,7 @@ TcpResolverPtr ExecutorService::createTcpResolver() { - - DeadlineTimerPtr ExecutorService::createDeadlineTimer() { - try { -- return DeadlineTimerPtr(new ASIO::steady_timer(io_service_)); -+ return DeadlineTimerPtr(new ASIO::steady_timer(io_context_)); - } catch (const ASIO_SYSTEM_ERROR &e) { - restart(); - auto error = std::string("Failed to create steady_timer: ") + e.what(); -@@ -105,7 +106,7 @@ DeadlineTimerPtr ExecutorService::createDeadlineTimer() { - } - } - --void ExecutorService::restart() { io_service_.stop(); } -+void ExecutorService::restart() { io_context_.stop(); } - - void ExecutorService::close(long timeoutMs) { - bool expectedState = false; -@@ -113,12 +114,12 @@ void ExecutorService::close(long timeoutMs) { - return; - } - if (timeoutMs == 0) { // non-blocking -- io_service_.stop(); -+ io_context_.stop(); - return; - } - - std::unique_lock lock{mutex_}; -- io_service_.stop(); -+ io_context_.stop(); - if (timeoutMs > 0) { - cond_.wait_for(lock, std::chrono::milliseconds(timeoutMs), [this] { return ioServiceDone_; }); - } else { // < 0 -@@ -126,7 +127,7 @@ void ExecutorService::close(long timeoutMs) { - } - } - --void ExecutorService::postWork(std::function task) { io_service_.post(task); } -+void ExecutorService::postWork(std::function task) { ASIO::post(io_context_, task); } - - ///////////////////// - -diff --git lib/ExecutorService.h lib/ExecutorService.h -index 89d06d30..626cb203 100644 ---- lib/ExecutorService.h -+++ lib/ExecutorService.h -@@ -23,11 +23,11 @@ - - #include - #ifdef USE_ASIO --#include -+#include - #include - #include - #else --#include -+#include - #include - #include - #endif -@@ -46,7 +46,7 @@ typedef std::shared_ptr > TlsSocketPt - typedef std::shared_ptr TcpResolverPtr; - class PULSAR_PUBLIC ExecutorService : public std::enable_shared_from_this { - public: -- using IOService = ASIO::io_service; -+ using IOService = ASIO::io_context; - using SharedPtr = std::shared_ptr; - - static SharedPtr create(); -@@ -67,14 +67,14 @@ class PULSAR_PUBLIC ExecutorService : public std::enable_shared_from_thiscancel(ignored); -- creationTimer_->cancel(ignored); -+ timer_->cancel(); -+ creationTimer_->cancel(); - } - - void HandlerBase::start() { -@@ -61,15 +60,14 @@ void HandlerBase::start() { - if (state_.compare_exchange_strong(state, Pending)) { - grabCnx(); - } -- creationTimer_->expires_from_now(operationTimeut_); -+ creationTimer_->expires_after(operationTimeut_); - std::weak_ptr weakSelf{shared_from_this()}; - creationTimer_->async_wait([this, weakSelf](const ASIO_ERROR& error) { - auto self = weakSelf.lock(); - if (self && !error) { - LOG_WARN("Cancel the pending reconnection due to the start timeout"); - connectionFailed(ResultTimeout); -- ASIO_ERROR ignored; -- timer_->cancel(ignored); -+ timer_->cancel(); - } - }); - } -@@ -133,8 +131,7 @@ void HandlerBase::grabCnx(const boost::optional& assignedBrokerUrl) - connectionTimeMs_ = - duration_cast(high_resolution_clock::now() - before).count(); - // Prevent the creationTimer_ from cancelling the timer_ in future -- ASIO_ERROR ignored; -- creationTimer_->cancel(ignored); -+ creationTimer_->cancel(); - LOG_INFO("Finished connecting to broker after " << connectionTimeMs_ << " ms") - } else if (isResultRetryable(result)) { - scheduleReconnection(); -@@ -188,7 +185,7 @@ void HandlerBase::scheduleReconnection(const boost::optional& assig - TimeDuration delay = assignedBrokerUrl ? std::chrono::milliseconds(0) : backoff_.next(); - - LOG_INFO(getName() << "Schedule reconnection in " << (toMillis(delay) / 1000.0) << " s"); -- timer_->expires_from_now(delay); -+ timer_->expires_after(delay); - // passing shared_ptr here since time_ will get destroyed, so tasks will be cancelled - // so we will not run into the case where grabCnx is invoked on out of scope handler - auto name = getName(); -diff --git lib/MultiTopicsConsumerImpl.cc lib/MultiTopicsConsumerImpl.cc -index dddade5c..61fbf7b8 100644 ---- lib/MultiTopicsConsumerImpl.cc -+++ lib/MultiTopicsConsumerImpl.cc -@@ -962,7 +962,7 @@ uint64_t MultiTopicsConsumerImpl::getNumberOfConnectedConsumer() { - return numberOfConnectedConsumer; - } - void MultiTopicsConsumerImpl::runPartitionUpdateTask() { -- partitionsUpdateTimer_->expires_from_now(partitionsUpdateInterval_); -+ partitionsUpdateTimer_->expires_after(partitionsUpdateInterval_); - auto weakSelf = weak_from_this(); - partitionsUpdateTimer_->async_wait([weakSelf](const ASIO_ERROR& ec) { - // If two requests call runPartitionUpdateTask at the same time, the timer will fail, and it -@@ -1115,8 +1115,7 @@ void MultiTopicsConsumerImpl::beforeConnectionChange(ClientConnection& cnx) { - - void MultiTopicsConsumerImpl::cancelTimers() noexcept { - if (partitionsUpdateTimer_) { -- ASIO_ERROR ec; -- partitionsUpdateTimer_->cancel(ec); -+ partitionsUpdateTimer_->cancel(); - } - } - -diff --git lib/NegativeAcksTracker.cc lib/NegativeAcksTracker.cc -index e443496d..e50b4ca2 100644 ---- lib/NegativeAcksTracker.cc -+++ lib/NegativeAcksTracker.cc -@@ -50,7 +50,7 @@ void NegativeAcksTracker::scheduleTimer() { - return; - } - std::weak_ptr weakSelf{shared_from_this()}; -- timer_->expires_from_now(timerInterval_); -+ timer_->expires_after(timerInterval_); - timer_->async_wait([weakSelf](const ASIO_ERROR &ec) { - if (auto self = weakSelf.lock()) { - self->handleTimer(ec); -@@ -107,8 +107,7 @@ void NegativeAcksTracker::add(const MessageId &m) { - - void NegativeAcksTracker::close() { - closed_ = true; -- ASIO_ERROR ec; -- timer_->cancel(ec); -+ timer_->cancel(); - std::lock_guard lock(mutex_); - nackedMessages_.clear(); - } -diff --git lib/PartitionedProducerImpl.cc lib/PartitionedProducerImpl.cc -index 4178096c..923c038b 100644 ---- lib/PartitionedProducerImpl.cc -+++ lib/PartitionedProducerImpl.cc -@@ -421,7 +421,7 @@ void PartitionedProducerImpl::flushAsync(FlushCallback callback) { - - void PartitionedProducerImpl::runPartitionUpdateTask() { - auto weakSelf = weak_from_this(); -- partitionsUpdateTimer_->expires_from_now(partitionsUpdateInterval_); -+ partitionsUpdateTimer_->expires_after(partitionsUpdateInterval_); - partitionsUpdateTimer_->async_wait([weakSelf](const ASIO_ERROR& ec) { - auto self = weakSelf.lock(); - if (self) { -@@ -524,8 +524,7 @@ uint64_t PartitionedProducerImpl::getNumberOfConnectedProducer() { - - void PartitionedProducerImpl::cancelTimers() noexcept { - if (partitionsUpdateTimer_) { -- ASIO_ERROR ec; -- partitionsUpdateTimer_->cancel(ec); -+ partitionsUpdateTimer_->cancel(); - } - } - -diff --git lib/PatternMultiTopicsConsumerImpl.cc lib/PatternMultiTopicsConsumerImpl.cc -index 4fc7bb61..07d9a7bc 100644 ---- lib/PatternMultiTopicsConsumerImpl.cc -+++ lib/PatternMultiTopicsConsumerImpl.cc -@@ -48,7 +48,7 @@ const PULSAR_REGEX_NAMESPACE::regex PatternMultiTopicsConsumerImpl::getPattern() - - void PatternMultiTopicsConsumerImpl::resetAutoDiscoveryTimer() { - autoDiscoveryRunning_ = false; -- autoDiscoveryTimer_->expires_from_now(seconds(conf_.getPatternAutoDiscoveryPeriod())); -+ autoDiscoveryTimer_->expires_after(seconds(conf_.getPatternAutoDiscoveryPeriod())); - - auto weakSelf = weak_from_this(); - autoDiscoveryTimer_->async_wait([weakSelf](const ASIO_ERROR& err) { -@@ -228,7 +228,7 @@ void PatternMultiTopicsConsumerImpl::start() { - LOG_DEBUG("PatternMultiTopicsConsumerImpl start autoDiscoveryTimer_."); - - if (conf_.getPatternAutoDiscoveryPeriod() > 0) { -- autoDiscoveryTimer_->expires_from_now(seconds(conf_.getPatternAutoDiscoveryPeriod())); -+ autoDiscoveryTimer_->expires_after(seconds(conf_.getPatternAutoDiscoveryPeriod())); - auto weakSelf = weak_from_this(); - autoDiscoveryTimer_->async_wait([weakSelf](const ASIO_ERROR& err) { - if (auto self = weakSelf.lock()) { -@@ -248,7 +248,4 @@ void PatternMultiTopicsConsumerImpl::closeAsync(ResultCallback callback) { - MultiTopicsConsumerImpl::closeAsync(callback); - } - --void PatternMultiTopicsConsumerImpl::cancelTimers() noexcept { -- ASIO_ERROR ec; -- autoDiscoveryTimer_->cancel(ec); --} -+void PatternMultiTopicsConsumerImpl::cancelTimers() noexcept { autoDiscoveryTimer_->cancel(); } -diff --git lib/PeriodicTask.cc lib/PeriodicTask.cc -index 9fde012a..4b5f9621 100644 ---- lib/PeriodicTask.cc -+++ lib/PeriodicTask.cc -@@ -29,7 +29,7 @@ void PeriodicTask::start() { - state_ = Ready; - if (periodMs_ >= 0) { - std::weak_ptr weakSelf{shared_from_this()}; -- timer_->expires_from_now(std::chrono::milliseconds(periodMs_)); -+ timer_->expires_after(std::chrono::milliseconds(periodMs_)); - timer_->async_wait([weakSelf](const ErrorCode& ec) { - auto self = weakSelf.lock(); - if (self) { -@@ -44,8 +44,7 @@ void PeriodicTask::stop() noexcept { - if (!state_.compare_exchange_strong(state, Closing)) { - return; - } -- ErrorCode ec; -- timer_->cancel(ec); -+ timer_->cancel(); - state_ = Pending; - } - -@@ -59,7 +58,7 @@ void PeriodicTask::handleTimeout(const ErrorCode& ec) { - // state_ may be changed in handleTimeout, so we check state_ again - if (state_ == Ready) { - auto self = shared_from_this(); -- timer_->expires_from_now(std::chrono::milliseconds(periodMs_)); -+ timer_->expires_after(std::chrono::milliseconds(periodMs_)); - timer_->async_wait([this, self](const ErrorCode& ec) { handleTimeout(ec); }); - } - } -diff --git lib/ProducerImpl.cc lib/ProducerImpl.cc -index 4399ce5f..8b112bf1 100644 ---- lib/ProducerImpl.cc -+++ lib/ProducerImpl.cc -@@ -570,7 +570,7 @@ void ProducerImpl::sendAsyncWithStatsUpdate(const Message& msg, SendCallback&& c - bool isFirstMessage = batchMessageContainer_->isFirstMessageToAdd(msg); - bool isFull = batchMessageContainer_->add(msg, callback); - if (isFirstMessage) { -- batchTimer_->expires_from_now(milliseconds(conf_.getBatchingMaxPublishDelayMs())); -+ batchTimer_->expires_after(milliseconds(conf_.getBatchingMaxPublishDelayMs())); - auto weakSelf = weak_from_this(); - batchTimer_->async_wait([this, weakSelf](const ASIO_ERROR& ec) { - auto self = weakSelf.lock(); -@@ -1007,9 +1007,8 @@ void ProducerImpl::shutdown() { - - void ProducerImpl::cancelTimers() noexcept { - dataKeyRefreshTask_.stop(); -- ASIO_ERROR ec; -- batchTimer_->cancel(ec); -- sendTimer_->cancel(ec); -+ batchTimer_->cancel(); -+ sendTimer_->cancel(); - } - - bool ProducerImplCmp::operator()(const ProducerImplPtr& a, const ProducerImplPtr& b) const { -@@ -1030,7 +1029,7 @@ void ProducerImpl::startSendTimeoutTimer() { - } - - void ProducerImpl::asyncWaitSendTimeout(DurationType expiryTime) { -- sendTimer_->expires_from_now(expiryTime); -+ sendTimer_->expires_after(expiryTime); - - auto weakSelf = weak_from_this(); - sendTimer_->async_wait([weakSelf](const ASIO_ERROR& err) { -diff --git lib/RetryableOperation.h lib/RetryableOperation.h -index dba190f4..8a235d3a 100644 ---- lib/RetryableOperation.h -+++ lib/RetryableOperation.h -@@ -26,8 +26,8 @@ - #include - #include - -+#include "AsioTimer.h" - #include "Backoff.h" --#include "ExecutorService.h" - #include "Future.h" - #include "LogUtils.h" - #include "ResultUtils.h" -@@ -68,8 +68,7 @@ class RetryableOperation : public std::enable_shared_from_thiscancel(ec); -+ timer_->cancel(); - } - - private: -@@ -107,7 +106,7 @@ class RetryableOperation : public std::enable_shared_from_thisexpires_from_now(delay); -+ timer_->expires_after(delay); - - auto nextRemainingTime = remainingTime - delay; - LOG_INFO("Reschedule " << name_ << " for " << toMillis(delay) -diff --git lib/RetryableOperationCache.h lib/RetryableOperationCache.h -index e42460dd..5030c94e 100644 ---- lib/RetryableOperationCache.h -+++ lib/RetryableOperationCache.h -@@ -18,7 +18,6 @@ - */ - #pragma once - --#include - #include - #include - -diff --git lib/SharedBuffer.h lib/SharedBuffer.h -index 26fc59ed..a6ced186 100644 ---- lib/SharedBuffer.h -+++ lib/SharedBuffer.h -@@ -151,11 +151,11 @@ class SharedBuffer { - - inline bool writable() const { return writableBytes() > 0; } - -- ASIO::const_buffers_1 const_asio_buffer() const { -- return ASIO::const_buffers_1(ptr_ + readIdx_, readableBytes()); -+ ASIO::const_buffer const_asio_buffer() const { -+ return ASIO::const_buffer(ptr_ + readIdx_, readableBytes()); - } - -- ASIO::mutable_buffers_1 asio_buffer() { -+ ASIO::mutable_buffer asio_buffer() { - assert(data_); - return ASIO::buffer(ptr_ + writeIdx_, writableBytes()); - } -diff --git lib/UnAckedMessageTrackerEnabled.cc lib/UnAckedMessageTrackerEnabled.cc -index e371af99..3b959d8a 100644 ---- lib/UnAckedMessageTrackerEnabled.cc -+++ lib/UnAckedMessageTrackerEnabled.cc -@@ -34,7 +34,7 @@ void UnAckedMessageTrackerEnabled::timeoutHandler() { - timeoutHandlerHelper(); - ExecutorServicePtr executorService = client_->getIOExecutorProvider()->get(); - timer_ = executorService->createDeadlineTimer(); -- timer_->expires_from_now(std::chrono::milliseconds(tickDurationInMs_)); -+ timer_->expires_after(std::chrono::milliseconds(tickDurationInMs_)); - std::weak_ptr weakSelf{shared_from_this()}; - timer_->async_wait([weakSelf](const ASIO_ERROR& ec) { - auto self = weakSelf.lock(); -@@ -173,9 +173,8 @@ void UnAckedMessageTrackerEnabled::clear() { - } - - void UnAckedMessageTrackerEnabled::stop() { -- ASIO_ERROR ec; - if (timer_) { -- timer_->cancel(ec); -+ timer_->cancel(); - } - } - } /* namespace pulsar */ -diff --git lib/stats/ConsumerStatsImpl.cc lib/stats/ConsumerStatsImpl.cc -index 0eefabdc..e8bd919a 100644 ---- lib/stats/ConsumerStatsImpl.cc -+++ lib/stats/ConsumerStatsImpl.cc -@@ -85,7 +85,7 @@ void ConsumerStatsImpl::messageAcknowledged(Result res, CommandAck_AckType ackTy - } - - void ConsumerStatsImpl::scheduleTimer() { -- timer_->expires_from_now(std::chrono::seconds(statsIntervalInSeconds_)); -+ timer_->expires_after(std::chrono::seconds(statsIntervalInSeconds_)); - std::weak_ptr weakSelf{shared_from_this()}; - timer_->async_wait([this, weakSelf](const ASIO_ERROR& ec) { - auto self = weakSelf.lock(); -diff --git lib/stats/ConsumerStatsImpl.h lib/stats/ConsumerStatsImpl.h -index 3333ea85..35fda9b4 100644 ---- lib/stats/ConsumerStatsImpl.h -+++ lib/stats/ConsumerStatsImpl.h -@@ -59,10 +59,7 @@ class ConsumerStatsImpl : public std::enable_shared_from_this - ConsumerStatsImpl(const ConsumerStatsImpl& stats); - void flushAndReset(const ASIO_ERROR&); - void start() override; -- void stop() override { -- ASIO_ERROR error; -- timer_->cancel(error); -- } -+ void stop() override { timer_->cancel(); } - void receivedMessage(Message&, Result) override; - void messageAcknowledged(Result, CommandAck_AckType, uint32_t ackNums) override; - virtual ~ConsumerStatsImpl(); -diff --git lib/stats/ProducerStatsImpl.cc lib/stats/ProducerStatsImpl.cc -index 15e9e67e..b5e00794 100644 ---- lib/stats/ProducerStatsImpl.cc -+++ lib/stats/ProducerStatsImpl.cc -@@ -109,7 +109,7 @@ void ProducerStatsImpl::messageReceived(Result res, const ptime& publishTime) { - ProducerStatsImpl::~ProducerStatsImpl() { timer_->cancel(); } - - void ProducerStatsImpl::scheduleTimer() { -- timer_->expires_from_now(std::chrono::seconds(statsIntervalInSeconds_)); -+ timer_->expires_after(std::chrono::seconds(statsIntervalInSeconds_)); - std::weak_ptr weakSelf{shared_from_this()}; - timer_->async_wait([this, weakSelf](const ASIO_ERROR& ec) { - auto self = weakSelf.lock(); -diff --git tests/AuthPluginTest.cc tests/AuthPluginTest.cc -index 24549d7f..ed0511ea 100644 ---- tests/AuthPluginTest.cc -+++ tests/AuthPluginTest.cc -@@ -309,16 +309,17 @@ namespace testAthenz { - std::string principalToken; - void mockZTS(Latch& latch, int port) { - LOG_INFO("-- MockZTS started"); -- ASIO::io_service io; -- ASIO::ip::tcp::iostream stream; -+ ASIO::io_context io; -+ ASIO::ip::tcp::socket socket(io); - ASIO::ip::tcp::acceptor acceptor(io, ASIO::ip::tcp::endpoint(ASIO::ip::tcp::v4(), port)); - - LOG_INFO("-- MockZTS waiting for connnection"); - latch.countdown(); -- acceptor.accept(*stream.rdbuf()); -+ acceptor.accept(socket); - LOG_INFO("-- MockZTS got connection"); - - std::string headerLine; -+ ASIO::ip::tcp::iostream stream(std::move(socket)); - while (getline(stream, headerLine)) { - std::vector kv; - boost::algorithm::split(kv, headerLine, boost::is_any_of(" ")); -diff --git tests/ConsumerTest.h tests/ConsumerTest.h -index 82482875..9d190c10 100644 ---- tests/ConsumerTest.h -+++ tests/ConsumerTest.h -@@ -46,8 +46,8 @@ class ConsumerTest { - return nullptr; - } - auto timer = cnx->executor_->createDeadlineTimer(); -- timer->expires_from_now(delaySinceStartGrabCnx - -- std::chrono::milliseconds(impl->connectionTimeMs_ + 50)); -+ timer->expires_after(delaySinceStartGrabCnx - -+ std::chrono::milliseconds(impl->connectionTimeMs_ + 50)); - timer->async_wait([cnx](const ASIO_ERROR&) { cnx->close(); }); - return timer; - } diff --git a/net-p2p/pulsar-client-cpp/files/patch-lib_AutoClusterFailover.cc b/net-p2p/pulsar-client-cpp/files/patch-lib_AutoClusterFailover.cc new file mode 100644 index 000000000000..c506fdab783e --- /dev/null +++ b/net-p2p/pulsar-client-cpp/files/patch-lib_AutoClusterFailover.cc @@ -0,0 +1,11 @@ +--- lib/AutoClusterFailover.cc.orig 2026-04-17 06:16:08 UTC ++++ lib/AutoClusterFailover.cc +@@ -173,7 +173,7 @@ class AutoClusterFailoverImpl : public std::enable_sha + ASIO_ERROR ignored; + context->resolver.cancel(); + context->socket.close(ignored); +- context->timer.cancel(ignored); ++ context->timer.cancel(); + + context->callback(success); + } diff --git a/net-p2p/pulsar-client-cpp/pkg-plist b/net-p2p/pulsar-client-cpp/pkg-plist index 660018914678..e31cfb7ef0a2 100644 --- a/net-p2p/pulsar-client-cpp/pkg-plist +++ b/net-p2p/pulsar-client-cpp/pkg-plist @@ -1,67 +1,71 @@ include/pulsar/Authentication.h +include/pulsar/AutoClusterFailover.h include/pulsar/BatchReceivePolicy.h include/pulsar/BrokerConsumerStats.h include/pulsar/Client.h include/pulsar/ClientConfiguration.h include/pulsar/CompressionType.h include/pulsar/ConsoleLoggerFactory.h include/pulsar/Consumer.h include/pulsar/ConsumerConfiguration.h include/pulsar/ConsumerCryptoFailureAction.h include/pulsar/ConsumerEventListener.h include/pulsar/ConsumerInterceptor.h include/pulsar/ConsumerType.h include/pulsar/CryptoKeyReader.h include/pulsar/DeadLetterPolicy.h include/pulsar/DeadLetterPolicyBuilder.h include/pulsar/DeprecatedException.h +include/pulsar/EncryptionContext.h include/pulsar/EncryptionKeyInfo.h include/pulsar/FileLoggerFactory.h include/pulsar/InitialPosition.h include/pulsar/KeySharedPolicy.h include/pulsar/KeyValue.h include/pulsar/Logger.h include/pulsar/Message.h include/pulsar/MessageBatch.h include/pulsar/MessageBuilder.h include/pulsar/MessageId.h include/pulsar/MessageIdBuilder.h include/pulsar/MessageRoutingPolicy.h include/pulsar/Producer.h include/pulsar/ProducerConfiguration.h include/pulsar/ProducerCryptoFailureAction.h include/pulsar/ProducerInterceptor.h include/pulsar/ProtobufNativeSchema.h include/pulsar/Reader.h include/pulsar/ReaderConfiguration.h include/pulsar/RegexSubscriptionMode.h include/pulsar/Result.h include/pulsar/Schema.h +include/pulsar/ServiceInfo.h +include/pulsar/ServiceInfoProvider.h include/pulsar/TableView.h include/pulsar/TableViewConfiguration.h include/pulsar/TopicMetadata.h include/pulsar/TypedMessage.h include/pulsar/TypedMessageBuilder.h include/pulsar/Version.h include/pulsar/c/authentication.h include/pulsar/c/client.h include/pulsar/c/client_configuration.h include/pulsar/c/consumer.h include/pulsar/c/consumer_configuration.h include/pulsar/c/message.h include/pulsar/c/message_id.h include/pulsar/c/message_router.h include/pulsar/c/messages.h include/pulsar/c/producer.h include/pulsar/c/producer_configuration.h include/pulsar/c/reader.h include/pulsar/c/reader_configuration.h include/pulsar/c/result.h include/pulsar/c/string_list.h include/pulsar/c/string_map.h include/pulsar/c/table_view.h include/pulsar/c/table_view_configuration.h include/pulsar/c/version.h include/pulsar/defines.h lib/libpulsar.a lib/libpulsar.so