#apache-spark #pyspark #apache-spark-sql
#apache-spark #pyspark #apache-spark-sql
Вопрос:
Моя схема json выглядит следующим образом
{
"uid": "a7f2e98835c1fb67e9aa9f1fbaae5e98",
"gender": "F",
"click": [
{
"url": "htp://abc.com/1.html?utm_campaign=397"
},
{
"url": "htp://qaz.com/1.html?utm_campaign=397"
}
]
}
у меня есть udf, который очищает URL-адрес.url, например my_udf(«htp:// abc.com /1.html?utm_campaign= 397»), я получаю abc.com
Я хочу получить фрейм данных с очищенным URL:
uid gender urls
a7f2e98835c1fb67e9aa9f1fbaae5e98 F [abc.com,qaz.com]
Мой код:
from pyspark.sql import functions as F
from pyspark.sql.types import *
import re
from urllib.parse import urlparse
from urllib.request import urlretrieve, unquote
clean = F.udf (lambda z:my_udf(z), ArrayType(StringType()))
def my_udf(url):
url = re.sub('(http(s)*://) ', 'http://', url)
parsed_url = urlparse(unquote(url.strip()))
if parsed_url.scheme not in ['http','https']: return None
netloc = re.search("(?:www.)?(.*)", parsed_url.netloc).group(1)
if netloc is not None: return str(netloc.encode('utf8')).strip()
return None
dataFrame = spark.read.json('1.json')
.withColumn("urls", clean(F.col("click.url")))
.select( F.col("uid"), F.col("gender"), F.col("urls") )
show(3)
Но я получаю ошибки:
TypeError: expected string or bytes-like object
Что я делаю не так?
Комментарии:
1. ваше определение udf проблематично — вам не нужно
lambda
. Также можете ли вы показать исходный код дляmy_udf
?2. добавлен код my_udf
3. попробуйте
clean = F.udf (my_udf, ArrayType(StringType()))
Ответ №1:
я сделал это:
dataFrame = spark.read.json('1.json')
.withColumn("urls_exploded", F.explode( F.col("click.url") ))
.withColumn("urls_cleaned", my_udf(F.col("urls_exploded")))
.groupBy(F.col("uid"),F.col("gender") )
.agg(F.collect_set(F.col("urls_cleaned")).alias("urls") )
.select( F.col("uid"), F.col("gender_age"), F.col("urls") )
.show(1,truncate=False)