Recreate worker as a .NET Core app

This commit is contained in:
Nate McMaster
2016-06-11 10:21:26 -07:00
parent 6562cee08c
commit e5489683c8
6 changed files with 149 additions and 200 deletions

View File

@ -0,0 +1,115 @@
using System;
using System.Data.Common;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Threading;
using Newtonsoft.Json;
using Npgsql;
using StackExchange.Redis;
namespace Worker
{
public class Program
{
public static int Main(string[] args)
{
try
{
var pgsql = OpenDbConnection("Server=db;Username=postgres;");
var redis = OpenRedisConnection("redis").GetDatabase();
var definition = new { vote = "", voter_id = "" };
while (true)
{
string json = redis.ListLeftPopAsync("votes").Result;
if (json != null)
{
var vote = JsonConvert.DeserializeAnonymousType(json, definition);
Console.WriteLine($"Processing vote for '{vote.vote}' by '{vote.voter_id}'");
UpdateVote(pgsql, vote.voter_id, vote.vote);
}
}
}
catch (Exception ex)
{
Console.Error.WriteLine(ex.ToString());
return 1;
}
}
private static NpgsqlConnection OpenDbConnection(string connectionString)
{
var connection = new NpgsqlConnection(connectionString);
while (true)
{
try
{
connection.Open();
break;
}
catch (DbException)
{
Console.Error.WriteLine("Failed to connect to db - retrying");
}
}
var command = connection.CreateCommand();
command.CommandText = @"CREATE TABLE IF NOT EXISTS votes (
id VARCHAR(255) NOT NULL UNIQUE,
vote VARCHAR(255) NOT NULL
)";
command.ExecuteNonQuery();
return connection;
}
private static ConnectionMultiplexer OpenRedisConnection(string hostname)
{
// Use IP address to workaround hhttps://github.com/StackExchange/StackExchange.Redis/issues/410
var ipAddress = GetIp(hostname);
Console.WriteLine($"Found redis at {ipAddress}");
while (true)
{
try
{
return ConnectionMultiplexer.Connect(ipAddress);
}
catch (RedisConnectionException)
{
Console.Error.WriteLine("Failed to connect to redis - retrying");
Thread.Sleep(1000);
}
}
}
private static string GetIp(string hostname)
=> Dns.GetHostEntryAsync(hostname)
.Result
.AddressList
.First(a => a.AddressFamily == AddressFamily.InterNetwork)
.ToString();
private static void UpdateVote(NpgsqlConnection connection, string voterId, string vote)
{
var command = connection.CreateCommand();
try
{
command.CommandText = "INSERT INTO votes (id, vote) VALUES (@id, @vote)";
command.Parameters.AddWithValue("@id", voterId);
command.Parameters.AddWithValue("@vote", vote);
command.ExecuteNonQuery();
}
catch (DbException)
{
command.CommandText = "UPDATE votes SET vote = @vote WHERE id = @id";
command.ExecuteNonQuery();
}
finally
{
command.Dispose();
}
}
}
}

View File

@ -0,0 +1,24 @@
{
"name": "Worker",
"buildOptions": {
"emitEntryPoint": true,
"warningsAsErrors": true
},
"dependencies": {
"StackExchange.Redis": "1.1.604-alpha",
"Npgsql": "3.1.3",
"Newtonsoft.Json": "9.0.1-beta1",
"Microsoft.NETCore.App": {
"type": "platform",
"version": "1.0.0-rc2-3002702"
}
},
"frameworks": {
"netcoreapp1.0": { }
},
"runtimeOptions": {
"configProperties": {
"System.GC.Server": true
}
}
}

View File

@ -1,101 +0,0 @@
package worker;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.exceptions.JedisConnectionException;
import java.sql.*;
import org.json.JSONObject;
class Worker {
public static void main(String[] args) {
try {
Jedis redis = connectToRedis("redis");
Connection dbConn = connectToDB("db");
System.err.println("Watching vote queue");
while (true) {
String voteJSON = redis.blpop(0, "votes").get(1);
JSONObject voteData = new JSONObject(voteJSON);
String voterID = voteData.getString("voter_id");
String vote = voteData.getString("vote");
System.err.printf("Processing vote for '%s' by '%s'\n", vote, voterID);
updateVote(dbConn, voterID, vote);
}
} catch (SQLException e) {
e.printStackTrace();
System.exit(1);
}
}
static void updateVote(Connection dbConn, String voterID, String vote) throws SQLException {
PreparedStatement insert = dbConn.prepareStatement(
"INSERT INTO votes (id, vote) VALUES (?, ?)");
insert.setString(1, voterID);
insert.setString(2, vote);
try {
insert.executeUpdate();
} catch (SQLException e) {
PreparedStatement update = dbConn.prepareStatement(
"UPDATE votes SET vote = ? WHERE id = ?");
update.setString(1, vote);
update.setString(2, voterID);
update.executeUpdate();
}
}
static Jedis connectToRedis(String host) {
Jedis conn = new Jedis(host);
while (true) {
try {
conn.keys("*");
break;
} catch (JedisConnectionException e) {
System.err.println("Failed to connect to redis - retrying");
sleep(1000);
}
}
System.err.println("Connected to redis");
return conn;
}
static Connection connectToDB(String host) throws SQLException {
Connection conn = null;
try {
Class.forName("org.postgresql.Driver");
String url = "jdbc:postgresql://" + host + "/postgres";
while (conn == null) {
try {
conn = DriverManager.getConnection(url, "postgres", "");
} catch (SQLException e) {
System.err.println("Failed to connect to db - retrying");
sleep(1000);
}
}
PreparedStatement st = conn.prepareStatement(
"CREATE TABLE IF NOT EXISTS votes (id VARCHAR(255) NOT NULL UNIQUE, vote VARCHAR(255) NOT NULL)");
st.executeUpdate();
} catch (ClassNotFoundException e) {
e.printStackTrace();
System.exit(1);
}
return conn;
}
static void sleep(long duration) {
try {
Thread.sleep(duration);
} catch (InterruptedException e) {
System.exit(1);
}
}
}