#python #apache-beam
Вопрос:
У меня есть таблица postgres со схемой
row_id INT not null person_id INT not null case_id INT not null seq_num INT code VARCHAR(10)
Тестовый конвейер для чтения данных с использованием ReadFromJDBC из Apache Beam Python sdk (2.34.0), Python 3.8.x
types = [ ('row_id', int), ('person_id', int), ('case_id', int), ('seq_num', int), ('code', str) ] TableRow = typing.NamedTuple('TableRow', types) with TestPipeline() as p: coders.registry.register_coder(TableRow, coders.RowCoder) p.not_use_test_runner_api = True result = ( p | 'Read from jdbc' gt;gt; ReadFromJdbc( table_name=table_name, query='SELECT * FROM table_name LIMIT 100', driver_class_name='org.postgresql.Driver', jdbc_url=postgres_jdbc_url, username=username, password=password, ))
выдает мне следующую ошибку
ValueError: Failed to decode schema due to an issue with Field proto: name: "code" type { nullable: true logical_type { urn: "beam:logical_type:javasdk:v1" payload: "202SNAPPY00000000010000000100000223325010360U2543550005sr00=org.apache.beam.sdk.io.jdbc.LogicalTypes$VariableLengthStringrlt;273'6u341257020001I00tmaxt3514xr008242X0020JdbcL31i270246376361367203_313a020004L0010argumentt0022Ljava/lang/Object;L0014ar 0124334t00.Lorg/t31600/013164/sdk/schemas/S051024$Field01034;L0010base0114Dq00~0003L00nidentifier6r00t35730;xpsr00210121100.01211lt;.Integer223422402443672012078%07$05valuexr002031(hNumber2062542253513224340213020000xp000000nsr006N(01r26224.AutoV01N00_t27404_F21274h9304m364S243227P020010L0025collectionEle!/353230413l9\10t000216"0100L312$;L00nmapKey35S1414map052273524,10metadatat0017)25234util/Map!g(nullablet0023t35!gt;8/Boolean;L00trowt34310t00$2122430001T(typeNamet00-21220000$0125401/20;xr00,nu01t210Y'3434526035252D00370H%266010114sr0036%3330134204.C5|Ds$EmptyMapY624205Z3343473200536202r364,315 r200325234372356020001ZQ230p00p~r00 21223400213140000r0130220000xr001605225!Z20.Enumr340535$pt0005INT32sA35100t0130601t002201051024p~0107\25t0006STRINGt0007VARCHAR000000n" representation { atomic_type: STRING } argument_type { atomic_type: INT32 } argument { atomic_value { int32: 10 } } } } id: 4 encoding_position: 4
Мне трудно понять, что я должен сделать, чтобы это исправить. Похоже, что Apache Beam не знает, как декодировать VARCHAR(10) в строку. Какую правильную схему использовать? Или что мне еще делать?