Gaurav Mantri's Personal Blog.

Building a Simple Task Scheduler in Windows Azure

Often times we need to execute certain tasks repeatedly. In this blog post, we will talk about building a simple task scheduler in Windows Azure. We’ll actually develop a simple task scheduler which will run in a Windows Azure Worker Role. We’ll also discuss some other alternatives available to us within Windows Azure.

The Project

For the purpose of demonstration, we’ll try and build a simple service which pings some public websites (e.g. www.microsoft.com etc.) and stores the result in Windows Azure Table Storage. This is very similar to service offered by Pingdom. For the sake of argument, let’s call this service as “Poor Man’s Pingdom” :).

We’ll store the sites which we need to ping in a table in Windows Azure Table Storage and every minute we’ll fetch this list from there, ping them and then store the result back in Windows Azure Table Storage (in another table of course). We’ll run this service in 2 X-Small instances of worker role just to show how we can handle concurrency issues so that each instance processing unique set of websites. We’ll assume that in all we’re pinging 10 websites and each worker role instance will ping 5 websites every minute so that the load is evenly distributed across multiple instances.

Implementing the Scheduler

At the core of the task scheduler is implementing the scheduling engine. There’re so many options available to you. You could use .Net Framework’s built in Timer objects or you could use 3rd part libraries available. In my opinion, one should not try and build this on their own and use what’s available out there. For the purpose of this project, we’re going to use Quartz Scheduler Engine (http://quartznet.sourceforge.net/). It’s extremely robust, used by very many people and lastly it’s open source. In my experience, I found it extremely flexible and easy to use.

Design Considerations

In a multi-instance environment, there’re a few things we would need to consider:

Only one Instance Fetches Master Data

We want to ensure that only one instance fetches the master data i.e. the data required by scheduler to process. For this we would rely on blob leasing functionality. A lease is an exclusive lock on a blob to prevent that blob from modification. In our application, each instance will try and acquire lease on the blob and only one instance will be successful. The instance which will be able to acquire the lease on the blob (let’s call it “Master Instance”) will fetch the master data. All other instances (let’s call them “Slave Instances”) will just wait till the time master instance is done with that data. Please note that the master instance will not actually execute the task just yet i.e. in our case ping the sites. It will just read the data from the source and push it some place from where both master and slave instances will pick the data and process that data (i.e. in our case ping the sites).

Division of Labor

It’s important that we make full use of all the instances in which our application is running (in our case 2). So what will happen is that the master instance will fetch the data from the source and puts that in a queue which is polled by all instances. For the sake of simplicity, the message will simply the URL that we need to ping. Since we know that that there’re two instances and we need to process ten websites, each instance will “GET” 5 messages. Each instance will then read the message contents (which is a URL) and then ping those URLs and record the result.

Trigger Mechanism

In normal worker role implementations, the worker role is in an endless loop mostly sleeping. It wakes up from the sleep, does some processing and goes back to sleep. Since we’re relying on Quartz for scheduling, we’ll only rely on Quartz for triggering the tasks instead of worker role. That would give us the flexibility of introducing more kinds of scheduled tasks without worrying about implementing them in our worker role. To explain, let’s assume that we’ve to process 2 scheduled tasks – one executed every minute and other executed every hour. If we were to implement it in the worker role sleep logic, it would become somewhat complicated. When you start adding more and more scheduled tasks, the level of complexity increases considerably. With Quartz, it’s really simple.

Keeping Things Simple

For the purpose of this blog post, to keep things simple, we will not worry about handling various error conditions. We’ll just assume that everything’s hunky-dory and we’ll not have to worry about transient errors from storage. In an actual application, one would need to take those things into consideration as well.

High Level Architecture

With these design considerations, this is how the application architecture and flow would look like:

image

So every minute, Quartz will trigger the task. Once the task is triggered, this is what will happen:

  1. Each instance will try and acquire the lease on a specific blob.
  2. As we know, only one instance will succeed. We’ll assume that the master instance would need about 15 seconds to read the data from the source and put that in queue. The slave instances will wait for 15 seconds while master instance does this bit.
  3. Master instance will fetch the data from master data source (Windows Azure Table Storage in our case). Slave instances are still waiting.
  4. Master instance will push the data in a queue. Slave instances are still waiting.
  5. All instances will now “GET” messages from the queue. By implementing “GET” semantics (instead of “PEEK”), we’re making sure that a message is fetched only by a single instance. Once the message is fetched, it will be immediately deleted.
  6. Each worker role instance will get the URI to be pinged from the message content and launches a process of pinging the data. Pinging will be done by creating a “Get” web request for that URI and reading the response.
  7. Once the ping result is returned, we’ll store the results in table storage and then wait for the next time Quartz will trigger the task.

The Code

Enough talking! Let’s look at some code :).

