# #python #google-cloud-dataflow #apache-beam
Вопрос:
Я пытаюсь выполнить левое соединение между коллекцией pc и ее дубликатом, поэтому я ищу что-то вроде этого:
((colA, colB, colC, colD)) (a,b,e,f) (a,b,g,h) (a,b,i,j) (c,d,k,l) (c,d,m,n)
Делаем левое соединение на colA и ColB, так что результат будет выглядеть так:
(e,f, g,h) (e,f, i,j) (g,h, i,j) (k,l, m,n)
Я пришел, чтобы решить эту проблему, используя фрейм данных apache beam:
df = to_dataframe(pcol) with dataframe.allow_non_parallel_operations(): res = df.merge(right=df, left_on=['colA', 'colB'], right_on=['colA', 'colB']) pcoll = to_pcollection(res)
и это работало нормально, но когда мне нужно обработать сбор больших строк, у меня возникла ошибка нехватки памяти (это было ожидаемо).
Теперь я ищу альтернативу df.merger (), но с pcollection, чтобы не столкнуться с ошибкой памяти
Ответ №1:
Если кто-нибудь интересуется этим вопросом
Я подумал об альтернативной логике. Сначала я сгруппировал свои записи по ключу, вот так:
((a,b),(e,f)) ((a,b),(g,h)) ((a,b),(i,j)) ((c,d),(k,l)) ((c,d),(m,n))
после этого я объединил их, используя GroupByKey
В следующем преобразовании я попытался перебросить все возможные комбинации
class combineLev(beam.DoFn): #this act like df.merge def process(self, element): (k, v) = element v_ = list(v) for i in range(len(v_)): for j in range(i,len(v_)): if v_[i][1] != v_[j][1]: #print(list_[i][1], list_[j][1]) yield (v_[i], v_[j])