Asp.NET Core – SignalR’da Streaming

Merhaba,

Önceki yazılarımızda klavyeye aldığımız Asp.NET Core – SignalR yazı serisinde SignalR teknolojisini sürekli tekil mesajların iletimi üzerine ele almıştık. Halbuki SignalR aynı zamanda message streaming’i destekleyen bir yapıya sahiptir. Evet, iki konum arasındaki akış sırasında iletilecek mesaja dair tüm verilerin kullanılabilir olmasını beklemektense, mesajın herbir parçasının/fragment’in kullanılabilir hale gelmesiyle client’a ya da server’a parça parça gönderilmesi mümkündür ve bu davranış client’tan server’a ya da server’dan client’a olacak şekilde streaming edilebilmektedir. Bu içeriğimizde SignalR teknolojisindeki mevzu bahis olan streaming davranışını masaya yatıracak, Client-to-Server Streaming ve Server-to-Client Streaming olmak üzere iki farklı yaklaşımı tam teferruatlı bir şekilde inceliyor olacağız.

Streaming Nedir?

Öncelikle incelemede bulunacağımız streaming davranışının ne olduğunu kısaca izah ederek başlamakta fayda görmekteyim. Streaming; bir mesajı hedef noktaya gönderirken bütünsel olarak göndermek yerine parçalar halinde göndermeyi ifade eden bir kavramdır. Bu mesaj, küçük parçalara ayrılmış büyük bir nesne/veri olabileceği gibi, her biri farklı öğeler olan bir nesne/veri koleksiyonu da olabilir.

Misal olarak bir dosyanın server’dan client’a gönderilmesi streaming ile gerçekleştirilebilir. Dosyanın tamamını bir seferde belleğe yüklemek yerine, parçalanıp gönderilmesi bir streaming’dir ve bu davranış ile hem bellek kullanımı daha etkili hale getirilebilir hem de işlem süresi daha da optimize edilebilir.

Streaming Ne İçin Kullanılır?

Mantıken iki farklı bloğun haberleşme sürecindeki zamansal ve performans açısından doğabilecek maliyetleri daha da törpülemek için diyebiliriz. Düşünsenize… Tarayıcı üzerinden bir video izlemek istediğinizde tüm videonun yüklenmesini mi bekliyorsunuz? Eğer böyle bir durum söz konusu olsaydı muhtemelen internetten bir film izleyebilmek için öncelikle kayda değer süre boyunca beklenmesi gerekecekti. Lakin videonun tamamının yüklenmesini beklemek yerine, video küçük parçalar halinde indirilmekte ve her parça geldikçe izleme gerçekleştirilmektedir. Bu şekilde hem videonun adil bir süre sonraki kısmı umrunuzda olmayacak hem de video izleme deneyimi tamamen kesintisiz olacaktır. Ee haliyle bu tarz bir durumda streaming davranışı olmazsa olmazdır diyebiliriz.

Ya da bir başka örnek vermemiz gerekirse eğer Kapalı Devre Televizyon/Closed-Circuit Television(CCTV) kameralarında video akışı süreklidir. Yani kamera çekim yaparken sadece veri parçalarını alır ve gönderir. İşte bu tarz senaryolarda da streaming tam anlamıyla uygulanabilecek tek senaryodur diyebiliriz.

SignalR’da Streaming Davranışı Niye?

Esasında streaming deyince akla direkt gRPC geliyor olabilir. Evet, bizler gRPC ile başarılı streaming davranışlarını oldukça hızlı ve performanslı bir şekilde gerçekleştirebiliyoruz. Amma velakin SignalR’da da real-time’da streaming davranışını direkt olarak sergilememiz gereken senaryolar olabilir. Tamam, SignalR ile birlikte gRPC’yi kullanarak streaming’i uygulayabiliriz. Ama salt bir şekilde gRPC’ye ihtiyaç duyulmaksızın da bu ihtiyacı karşılayabilmekteyiz.

SignalR İle Client-to-Server Streaming

Bu akış türünde, client’tan server’a bir stream başlatılır ve bu stream bitene ve sonlandırılana kadar mesaj gönderme işlemi devam eder. Server, mesajları aynı çağrı bağlamında geldikleri sırayla edinecek ve mutlak veriyi bütünsel olarak elde edene kadar bu süreç devam edecektir. Burada önemli olan nokta şudur ki, akışı kontrol eden client’tır. Tabi server, istediği an akışı kapatabilecek bir yeteneğe sahiptir ancak normal bir şekilde kapatmanın söz konusu olabilmesi için client tarafından bu eylemin gelmesi gerekmektedir. Client-to-Server Streaming için aşağıdaki gibi Javascript çalışması yürütülebilir.

