#c #.net-core #ipc #mkfifo
#c #.net-ядро #ipc #mkfifo
Вопрос:
У нас есть приложение, написанное на c, которое отправляет события / уведомления в приложение, написанное на c #. Оба приложения выполняются на одном компьютере с Linux.
Приложение C:
Приложением C является Asterisk, и мы модифицировали исходный код (он с открытым исходным кодом), чтобы он мог отправлять события в наше консольное приложение dotnet. В настоящее время мы отправляем события простым добавлением текста в файл. Например, так мы отправляем событие, к которому подключился новый одноранговый узел (ip-телефон):
// place this on chan_sip.c
// Example: 1-LN-48T6-E3C5-OFWT|10.0.0.103:5868|189.217.18.244|10216|Z 3.9.32144 r32121
if(!ast_sockaddr_isnull(amp;peer->addr))
{
// lock
ast_mutex_lock(amp;some_lock);
// write to file
FILE *pFile;
pFile=fopen("/var/log/asterisk/peer-subscriptions.txt", "a");
if(pFile==NULL) { perror("Error opening file."); }
else {
fprintf(pFile,"%s|%s|%s|%s|%sn",
/* 1-LN-48T6-E3C5-OFWT */ peer->name,
/* 10.0.0.103:5868 */ pvt->initviasentby,
/* 189.217.18.244 */ ast_sockaddr_stringify_addr(amp;peer->addr),
/* 10216 */ ast_strdupa(ast_sockaddr_stringify_port(amp;peer->addr)),
/* Z 3.9.32144 */ peer->useragent
// Other:
// peer->fullcontact, // sip:1-LN-48T6-E3C5-OFWT@189.217.18.244:10216;rinstance=8b4135488f735cbf;transport=UDP
// pvt->via // SIP/2.0/UDP 54.81.92.135:20001;branch=z9hG4bK58525e18;rport
);
}
fclose(pFile);
// unlock
ast_mutex_lock(amp;some_lock);
}
Приложение C #
Приложение c # — это консольное приложение, которое открывает этот файл для чтения событий, ничего особенного.
Таким образом, в основном приложение C выполняет запись в текстовый файл, а приложение c # считывает данные из этого текстового файла.
Вопрос
Со временем файл становится большим, и я не хочу сталкиваться с проблемой его усечения и создания другой блокировки, в то время как он усекает и т. Д. … использование mkfifo
, похоже, именно то, что я хочу. Поскольку я относительно новичок в Linux, я хочу убедиться, что понимаю, как это работает, прежде чем использовать его. Я знаю основы C (я не эксперт) и хотел бы использовать более эффективный подход. Вы, ребята, рекомендуете использовать mkfifo, namedpipes или tcp?
Пример 1:
mkfifo работает потрясающе с несколькими строками, но когда я пытаюсь прочитать много строк, он терпит неудачу. Возьмем этот пример:
mkfifo foo.pipe # create a file of type pipe
На первом терминале запись в этот файл
echo "hello world" >> foo.pipe # writes hello world AND blocks until someone READS from it
На отдельном терминале я делаю:
cat foo.pipe # it will output hello world. This will block too until someone WRITES to that file
Пример 2:
mkfifo foo.pipe # create a file of type pipe. If it exists already do not create again
На терминале 1 считывается из этого файла
tail -f foo.pipe # similar to cat foo.pipe but it keeps reading
На терминале 2 запись в этот файл, но много данных
echo ~/.bashrc >> foo.pipe # write the content of file ~/.bashrc to that file
Это не работает, и на консоли отображается только несколько строк этого файла. Как я могу правильно использовать mkfifo для чтения всего текста? Должен ли я использовать другой подход и вместо этого использовать tcp?
Ответ №1:
Я бы использовал соединение с сокетами AF_UNIX.
Комментарии:
1. Спасибо за помощь в отказоустойчивости. Я уже нашел примеры на C. Но как я могу получать эти события из dotnet?
Ответ №2:
Я только что закончил использовать tcp. Я без проблем отправляю 10000 коротких сообщений за 10 секунд.
C-код (клиент)
#include<stdio.h>
#include<string.h> //strlen
#include<sys/socket.h>
#include<arpa/inet.h> //inet_addr
#include<unistd.h>
int send_data(void)
{
int socket_desc;
struct sockaddr_in server;
//Create socket
socket_desc = socket(AF_INET , SOCK_STREAM , 0);
if (socket_desc == -1)
{
printf("Could not create socket n");
return 1;
}
server.sin_addr.s_addr = inet_addr("127.0.0.1");
server.sin_family = AF_INET;
server.sin_port = htons( 11234 );
//Connect to remote server
if (connect(socket_desc , (struct sockaddr *)amp;server , sizeof(server)) < 0)
{
printf("connect error n");
close(socket_desc);
return 2;
}
char *message;
message = "hello world";
if( send(socket_desc , message , strlen(message) , 0) < 0)
{
printf("Send failed n");
close(socket_desc);
return 3;
}
close(socket_desc);
return 0;
}
int main(int argc , char *argv[])
{
// send 1000 messages
for(int i=0; i<1000; i )
{
send_data();
// 10 milliseconds
usleep(10000);
}
return 0;
}
C # код (сервер)
using System;
using System.Net;
using System.Net.Sockets;
using System.Threading;
public class Ipc_Tcp
{
// Thread signal.
public static ManualResetEvent _semaphore = new ManualResetEvent(false);
// maximum length of the pending connections queue.
const int _max_length_pending_connections_queue = 50;
const ushort _port = 11234;
static int _counter = 0;
public static void StartListening()
{
IPEndPoint localEndPoint = new IPEndPoint(System.Net.IPAddress.Loopback, _port);
// Create a TCP/IP socket.
Socket listener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
try
{
listener.Bind(localEndPoint);
listener.Listen(_max_length_pending_connections_queue);
Console.WriteLine("Waiting for a connection...");
while (true)
{
// Set the event to nonsignaled state.
_semaphore.Reset();
// Start an asynchronous socket to listen for connections.
listener.BeginAccept(new AsyncCallback(AcceptCallback), listener);
// Wait until a connection is made before continuing.
_semaphore.WaitOne();
}
}
catch (Exception e)
{
Console.WriteLine("Something bad happened:");
Console.WriteLine(e.ToString());
Console.WriteLine("nPress ENTER to continue...");
Console.Read();
}
}
// On new connection
public static void AcceptCallback(IAsyncResult ar)
{
// Signal the main thread to continue.
_semaphore.Set();
var cntr = Interlocked.Increment(ref _counter);
// Get the socket that handles the client request.
Socket listener = (Socket)ar.AsyncState;
Socket socket = listener.EndAccept(ar);
var data = new byte[1024];
var i = socket.Receive(data);
// print message every 100 times
if (cntr % 100 == 0)
Console.WriteLine($"[{cntr}] Received data: {System.Text.Encoding.UTF8.GetString(data, 0, i)}");
// close socket we are only receiving events
socket.Close();
}
public static int Main(String[] args)
{
StartListening();
return 0;
}
}
Комментарии:
1. Одна из проблем (могут быть и другие) с этим подходом заключается в том, использует ли какое-либо другое приложение ваш жестко запрограммированный порт. У меня нет удобного примера использования сокетов AF_UNIX на C #, но я уверен, что вы можете его найти.
Ответ №3:
Как указано в @resiliware, вероятно, лучше всего использовать сокет unix.
В этом примере показано, как взаимодействовать между C и C # с использованием сокета unix:
Клиент (написанный на C, работающий на ubuntu)
#include<stdio.h>
#include<string.h> //strlen
#include<sys/socket.h>
#include<unistd.h>
int send_data(void)
{
int sock;
int conn;
struct sockaddr saddr = {AF_UNIX, "/tmp/foo.sock"};
socklen_t saddrlen = sizeof(struct sockaddr) 6;
sock = socket(AF_UNIX, SOCK_STREAM, 0);
conn = connect(sock, amp;saddr, saddrlen);
char BUFF[1024];
char *message;
message = "hello world";
if( send(sock , message , strlen(message) , 0) < 0)
{
printf("Send failed n");
close(sock);
return 3;
}
// I am not sure if I should close both or only the socket.
close(conn);
close(sock);
return 0;
}
int main(int argc , char *argv[])
{
// send 5000 messages
for(int i=0; i<4000; i )
{
send_data();
// sleep 1 millisecond
usleep(1000);
}
return 0;
}
Сервер (написанный на C #, работающий на том же компьютере Ubuntu)
using System;
using System.Net.Sockets;
using System.Threading;
class Program
{
// unix Endpoint that we will use
const string path = "/tmp/foo.sock";
// Thread signal.
public static ManualResetEvent _semaphore = new ManualResetEvent(false);
// maximum length of the pending connections queue.
const int _max_length_pending_connections_queue = 100;
// Counts the number of messages received
static int _counter = 0;
public static void StartListening()
{
if (System.IO.File.Exists(path))
System.IO.File.Delete(path);
// create unix socket
var listener = new Socket(AddressFamily.Unix, SocketType.Stream, ProtocolType.Unspecified);
try
{
// listener.Bind(localEndPoint);
listener.Bind(new UnixDomainSocketEndPoint(path));
listener.Listen(_max_length_pending_connections_queue);
Console.WriteLine("Waiting for a connection...");
// keep listening for connections
while (true)
{
// Set the event to nonsignaled state.
_semaphore.Reset();
// Start an asynchronous socket to listen for connections.
listener.BeginAccept(new AsyncCallback(AcceptCallback), listener);
// Wait until a connection is made before continuing.
_semaphore.WaitOne();
}
}
catch (Exception e)
{
Console.WriteLine("Something bad happened:");
Console.WriteLine(e.ToString());
Console.WriteLine("nPress ENTER to continue...");
Console.Read();
}
}
// On new connection
public static void AcceptCallback(IAsyncResult ar)
{
// Signal the main thread to continue.
_semaphore.Set();
var cntr = Interlocked.Increment(ref _counter);
// Get the socket that handles the client request.
Socket listener = (Socket)ar.AsyncState;
Socket socket = listener.EndAccept(ar);
var data = new byte[1024];
var i = socket.Receive(data);
// print message every 100 times
//if (cntr % 100 == 0)
Console.WriteLine($"[{cntr}] Received data: {System.Text.Encoding.UTF8.GetString(data, 0, i)}");
// close socket we are only receiving events
socket.Close();
}
static void Main(string[] args)
{
StartListening();
}
}
Клиент (если вы хотите, чтобы код клиента был написан на C # вместо C)
using (var socket = new Socket(AddressFamily.Unix, SocketType.Stream, ProtocolType.Unspecified))
{
socket.Connect(new UnixDomainSocketEndPoint(path));
// send hello world
var dataToSend = System.Text.Encoding.UTF8.GetBytes("Hello-world!");
socket.Send(dataToSend);
}