Entities

Since we’re storing the master data as well as results in Windows Azure Table Storage, let’s create two classes which will hold that data. Both of these will be derived from TableEntity class.

PingItem.cs

This entity will represent the items to be pinged. We’ll keep things simple and have only one property which contains the URL to be pinged. This is how the code looks like:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Table;

namespace PoorMansPingdomWorkerRole
{
    public class PingItem : TableEntity
    {
        public PingItem()
        {
            PartitionKey = "PingItem";
            RowKey = Guid.NewGuid().ToString();
        }

        /// <summary>
        /// Gets or sets the URL to be pinged.
        /// </summary>
        public string Url
        {
            get;
            set;
        }

        public override string ToString()
        {
            return this.RowKey + "|" + Url;
        }

        public static PingItem ParseFromString(string s)
        {
            string[] splitter = {"|"};
            string[] rowKeyAndUrl = s.Split(splitter, StringSplitOptions.RemoveEmptyEntries);
            return new PingItem()
            {
                PartitionKey = "PingItem",
                RowKey = rowKeyAndUrl[0],
                Url = rowKeyAndUrl[1],
            };
        }
    }
}

PingResult.cs

This entity will store the result of the ping.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Table;

namespace PoorMansPingdomWorkerRole
{
    public class PingResult : TableEntity
    {
        /// <summary>
        /// Gets or sets the URL pinged.
        /// </summary>
        public string Url
        {
            get;
            set;
        }

        /// <summary>
        /// Gets or sets the HTTP Status code.
        /// </summary>
        public string StatusCode
        {
            get;
            set;
        }

        /// <summary>
        /// Gets or sets the time taken to process the ping in milliseconds.
        /// </summary>
        public double TimeTaken
        {
            get;
            set;
        }

        public long ContentLength
        {
            get;
            set;
        }
    }
}

Application Code

Worker Role Initialization – Master Settings

Since our implementation depends on certain assumptions, we’ll ensure that those assumptions are in place by implementing them in the worker role’s initialization phase. The things we would do are:

  • Ensuring that the table in which will store the results is already present.
  • Ensuring that the blob on which we’ll acquire the lease is already present.

To keep things flexible, we’ll define a number of settings in the configuration file. This is how our configuration file would look like for these things:

<?xml version="1.0" encoding="utf-8"?>
<ServiceConfiguration serviceName="PoorMansPingdom" xmlns="http://schemas.microsoft.com/ServiceHosting/2008/10/ServiceConfiguration" osFamily="3" osVersion="*" schemaVersion="2012-10.1.8">
  <Role name="PoorMansPingdomWorkerRole">
    <Instances count="2" />
    <ConfigurationSettings>
      <Setting name="Microsoft.WindowsAzure.Plugins.Diagnostics.ConnectionString" value="UseDevelopmentStorage=true" />
      <!-- Storage account where our data will be stored. -->
      <Setting name="StorageAccount" value="UseDevelopmentStorage=true" />
      <!-- Name of the table where master data will be stored. -->
      <Setting name="PingItemsTableName" value="PingItems"/>
      <!-- Name of the table where we'll store the results. -->
      <Setting name="PingResultsTableName" value="PingResults" />
      <!-- Blob container name where we'll store the blob which will be leased. -->
      <Setting name="BlobContainer" value="lease-blob-container" />
      <!-- Name of the blob on which each instance will try and acquire the lease. -->
      <Setting name="BlobToBeLeased" value="lease-blob.txt" />
      <!-- Name of the queue from which messages will be read. -->
      <Setting name="ProcessQueueName" value="ping-items-queue"/>
    </ConfigurationSettings>
  </Role>
</ServiceConfiguration>