const subject = new Subject();
this.hubConnection.send("UploadStream", subject);
let count = 0;
const intervalHandle = setInterval(() => {
  count++;
  subject.next(`Merhaba ${count}`);
  if (count === 10) {
    clearInterval(intervalHandle);
    subject.complete();
  }
}, 1000);

Burada yapılan çalışmaya göz atarsak eğer ‘hubconnection’ üzerinden send fonksiyonuna çağrıda bulunuyor ve sunucudaki UploadStream metodunu tetikliyoruz ve böylece bir stream başlatmış oluyoruz. Tabi dikkat ederseniz stream’de bir veri yayınlayabilmek için RxJS kütüphanesinde bulunan Subject nesnesinden istifade ediyoruz. Ne zaman ki bu subject’i complete ederiz, o zaman oluşturulan stream sonlandırılmış olacaktır.

Peki hocam, bu client-to-server stream isteğinin karşılanması için sunucu tarafında nasıl bir çalışma yapmalıyız? diye sorduğunuzu duyar gibiyim… Client’tan başlatılan stream’i karşılayabilmek için aşağıdaki gibi ChannelReader<T> veya IAsyncEnumerable<T> türlerinden parametre alan metotlardan birinin bulunduğu hub sınıfının tanımlanması yeterli olacaktır.

public class ExampleHub : Hub
{
    public async Task UploadStream(IAsyncEnumerable<string> stream)
    {
        await foreach (var item in stream)
        {
            Console.WriteLine(item);
        }
    }
}

veya


public class ExampleHub : Hub
{
    public async Task UploadStream(ChannelReader<string> stream)
    {
        while (await stream.WaitToReadAsync())
        {
            await Console.Out.WriteLineAsync(await stream.ReadAsync());
        }
    }
}

SignalR; hub içerisinde yukarıdaki gibi ilgili parametrelere sahip metotları gördüğü an, bunların otomatik olarak client’tan server’a stream edilebilir hub metotları olduğunu anlayacaktır.

Bu yaklaşıma misal olarak bir de .NET client ile örneklendirmede bulunabiliriz. Şöyle ki;

using Microsoft.AspNetCore.SignalR.Client;
 
var connection = new HubConnectionBuilder()
          .WithUrl("https://localhost:7132/example-hub")
          .Build();
 
await connection.StartAsync();
 
await connection.SendAsync("UploadStream", ClientStreamData());
 
async IAsyncEnumerable<string> ClientStreamData()
{
    for (int i = 0; i < 10; i++)
    {
        await Task.Delay(750);
        yield return $"Merhaba {i}";
    }
}

ya da aşağıdaki gibi ChannelWriter eşliğinde de(channel.Writer) stream başlatılıp, veriler gönderilebilir.

using Microsoft.AspNetCore.SignalR.Client;
using System.Threading.Channels;
 
var connection = new HubConnectionBuilder()
          .WithUrl("https://localhost:7132/example-hub")
          .Build();
 
await connection.StartAsync();
 
var channel = Channel.CreateBounded<string>(10);
await connection.SendAsync("UploadStream", channel.Reader);
for (int i = 0; i < 10; i++)
{
    await Task.Delay(750);
    await channel.Writer.WriteAsync($"Merhaba {i}");
}
channel.Writer.Complete();

Dikkat ederseniz bu yöntemde Complete metodu ile mesaj gönderme işlemi bittiği taktirde stream tamamlanmaktadır.

Evet, görüldüğü üzere SignalR ile client-to-server stream yöntemini tam teferruatlı incelemiş bulunuyoruz. Şimdi sıra server-to-client stream yöntemine gelmiş bulunmaktadır. Gelin bu yönteminde nasıl bir davranışa sahip olduğunu ve nasıl yapılandırıldığını incelemeye başlayalım…

SignalR İle Server-to-Client Streaming

Server streaming’de ise adından da anlaşılacağı üzere client streaming’in tam tersi şekilde çalışma sergilenmektedir. Client, server üzerinde bir server streaming endpoint’ini tetiklemesiyle akışa subscribe olacaktır ve akış sonlanana kadar mesajları elde etmeye başlayacaktır.

