Spark Connector MongoDB — Python API

#python #mongodb #scala #apache-spark #pyspark

#python #mongodb #scala #apache-spark #pyspark

Вопрос:

Я бы хотел извлекать данные из Mongo с помощью Spark, особенно с помощью PySpark.. Я нашел официальное руководство от Mongo https://docs.mongodb.com/spark-connector/python-api /

У меня есть все необходимые условия:

  • Scala 2.11.8
  • Spark 1.6.2
  • MongoDB 3.0.8 (не на том же устройстве, где находится Spark)

    $ pyspark —conf «spark.mongodb.input.uri=mongodb://mongo1:27019/xxx.xxx?readPreference=primaryPreferred» —пакеты org.mongodb.spark:mongo-spark-connector_2.11:1.1.0

и PySpark показал мне это:

 Python 3.4.2 (default, Oct  8 2014, 10:45:20) 
[GCC 4.9.1] on linux
Type "help", "copyright", "credits" or "license" for more information.
Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
:: loading settings :: url = jar:file:/usr/local/spark/lib/spark-assembly-1.6.2-hadoop2.6.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
org.mongodb.spark#mongo-spark-connector_2.11 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent;1.0
    confs: [default]
    found org.mongodb.spark#mongo-spark-connector_2.11;1.1.0 in central
    found org.mongodb#mongo-java-driver;3.2.2 in central
