#python #jenkins #google-cloud-pubsub
#python #Дженкинс #google-cloud-pubsub
Вопрос:
Я создал скрипт Python для публикации сообщений в Google Cloud pubsub, и я хочу обернуть его в общую библиотеку Jenkins.
Вот мой класс python publisher
from google.cloud import pubsub_v1
from google.cloud.pubsub_v1 import futures
from concurrent.futures import TimeoutError
from typing import Mapping, Text
class Publisher:
def __init__(self, project_id: Text, topic_id: Text):
self._publisher = pubsub_v1.PublisherClient()
self._topic_path = self._publisher.topic_path(project_id, topic_id)
@staticmethod
def callback(future: futures.Future) -> None:
try:
message_id = future.result()
print(f"Successful published the message [{message_id}]")
except TimeoutError:
print(f"An error occurred: The request times out.")
except Exception as exception:
print(f"An exception occurred in publishing the message: {exception}")
def publish(self, message: Text, **kwargs: Mapping[Text, Text]) -> None:
data = message.encode("utf-8")
future = self._publisher.publish(topic=self._topic_path, data=data, **kwargs)
future.add_done_callback(self.callback)
И вот сценарий, который я хочу запустить внутри конвейеров jenkins
import os
import argparse
from publisher import Publisher
if "__main__" == __name__:
parser = argparse.ArgumentParser(description="Publish a message to a topic in google cloud pubsub.")
parser.add_argument("--project_id", action="store", type=str, required=True, help="The google cloud project ID")
parser.add_argument("--topic_id", action="store", type=str, required=True, help="The google cloud pubsub topic ID")
parser.add_argument("--message", action="store", type=str, required=True,
help="The message you want to publish")
parser.add_argument("--table_name", action="store", type=str, required=False, help="The data table name")
parser.add_argument("--date", action="store", type=str, required=False, help="The date")
parser.add_argument("--test_instance", action="store", type=str, required=False,
default=os.environ.get("TEST_INSTANCE", "False"), help="Is test instance? Default: "False"")
args = parser.parse_args()
publisher = Publisher(project_id=args.project_id, topic_id=args.topic_id)
kwargs = {
key: getattr(args, key)
for key in ["table_name", "date", "test_instance"]
if getattr(args, key) is not None
}
publisher.publish(message=args.message, **kwargs)
Может кто-нибудь мне помочь? Я новичок в Дженкинсе
Ответ №1:
Установите свои скрипты на сервер и убедитесь, что вы можете запускать их из командной строки. Возможно, вам потребуется настроить виртуальную среду и возиться с разрешениями.
В jenkins вы просто добавляете шаг сценария оболочки, который вызывает сценарий.
Комментарии:
1. я хочу инкапсулировать скрипт в разделяемую библиотеку Jenkins, чтобы скрипт можно было запускать везде.