Transactional Outbox Pattern.
Transactional Outbox, dağıtık sistemlerde ve mikroservis mimarilerinde veri tutarlılığını sağlamak için kullanılan bir desendir. Bu desen, veritabanı işlemleri ve mesajlaşma sistemleri arasında olabilecek tutarsızlıkları önler. Özellikle mikroservislerin birbirleriyle iletişim kurarken mesajlaşma (örn. RabbitMQ, Kafka) sistemleri kullandığı durumlarda kritik öneme sahiptir.
Normalde, veritabanı işlemleri ve mesaj iletimi birbirinden bağımsızdır, bu da veri tutarsızlığına yol açabilir. Transactional Outbox, her iki işlemi bir arada gerçekleştirir. Yani, bir veritabanı işlemi yapılırken, bu işlemle ilişkili mesajlar bir “outbox” tablosuna kaydedilir. Ardından, bu mesajlar asenkron bir şekilde dış bir servis tarafından kuyruklara iletilir.
Temel Çalışma Prensibi
- Veritabanı Güncellemesi ve Mesajın Kaydedilmesi:
- Bir işlem başlatıldığında (örneğin bir sipariş oluşturulduğunda), bu işlemle ilişkili bir mesaj aynı zamanda veritabanındaki bir
outbox
tablosuna kaydedilir. Böylece, veritabanı işlemi ve mesaj iletme işlemi bir transaction içerisinde gerçekleşir.
- Outbox’dan Mesajların Kuyruğa Gönderilmesi:
- Arka planda çalışan bir servis, periyodik olarak
outbox
tablosunu kontrol eder vePENDING
durumundaki mesajları alarak, bunları ilgili mesaj kuyruğuna (örneğin RabbitMQ, Kafka) gönderir.
- Başarılı Gönderim Sonrası Güncelleme:
- Mesaj başarıyla kuyruğa iletildiyse, mesajın durumu
SENT
olarak güncellenir. Eğer mesaj iletimi başarısız olursa, mesajın durumuFAILED
olarak işaretlenir.
Bu desen, veritabanı ve mesaj sistemleri arasındaki tutarsızlıkları engeller ve sistemin tutarlı bir şekilde çalışmasını sağlar.
Java ile Transactional Outbox Uygulaması
Şimdi, Transactional Outbox desenini bir Java Spring Boot uygulaması ile nasıl implement edebileceğimizi adım adım görelim.
1. Veritabanı Yapısı
Öncelikle, outbox tablosu veritabanında şöyle görünür:
CREATE TABLE outbox (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
aggregate_id BIGINT NOT NULL,
message_type VARCHAR(255) NOT NULL,
payload TEXT NOT NULL,
status VARCHAR(50) DEFAULT 'PENDING',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);
- aggregate_id: Mesajın ilişkili olduğu ana iş öğesi (örneğin, sipariş ID’si).
- message_type: Mesaj türü (örneğin,
OrderCreated
). - payload: Mesajın içeriği, genellikle JSON formatında.
- status: Mesajın durumu (başlangıçta
PENDING
, daha sonraSENT
veyaFAILED
olabilir). - created_at: Mesajın oluşturulma zamanı.
- updated_at: Mesajın güncellenme zamanı.
2. Spring Boot Yapısı
a. Maven Bağımlılıkları
pom.xml
dosyanıza gerekli bağımlılıkları ekleyin:
<dependencies>
<!-- Spring Boot Starter Web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- Spring Boot Starter Data JPA (Hibernate) -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<!-- H2 Database (veya gerçek bir veritabanı kullanabilirsiniz) -->
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<scope>runtime</scope>
</dependency>
<!-- Spring Boot Starter AMQP (RabbitMQ için) -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!-- Jackson (JSON işleme için) -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<!-- Spring Boot Starter Test -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
</dependencies>
b. JPA Entity: Outbox Mesaj Tablosu
OutboxMessage
JPA entity'si, mesajların outbox
tablosuna kaydedilmesini sağlar:
import javax.persistence.*;
import java.time.LocalDateTime;
@Entity
public class OutboxMessage {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@Column(nullable = false)
private Long aggregateId;
@Column(nullable = false)
private String messageType;
@Column(nullable = false, columnDefinition = "TEXT")
private String payload;
@Column(nullable = false)
private String status = "PENDING"; // 'PENDING', 'SENT', 'FAILED'
@Column(nullable = false)
private LocalDateTime createdAt = LocalDateTime.now();
@Column(nullable = false)
private LocalDateTime updatedAt = LocalDateTime.now();
// Getters and setters...
}
c. JPA Repository: Outbox Mesajları Repository
Mesajları almak ve güncellemek için bir repository sınıfı oluşturuyoruz:
import org.springframework.data.jpa.repository.JpaRepository;
public interface OutboxMessageRepository extends JpaRepository<OutboxMessage, Long> {
// PENDING durumundaki mesajları alalım
List<OutboxMessage> findByStatus(String status);
}
d. Mesaj Gönderme Servisi
Mesajları outbox
tablosuna kaydederken aynı zamanda mesajları dışa gönderen servis:
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.List;
@Service
public class OutboxService {
@Autowired
private OutboxMessageRepository outboxMessageRepository;
@Autowired
private RabbitTemplate rabbitTemplate;
private static final String QUEUE_NAME = "order-events";
// Mesajları veritabanına kaydeder
public void saveMessage(Long aggregateId, String messageType, String payload) {
OutboxMessage message = new OutboxMessage();
message.setAggregateId(aggregateId);
message.setMessageType(messageType);
message.setPayload(payload);
outboxMessageRepository.save(message);
}
// Outbox'tan mesajları alıp, kuyruğa gönderir
public void processPendingMessages() {
List<OutboxMessage> pendingMessages = outboxMessageRepository.findByStatus("PENDING");
for (OutboxMessage message : pendingMessages) {
try {
// RabbitMQ kuyruğuna gönderiyoruz
rabbitTemplate.convertAndSend(QUEUE_NAME, message.getPayload());
// Mesajı 'SENT' olarak güncelliyoruz
message.setStatus("SENT");
outboxMessageRepository.save(message);
} catch (Exception e) {
// Mesaj gönderme başarısız olursa 'FAILED' olarak işaretliyoruz
message.setStatus("FAILED");
outboxMessageRepository.save(message);
}
}
}
}
e. Mesaj Gönderme ve İşlem Servisi
Bir sipariş oluşturma ve outbox’a mesaj ekleme işlemi:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class OrderService {
@Autowired
private OutboxService outboxService;
public void createOrder(Long orderId, String orderDetails) {
// Siparişi veritabanına kaydeder (bu örnekte basitçe simüle ettik)
// Sipariş oluşturulduğunda, outbox tablosuna bir mesaj ekleyelim
String payload = "{ \"orderId\": " + orderId + ", \"orderDetails\": \"" + orderDetails + "\" }";
outboxService.saveMessage(orderId, "OrderCreated", payload);
}
}
f. Uygulama Başlatma ve Scheduled Task
Mesajların dışa gönderilmesini otomatik hale getirebilmek için bir scheduled task:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@Component
public class MessageProcessor {
@Autowired
private OutboxService outboxService;
// Her 10 saniyede bir Outbox'taki mesajları kontrol et ve gönder
@Scheduled(fixedRate = 10000)
public void processMessages() {
outboxService.processPendingMessages();
}
}
g. Konfigürasyon
RabbitMQ için basit bir Spring Boot konfigürasyonu:
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.context.annotation.Configuration;
@Configuration
@EnableRabbit
public class RabbitConfig {
// RabbitMQ konfigürasyon ayarları buraya eklenebilir. Nasıl yogurt yemek istiyorsan.
}
Retry Mekanizması Tasarımı
Bir retry mekanizması tasarlandığında, mesajların tekrar gönderim işlemleri güvenli ve verimli bir şekilde yapılmalıdır. Aşağıda, retry mekanizmasının nasıl tasarlanabileceğine dair bir açıklama ve Java kodu örneği sunulmuştur.
Retry Mekanizmasının Temel Prensipleri
- Max Retry Sayısı:
- Bir mesajın yeniden gönderilme sayısını sınırlamak gerekir. Aksi takdirde, sonsuz döngüler oluşabilir. Genellikle, bir mesajın yeniden gönderilme sayısı belirli bir sınırla (
maxRetries
) sınırlanır.
- Backoff Stratejisi (Geriye Dönüş):
- Her yeniden denemede, önceki başarısız denemelere bağlı olarak bekleme süresi arttırılabilir. Bu, mesajların sürekli olarak denendiği ama çok fazla kaynak tüketmediği anlamına gelir. Exponential Backoff (Üssel Geriye Dönüş) yaygın bir yöntemdir, burada her başarısız denemeden sonra bekleme süresi çifte artar.
- Durum Güncelleme:
- Her yeniden denemede, mesajın durumunun güncellenmesi gerekir. Eğer maksimum yeniden deneme sayısına ulaşıldıysa, mesajın durumu
FAILED
olarak işaretlenebilir. Bunun dışında, her başarılı denemede durumSENT
olarak değiştirilir.
- Loglama ve İzleme:
- Retry işlemlerinde hata oluştuğunda, hata mesajlarının loglanması ve izlenmesi önemlidir. Bu sayede hangi mesajların neden başarısız olduğu daha kolay analiz edilebilir.
- Yedekleme ve Olay Yönetimi:
- Eğer retry işlemleri sürekli başarısız oluyorsa, bu mesajları başka bir kuyruğa yönlendirebilir (örneğin, bir “dead-letter” kuyruğu), böylece sorunlu mesajlar daha ileri bir işlem için gözden geçirilebilir.
Java ile Retry Mekanizması
Şimdi, retry mekanizmasını uygulamak için Spring Boot ve RabbitMQ kullandığımız örneği genişleterek retry mekanizmasını ekleyelim.
1. OutboxService Güncellemesi
Mesaj gönderme işlemi sırasında retry mekanizması ekleyeceğiz. Ayrıca her yeniden denemede, belirli bir bekleme süresi (backoff) ekleyeceğiz.
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.List;
import java.util.concurrent.TimeUnit;
@Service
public class OutboxService {
@Autowired
private OutboxMessageRepository outboxMessageRepository;
@Autowired
private RabbitTemplate rabbitTemplate;
private static final String QUEUE_NAME = "order-events";
private static final int MAX_RETRIES = 3; // Maksimum yeniden deneme sayısı
private static final int INITIAL_BACKOFF = 1000; // Başlangıç bekleme süresi (ms)
private static final int BACKOFF_MULTIPLIER = 2; // Üssel artış
// Mesajları veritabanına kaydeder
public void saveMessage(Long aggregateId, String messageType, String payload) {
OutboxMessage message = new OutboxMessage();
message.setAggregateId(aggregateId);
message.setMessageType(messageType);
message.setPayload(payload);
outboxMessageRepository.save(message);
}
// Outbox'tan mesajları alıp, kuyruğa gönderir
public void processPendingMessages() {
List<OutboxMessage> pendingMessages = outboxMessageRepository.findByStatus("PENDING");
for (OutboxMessage message : pendingMessages) {
boolean sent = false;
int retryCount = 0;
while (!sent && retryCount <= MAX_RETRIES) {
try {
// RabbitMQ kuyruğuna gönderiyoruz
rabbitTemplate.convertAndSend(QUEUE_NAME, message.getPayload());
// Mesaj başarıyla gönderildi, durumunu güncelle
message.setStatus("SENT");
outboxMessageRepository.save(message);
sent = true;
} catch (Exception e) {
// Hata oluştu, retry yapıyoruz
retryCount++;
if (retryCount > MAX_RETRIES) {
// Maksimum deneme sayısına ulaşıldıysa, durumu FAILED olarak güncelle
message.setStatus("FAILED");
outboxMessageRepository.save(message);
} else {
// Bekleme süresi arttırarak yeniden deniyoruz
long backoffTime = INITIAL_BACKOFF * (long) Math.pow(BACKOFF_MULTIPLIER, retryCount);
try {
TimeUnit.MILLISECONDS.sleep(backoffTime);
} catch (InterruptedException interruptedException) {
Thread.currentThread().interrupt();
}
}
}
}
}
}
}
Açıklamalar:
- MAX_RETRIES: Maksimum yeniden deneme sayısını belirler. Bir mesaj, bu sayıya kadar yeniden denenebilir. Bu sayı aşılırsa, mesaj
FAILED
olarak işaretlenir. - INITIAL_BACKOFF: İlk başarısız denemeden sonra bekleme süresi. Başlangıçta bu değeri milisaniye cinsinden belirleriz.
- BACKOFF_MULTIPLIER: Üssel artış katsayısı. Her başarısız denemede bekleme süresi bu kat sayısı ile arttırılır.
- TimeUnit.MILLISECONDS.sleep(backoffTime): Her başarısız denemeden sonra belirli bir süre bekleriz. Bu süre üssel bir şekilde artar.
- Exception Handling: Mesaj iletimi sırasında bir hata meydana gelirse, retry yapılır. Eğer belirlenen maksimum sayıda deneme yapılmışsa, mesajın durumu
FAILED
olarak değiştirilir.
2. Scheduled Task Güncellemesi
Mesajların dışa iletilmesini düzenli aralıklarla yapan Scheduled
task'ı, retry mekanizması eklenmiş versiyonla aynı şekilde çalışacak. Yalnızca önceki işlevde retry işlemi eklendi.
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@Component
public class MessageProcessor {
@Autowired
private OutboxService outboxService;
// Her 10 saniyede bir Outbox'taki mesajları kontrol et ve gönder
@Scheduled(fixedRate = 10000)
public void processMessages() {
outboxService.processPendingMessages();
}
}
3. RabbitMQ Konfigürasyonu
RabbitMQ’da retry ile ilgili ek bir yapılandırma gerekmez, ancak retry yapılmadığı durumlarda dead-letter queue (DLQ) kullanmak faydalı olabilir. Yani, retry işlemi belirli bir sınırda başarısız olursa, mesajlar başka bir kuyruğa (dead-letter queue) yönlendirilip daha sonra manuel veya otomatik olarak işlenebilir.
Örnek olarak, application.yml
dosyasına DLQ konfigürasyonu ekleyebilirsiniz:
spring:
rabbitmq:
listener:
simple:
retry:
enabled: true
max-attempts: 3
initial-interval: 1000ms
multiplier: 2.0
max-interval: 10000ms
4. Dead-letter Queue (Opsiyonel)
Dead-letter queue, başarısız mesajları takip etmek ve manuel müdahale etmek için iyi bir yöntemdir. Eğer retry işlemi başarısız olursa, mesajları başka bir kuyrukta depolayabilirsiniz.
@Bean
public Queue deadLetterQueue() {
return QueueBuilder.durable("deadLetterQueue").build();
}
@Bean
public DirectExchange deadLetterExchange() {
return new DirectExchange("deadLetterExchange");
}
@Bean
public Binding bindingDeadLetterQueue() {
return BindingBuilder.bind(deadLetterQueue())
.to(deadLetterExchange())
.with("deadLetterRoutingKey");
}
5. Loglama ve İzleme
Her başarısız denemede ve başarılı mesaj gönderiminde loglama yapmak, retry işlemlerinin izlenmesi açısından önemlidir.
private static final Logger logger = LoggerFactory.getLogger(OutboxService.class);
// Hata olduğunda
logger.error("Message sending failed. Retrying... attempt {}/{}", retryCount, MAX_RETRIES);
// Başarıyla gönderildiğinde
logger.info("Message successfully sent to the queue: {}", message.getId());
Sonuç
Bu örnekte Transactional Outbox desenini Java Spring Boot ile başarılı bir şekilde uyguladık. Mikroservislerin arasında güvenli ve tutarlı bir mesaj iletimi sağladık. Veritabanı ve mesajlaşma sistemleri arasındaki tutarsızlıkları önlemek için mesajlar outbox tablosunda saklanırken, arka planda çalışan bir işlem bunları dışa gönderdi.
Uygulama şu şekilde çalışır:
- Bir sipariş oluşturulduğunda,
OrderService
outbox tablosuna bir mesaj ekler. MessageProcessor
sınıfı, her 10 saniyede bir bu mesajları kontrol eder ve RabbitMQ kuyruğuna gönderir.- Eğer mesaj başarılı bir şekilde gönderildiyse, durumu
SENT
olarak güncellenir, aksi haldeFAILED
olarak işaretlenir. - Retry Mekanizması: Mesaj kuyruğuna iletimi başarısız olan mesajlar, belirli bir süre bekledikten sonra yeniden denenir. Başarısız denemeler bir sınır koyularak durdurulur ve mesaj
FAILED
olarak işaretlenir. - Backoff Stratejisi: Her başarısız denemeden sonra bekleme süresi arttırılır (exponential backoff).
- Loglama ve İzleme: Hataların izlenebilirliği sağlanır ve gerektiğinde mesajlar
dead-letter queue
'ya yönlendirilir.
Bu yaklaşım, mesajların kaybolmamasını ve sistemin tutarlı bir şekilde çalışmasını sağlar.
"Microservices Patterns: With examples in Java” by Chris Richardson:
- Kitap, mikroservisler için yaygın desenleri ele alır ve Transactional Outbox desenini detaylı bir şekilde açıklar. Ayrıca, desenin nasıl uygulanacağına dair örnekler ve Java kodları sunar. Ek olarak bu konu özelinde 98. sayfaya bakmak yeterldir.
- Link: Microservices Patterns on Amazon