2 years ago
#68888

Aakanksha D
How to solve the duplicate subscription (Subscribe.OnSubscribe()) error in listener code in azure eventhubs with Ktor (Kotlin server)?
The error is:
Duplicate Subscription has been detected. java.lang.IllegalStateException: Spec. Rule 2.12 - Subscriber.onSubscribe MUST NOT be called more than once (based on object equality) This is the error that I'm getting on the listener side in azure eventhubs with ktor server. The error is in the PARTITION_PROCESSOR part. How do you solve this error?
package com.example.eventhub
import com.azure.messaging.eventhubs.EventHubClientBuilder
import com.azure.messaging.eventhubs.EventProcessorClientBuilder
import com.azure.messaging.eventhubs.checkpointstore.blob.BlobCheckpointStore
import com.azure.messaging.eventhubs.models.PartitionContext
import com.azure.storage.blob.BlobContainerClientBuilder
import java.util.function.Consumer
import com.azure.messaging.eventhubs.models.EventContext
import com.azure.messaging.eventhubs.models.ErrorContext
import com.azure.messaging.eventhubs.EventData
class AzureEventHubClient {
fun listenEvent() {
val blobContainerAsyncClient = BlobContainerClientBuilder()
.connectionString("DefaultEndpointsProtocol=https;AccountName=[[ REDACTED ]];AccountKey=[[ REDACTED ]];EndpointSuffix=core.windows.net")
.containerName("[[ REDACTED ]]")
.buildAsyncClient()
val PARTITION_PROCESSOR: Consumer<EventContext> = Consumer<EventContext> { eventContext ->
val partitionContext: PartitionContext = eventContext.partitionContext
val eventData: EventData = eventContext.eventData
System.out.printf(
"Processing event from partition %s with sequence number %d with body: %s%n",
partitionContext.partitionId, eventData.sequenceNumber, eventData.bodyAsString
)
// Every 10 events received, it will update the checkpoint stored in Azure Blob Storage.
if (eventContext.getEventData().getSequenceNumber().toInt() % 5 === 0) {
eventContext.updateCheckpoint()
}
}
val ERROR_HANDLER: Consumer<ErrorContext> = Consumer<ErrorContext> { errorContext ->
println(
"Error occurred in partition processor for partition ::" + errorContext.partitionContext.partitionId
)
}
val clientConnectionString: String =
"Endpoint=sb://[[ REDACTED ]].servicebus.windows.net/;SharedAccessKeyName=[[ REDACTED ]];SharedAccessKey=[[ REDACTED ]];EntityPath=[[ REDACTED ]]"
val eventProcessorClientBuilder = EventProcessorClientBuilder()
.connectionString(clientConnectionString, "[[ REDACTED ]]")
.consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
.processEvent(PARTITION_PROCESSOR)
.processError(ERROR_HANDLER)
.checkpointStore(BlobCheckpointStore(blobContainerAsyncClient))
// Use the builder object to create an event processor client
val eventProcessorClient = eventProcessorClientBuilder.buildEventProcessorClient()
println("Starting event processor")
eventProcessorClient.start()
println("Press enter to stop.")
Thread.sleep(150000)
println("Stopping event processor")
eventProcessorClient.stop()
println("Event processor stopped.")
println("Exiting process")
}
}
azure
kotlin
azure-eventhub
ktor
0 Answers
Your Answer