We’ll write a function which will be called during the initialization process for setting master settings:

        private void Init()
        {
            // Get the cloud storage account.
            CloudStorageAccount storageAccount = CloudStorageAccount.Parse(RoleEnvironment.GetConfigurationSettingValue("StorageAccount"));
            // Get the name of the blob container
            string blobContainerName = RoleEnvironment.GetConfigurationSettingValue("BlobContainer");
            CloudBlobContainer blobContainer = storageAccount.CreateCloudBlobClient().GetContainerReference(blobContainerName);
            // Create the blob container.
            blobContainer.CreateIfNotExists();
            // Get the blob name
            string blobName = RoleEnvironment.GetConfigurationSettingValue("BlobToBeLeased");
            CloudBlockBlob blob = blobContainer.GetBlockBlobReference(blobName);
            // Write some dummy data in the blob.
            string blobContent = "This is dummy data";
            // Upload blob
            using (MemoryStream ms = new MemoryStream(Encoding.UTF8.GetBytes(blobContent)))
            {
                blob.UploadFromStream(ms);
            }
            // Get the table name for storing results.
            string tableName = RoleEnvironment.GetConfigurationSettingValue("PingResultsTableName");
            // Create the table.
            CloudTable table = storageAccount.CreateCloudTableClient().GetTableReference(tableName);
            table.CreateIfNotExists();
            // Get the queue name where ping items will be stored.
            string queueName = RoleEnvironment.GetConfigurationSettingValue("ProcessQueueName");
            // Create the queue.
            CloudQueue queue = storageAccount.CreateCloudQueueClient().GetQueueReference(queueName);
            queue.CreateIfNotExists();
        }

And this is how we’ll call it:

        public override void Run()
        {
            // This is a sample worker implementation. Replace with your logic.
            Trace.WriteLine("PoorMansPingdomWorkerRole entry point called", "Information");
            // Call the initialization routine.
            Init();
            while (true)
            {
                Thread.Sleep(10000);
                Trace.WriteLine("Working", "Information");
            }
        }

Now if we run this thing, we will see the following in our storage account.

image

Creating a Scheduled Job and Scheduling the Task

Now let’s create a job and schedule it. To start with, the job won’t be doing any work. We’ll just create a class called PingJob and have it implement IInterruptableJob interface in Quartz library.

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Net;
using System.Text;
using System.Threading;
using Microsoft.WindowsAzure.Diagnostics;
using Microsoft.WindowsAzure.ServiceRuntime;
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Blob;
using Microsoft.WindowsAzure.Storage.Table;
using Microsoft.WindowsAzure.Storage.Queue;
using Quartz;

namespace PoorMansPingdomWorkerRole
{
    public class PingJob : IInterruptableJob
    {
        public void Execute(IJobExecutionContext context)
        {
            Trace.WriteLine(string.Format("[{0}] - Executing ping job", DateTime.UtcNow.ToString("yyyy-MM-dd HH:mm:ss")));
        }

        public void Interrupt()
        {
            throw new NotImplementedException();
        }
    }
}

Now let’s schedule this job. To do so we would need to define the CRON job schedule which we will do in our application configuration file so that we can change it on the fly if need be:

      <!-- Ping Job Cron Schedule. Executes every minute -->
      <Setting name="PingJobCronSchedule" value="0 0/1 * * * ?"/>

And then in our WorkerRole.cs we will schedule this job:

        private void ScheduleJob()
        {
            DateTimeOffset runTime = DateBuilder.EvenMinuteDate(DateTime.Now);

            // construct a scheduler factory
            ISchedulerFactory schedFact = new StdSchedulerFactory();

            // get a scheduler
            IScheduler sched = schedFact.GetScheduler();
            sched.Start();

            JobDataMap jobDataMap = new JobDataMap();

            IJobDetail websitePingJobDetail = JobBuilder.Create<PingJob>()
                    .WithIdentity("WebsitePingJob", "group1")
                    .WithDescription("Website Ping Job")
                    .UsingJobData(jobDataMap)
                    .Build();

            ITrigger websitePingJobTrigger = TriggerBuilder.Create()
                .WithIdentity("WebsitePingJob", "group1")
                .StartAt(runTime)
                .WithCronSchedule(RoleEnvironment.GetConfigurationSettingValue("PingJobCronSchedule"))
                .StartNow()
                .Build();

            sched.ScheduleJob(websitePingJobDetail, websitePingJobTrigger);
        }