Bunun için öncelikle server kısmında streaming metodu oluşturulması gerekecektir. SignalR’da, bir hub içerisinde IAsyncEnumerable<T>ChannelReader<T>Task<IAsyncEnumerable<T>> veya Task<ChannelReader<T>> türlerinde geriye değer döndüren metotlar varsa eğer bunlar otomatik olarak streaming metodu olarak değerlendirilir.

Misal olarak aşağıda IAsyncEnumerable<T> türünden geriye dönüş değeri olan bir server hub stream metodu tanımlanmıştır.

public class ExampleHub : Hub
{
    public async IAsyncEnumerable<string> DownloadStream(int count, int delay, CancellationToken cancellationToken)
    {
        for (int i = 0; i < count; i++)
        {
            cancellationToken.ThrowIfCancellationRequested();
 
            yield return $"Merhaba {i}";
            await Task.Delay(delay, cancellationToken);
        }
    }
}

Burada dikkat ederseniz eğer client’ın stream’i iptal edebilmesi için bir CancellationToken parametresi tanımlanabilmektedir. Böylece bu parametre aracılığıyla client; stream sona ermeden önce bağlantıyı kesebilir, sunucudaki işlemi durdurabilir ve tüm kaynakları serbest bırakabilir.

Şimdi aynı işlemi Channel‘lar ile sağlayacak olan hub metodunu oluşturalım;

public class ExampleHub : Hub
{
    public async Task<ChannelReader<string>> DownloadStream(int count, int delay, CancellationToken cancellationToken)
    {
        var channel = Channel.CreateUnbounded<string>();
        for (int i = 0; i < count; i++)
        {
            await channel.Writer.WriteAsync($"Merhaba {i}.", cancellationToken);
            await Task.Delay(delay, cancellationToken);
        }
        return channel.Reader;
    }
}

Tabi burada Channel‘ların kullanıldığı senaryoda ChannelReader return edilene kadar hub çağrıları engellenecektir.

Hangi çalışma olursa olsun her ikisini de aşağıdaki Javascript çalışmasıyla tetikleyebilir ve server tabanlı streaming’i başlatabiliriz.

this.hubConnection.stream("DownloadStream", count, delay)
  .subscribe({
    next: item => {
      console.log(item);
    },
    complete: () => {
      console.log("Stream completed");
    },
    error: error => {
      console.log(error);
    }
  });

Benzer mantıkla aynı streaming’i aşağıdaki .NET client’ı üzerinden de tüketebiliriz.

using Microsoft.AspNetCore.SignalR.Client;
 
var connection = new HubConnectionBuilder()
          .WithUrl("https://localhost:7132/example-hub")
          .Build();
 
await connection.StartAsync();
 
CancellationTokenSource cancellationTokenSource = new();
var stream = connection.StreamAsync<string>("DownloadStream", 10, 170, cancellationTokenSource.Token);
 
await foreach (var message in stream)
{
    Console.WriteLine(message);
}

Yukarıdaki kod bloğunu incelersek eğer StreamAsync fonksiyonu eşliğinde server streaming başlatılmakta ve bu fonksiyon ile IAsyncEnumerable<T> referansıyla çalışılmaktadır.

Ayrıca .NET client’ı aşağıdaki gibi StreamAsChannelAsync metoduyla da oluşturabilir ve böylece ChannelReader<T> referansıyla da streaming edilen datayı elde edebiliz.

using Microsoft.AspNetCore.SignalR.Client;
 
var connection = new HubConnectionBuilder()
          .WithUrl("https://localhost:7132/example-hub")
          .Build();
 
await connection.StartAsync();
 
CancellationTokenSource cancellationTokenSource = new();
var channel = await connection.StreamAsChannelAsync<string>("DownloadStream", 10, 170, cancellationTokenSource.Token);
 
while (await channel.WaitToReadAsync())
{
    while (channel.TryRead(out var message))
    {
        Console.WriteLine(message);
    }
}

İşte bu kadar basit 🙂

Böylece içeriğimiz boyunca SignalR kapsamında var olan streaming özelliğini incelemiş bulunuyor ve client-to-server ve server-to-client davranışlarının nasıl sağlanacağını Javascript ve .NET client’ları eşliğinde değerlendirmiş bulunuyoruz.

İlgilenenlerin faydalanması dileğiyle…
Sonraki yazılarımda görüşmek üzere…
İyi çalışmalar…

Hasan Aslan

You may also like...

Bir yanıt yazın

E-posta adresiniz yayınlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir