Apache beam левое соединение между 2 коллекциями ПК

# #python #google-cloud-dataflow #apache-beam

Вопрос:

Я пытаюсь выполнить левое соединение между коллекцией pc и ее дубликатом, поэтому я ищу что-то вроде этого:

 ((colA, colB, colC, colD)) (a,b,e,f) (a,b,g,h) (a,b,i,j) (c,d,k,l) (c,d,m,n)  

Делаем левое соединение на colA и ColB, так что результат будет выглядеть так:

 (e,f, g,h) (e,f, i,j) (g,h, i,j)  (k,l, m,n)  

Я пришел, чтобы решить эту проблему, используя фрейм данных apache beam:

 df = to_dataframe(pcol)  with dataframe.allow_non_parallel_operations():  res = df.merge(right=df, left_on=['colA', 'colB'], right_on=['colA', 'colB']) pcoll = to_pcollection(res)  

и это работало нормально, но когда мне нужно обработать сбор больших строк, у меня возникла ошибка нехватки памяти (это было ожидаемо).

Теперь я ищу альтернативу df.merger (), но с pcollection, чтобы не столкнуться с ошибкой памяти

Ответ №1:

Если кто-нибудь интересуется этим вопросом

Я подумал об альтернативной логике. Сначала я сгруппировал свои записи по ключу, вот так:

 ((a,b),(e,f)) ((a,b),(g,h)) ((a,b),(i,j)) ((c,d),(k,l)) ((c,d),(m,n))  

после этого я объединил их, используя GroupByKey
В следующем преобразовании я попытался перебросить все возможные комбинации

 class combineLev(beam.DoFn):  #this act like df.merge  def process(self, element):  (k, v) = element  v_ = list(v)  for i in range(len(v_)):  for j in range(i,len(v_)):  if v_[i][1] != v_[j][1]:  #print(list_[i][1], list_[j][1])  yield (v_[i], v_[j])