We’ll just call this function in our role’s Run() method as shown below and our job is now scheduled. It will fire off every minute. It’s that simple!

        public override void Run()
        {
            // This is a sample worker implementation. Replace with your logic.
            Trace.WriteLine("PoorMansPingdomWorkerRole entry point called", "Information");
            // Call the initialization routine.
            Init();
            // Call the job scheduling routine.
            ScheduleJob();
            while (true)
            {
                Thread.Sleep(10000);
                Trace.WriteLine("Working", "Information");
            }
        }

Just to ensure the job is executing properly, here’s the output in the compute emulator for both role instances:

image

image

Now all we’re left to do is implement the job functionality. So let’s do that.

Acquiring Lease

As mentioned above, first thing we want to do is try an acquire lease on the blob.

        private bool AcquireLease()
        {
            try
            {
                blob.AcquireLease(TimeSpan.FromSeconds(15), null);
                return true;
            }
            catch (Exception exception)
            {
                return false;
            }
        }

We’ll keep things simple and if there’s any exception we would just assume that another instance acquired lease on the blob. In real world scenario, you would need to take proper exceptions into consideration.

            // Try and acquire the lease.
            if (AcquireLease())
            {
                Trace.WriteLine(string.Format("[{0}] - Lease acquired. Role instance: {1}.", DateTime.UtcNow.ToString("yyyy-MM-dd HH:mm:ss"), RoleEnvironment.CurrentRoleInstance.Id));
                // If successful then read the data.
            }
            else
            {
                Trace.WriteLine(string.Format("[{0}] - Failed to acquire lease. Role instance: {1}.", DateTime.UtcNow.ToString("yyyy-MM-dd HH:mm:ss"), RoleEnvironment.CurrentRoleInstance.Id));
                // Else just sleep for 15 seconds.
                Thread.Sleep(15 * 1000);
            }

image

image

Reading Master Data

Next step would be reading master data. Again keeping things simple, we’ll not worry about the exceptions. We’ll just ensure that the data is there in our “PingItems” table. For this blog post, I just entered the data in this table manually.

image

        private List<PingItem> ReadMasterData()
        {
            string pingItemTableName = RoleEnvironment.GetConfigurationSettingValue("PingItemsTableName");
            CloudTable table = storageAccount.CreateCloudTableClient().GetTableReference(pingItemTableName);
            TableQuery<PingItem> query = new TableQuery<PingItem>();
            var queryResult = table.ExecuteQuery<PingItem>(query);
            return queryResult.ToList();
        }

Saving Data in Process Queue

Now we’ll save the data in process queue.

        private void SaveMessages(List<PingItem> pingItems)
        {
            string queueName = RoleEnvironment.GetConfigurationSettingValue("ProcessQueueName");
            CloudQueue queue = storageAccount.CreateCloudQueueClient().GetQueueReference(queueName);
            foreach (var pingItem in pingItems)
            {
                CloudQueueMessage msg = new CloudQueueMessage(pingItem.ToString());
                queue.AddMessage(msg, TimeSpan.FromSeconds(45));
            }
        }

image

Fetch Data from Process Queue

Next we’ll fetch data from process queue and process those records. Each instance will fetch 5 messages from the queue. Again for the sake of simplicity, once a message is fetched we’ll delete it immediately. In real world scenario, one would need to hold on to this message till the time the message is processed properly.

        private List<PingItem> FetchMessages(int maximumMessagesToFetch)
        {
            string queueName = RoleEnvironment.GetConfigurationSettingValue("ProcessQueueName");
            CloudQueue queue = storageAccount.CreateCloudQueueClient().GetQueueReference(queueName);
            var messages = queue.GetMessages(maximumMessagesToFetch);
            List<PingItem> itemsToBeProcessed = new List<PingItem>();
            foreach (var message in messages)
            {
                itemsToBeProcessed.Add(PingItem.ParseFromString(message.AsString));
                queue.DeleteMessage(message);
            }
            return itemsToBeProcessed;
        }

Process Items

