microsoft / service-fabric-issues

This repo is for the reporting of issues found with Azure Service Fabric.
168 stars 21 forks source link

Background Service operation stops with no error/exception #1012

Closed Thialala closed 6 years ago

Thialala commented 6 years ago

I had an issue with background operation stopping with no error / exception. I have a SF app with two services :

The FrontService receives the request and directly forwards it to the BackgroundService. This latter service calls another service in the cluster to receive a csv file stream from Azure Datalake and split it and insert it in a table in Azure SQL Database.

When I test it locally on my Dev Cluster, it works fine and insert the 1,300,00 lines of the file in the table in around 3 minutes. When I publish it to the Azure Service Fabric Cluster, it only inserts 150000 lines and stops.

I cannot provide logs because I couldn't enable Debugging nor Streaming Tracing. The cluster is running on a VNET with NSG and an internal load balancer.

Microsoft.ServiceFabric ==> Version="6.1.472" Microsoft.ServiceFabric.* ==> Version="3.0.472"

Azure Service Fabric Version : 6.1.480.9494

Here is the code of the FrontService:

namespace FrontService.Controllers
{
    [Route("api/trades/feeder")]
    public class FrontServiceController : Controller
    {
        private readonly IDatabaseFeeder _databaseFeeder;

        public FrontServiceController(IConfiguration configuration)
        {
            _databaseFeeder = ServiceProxy.Create<IDatabaseFeeder>(new Uri("fabric:/AppName/BackgroundService"));

            Configuration = configuration;
        }

        public IConfiguration Configuration { get; private set; }

        [HttpGet]
        public IActionResult InsertDlkFileInDatabase(string folder, string filename)
        {
            var connectionString = Configuration.GetConnectionString("ConnectionName");
            try
            {
                _databaseFeeder.Feed(folder, filename, connectionString);
                return Ok("Processing in backgroud");
            }
            catch (Exception ex)
            {
                return BadRequest(ex.Message);
            }
        }
    }
}

Here is the code of the BackgroundService :

namespace BackgroundServiceNamespace
{    
    internal sealed class BackgroundService : StatelessService, IDatabaseFeeder
    {
        public BackgroundService(StatelessServiceContext context)
            : base(context)
        {
        }

        protected override IEnumerable<ServiceInstanceListener> CreateServiceInstanceListeners()
        {
            return new[] { new ServiceInstanceListener(context => this.CreateServiceRemotingListener(context)) };
        }

        public async Task Feed(string folder, string filename, string connectionString)
        {
            ServiceEventSource.Current.ServiceMessage(this.Context, "Received request for file: {0}/{1}", folder, filename);

            ServicePointManager.ServerCertificateValidationCallback = delegate (object sender, X509Certificate certificate, X509Chain
            chain, SslPolicyErrors sslPolicyErrors)

            {
                return true;
            };

            int batchSize = 50000;

            var reverseProxyPort = await ReverseProxyPortResolver.ReverseProxyPortAsync();
            string serviceUri = "DLKStreamer/DLKStreamerService";
            string requestUrl = $"https://localhost:{reverseProxyPort}/{serviceUri}/api/filestreamer?filename={folder}/{filename}&resultname={filename}";

            var client = new HttpClient();
            Stream response = null;
            try
            {
                response = await client.GetStreamAsync(requestUrl);
            }
            catch (Exception ex)
            {
                ServiceEventSource.Current.ServiceMessage(this.Context, ex.Message);
            }

            DataTable dt = new DataTable();
            dt.Columns.AddRange(new DataColumn[6] {
            new DataColumn("Date", typeof(DateTime)),
            new DataColumn("Day_of_week",typeof(int)),
            new DataColumn("Half_hour", typeof(string)),
            new DataColumn("Underlying", typeof(string)),
            new DataColumn("MaturitySliding", typeof(string)),
            new DataColumn("Volume", typeof(decimal))
        });

            try
            {
                using (SqlConnection con = new SqlConnection(connectionString))
                {
                    con.Open();

                    using (SqlBulkCopy sqlBulkCopy = new SqlBulkCopy(con))
                    {
                        sqlBulkCopy.DestinationTableName = "dbo.Trades";

                        sqlBulkCopy.ColumnMappings.Add("Date", "Date");
                        sqlBulkCopy.ColumnMappings.Add("Day_of_week", "Day_of_week");
                        sqlBulkCopy.ColumnMappings.Add("Half_hour", "Half_hour");
                        sqlBulkCopy.ColumnMappings.Add("Underlying", "Underlying");
                        sqlBulkCopy.ColumnMappings.Add("MaturitySliding", "MaturitySliding");
                        sqlBulkCopy.ColumnMappings.Add("Volume", "Volume");

                        using (var reader = new StreamReader(response))
                        {
                            reader.ReadLine();
                            var linesReadCount = 0;

                            while (!reader.EndOfStream)
                            {
                                dt.Rows.Add();
                                linesReadCount++;

                                var line = await reader.ReadLineAsync();                               
                                var cells = line.Split(',');
                                dt.Rows[dt.Rows.Count - 1][0] = DateTime.ParseExact(cells[0], "yyyy-MM-dd", CultureInfo.InvariantCulture);
                                dt.Rows[dt.Rows.Count - 1][1] = int.Parse(cells[1]);
                                dt.Rows[dt.Rows.Count - 1][2] = cells[2];
                                dt.Rows[dt.Rows.Count - 1][3] = cells[3];
                                dt.Rows[dt.Rows.Count - 1][4] = cells[4];
                                dt.Rows[dt.Rows.Count - 1][5] = decimal.Parse(cells[5]);

                                if (linesReadCount % batchSize == 0)
                                {
                                    sqlBulkCopy.WriteToServer(dt);
                                    dt.Rows.Clear();
                                    ServiceEventSource.Current.ServiceMessage(this.Context, "Processed lines : {0}", linesReadCount);
                                }
                            }
                        }

                        sqlBulkCopy.WriteToServer(dt);
                        dt.Rows.Clear();
                        con.Close();

                        ServiceEventSource.Current.ServiceMessage(this.Context, "End of feeder.");
                    }
                }
            }
            catch (Exception ex)
            {
                ServiceEventSource.Current.ServiceMessage(this.Context, "Exception while tranfering file : {0}", ex.Message);
            }
        }
}

Here is the code of the interface IDatabaseFeeder :

  public interface IDatabaseFeeder : IService
    {
        Task Feed(string folder, string filename, string connectionString);
    }
masnider commented 6 years ago

When you say it stops what do you mean exactly? Looks like there's multiple ways for the Feed function to stop (ex: you're hitting some exception). You'll need to figure out what path it's taking that causes it to stop. If you need help trying to get your tracing or debugging working in that environment, let us know what you're running into there. You could also consider utilizing service fabric health reports to help track where processing is stopping and why: https://docs.microsoft.com/en-us/azure/service-fabric/service-fabric-report-health

Thialala commented 6 years ago

@masnider , thank you for your answer. You were right, I'm hitting an exception:

{"ClassName":"System.AggregateException","Message":"One or more errors occurred.","Data":null,"InnerException":{"ClassName":"System.IO.IOException","Message":"Unable to read data from the transport connection: The connection was closed.","Data":null,"InnerException":null,"HelpURL":null,"StackTraceString":"   at System.Net.ConnectStream.Read(Byte[] buffer, Int32 offset, Int32 size)  
at System.IO.StreamReader.ReadBuffer()
at System.IO.StreamReader.get_EndOfStream()   
at BackgroundService.<Feed>d__2.MoveNext() in C:\\........\\BackgroundService.cs:line 145

On my LocalDevCluster, I have no connection closed and I'm able to recover the entire stream from another service running in the cluster. However, once deployed, I have this error.

Any clue ?

Thanks!

masnider commented 6 years ago

@Thialala - sorry no idea why that might be happening off the top of my head. Could be lots of things. Something about your use of that SQL reader doesn't work in your real environment. Could be things like the address of the other service is incorrect, or there's firewalls or other things blocking the communication. For now we're going to close this down as it's probably not related to SF, but if you're stuck and you've confirmed that the connectivity is working as you'd expect and that the addresses of everyting is correct, then please reopen things.