Получить размер файла потоковой передачи с перехватчика сервера grpc

# #go #grpc #middleware #interceptor #go-grpc-middleware

Вопрос:

У меня есть метод двунаправленного потока Go-сервера, определенный следующим образом в прото-файле:

 syntax = "proto3";
option go_package="pdfcompose/;pdfcompose";
package pdfcompose;

service PdfCompose {
  rpc Send (stream FileForm) returns (stream PdfFile) {}
}

message FileForm {
  bytes Upfile1 = 1;
  bytes Upfile2 = 2;
  bytes Upfile3 = 3;
}

message PdfFile {
  bytes File = 1;
}
 

И мой перехватчик журналов имеет следующий интерфейс:

 func logInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler)  error {
    fmt.Println("Log Interceptor")
    err := handler(srv, ss)
    if err != nil {
        return err
    }
    return nil
}
 

И я использую https://github.com/grpc-ecosystem/go-grpc-middleware в качестве двигателя-перехватчика.
Мне нужно реализовать регистрацию размера потокового файла (в образовательных целях) и попытаться выяснить, где я могу получить какие-либо данные о форме файла и его содержимом.

Мое первое предположение состояло в том, чтобы изучить аргумент grpc.ServerStream (ss), чтобы найти что-то об этом, и похоже, что он содержит много данных, таких как максимальный и минимальный размер сообщений, но отмечает фактическую длину содержимого.

Как я могу получить размер входящего файла с помощью такого рода перехватчиков?

Комментарии:

1. Поскольку контент передается в потоковом режиме, он будет поступать со временем (поэтому информация недоступна во время вызова перехватчика; вам нужно подождать handler ). Вы хотите создать запись журнала по мере PdfFile получения каждой записи (это проще сделать в обработчике, а не в перехватчике) или просто общее количество обработанных байтов (известно только при закрытии потока; думаю, для этого потребуется обернутый поток ).

2. Да, вы правы, именно так ИИ удается реализовать это!

Ответ №1:

Итак, как упоминал @Brits выше, рабочий способ реализовать то, что я хотел, — это написать оболочку для потока.

Вот пример: https://github.com/grpc-ecosystem/go-grpc-middleware/blob/master/validator/validator.go Я взял следующий код из этого репозитория, и я надеюсь, что правильно понимаю лицензию apache2 и нет никаких проблем с копированием части этого кода:

 // StreamServerInterceptor returns a new streaming server interceptor that validates incoming messages.
//
// The stage at which invalid messages will be rejected with `InvalidArgument` varies based on the
// type of the RPC. For `ServerStream` (1:m) requests, it will happen before reaching any userspace
// handlers. For `ClientStream` (n:1) or `BidiStream` (n:m) RPCs, the messages will be rejected on
// calls to `stream.Recv()`.
func StreamServerInterceptor() grpc.StreamServerInterceptor {
    return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
        wrapper := amp;recvWrapper{stream}
        return handler(srv, wrapper)
    }
}

type recvWrapper struct {
    grpc.ServerStream
}

func (s *recvWrapper) RecvMsg(m interface{}) error {
    if err := s.ServerStream.RecvMsg(m); err != nil {
        return err
    }

    if err := validate(m); err != nil {
        return err
    }

    return nil
}
 

validate() функция фактически получает запрос как interface{} , и вам нужно использовать утверждение типа, чтобы привести этот запрос к типу, который вам нужен как v := req.(type) .

Например, я набрал запрос FileForm и смог проверить его содержимое:

 func logFileSize(req interface{}) error {
    m := req.(*pdfcompose.FileForm)
    println("SizeOfUpfile1: "   strconv.Itoa(int(binary.Size(m.Upfile1))))

    if m.Upfile2 != nil {
        println("SizeOfUpfile2: "   strconv.Itoa(int(binary.Size(m.Upfile3))))
    }
    if m.Upfile3 != nil {
        println("SizeOfUpfile2: "   strconv.Itoa(int(binary.Size(m.Upfile3))))
    }
    return nil
}

func (s *recvWrapper) RecvMsg(m interface{}) error {
    if err := s.ServerStream.RecvMsg(m); err != nil {
        return err
    }
    //z := m.(pdfcompose.FileForm)
    if err := logFileSize(m); err != nil {
        return err
    }

    return nil
}

func StreamServerInterceptor() grpc.StreamServerInterceptor {
    return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
        wrapper := amp;recvWrapper{stream}
        return handler(srv, wrapper)
    }
}