This is the final stage of our task. We’ll write a function which will fetch the URL and returns a PingResult object which we’ll persist in table storage.


        private PingResult FetchUrl(PingItem item)
        {
            DateTime startDateTime = DateTime.UtcNow;
            TimeSpan elapsedTime = TimeSpan.FromSeconds(0);
            string statusCode = "";
            long contentLength = 0;
            try
            {
                HttpWebRequest req = (HttpWebRequest)WebRequest.Create(item.Url);
                req.Timeout = 30 * 1000;//Let's timeout the request in 30 seconds.
                req.Method = "GET";
                using (HttpWebResponse resp = (HttpWebResponse)req.GetResponse())
                {
                    DateTime endDateTime = DateTime.UtcNow;
                    elapsedTime = new TimeSpan(endDateTime.Ticks - startDateTime.Ticks);
                    statusCode = resp.StatusCode.ToString();
                    contentLength = resp.ContentLength;
                }
            }
            catch (WebException webEx)
            {
                DateTime endDateTime = DateTime.UtcNow;
                elapsedTime = new TimeSpan(endDateTime.Ticks - startDateTime.Ticks);
                statusCode = webEx.Status.ToString();
            }
            return new PingResult()
            {
                PartitionKey = DateTime.UtcNow.Ticks.ToString("d19"),
                RowKey = item.RowKey,
                Url = item.Url,
                StatusCode = statusCode,
                ContentLength = contentLength,
                TimeTaken = elapsedTime.TotalMilliseconds,
            };
        }

        private void SaveResult(PingResult result)
        {
            string tableName = RoleEnvironment.GetConfigurationSettingValue("PingResultsTableName");
            CloudTable table = storageAccount.CreateCloudTableClient().GetTableReference(tableName);
            TableOperation addOperation = TableOperation.Insert(result);
            table.Execute(addOperation);
        }

        public void Execute(IJobExecutionContext context)
        {
            // Introduce a random delay between 100 and 200 ms to to avoid race condition.
            Thread.Sleep((new Random()).Next(100, 200));
            Trace.WriteLine(string.Format("[{0}] - Executing ping job. Role instance: {1}.", DateTime.UtcNow.ToString("yyyy-MM-dd HH:mm:ss"), RoleEnvironment.CurrentRoleInstance.Id));
            Init();
            // Try and acquire the lease.
            if (AcquireLease())
            {
                Trace.WriteLine(string.Format("[{0}] - Lease acquired. Role instance: {1}.", DateTime.UtcNow.ToString("yyyy-MM-dd HH:mm:ss"), RoleEnvironment.CurrentRoleInstance.Id));
                // If successful then read the data.
                var itemsToBeProcessed = ReadMasterData();
                //Now save this data as messages in process queue.
                SaveMessages(itemsToBeProcessed);
            }
            else
            {
                Trace.WriteLine(string.Format("[{0}] - Failed to acquire lease. Role instance: {1}.", DateTime.UtcNow.ToString("yyyy-MM-dd HH:mm:ss"), RoleEnvironment.CurrentRoleInstance.Id));
                // Else just sleep for 15 seconds.
                Thread.Sleep(15 * 1000);
            }
            // Now we'll fetch 5 messages from top of queue
            var itemsToBeProcessedByThisInstance = FetchMessages(5);
            if (itemsToBeProcessedByThisInstance.Count > 0)
            {
                int numTasks = itemsToBeProcessedByThisInstance.Count;
                List<Task> tasks = new List<Task>();
                for (int i = 0; i < numTasks; i++)
                {
                    var pingItem = itemsToBeProcessedByThisInstance[i];
                    var task = Task.Factory.StartNew(() =>
                        {
                            var pingResult = FetchUrl(pingItem);
                            SaveResult(pingResult);
                        });
                    tasks.Add(task);
                }
                Task.WaitAll(tasks.ToArray());
            }
        }

image

Pretty simple huh!!!

Finished Code

Here’s the complete code for pinging job in one place :).

Configuration File

