To get a better understanding of the Cosmos DB Change, Event Grid, retry and fail over logic, I decided to build a system that would respond to changes in a Cosmos DB, send events to an Azure Event Grid, which in turn would forward the events to subscribers.
I broke this article in to three parts.
In Part 1, we will create a Cosmos DB Trigger Azure Function to respond to document adds and updates from a Cosmos DB database, which will then forward the documents to an Event Grid Topic.
We will create a single subscription to an Event Grid Topic which will forward events to an Azure Store Queue with an Event Grid Subscription.
If for some reason events can not be published to the Event Grid Topic, latency or throttling with Event Grid, the Cosmos DB Trigger Azure Function will make use of retry logic to attempt to publish the events again, until the retries are exhausted.
In Part 2, we will add an Event Grid Trigger Azure Function to subscribe to Event Grid messages.
In Part 3, we will add fail over logic to the Cosmos DB Trigger Azure Function if it is unable to publish event to the Event Grid topic , e.g. Event Grid is down, it will instead push the events to Azure Blob Storage.
Since Azure Functions do not support output bindings for Azure Event Grid We will have to publish the events to Azure Event Grid with code, using the Azure Event Grid SDK.
This article assumes some familiarity with Azure and creating various Azure Resources.
Part 1
Setup Azure Environment
Create the following Azure resources:
- Create a Resource Group
- Create a Storage Account
- Create a Function App
- Create with an Application Insights resource
- Create an Event Grid Topic
- Create a Storage Queue for the Storage Account
- Name the storage queue the same as the Event Grid Topic created in the previous step
- Create an Event Grid Subscription
- Name the subscription the same as the Storage Queue created in the previous step
- Create with a Storage Queue endpoint
- Create a Cosmos DB account
- Create a database for the Cosmos DB account
- Create a container for the Cosmos DB database, called
persons
Create the Function App Project
Open up Visual Studio, or VS Code.
Create an Azure Function App project, I named mine FuncApp1
.
Add a folder named Helpers
.
Add a class named Person.cs
.
Paste into your class the code snippet as follows:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
public class Person { [JsonProperty("id")] public Guid Id { get; set; } [JsonProperty("person_id")] public Guid PersonId => this.Id; [JsonProperty("birth_date")] public DateTime? BirthDate { get; set; } [JsonProperty("first_name")] public string FirstName { get; set; } [JsonProperty("last_name")] public string LastName { get; set; } [JsonProperty("full_name")] public string FullName => $"{this.FirstName} {this.LastName}"; [JsonProperty("phone")] public string Phone { get; set; } [JsonProperty("user_name")] public string UserName { get; set; } [JsonProperty("email")] public string Email { get; set; } [JsonProperty("avatar")] public string Avatar { get; set; } } |
These are the documents that we will write to the Cosmos DB database and send as events to Azure Event Grid.
Edit the file local.settings.json
as follows:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
{ "IsEncrypted": false, "Values": { "AzureWebJobsStorage": "UseDevelopmentStorage=true", "FUNCTIONS_WORKER_RUNTIME": "dotnet", "AzureCosmosDb:ConnectionString": "YOUR_COSMOS_DB_CONNECTION_STRING", "AzureCosmosDb:DatabaseName": "YOUR_COSMOS_DB_DATABASE_NAME", "AzureCosmosDb:CollectionName": "persons", "AzureCosmosDb:LeaseCollectionName": "persons-leases", "AzureEventGrid:TopicKey": "YOUR_EVENT_GRID_TOPIC_KEY", "AzureEventGrid:TopicEndpoint": "YOUR_EVENT_GRID_TOPIC_ENDPOINT", "AzureStorage:ConnectionString": "YOUR_AZURE_STORAGE_CONNECTION_STRING" } } |
Replace the values that start with YOUR
with the values from your Azure resources.
Create the Http Trigger to Populate Cosmos DB Database
Add an Http Trigger called HttpTrigger1.cs
.
Paste into your class the code snippet as follows:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
public static class HttpTrigger1 { public const int MAXIMUM_RECORDS = 10; [FunctionName("HttpTrigger1")] [SuppressMessage("Style", "IDE0060:Remove unused parameter", Justification = "Nothing needs to be passed")] public static async Task<IActionResult> Run( [HttpTrigger(AuthorizationLevel.Function, "post", Route = null)] HttpRequest req, ILogger logger, [CosmosDB( databaseName: "%AzureCosmosDb:DatabaseName%", collectionName: "%AzureCosmosDb:CollectionName%", ConnectionStringSetting = "AzureCosmosDb:ConnectionString")] IAsyncCollector<Person> persons) { logger.LogInformation("HttpTrigger1 function processed a request."); for (var i = 0; i < MAXIMUM_RECORDS; i++) { await persons.AddAsync( new Faker<Person>() .RuleFor(o => o.Id, f => Guid.NewGuid()) .RuleFor(o => o.BirthDate, f => f.Person.DateOfBirth) .RuleFor(o => o.Avatar, f => f.Person.Avatar) .RuleFor(o => o.FirstName, f => f.Person.FirstName) .RuleFor(o => o.LastName, f => f.Person.LastName) .RuleFor(o => o.Phone, f => f.Person.Phone) .RuleFor(o => o.UserName, f => f.Person.UserName) .RuleFor(o => o.Email, f => f.Person.Email) .Generate()); } return new OkResult(); } } |
Install the NuGet package Bogus.
This function will insert 10 person documents into the Cosmos DB database that will trigger the Cosmos DB Trigger, which we will create in the next step.
Create the Cosmos DB Trigger to Read from Cosmos DB Change Feed
Add a Cosmos DB Trigger called CosmosDBTrigger1.cs
and paste the code snippet into the call as follows:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 |
public static class CosmosDBTrigger1 { private static HttpClient _httpClient; [FunctionName("CosmosDBTrigger1")] public static async Task Run([CosmosDBTrigger( databaseName: "%AzureCosmosDb:DatabaseName%", collectionName: "%AzureCosmosDb:CollectionName%", MaxItemsPerInvocation = 10, ConnectionStringSetting = "AzureCosmosDb:ConnectionString", CreateLeaseCollectionIfNotExists = true, LeaseCollectionName = "%AzureCosmosDb:LeaseCollectionName%")]IReadOnlyList<Document> documents, ILogger logger) { logger.LogInformation("CosmosDBTrigger1 trigger fired."); if (_httpClient == null) { _httpClient = new HttpClient(new HttpRetryMessageHandler(logger, new HttpClientHandler())); } if (documents != null) { logger.LogInformation($"Received {documents.Count} document(s) from Cosmos DB."); var eventGridEventList = new List<EventGridEvent>(); var topicCredentials = new TopicCredentials( Environment.GetEnvironmentVariable("AzureEventGrid:TopicKey")); var eventGridClient = new EventGridClient( topicCredentials, _httpClient, false); foreach (var document in documents) { var eventGridEvent = new EventGridEvent() { Id = Guid.NewGuid().ToString(), Subject = $"/persons/{document.GetPropertyValue<Guid>("person_id")}", EventType = "Person.Added", Data = document, EventTime = DateTime.Now, DataVersion = "1.0" }; eventGridEventList.Add( eventGridEvent); } logger.LogInformation($"Sending a batch of {documents.Count} event(s) to Azure Event Grid for processing."); await eventGridClient.PublishEventsAsync( new Uri(Environment.GetEnvironmentVariable("AzureEventGrid:TopicEndpoint")).Host, eventGridEventList); } } } |
This Azure Function will respond to any adds or updates from the persons
container in Cosmos DB.
Install the NuGet package Microsoft.Azure.EventGrid.
We will want to provide our own HttpClient
as opposed to letting the EventGridClient
manage it for us, if we don’t, we could run into Socket Exceptions.
See Managing Connections for more information.
Since Event Grid does not support retry logic out of the box, we add support ourselves using Polly.
Install the NuGet package Polly.
Add another class, HttpRetryMessageHandler.cs
, to the Helpers
folder and paste the code snippet into the class as follows:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
public class HttpRetryMessageHandler : DelegatingHandler { [System.Diagnostics.CodeAnalysis.SuppressMessage("Style", "IDE1006:Naming Styles", Justification = "<Pending>")] private const int MAXIMUM_RETRY_ATTEMPT = 3; private readonly ILogger _logger; public HttpRetryMessageHandler( ILogger logger, HttpClientHandler handler) : base(handler) { _logger = logger; } protected override Task<HttpResponseMessage> SendAsync( HttpRequestMessage request, CancellationToken cancellationToken) => // Will retry again at the 3 second, 9 second and 27 second mark before throwing an exception Policy .Handle<HttpRequestException>() .Or<TaskCanceledException>() .OrResult<HttpResponseMessage>(x => !x.IsSuccessStatusCode) .WaitAndRetryAsync( MAXIMUM_RETRY_ATTEMPT, retryAttempt => TimeSpan.FromSeconds(Math.Pow(MAXIMUM_RETRY_ATTEMPT, retryAttempt)), onRetry: (exception, nextRetry, context) => { _logger.LogInformation($"An error was encountered, will retry again in {nextRetry.TotalSeconds} second(s)..."); }) .ExecuteAsync(() => base.SendAsync(request, cancellationToken)); } |
This class creates a retry policy with an exponential back-off.
- After the first failure the code will wait
3
seconds before it tries to execute again. - After the second failure the code will wait
9
seconds. - After the third failure it will wait
27
seconds. - After the fourth failure it will throw an
Exception
.
Run the Azure Function project.
Call the Http Trigger Azure Function, using Postman, with a POST
request to http://localhost:7071/api/HttpTrigger1
to insert 10 person documents into Cosmos DB.
Let’s create the Event Grid Subscription.
Navigate to the Event Grid Topic and click Add Subscription.
Enter a name.
Select Storage Queues
for Endpoint Type.
Click Select an endpoint.
Select the Storage Account and Storage Queue that was created earlier.
Click Create.
Call the Http Trigger Azure Function to insert 10 person documents into Cosmos DB.
Open up Azure Storage Explorer.
Navigate to the Storage Account we created earlier, expand Queues and click on the Storage Queue.
You should see a list of the Cosmos DB documents that were sent to Event Grid and forwarded to the Storage Queue subscription.
One thing to note, our Cosmos DB Trigger Azure Function is sending events to Azure Event Grid in batches, as Azure Event Grid expects an array of events.
When our events reach Azure Event Grid, they are debatched and sent to our endpoints individually.
Part 2
Now let’s add an Azure Function to receive events from our Event Grid Topic.
Add a Event Grid Trigger called EventGridTrigger1.cs
.
Paste the code snippet below into the EventGridTrigger1.cs
class as follows:
1 2 3 4 5 6 7 8 9 10 11 12 |
public static class EventGridTrigger1 { [FunctionName("EventGridTrigger1")] public static void Run( [EventGridTrigger]EventGridEvent eventGridEvent, ILogger logger) { logger.LogInformation("EventGridTrigger1 function triggered."); logger.LogInformation(eventGridEvent.Data.ToString()); } } |
Since it is difficult to test the Event Grid Azure Function locally, let’s publish the Function App to Azure.
Make sure to include your Application Settings by clicking Edit Azure App Service Settings.
In the Azure portal, navigate to the Function App and select the EventGridTrigger1
function.
Click the Add Event Grid subscription link.
Provide a name for the subscription.
Select Event Grid Topic
for the Topic Type.
Select the subscription, resource group, and finally the Azure Event Grid Topic.
Click Create.
If you navigate to your Event Grid Topic you will now see two Event Grid Subscription, the first to our Azure Storage Queue and the second to our Event Grid Trigger Azure Function.
Before we test this locally, we will need to disable the CosmosDBTrigger1
from the portal, if we don’t, there is a chance, it may respond to our Cosmos DB changes before our local environment can.
Run the Azure Function project.
Call the Http Trigger Azure Function to insert 10 person documents into Cosmos DB.
Navigate back to the Azure Function App, select the EventGridTrigger1
and then select the Monitor node.
You should see a list of the events received from Azure Event Grid.
Part 3
So what happens if we are unable to send an event to Azure Event Grid? Remember our retry policy will try three times and on the final attempt it will throw an Exception
.
If we cannot send an event to Azure Event Grid we will write the event to Azure Blob Storage so it can be processes later, maybe by a Time Trigger Azure Function.
We will need to update the CosmosDBTrigger1
with a couple of code changes.
Update the Run
method to include a parameter for the Blob output binding, I added mine right after ILogger logger
.
1 |
[Blob("cosmosdbtrigger1-errors", FileAccess.Write, Connection = "AzureStorage:ConnectionString")] CloudBlobContainer blobContainer |
We will also want to wrap the eventGridClient.PublishEventSync()
method in a try...catch
, and in the catch add the code to write the event to Azure Blob Storage.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
try { logger.LogInformation( $"Sending a batch of {documents.Count} event(s) to Azure Event Grid for processing."); await eventGridClient.PublishEventsAsync( new Uri(Environment.GetEnvironmentVariable("AzureEventGrid:TopicEndpoint")).Host, eventGridEventList); } catch { logger.LogInformation( $"Logging a batch of {documents.Count} event(s) to Azure Blob Storage that were not able to be sent to Azure Event Grid."); var cloudBlockBlob = blobContainer.GetBlockBlobReference($"{Guid.NewGuid()}.json"); cloudBlockBlob.Properties.ContentType = "application/json"; await cloudBlockBlob.UploadTextAsync( JsonConvert.SerializeObject(eventGridEventList, Formatting.Indented)); throw; } |
We will throw the Exception
again so that Application Insights tracks the exception.
To simulate a failure to call Azure Event Grid update the value for AzureEventGrid:TopicEndpoint
in local.settings.json
to a bad value, I just add an additional character.
Create a Blob container called cosmsosdbtrigger-errors
to capture the events.
Run the Azure Function project.
Call the Http Trigger Azure Function to insert 10 person documents into Cosmos DB.
Open Azure Storage Explorer.
Navigate to the Blob storage container cosmsosdbtrigger-errors
.
You should see a list of the events sitting in Azure Blob Storage.
Apologize for the long article, but felt it best to pack it all together as I have been working better practices for retry logic for sending to Event Grid in an Azure Function, and fail over logic.
Feedback is much appreciated!
Discover more from Matt Ruma
Subscribe to get the latest posts sent to your email.
Great article – thanks Matt!
Glad it helped!
Interesting article on how to deal with failure and definitly the way I wanted to implement a failure in a cosmos db trigger function.
But my finding is, that if you throw in the try catch block nothing is written to the application insights. You won’t find the requests and log statement written in the application insights.
My understanding is that with the throw the function app itself crashes and nothing can be redirected to application insights anymore.