Как создать общую библиотеку Jenkins, использующую скрипты Python?

#python #jenkins #google-cloud-pubsub

#python #Дженкинс #google-cloud-pubsub

Вопрос:

Я создал скрипт Python для публикации сообщений в Google Cloud pubsub, и я хочу обернуть его в общую библиотеку Jenkins.

Вот мой класс python publisher

 from google.cloud import pubsub_v1
from google.cloud.pubsub_v1 import futures
from concurrent.futures import TimeoutError
from typing import Mapping, Text


class Publisher:

    def __init__(self, project_id: Text, topic_id: Text):
        self._publisher = pubsub_v1.PublisherClient()
        self._topic_path = self._publisher.topic_path(project_id, topic_id)

    @staticmethod
    def callback(future: futures.Future) -> None:
        try:
            message_id = future.result()
            print(f"Successful published the message [{message_id}]")
        except TimeoutError:
            print(f"An error occurred: The request times out.")
        except Exception as exception:
            print(f"An exception occurred in publishing the message: {exception}")

    def publish(self, message: Text, **kwargs: Mapping[Text, Text]) -> None:
        data = message.encode("utf-8")
        future = self._publisher.publish(topic=self._topic_path, data=data, **kwargs)
        future.add_done_callback(self.callback)
 

И вот сценарий, который я хочу запустить внутри конвейеров jenkins

 import os
import argparse
from publisher import Publisher

if "__main__" == __name__:
    parser = argparse.ArgumentParser(description="Publish a message to a topic in google cloud pubsub.")
    parser.add_argument("--project_id", action="store", type=str, required=True, help="The google cloud project ID")
    parser.add_argument("--topic_id", action="store", type=str, required=True, help="The google cloud pubsub topic ID")
    parser.add_argument("--message", action="store", type=str, required=True,
                        help="The message you want to publish")
    parser.add_argument("--table_name", action="store", type=str, required=False, help="The data table name")
    parser.add_argument("--date", action="store", type=str, required=False, help="The date")
    parser.add_argument("--test_instance", action="store", type=str, required=False,
                        default=os.environ.get("TEST_INSTANCE", "False"), help="Is test instance? Default: "False"")
    args = parser.parse_args()

    publisher = Publisher(project_id=args.project_id, topic_id=args.topic_id)
    kwargs = {
        key: getattr(args, key)
        for key in ["table_name", "date", "test_instance"]
        if getattr(args, key) is not None
    }
    publisher.publish(message=args.message, **kwargs)

 

Может кто-нибудь мне помочь? Я новичок в Дженкинсе

Ответ №1:

Установите свои скрипты на сервер и убедитесь, что вы можете запускать их из командной строки. Возможно, вам потребуется настроить виртуальную среду и возиться с разрешениями.

В jenkins вы просто добавляете шаг сценария оболочки, который вызывает сценарий.

Комментарии:

1. я хочу инкапсулировать скрипт в разделяемую библиотеку Jenkins, чтобы скрипт можно было запускать везде.