<?xml version="1.0" encoding="utf-8"?>
<ServiceConfiguration serviceName="PoorMansPingdom" xmlns="http://schemas.microsoft.com/ServiceHosting/2008/10/ServiceConfiguration" osFamily="3" osVersion="*" schemaVersion="2012-10.1.8">
  <Role name="PoorMansPingdomWorkerRole">
    <Instances count="2" />
    <ConfigurationSettings>
      <Setting name="Microsoft.WindowsAzure.Plugins.Diagnostics.ConnectionString" value="UseDevelopmentStorage=true" />
      <!-- Storage account where our data will be stored. -->
      <Setting name="StorageAccount" value="UseDevelopmentStorage=true" />
      <!-- Name of the table where master data will be stored. -->
      <Setting name="PingItemsTableName" value="PingItems" />
      <!-- Name of the table where we'll store the results. -->
      <Setting name="PingResultsTableName" value="PingResults" />
      <!-- Blob container name where we'll store the blob which will be leased. -->
      <Setting name="BlobContainer" value="lease-blob-container" />
      <!-- Name of the blob on which each instance will try and acquire the lease. -->
      <Setting name="BlobToBeLeased" value="lease-blob.txt" />
      <!-- Name of the queue from which messages will be read. -->
      <Setting name="ProcessQueueName" value="ping-items-queue" />
      <!-- Ping Job Cron Schedule -->
      <Setting name="PingJobCronSchedule" value="0 0/1 * * * ?" />
    </ConfigurationSettings>
  </Role>
</ServiceConfiguration>

WorkerRole.cs

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Net;
using System.Text;
using System.Threading;
using Microsoft.WindowsAzure.Diagnostics;
using Microsoft.WindowsAzure.ServiceRuntime;
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Blob;
using Microsoft.WindowsAzure.Storage.Table;
using Microsoft.WindowsAzure.Storage.Queue;
using Quartz;
using Quartz.Impl;

namespace PoorMansPingdomWorkerRole
{
    public class WorkerRole : RoleEntryPoint
    {
        public override void Run()
        {
            // This is a sample worker implementation. Replace with your logic.
            Trace.WriteLine("PoorMansPingdomWorkerRole entry point called", "Information");
            // Call the initialization routine.
            Init();
            // Call the job scheduling routine.
            ScheduleJob();
            while (true)
            {
                Thread.Sleep(10000);
                //Trace.WriteLine("Working", "Information");
            }
        }

        public override bool OnStart()
        {
            // Set the maximum number of concurrent connections 
            ServicePointManager.DefaultConnectionLimit = 12;

            // For information on handling configuration changes
            // see the MSDN topic at http://go.microsoft.com/fwlink/?LinkId=166357.

            return base.OnStart();
        }

        private void Init()
        {
            // Get the cloud storage account.
            CloudStorageAccount storageAccount = CloudStorageAccount.Parse(RoleEnvironment.GetConfigurationSettingValue("StorageAccount"));
            // Get the name of the blob container
            string blobContainerName = RoleEnvironment.GetConfigurationSettingValue("BlobContainer");
            CloudBlobContainer blobContainer = storageAccount.CreateCloudBlobClient().GetContainerReference(blobContainerName);
            // Create the blob container.
            blobContainer.CreateIfNotExists();
            // Get the blob name
            string blobName = RoleEnvironment.GetConfigurationSettingValue("BlobToBeLeased");
            CloudBlockBlob blob = blobContainer.GetBlockBlobReference(blobName);
            // Write some dummy data in the blob.
            string blobContent = "This is dummy data";
            // Upload blob
            using (MemoryStream ms = new MemoryStream(Encoding.UTF8.GetBytes(blobContent)))
            {
                blob.UploadFromStream(ms);
            }
            // Get the table name for storing results.
            string tableName = RoleEnvironment.GetConfigurationSettingValue("PingResultsTableName");
            // Create the table.
            CloudTable table = storageAccount.CreateCloudTableClient().GetTableReference(tableName);
            table.CreateIfNotExists();
            // Get the queue name where ping items will be stored.
            string queueName = RoleEnvironment.GetConfigurationSettingValue("ProcessQueueName");
            // Create the queue.
            CloudQueue queue = storageAccount.CreateCloudQueueClient().GetQueueReference(queueName);
            queue.CreateIfNotExists();
        }

        private void ScheduleJob()
        {
            DateTimeOffset runTime = DateBuilder.EvenMinuteDate(DateTime.Now);

            // construct a scheduler factory
            ISchedulerFactory schedFact = new StdSchedulerFactory();

            // get a scheduler
            IScheduler sched = schedFact.GetScheduler();
            sched.Start();

            JobDataMap jobDataMap = new JobDataMap();

            IJobDetail websitePingJobDetail = JobBuilder.Create<PingJob>()
                    .WithIdentity("WebsitePingJob", "group1")
                    .WithDescription("Website Ping Job")
                    .UsingJobData(jobDataMap)
                    .Build();

            ITrigger websitePingJobTrigger = TriggerBuilder.Create()
                .WithIdentity("WebsitePingJob", "group1")
                .StartAt(runTime)
                .WithCronSchedule(RoleEnvironment.GetConfigurationSettingValue("PingJobCronSchedule"))
                .StartNow()
                .Build();

            sched.ScheduleJob(websitePingJobDetail, websitePingJobTrigger);
        }
    }
}