:: resolution report :: resolve 280ms :: artifacts dl 6ms
    :: modules in use:
    org.mongodb#mongo-java-driver;3.2.2 from central in [default]
    org.mongodb.spark#mongo-spark-connector_2.11;1.1.0 from central in [default]
    ---------------------------------------------------------------------
    |                  |            modules            ||   artifacts   |
    |       conf       | number| search|dwnlded|evicted|| number|dwnlded|
    ---------------------------------------------------------------------
    |      default     |   2   |   0   |   0   |   0   ||   2   |   0   |
    ---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent
    confs: [default]
    0 artifacts copied, 2 already retrieved (0kB/9ms)
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
16/10/12 16:35:46 INFO SparkContext: Running Spark version 1.6.2
16/10/12 16:35:47 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/10/12 16:35:47 INFO SecurityManager: Changing view acls to: root
16/10/12 16:35:47 INFO SecurityManager: Changing modify acls to: root
16/10/12 16:35:47 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); users with modify permissions: Set(root)
16/10/12 16:35:47 INFO Utils: Successfully started service 'sparkDriver' on port 35485.
16/10/12 16:35:48 INFO Slf4jLogger: Slf4jLogger started
16/10/12 16:35:48 INFO Remoting: Starting remoting
16/10/12 16:35:48 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriverActorSystem@192.168.28.194:39860]
16/10/12 16:35:48 INFO Utils: Successfully started service 'sparkDriverActorSystem' on port 39860.
16/10/12 16:35:48 INFO SparkEnv: Registering MapOutputTracker
16/10/12 16:35:48 INFO SparkEnv: Registering BlockManagerMaster
16/10/12 16:35:48 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-1e9185bd-fd1a-4d36-8c7e-9b6430e9f5c6
16/10/12 16:35:48 INFO MemoryStore: MemoryStore started with capacity 511.1 MB
16/10/12 16:35:48 INFO SparkEnv: Registering OutputCommitCoordinator
16/10/12 16:35:48 INFO Utils: Successfully started service 'SparkUI' on port 4040.
16/10/12 16:35:48 INFO SparkUI: Started SparkUI at http://192.168.28.194:4040
16/10/12 16:35:48 INFO HttpFileServer: HTTP File server directory is /tmp/spark-c1d32422-8241-411f-a751-e01e5f6e2b5c/httpd-d62ed1b8-e4ab-4891-9b61-5f0f5ae7eb6e
16/10/12 16:35:48 INFO HttpServer: Starting HTTP Server
16/10/12 16:35:48 INFO Utils: Successfully started service 'HTTP file server' on port 34716.
16/10/12 16:35:48 INFO SparkContext: Added JAR file:/root/.ivy2/jars/org.mongodb.spark_mongo-spark-connector_2.11-1.1.0.jar at http://192.168.28.194:34716/jars/org.mongodb.spark_mongo-spark-connector_2.11-1.1.0.jar with timestamp 1476282948892
16/10/12 16:35:48 INFO SparkContext: Added JAR file:/root/.ivy2/jars/org.mongodb_mongo-java-driver-3.2.2.jar at http://192.168.28.194:34716/jars/org.mongodb_mongo-java-driver-3.2.2.jar with timestamp 1476282948898
16/10/12 16:35:49 INFO Utils: Copying /root/.ivy2/jars/org.mongodb.spark_mongo-spark-connector_2.11-1.1.0.jar to /tmp/spark-c1d32422-8241-411f-a751-e01e5f6e2b5c/userFiles-549541b8-aaba-4444-b2eb-438baa7e82e8/org.mongodb.spark_mongo-spark-connector_2.11-1.1.0.jar
16/10/12 16:35:49 INFO SparkContext: Added file file:/root/.ivy2/jars/org.mongodb.spark_mongo-spark-connector_2.11-1.1.0.jar at file:/root/.ivy2/jars/org.mongodb.spark_mongo-spark-connector_2.11-1.1.0.jar with timestamp 1476282949018
16/10/12 16:35:49 INFO Utils: Copying /root/.ivy2/jars/org.mongodb_mongo-java-driver-3.2.2.jar to /tmp/spark-c1d32422-8241-411f-a751-e01e5f6e2b5c/userFiles-549541b8-aaba-4444-b2eb-438baa7e82e8/org.mongodb_mongo-java-driver-3.2.2.jar
16/10/12 16:35:49 INFO SparkContext: Added file file:/root/.ivy2/jars/org.mongodb_mongo-java-driver-3.2.2.jar at file:/root/.ivy2/jars/org.mongodb_mongo-java-driver-3.2.2.jar with timestamp 1476282949029
16/10/12 16:35:49 INFO Executor: Starting executor ID driver on host localhost
16/10/12 16:35:49 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 43448.
16/10/12 16:35:49 INFO NettyBlockTransferService: Server created on 43448
16/10/12 16:35:49 INFO BlockManagerMaster: Trying to register BlockManager
16/10/12 16:35:49 INFO BlockManagerMasterEndpoint: Registering block manager localhost:43448 with 511.1 MB RAM, BlockManagerId(driver, localhost, 43448)
16/10/12 16:35:49 INFO BlockManagerMaster: Registered BlockManager
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _ / _ / _ `/ __/  '_/
   /__ / .__/_,_/_/ /_/_   version 1.6.2
      /_/

Using Python version 3.4.2 (default, Oct  8 2014 10:45:20)
SparkContext available as sc, HiveContext available as sqlContext.
  

затем я вставил этот код:

 df = sqlContext.read.format("com.mongodb.spark.sql.DefaultSource").load()
  

и там было это:

 16/10/12 16:40:51 INFO HiveContext: Initializing execution hive, version 1.2.1
16/10/12 16:40:51 INFO ClientWrapper: Inspected Hadoop version: 2.6.0
16/10/12 16:40:51 INFO ClientWrapper: Loaded org.apache.hadoop.hive.shims.Hadoop23Shims for Hadoop version 2.6.0
16/10/12 16:40:51 INFO HiveMetaStore: 0: Opening raw store with implemenation class:org.apache.hadoop.hive.metastore.ObjectStore
16/10/12 16:40:51 INFO ObjectStore: ObjectStore, initialize called
16/10/12 16:40:51 INFO Persistence: Property hive.metastore.integral.jdo.pushdown unknown - will be ignored
16/10/12 16:40:51 INFO Persistence: Property datanucleus.cache.level2 unknown - will be ignored
16/10/12 16:40:51 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
16/10/12 16:40:51 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
16/10/12 16:40:53 INFO ObjectStore: Setting MetaStore object pin classes with hive.metastore.cache.pinobjtypes="Table,StorageDescriptor,SerDeInfo,Partition,Database,Type,FieldSchema,Order"
16/10/12 16:40:53 INFO Datastore: The class "org.apache.hadoop.hive.metastore.model.MFieldSchema" is tagged as "embedded-only" so does not have its own datastore table.
16/10/12 16:40:53 INFO Datastore: The class "org.apache.hadoop.hive.metastore.model.MOrder" is tagged as "embedded-only" so does not have its own datastore table.
16/10/12 16:40:54 INFO Datastore: The class "org.apache.hadoop.hive.metastore.model.MFieldSchema" is tagged as "embedded-only" so does not have its own datastore table.
16/10/12 16:40:54 INFO Datastore: The class "org.apache.hadoop.hive.metastore.model.MOrder" is tagged as "embedded-only" so does not have its own datastore table.
16/10/12 16:40:54 INFO MetaStoreDirectSql: Using direct SQL, underlying DB is DERBY
16/10/12 16:40:54 INFO ObjectStore: Initialized ObjectStore
16/10/12 16:40:55 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0
16/10/12 16:40:55 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException
16/10/12 16:40:55 INFO HiveMetaStore: Added admin role in metastore
16/10/12 16:40:55 INFO HiveMetaStore: Added public role in metastore
16/10/12 16:40:55 INFO HiveMetaStore: No user is added in admin role, since config is empty
16/10/12 16:40:55 INFO HiveMetaStore: 0: get_all_databases
16/10/12 16:40:55 INFO audit: ugi=root  ip=unknown-ip-addr  cmd=get_all_databases   
16/10/12 16:40:55 INFO HiveMetaStore: 0: get_functions: db=default pat=*
16/10/12 16:40:55 INFO audit: ugi=root  ip=unknown-ip-addr  cmd=get_functions: db=default pat=* 
16/10/12 16:40:55 INFO Datastore: The class "org.apache.hadoop.hive.metastore.model.MResourceUri" is tagged as "embedded-only" so does not have its own datastore table.
16/10/12 16:40:55 INFO SessionState: Created local directory: /tmp/8733297b-e0d2-49cf-8557-62c8c4e7cc4a_resources
16/10/12 16:40:55 INFO SessionState: Created HDFS directory: /tmp/hive/root/8733297b-e0d2-49cf-8557-62c8c4e7cc4a
16/10/12 16:40:55 INFO SessionState: Created local directory: /tmp/root/8733297b-e0d2-49cf-8557-62c8c4e7cc4a
16/10/12 16:40:55 INFO SessionState: Created HDFS directory: /tmp/hive/root/8733297b-e0d2-49cf-8557-62c8c4e7cc4a/_tmp_space.db
16/10/12 16:40:55 INFO HiveContext: default warehouse location is /user/hive/warehouse
16/10/12 16:40:55 INFO HiveContext: Initializing HiveMetastoreConnection version 1.2.1 using Spark classes.
16/10/12 16:40:55 INFO ClientWrapper: Inspected Hadoop version: 2.6.0
16/10/12 16:40:55 INFO ClientWrapper: Loaded org.apache.hadoop.hive.shims.Hadoop23Shims for Hadoop version 2.6.0
16/10/12 16:40:56 INFO HiveMetaStore: 0: Opening raw store with implemenation class:org.apache.hadoop.hive.metastore.ObjectStore
16/10/12 16:40:56 INFO ObjectStore: ObjectStore, initialize called
16/10/12 16:40:56 INFO Persistence: Property hive.metastore.integral.jdo.pushdown unknown - will be ignored
16/10/12 16:40:56 INFO Persistence: Property datanucleus.cache.level2 unknown - will be ignored
16/10/12 16:40:56 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
16/10/12 16:40:56 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
16/10/12 16:40:57 INFO ObjectStore: Setting MetaStore object pin classes with hive.metastore.cache.pinobjtypes="Table,StorageDescriptor,SerDeInfo,Partition,Database,Type,FieldSchema,Order"
16/10/12 16:40:58 INFO Datastore: The class "org.apache.hadoop.hive.metastore.model.MFieldSchema" is tagged as "embedded-only" so does not have its own datastore table.
16/10/12 16:40:58 INFO Datastore: The class "org.apache.hadoop.hive.metastore.model.MOrder" is tagged as "embedded-only" so does not have its own datastore table.
16/10/12 16:40:59 INFO Datastore: The class "org.apache.hadoop.hive.metastore.model.MFieldSchema" is tagged as "embedded-only" so does not have its own datastore table.
16/10/12 16:40:59 INFO Datastore: The class "org.apache.hadoop.hive.metastore.model.MOrder" is tagged as "embedded-only" so does not have its own datastore table.
16/10/12 16:40:59 INFO Query: Reading in results for query "org.datanucleus.store.rdbms.query.SQLQuery@0" since the connection used is closing
16/10/12 16:40:59 INFO MetaStoreDirectSql: Using direct SQL, underlying DB is DERBY
16/10/12 16:40:59 INFO ObjectStore: Initialized ObjectStore
16/10/12 16:40:59 INFO HiveMetaStore: Added admin role in metastore
16/10/12 16:40:59 INFO HiveMetaStore: Added public role in metastore
16/10/12 16:40:59 INFO HiveMetaStore: No user is added in admin role, since config is empty
16/10/12 16:40:59 INFO HiveMetaStore: 0: get_all_databases
16/10/12 16:40:59 INFO audit: ugi=root  ip=unknown-ip-addr  cmd=get_all_databases   
16/10/12 16:40:59 INFO HiveMetaStore: 0: get_functions: db=default pat=*
16/10/12 16:40:59 INFO audit: ugi=root  ip=unknown-ip-addr  cmd=get_functions: db=default pat=* 
16/10/12 16:40:59 INFO Datastore: The class "org.apache.hadoop.hive.metastore.model.MResourceUri" is tagged as "embedded-only" so does not have its own datastore table.
16/10/12 16:40:59 INFO SessionState: Created local directory: /tmp/cc4f12a5-e5b2-4a22-a240-04e1ca3727ae_resources
16/10/12 16:40:59 INFO SessionState: Created HDFS directory: /tmp/hive/root/cc4f12a5-e5b2-4a22-a240-04e1ca3727ae
16/10/12 16:40:59 INFO SessionState: Created local directory: /tmp/root/cc4f12a5-e5b2-4a22-a240-04e1ca3727ae
16/10/12 16:40:59 INFO SessionState: Created HDFS directory: /tmp/hive/root/cc4f12a5-e5b2-4a22-a240-04e1ca3727ae/_tmp_space.db
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/local/spark/python/pyspark/sql/readwriter.py", line 139, in load
    return self._df(self._jreader.load())
  File "/usr/local/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in __call__
  File "/usr/local/spark/python/pyspark/sql/utils.py", line 45, in deco
    return f(*a, **kw)
  File "/usr/local/spark/python/lib/py4j-0.9-src.zip/py4j/protocol.py", line 308, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o24.load.
: java.lang.NoSuchMethodError: scala.Predef$.$conforms()Lscala/Predef$$less$colon$less;
    at com.mongodb.spark.config.MongoCompanionConfig$class.getOptionsFromConf(MongoCompanionConfig.scala:209)
    at com.mongodb.spark.config.ReadConfig$.getOptionsFromConf(ReadConfig.scala:39)
    at com.mongodb.spark.config.MongoCompanionConfig$class.apply(MongoCompanionConfig.scala:101)
    at com.mongodb.spark.config.ReadConfig$.apply(ReadConfig.scala:39)
    at com.mongodb.spark.sql.DefaultSource.createRelation(DefaultSource.scala:67)
    at com.mongodb.spark.sql.DefaultSource.createRelation(DefaultSource.scala:50)
    at com.mongodb.spark.sql.DefaultSource.createRelation(DefaultSource.scala:36)
    at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:158)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:119)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
    at py4j.Gateway.invoke(Gateway.java:259)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:209)
    at java.lang.Thread.run(Thread.java:745)
  

Я перепробовал множество возможных вариантов извлечения данных из Mongo с помощью Spark .. Какие-нибудь советы?

Ответ №1:

Это похоже на ошибку, которую я ожидал бы увидеть, если бы использовал код, скомпилированный в другой версии Scala. Вы пробовали запускать его с --packages org.mongodb.spark:mongo-spark-connector_2.10:1.1.0 помощью?

По умолчанию в Spark 1.6.x компилируется для Scala 2.10, и вам нужно вручную собрать его для Scala 2.11 следующим образом:

 ./dev/change-scala-version.sh 2.11
mvn -Pyarn -Phadoop-2.4 -Dscala-2.11 -DskipTests clean package