Open androllen opened 4 years ago
using NetMQ;
using NetMQ.Sockets;
using Prism.Events;
using Prism.Logging;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.ComponentModel;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Management;
using System.Threading;
using System.Threading.Tasks;
using System.Xml;
using Tx.EventBus;
using Tx.PythonService.Definitions;
using Tx.PythonService.Helpers;
using Tx.PythonService.Interfaces;
namespace Tx.PythonService
{
public partial class CoreShell : ICoreShell
{
private string _basePyPath => Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "scripts", "TXThrift");
private string _workPath => Path.Combine(_basePyPath, "TX_Py3");
private string _venvPath => Path.Combine(_basePyPath, "TX_WIN_VENV");
private string _winsw => Path.Combine(Path.GetDirectoryName(_basePyPath), "winsw.exe");
private string _xml => Path.Combine(Path.GetDirectoryName(_basePyPath), "winsw.xml");
private IEventAggregator _eventAggregator;
private ILoggerFacade _logger;
private string _reqhost = "tcp://127.0.0.1:5555";
private string _subhost = "tcp://127.0.0.1:50505";
private bool _debug = false;
private static readonly object _locker = new object();
private static CoreShell _instance;
public static CoreShell Current
{
get
{
if (_instance == null)
{
lock (_locker)
{
if (_instance == null)
{
_instance = new CoreShell();
}
}
}
return _instance;
}
}
private const int NullTaskID = -1;
private int taskPID = NullTaskID;
private CancellationTokenSource cts = null;
private bool isRunning;
private CoreShell()
{
}
public void Dispose()
{
while (_s.Count > 0)
{
var first = _s.FirstOrDefault().Key;
Unsubscribe(first.Parameter);
_s.TryRemove(first, out SubscriberSocket subscriber);
}
cts?.Cancel(false);
cts?.Dispose();
KillProcessAndChildren(taskPID);
taskPID = NullTaskID;
isRunning = false;
}
private static void KillProcessAndChildren(int pid)
{
// Cannot close 'system idle process'.
if (pid <= 0)
{
return;
}
ManagementObjectSearcher searcher = new ManagementObjectSearcher
("Select * From Win32_Process Where ParentProcessID=" + pid);
ManagementObjectCollection moc = searcher.Get();
foreach (ManagementObject mo in moc)
{
KillProcessAndChildren(Convert.ToInt32(mo["ProcessID"]));
}
try
{
Process proc = Process.GetProcessById(pid);
proc.Kill();
}
catch (ArgumentException)
{
// Process already exited.
}
catch (Win32Exception)
{
// Access denied
}
}
}
public partial class CoreShell : ICoreShell
{
private readonly ConcurrentDictionary<Worker, SubscriberSocket> _s = new ConcurrentDictionary<Worker, SubscriberSocket>();
public void Start(IEventAggregator eventAggregator, ILoggerFacade logger, string reqthost, string subhost, bool debug)
{
_eventAggregator = eventAggregator;
_logger = logger;
_reqhost = reqthost;
_subhost = subhost;
_debug = debug;
Initialize(_reqhost, _subhost, _debug);
}
public void Stop()
{
StopSw();
Dispose();
}
public void CreateDevCommandLine()
{
var psi = new ProcessStartInfo("CMD.exe")
{
CreateNoWindow = false,
UseShellExecute = false,
WorkingDirectory = _workPath,
ErrorDialog = false
};
var path = psi.EnvironmentVariables["PATH"];
if (path != null)
{
var array = path.Split(new[] { ';' }).Where(p => !p.ToLower().Contains("python")).ToList();
array.AddRange(new[] { _venvPath, Path.Combine(_venvPath, "Lib"), _workPath });
psi.EnvironmentVariables["PATH"] = string.Join(";", array);
}
Process.Start(psi);
}
private void Initialize(string reqHost, string subHost, bool debug = true)
{
try
{
var pss = Process.GetProcesses().Where(p => p.ProcessName.ToLower().Contains("python"));
foreach (var ps in pss)
{
KillProcessAndChildren(ps.Id);
}
KillProcessAndChildren(taskPID);
}
catch (Exception ex)
{
_logger.Log(ex.ToString(), Category.Exception, Priority.Low);
}
CreateProcess(reqHost, subHost, debug);
StartSw();
//_logger.Log($"初始进程ID:{taskPID}", Category.Debug, Priority.Medium);
}
private void CreateProcess(string reqHost, string subHost, bool debug)
{
var fileName = Path.Combine(_venvPath, "python3.exe");
var arguements = debug ? $"server.py -r {reqHost} -s {subHost} --debug" : $"server.py -r {reqHost} -s {subHost}";
var psi = new ProcessStartInfo(fileName)
{
Arguments = arguements,
WorkingDirectory = _workPath,
ErrorDialog = false,
UseShellExecute = false,
CreateNoWindow = true
};
var path = psi.EnvironmentVariables["PATH"];
if (path != null)
{
var array = path.Split(new[] { ';' }).Where(p => !p.ToLower().Contains("python")).ToList();
array.AddRange(new[] { _venvPath, Path.Combine(_venvPath, "Lib"), _workPath, _winsw });
psi.EnvironmentVariables["PATH"] = string.Join(";", array);
}
try
{
#region Write XmlDocument
XmlDocument xmlDocument = new XmlDocument();
xmlDocument.AppendChild(xmlDocument.CreateXmlDeclaration("1.0", "UTF-8", null));
XmlElement xmlRoot = xmlDocument.CreateElement("service");
xmlDocument.AppendChild(xmlRoot);
xmlDocument.CreateComment("id");
XmlElement id_xml = xmlDocument.CreateElement("id");
id_xml.InnerText = "txpy";
xmlRoot.AppendChild(id_xml);
xmlDocument.CreateComment("id");
XmlElement name_xml = xmlDocument.CreateElement("name");
name_xml.InnerText = "app";
xmlRoot.AppendChild(name_xml);
xmlDocument.CreateComment("id");
XmlElement description_xml = xmlDocument.CreateElement("description");
description_xml.InnerText = "this is";
xmlRoot.AppendChild(description_xml);
xmlDocument.CreateComment("id");
XmlElement exe_xml = xmlDocument.CreateElement("executable");
exe_xml.InnerText = fileName;
xmlRoot.AppendChild(exe_xml);
xmlDocument.CreateComment("id");
XmlElement args_xml = xmlDocument.CreateElement("arguments");
args_xml.InnerText = arguements;
xmlRoot.AppendChild(args_xml);
xmlDocument.CreateComment("id");
XmlElement env_xml = xmlDocument.CreateElement("env");
XmlAttribute path_attr = xmlDocument.CreateAttribute("name");
path_attr.Value = "Path";
XmlAttribute value_attr = xmlDocument.CreateAttribute("value");
value_attr.Value = psi.EnvironmentVariables["PATH"];
env_xml.Attributes.SetNamedItem(path_attr);
env_xml.Attributes.SetNamedItem(value_attr);
xmlRoot.AppendChild(env_xml);
xmlDocument.Save(_xml);
#endregion
Process.Start(psi);
}
catch (Exception ex)
{
Trace.WriteLine(ex);
}
}
private void StartSw()
{
Process process = new Process();
process.StartInfo.FileName = "winsw.exe";
process.StartInfo.UseShellExecute = false;
process.StartInfo.Arguments = "install";
process.Start();
process.WaitForExit();
process.Close();
process = new Process();
process.StartInfo.FileName = "winsw.exe";
process.StartInfo.UseShellExecute = false;
process.StartInfo.Arguments = "start";
process.Start();
process.WaitForExit();
process.Close();
}
private void StopSw()
{
Process process = new Process();
process.StartInfo.FileName = "winsw.exe";
process.StartInfo.UseShellExecute = false;
process.StartInfo.Arguments = "stop";
process.Start();
process.WaitForExit();
process.Close();
process = new Process();
process.StartInfo.FileName = "winsw.exe";
process.StartInfo.UseShellExecute = false;
process.StartInfo.Arguments = "uninstall";
process.Start();
process.WaitForExit();
process.Close();
}
private bool CheckSw()
{
bool output = false;
Process process = new Process();
process.StartInfo.FileName = "winsw.exe";
process.StartInfo.UseShellExecute = false;
process.StartInfo.Arguments = "status";
process.StartInfo.RedirectStandardOutput = true;
process.OutputDataReceived += new DataReceivedEventHandler((sender, e) =>
{
// Prepend line numbers to each line of the output.
if (!string.IsNullOrEmpty(e.Data) && e.Data.Equals(""))
{
output = true;
}
});
process.Start();
// Asynchronously read the standard output of the spawned process.
// This raises OutputDataReceived events for each line of output.
process.BeginOutputReadLine();
process.WaitForExit();
process.Close();
return output;
}
public bool Check(PyScript script, Dictionary<string, object> parameters, out PyData data)
{
data = new PyData
{
ID = Guid.NewGuid(),
ScriptID = script.GetDefaultValue().ToString(),
Parameters = parameters,
State = true
};
bool ok = false;
try
{
using (var _request = new RequestSocket(_reqhost))
{
var json = data.SerializeObject();
Console.WriteLine(json);
ok = _request.TrySendFrame(TimeSpan.FromSeconds(2), json);
if (ok)
{
ok = _request.TryReceiveFrameString(TimeSpan.FromSeconds(2), out string message);
bool.TryParse(message, out ok);
}
}
_eventAggregator.GetEvent<ConsoleEvent>().Publish($"请求结果:{ok}");
return ok;
}
catch (Exception ex)
{
_eventAggregator.GetEvent<ConsoleEvent>().Publish($"请求发生异常:{ex}");
data = default(PyData);
return ok;
}
}
public async Task SubscribeAsync<T>(PyData data, Action<T, Exception> action) where T : class
{
var cts = new CancellationTokenSource();
await Task.Run(() =>
{
var worker = new Worker()
{
Parameter = data,
Cts = cts
};
var items = _s.Keys.Where(p => p.Parameter.ID == worker.Parameter.ID);
if (items == null || items.Count() == 0)
{
var subscriber = new SubscriberSocket();
subscriber.Options.ReceiveHighWatermark = 1000;
subscriber.Connect(_subhost);
subscriber.Subscribe(worker.Parameter.ToString());
_s.TryAdd(worker, subscriber);
}
var key = _s.Keys.FirstOrDefault(p => p.Parameter.ID == worker.Parameter.ID);
var finished = _s.TryGetValue(key, out SubscriberSocket subSocket);
if (finished)
{
while (!cts.IsCancellationRequested)
{
if (subSocket.TryReceiveFrameString(out string str))
{
if (!string.IsNullOrWhiteSpace(str) && str.Length > 3)
{
var dt = DateTime.Now.ToString("yyyy-MM-dd hh:mm:ss");
var index = str.IndexOf(',');
var id = str.Substring(0, index - 1).Trim();
var json = str.Substring(index + 1).Trim();
if (json.StartsWith("{") && json.EndsWith("}"))
{
Trace.WriteLine(json);
var model = json.DeserializeObject<T>();
action(model, null);
}
else if (json == "end!!!")
{
action(default(T), new EndException());
}
else if (json.StartsWith("error_"))
{
action(default(T), new PythonException(json.Replace("error_", "")));
}
_eventAggregator.GetEvent<ConsoleEvent>().Publish($"{dt} - TASK_ID:{id}");
}
}
}
}
}, cts.Token);
}
public void Unsubscribe(PyData data)
{
if (data == null)
return;
var workers = _s.Keys.Where(p => p.Parameter.ToString() == data.ToString());
foreach (var worker in workers)
{
var has = _s.TryRemove(worker, out SubscriberSocket socket);
if (has)
{
try
{
worker.Cts.Cancel(false);
worker.Cts.Dispose();
socket.Unsubscribe(worker.Parameter.ToString());
socket.Disconnect(_subhost);
socket.Close();
socket.Dispose();
}
catch (Exception ex)
{
Trace.WriteLine(ex.Message);
//_eventAggregator.GetEvent<ConsoleEvent>().Publish(ex.Message);
}
}
}
}
}
}
cd bin\Debug\netcoreapp3.1
git clone https://github.com/androllen/AnWorker.git
download WinSW.NET461.exe use 7zip