PingJob.cs

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Net;
using System.Text;
using System.Threading;
using Microsoft.WindowsAzure.Diagnostics;
using Microsoft.WindowsAzure.ServiceRuntime;
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Blob;
using Microsoft.WindowsAzure.Storage.Table;
using Microsoft.WindowsAzure.Storage.Queue;
using Quartz;
using System.Threading.Tasks;

namespace PoorMansPingdomWorkerRole
{
    public class PingJob : IInterruptableJob
    {
        CloudStorageAccount storageAccount;
        CloudBlobContainer blobContainer;
        CloudBlockBlob blob;

        public void Execute(IJobExecutionContext context)
        {
            // Introduce a random delay between 100 and 200 ms to to avoid race condition.
            Thread.Sleep((new Random()).Next(100, 200));
            Trace.WriteLine(string.Format("[{0}] - Executing ping job. Role instance: {1}.", DateTime.UtcNow.ToString("yyyy-MM-dd HH:mm:ss"), RoleEnvironment.CurrentRoleInstance.Id));
            Init();
            // Try and acquire the lease.
            if (AcquireLease())
            {
                Trace.WriteLine(string.Format("[{0}] - Lease acquired. Role instance: {1}.", DateTime.UtcNow.ToString("yyyy-MM-dd HH:mm:ss"), RoleEnvironment.CurrentRoleInstance.Id));
                // If successful then read the data.
                var itemsToBeProcessed = ReadMasterData();
                //Now save this data as messages in process queue.
                SaveMessages(itemsToBeProcessed);
            }
            else
            {
                Trace.WriteLine(string.Format("[{0}] - Failed to acquire lease. Role instance: {1}.", DateTime.UtcNow.ToString("yyyy-MM-dd HH:mm:ss"), RoleEnvironment.CurrentRoleInstance.Id));
                // Else just sleep for 15 seconds.
                Thread.Sleep(15 * 1000);
            }
            // Now we'll fetch 5 messages from top of queue
            var itemsToBeProcessedByThisInstance = FetchMessages(5);
            if (itemsToBeProcessedByThisInstance.Count > 0)
            {
                int numTasks = itemsToBeProcessedByThisInstance.Count;
                List<Task> tasks = new List<Task>();
                for (int i = 0; i < numTasks; i++)
                {
                    var pingItem = itemsToBeProcessedByThisInstance[i];
                    var task = Task.Factory.StartNew(() =>
                        {
                            var pingResult = FetchUrl(pingItem);
                            SaveResult(pingResult);
                        });
                    tasks.Add(task);
                }
                Task.WaitAll(tasks.ToArray());
            }
        }

        public void Interrupt()
        {
            throw new NotImplementedException();
        }

        private void Init()
        {
            storageAccount = CloudStorageAccount.Parse(RoleEnvironment.GetConfigurationSettingValue("StorageAccount"));
            string blobContainerName = RoleEnvironment.GetConfigurationSettingValue("BlobContainer");
            blobContainer = storageAccount.CreateCloudBlobClient().GetContainerReference(blobContainerName);
            string blobName = RoleEnvironment.GetConfigurationSettingValue("BlobToBeLeased");
            blob = blobContainer.GetBlockBlobReference(blobName);
        }

        private bool AcquireLease()
        {
            try
            {
                blob.AcquireLease(TimeSpan.FromSeconds(45), null);
                return true;
            }
            catch (Exception exception)
            {
                return false;
            }
        }

        private List<PingItem> ReadMasterData()
        {
            string pingItemTableName = RoleEnvironment.GetConfigurationSettingValue("PingItemsTableName");
            CloudTable table = storageAccount.CreateCloudTableClient().GetTableReference(pingItemTableName);
            TableQuery<PingItem> query = new TableQuery<PingItem>();
            var queryResult = table.ExecuteQuery<PingItem>(query);
            return queryResult.ToList();
        }

