Source code for pulsar.client.test.test_relay_transport

"""
Tests for the relay transport implementation.

Tests retry logic and message ID tracking functionality.
"""
from unittest import TestCase
from unittest.mock import Mock, patch

import requests

from pulsar.client.transport.relay import RelayTransport, RelayTransportError


[docs] class TestRetryLogic(TestCase): """Test retry logic with exponential backoff."""
[docs] @patch('pulsar.client.transport.relay.time.sleep') def test_post_message_retries_on_connection_error(self, mock_sleep): """Test that post_message retries indefinitely on connection errors.""" transport = RelayTransport('http://localhost:8000', 'user', 'pass') # Mock the auth manager to return a token transport.auth_manager.get_token = Mock(return_value='test-token') # Mock session.post to fail twice with ConnectionError, then succeed mock_response = Mock() mock_response.status_code = 200 mock_response.json.return_value = { 'message_id': 'msg_123', 'topic': 'test-topic', 'timestamp': '2025-10-27T10:00:00Z' } transport.session.post = Mock( side_effect=[ requests.ConnectionError("Connection refused"), requests.ConnectionError("Connection refused"), mock_response ] ) result = transport.post_message('test-topic', {'data': 'test'}) # Verify it succeeded after retries assert result['message_id'] == 'msg_123' assert transport.session.post.call_count == 3 # Verify exponential backoff was used assert mock_sleep.call_count == 2 # First delay should be 1.0, second should be 2.0 assert mock_sleep.call_args_list[0][0][0] == 1.0 assert mock_sleep.call_args_list[1][0][0] == 2.0
[docs] @patch('pulsar.client.transport.relay.time.sleep') def test_post_message_retries_on_500_error(self, mock_sleep): """Test that post_message retries on 5xx server errors.""" transport = RelayTransport('http://localhost:8000', 'user', 'pass') transport.auth_manager.get_token = Mock(return_value='test-token') # Mock responses: 500, 503, then 200 mock_500 = Mock() mock_500.status_code = 500 mock_503 = Mock() mock_503.status_code = 503 mock_200 = Mock() mock_200.status_code = 200 mock_200.json.return_value = { 'message_id': 'msg_456', 'topic': 'test-topic', 'timestamp': '2025-10-27T10:00:00Z' } transport.session.post = Mock(side_effect=[mock_500, mock_503, mock_200]) result = transport.post_message('test-topic', {'data': 'test'}) assert result['message_id'] == 'msg_456' assert transport.session.post.call_count == 3 assert mock_sleep.call_count == 2
[docs] def test_post_message_does_not_retry_on_400_error(self): """Test that post_message does NOT retry on 4xx client errors.""" transport = RelayTransport('http://localhost:8000', 'user', 'pass') transport.auth_manager.get_token = Mock(return_value='test-token') # Mock response with 400 error mock_400 = Mock() mock_400.status_code = 400 # Create HTTPError with response attached error = requests.HTTPError("400 Bad Request") error.response = mock_400 mock_400.raise_for_status.side_effect = error transport.session.post = Mock(return_value=mock_400) with self.assertRaises(RelayTransportError): transport.post_message('test-topic', {'data': 'test'}) # Should only be called once (no retries for 4xx) assert transport.session.post.call_count == 1
[docs] @patch('pulsar.client.transport.relay.time.sleep') def test_post_message_retries_on_timeout(self, mock_sleep): """Test that post_message retries on timeout.""" transport = RelayTransport('http://localhost:8000', 'user', 'pass') transport.auth_manager.get_token = Mock(return_value='test-token') mock_response = Mock() mock_response.status_code = 200 mock_response.json.return_value = { 'message_id': 'msg_789', 'topic': 'test-topic', 'timestamp': '2025-10-27T10:00:00Z' } transport.session.post = Mock( side_effect=[ requests.Timeout("Request timed out"), mock_response ] ) result = transport.post_message('test-topic', {'data': 'test'}) assert result['message_id'] == 'msg_789' assert transport.session.post.call_count == 2 assert mock_sleep.call_count == 1
[docs] @patch('pulsar.client.transport.relay.time.sleep') def test_retry_backoff_caps_at_max_delay(self, mock_sleep): """Test that exponential backoff caps at max_delay.""" transport = RelayTransport('http://localhost:8000', 'user', 'pass') transport.auth_manager.get_token = Mock(return_value='test-token') # Create many connection errors to test max delay errors = [requests.ConnectionError("Connection refused")] * 10 mock_response = Mock() mock_response.status_code = 200 mock_response.json.return_value = { 'message_id': 'msg_999', 'topic': 'test-topic', 'timestamp': '2025-10-27T10:00:00Z' } transport.session.post = Mock(side_effect=errors + [mock_response]) result = transport.post_message('test-topic', {'data': 'test'}) assert result['message_id'] == 'msg_999' assert mock_sleep.call_count == 10 # Check that delay caps at 60 seconds delays = [call[0][0] for call in mock_sleep.call_args_list] # Expected: 1, 2, 4, 8, 16, 32, 60, 60, 60, 60 assert delays[0] == 1.0 assert delays[1] == 2.0 assert delays[2] == 4.0 assert delays[3] == 8.0 assert delays[4] == 16.0 assert delays[5] == 32.0 # After this, should cap at 60 assert all(d == 60.0 for d in delays[6:])
[docs] class TestMessageIDTracking: """Test message ID tracking functionality."""
[docs] def test_long_poll_tracks_message_ids(self): """Test that long_poll tracks message IDs per topic.""" transport = RelayTransport('http://localhost:8000', 'user', 'pass') transport.auth_manager.get_token = Mock(return_value='test-token') # Mock response with messages from different topics mock_response = Mock() mock_response.status_code = 200 mock_response.json.return_value = { 'messages': [ {'topic': 'topic1', 'message_id': 'msg_001', 'payload': {'data': 'a'}}, {'topic': 'topic2', 'message_id': 'msg_002', 'payload': {'data': 'b'}}, {'topic': 'topic1', 'message_id': 'msg_003', 'payload': {'data': 'c'}}, ], 'has_more': False } transport.session.post = Mock(return_value=mock_response) messages = transport.long_poll(['topic1', 'topic2']) # Verify message IDs are tracked (last message ID per topic) assert transport.get_last_message_id('topic1') == 'msg_003' assert transport.get_last_message_id('topic2') == 'msg_002' assert len(messages) == 3
[docs] def test_long_poll_uses_tracked_message_ids_in_since(self): """Test that long_poll includes tracked message IDs in the since parameter.""" transport = RelayTransport('http://localhost:8000', 'user', 'pass') transport.auth_manager.get_token = Mock(return_value='test-token') # Set some tracked message IDs transport.set_last_message_id('topic1', 'msg_100') transport.set_last_message_id('topic2', 'msg_200') mock_response = Mock() mock_response.status_code = 200 mock_response.json.return_value = { 'messages': [], 'has_more': False } transport.session.post = Mock(return_value=mock_response) # Call long_poll transport.long_poll(['topic1', 'topic2']) # Verify the 'since' parameter was included in the request call_args = transport.session.post.call_args request_json = call_args[1]['json'] assert 'since' in request_json assert request_json['since']['topic1'] == 'msg_100' assert request_json['since']['topic2'] == 'msg_200'
[docs] def test_long_poll_only_includes_since_for_requested_topics(self): """Test that since only includes tracked IDs for topics in the request.""" transport = RelayTransport('http://localhost:8000', 'user', 'pass') transport.auth_manager.get_token = Mock(return_value='test-token') # Set tracked message IDs for multiple topics transport.set_last_message_id('topic1', 'msg_100') transport.set_last_message_id('topic2', 'msg_200') transport.set_last_message_id('topic3', 'msg_300') mock_response = Mock() mock_response.status_code = 200 mock_response.json.return_value = {'messages': [], 'has_more': False} transport.session.post = Mock(return_value=mock_response) # Only poll for topic1 and topic2 transport.long_poll(['topic1', 'topic2']) call_args = transport.session.post.call_args request_json = call_args[1]['json'] # Should only include topic1 and topic2 in since assert 'since' in request_json assert 'topic1' in request_json['since'] assert 'topic2' in request_json['since'] assert 'topic3' not in request_json['since']