cloudnet-compose/worker/src/Worker/Program.cs

115 lines
3.7 KiB
C#
Raw Normal View History

2016-06-11 19:21:26 +02:00
using System;
using System.Data.Common;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Threading;
using Newtonsoft.Json;
using Npgsql;
using StackExchange.Redis;
namespace Worker
{
public class Program
{
public static int Main(string[] args)
{
try
{
var pgsql = OpenDbConnection("Server=db;Username=postgres;");
var redis = OpenRedisConnection("redis").GetDatabase();
var definition = new { vote = "", voter_id = "" };
while (true)
{
string json = redis.ListLeftPopAsync("votes").Result;
if (json != null)
{
var vote = JsonConvert.DeserializeAnonymousType(json, definition);
Console.WriteLine($"Processing vote for '{vote.vote}' by '{vote.voter_id}'");
UpdateVote(pgsql, vote.voter_id, vote.vote);
}
}
}
catch (Exception ex)
{
Console.Error.WriteLine(ex.ToString());
return 1;
}
}
private static NpgsqlConnection OpenDbConnection(string connectionString)
{
var connection = new NpgsqlConnection(connectionString);
while (true)
{
try
{
connection.Open();
break;
}
catch (DbException)
{
Console.Error.WriteLine("Failed to connect to db - retrying");
}
}
var command = connection.CreateCommand();
command.CommandText = @"CREATE TABLE IF NOT EXISTS votes (
id VARCHAR(255) NOT NULL UNIQUE,
vote VARCHAR(255) NOT NULL
)";
command.ExecuteNonQuery();
return connection;
}
private static ConnectionMultiplexer OpenRedisConnection(string hostname)
{
// Use IP address to workaround hhttps://github.com/StackExchange/StackExchange.Redis/issues/410
var ipAddress = GetIp(hostname);
Console.WriteLine($"Found redis at {ipAddress}");
while (true)
{
try
{
return ConnectionMultiplexer.Connect(ipAddress);
}
catch (RedisConnectionException)
{
Console.Error.WriteLine("Failed to connect to redis - retrying");
Thread.Sleep(1000);
}
}
}
private static string GetIp(string hostname)
=> Dns.GetHostEntryAsync(hostname)
.Result
.AddressList
.First(a => a.AddressFamily == AddressFamily.InterNetwork)
.ToString();
private static void UpdateVote(NpgsqlConnection connection, string voterId, string vote)
{
var command = connection.CreateCommand();
try
{
command.CommandText = "INSERT INTO votes (id, vote) VALUES (@id, @vote)";
command.Parameters.AddWithValue("@id", voterId);
command.Parameters.AddWithValue("@vote", vote);
command.ExecuteNonQuery();
}
catch (DbException)
{
command.CommandText = "UPDATE votes SET vote = @vote WHERE id = @id";
command.ExecuteNonQuery();
}
finally
{
command.Dispose();
}
}
}
}