        private void SaveMessages(List<PingItem> pingItems)
        {
            string queueName = RoleEnvironment.GetConfigurationSettingValue("ProcessQueueName");
            CloudQueue queue = storageAccount.CreateCloudQueueClient().GetQueueReference(queueName);
            foreach (var pingItem in pingItems)
            {
                CloudQueueMessage msg = new CloudQueueMessage(pingItem.ToString());
                queue.AddMessage(msg, TimeSpan.FromSeconds(45));
            }
        }

        private List<PingItem> FetchMessages(int maximumMessagesToFetch)
        {
            string queueName = RoleEnvironment.GetConfigurationSettingValue("ProcessQueueName");
            CloudQueue queue = storageAccount.CreateCloudQueueClient().GetQueueReference(queueName);
            var messages = queue.GetMessages(maximumMessagesToFetch);
            List<PingItem> itemsToBeProcessed = new List<PingItem>();
            foreach (var message in messages)
            {
                itemsToBeProcessed.Add(PingItem.ParseFromString(message.AsString));
                queue.DeleteMessage(message);
            }
            return itemsToBeProcessed;
        }

        private PingResult FetchUrl(PingItem item)
        {
            DateTime startDateTime = DateTime.UtcNow;
            TimeSpan elapsedTime = TimeSpan.FromSeconds(0);
            string statusCode = "";
            long contentLength = 0;
            try
            {
                HttpWebRequest req = (HttpWebRequest)WebRequest.Create(item.Url);
                req.Timeout = 30 * 1000;//Let's timeout the request in 30 seconds.
                req.Method = "GET";
                using (HttpWebResponse resp = (HttpWebResponse)req.GetResponse())
                {
                    DateTime endDateTime = DateTime.UtcNow;
                    elapsedTime = new TimeSpan(endDateTime.Ticks - startDateTime.Ticks);
                    statusCode = resp.StatusCode.ToString();
                    contentLength = resp.ContentLength;
                }
            }
            catch (WebException webEx)
            {
                DateTime endDateTime = DateTime.UtcNow;
                elapsedTime = new TimeSpan(endDateTime.Ticks - startDateTime.Ticks);
                statusCode = webEx.Status.ToString();
            }
            return new PingResult()
            {
                PartitionKey = DateTime.UtcNow.Ticks.ToString("d19"),
                RowKey = item.RowKey,
                Url = item.Url,
                StatusCode = statusCode,
                ContentLength = contentLength,
                TimeTaken = elapsedTime.TotalMilliseconds,
            };
        }

        private void SaveResult(PingResult result)
        {
            string tableName = RoleEnvironment.GetConfigurationSettingValue("PingResultsTableName");
            CloudTable table = storageAccount.CreateCloudTableClient().GetTableReference(tableName);
            TableOperation addOperation = TableOperation.Insert(result);
            table.Execute(addOperation);
        }
    }
}

Complete Source Code on Github.com!

I’ve been putting this exercise on back burner for a long-long time. Not anymore:) I’ve taken the plunge and started using Github. The complete solution is now available on Github.com for you to take a look at: https://github.com/gmantri/windowsazure-task-scheduler-example.

Other Alternatives

You don’t really have to go all out and build this thing on your own. Luckily there’re some things which are available to you even today which will help you achieve the same thing. Some of the options are outside of Windows Azure while some are inside Windows Azure. We’ll only talk about options available to you today in Windows Azure:

Windows Azure Mobile Service Task Scheduler

Recently Windows Azure announced the availability of a job scheduler functionality in Windows Azure Mobile Service. You can write code using node.js for job functionality and mobile service takes care of job execution. For more information, please visit: http://www.windowsazure.com/en-us/develop/mobile/tutorials/schedule-backend-tasks/.

Aditi Cloud Services

Aditi (www.aditi.com), a big Microsoft partner recently announced the availability of “Scheduler” service which allows you execute any CRON job in the cloud. This service is also available through Windows Azure Marketplace and can be added as a add-on feature to your subscription. For more information, please visit: http://www.aditicloud.com/.

Summary

As demonstrated above, it is quite simple to build a task scheduler in Windows Azure. Obviously I took a rather simple example and made certain assumptions. When you would build a service like this for production use, you would need to address a number of concerns so that you build a robust service. I hope you’ve found this blog post useful. Do share your thoughts by providing comments. If you find any issues, please let me know and I’ll fix them ASAP.

Happy Coding!!!


[This is the latest product I'm working on]