从Java中的Spark运行Cassandra时出错 – org.apache.spark.sql.catalyst中的NoClassDefFoundError

我正在使用Cassandra 3.0.3,Spark 1.6.0并试图通过组合http://www.datastax.com/dev/blog/accessing-cassandra-from-spark-in-java中的旧文档和https://github.com/datastax/spark-cassandra-connector/blob/master/...

我正在使用Cassandra 3.0.3,Spark 1.6.0并试图通过组合http://www.datastax.com/dev/blog/accessing-cassandra-from-spark-in-java中的旧文档和https://github.com/datastax/spark-cassandra-connector/blob/master/doc/7_java_api.md中的新文档来运行.

这是我的pom.xml文件

<?xml version="1.0" encoding="UTF-8"?>
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 <modelVersion>4.0.0</modelVersion>
 <groupId>muhrafifm</groupId>
 <artifactId>spark-cass-twitterdw</artifactId>
 <version>1.0</version>
 <packaging>jar</packaging>
 <build>
    <plugins>
      <plugin>
          <artifactId>maven-compiler-plugin</artifactId>
          <version>3.0</version>
          <configuration>
              <source>1.7</source>
              <target>1.7</target>
          </configuration>
      </plugin>
    </plugins>
</build>
<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.7</maven.compiler.source>
    <maven.compiler.target>1.7</maven.compiler.target>
</properties>
<dependencies>        
    <dependency>
        <groupId>com.datastax.cassandra</groupId>
        <artifactId>cassandra-driver-core</artifactId>
        <version>3.0.0</version>
    </dependency>
    <dependency>
        <groupId>com.googlecode.json-simple</groupId>
        <artifactId>json-simple</artifactId>
        <version>1.1.1</version>
        <type>jar</type>    
    </dependency>
    <dependency>
        <groupId>com.datastax.spark</groupId>
        <artifactId>spark-cassandra-connector_2.10</artifactId>
        <version>1.6.0-M1</version>
        <type>jar</type>
    </dependency>
    <dependency>
        <groupId>com.datastax.spark</groupId>
        <artifactId>spark-cassandra-connector-java_2.10</artifactId>
        <version>1.6.0-M1</version>
        <type>jar</type>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.10</artifactId>
        <version>1.6.0</version>
        <type>jar</type>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.10</artifactId>
        <version>1.6.0</version>
        <type>jar</type>
    </dependency>
    <dependency>
        <groupId>org.apache.thrift</groupId>
        <artifactId>libthrift</artifactId>
        <version>0.9.1</version>
     </dependency>
</dependencies>

我所做的更改基本上是方法javaFunction,这是我根据新文档更改javaFunction后的方法之一.我还包括import static com.datastax.spark.connector.japi.CassandraJavaUtil.*;

private void generateData(JavaSparkContext sc) {
    CassandraConnector connector = CassandraConnector.apply(sc.getConf());

    // Prepare the schema
    try (Session session = connector.openSession()) {
        session.execute("DROP KEYSPACE IF EXISTS java_api");
        session.execute("CREATE KEYSPACE java_api WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}");
        session.execute("CREATE TABLE java_api.products (id INT PRIMARY KEY, name TEXT, parents LIST<INT>)");
        session.execute("CREATE TABLE java_api.sales (id UUID PRIMARY KEY, product INT, price DECIMAL)");
        session.execute("CREATE TABLE java_api.summaries (product INT PRIMARY KEY, summary DECIMAL)");
    }

    // Prepare the products hierarchy
    List<Product> products = Arrays.asList(
            new Product(0, "All products", Collections.<Integer>emptyList()),
            new Product(1, "Product A", Arrays.asList(0)),
            new Product(4, "Product A1", Arrays.asList(0, 1)),
            new Product(5, "Product A2", Arrays.asList(0, 1)),
            new Product(2, "Product B", Arrays.asList(0)),
            new Product(6, "Product B1", Arrays.asList(0, 2)),
            new Product(7, "Product B2", Arrays.asList(0, 2)),
            new Product(3, "Product C", Arrays.asList(0)),
            new Product(8, "Product C1", Arrays.asList(0, 3)),
            new Product(9, "Product C2", Arrays.asList(0, 3))
    );

    JavaRDD<Product> productsRDD = sc.parallelize(products);       
    javaFunctions(productsRDD).writerBuilder("java_api", "products", mapToRow(Product.class)).saveToCassandra();

    JavaRDD<Sale> salesRDD = productsRDD.filter(new Function<Product, Boolean>() {
        @Override
        public Boolean call(Product product) throws Exception {
            return product.getParents().size() == 2;
        }
    }).flatMap(new FlatMapFunction<Product, Sale>() {
        @Override
        public Iterable<Sale> call(Product product) throws Exception {
            Random random = new Random();
            List<Sale> sales = new ArrayList<>(1000);
            for (int i = 0; i < 1000; i++) {
                sales.add(new Sale(UUID.randomUUID(), product.getId(), BigDecimal.valueOf(random.nextDouble())));
            }
            return sales;
        }
    });
    javaFunctions(salesRDD).writerBuilder("java_api", "sales", mapToRow(Sale.class)).saveToCassandra();
}

这是我得到的错误.

16/03/04 13:29:06 INFO Cluster: New Cassandra host /127.0.0.1:9042 added
16/03/04 13:29:06 INFO CassandraConnector: Connected to Cassandra cluster: Test Cluster
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/sql/catalyst/package$ScalaReflectionLock$
    at org.apache.spark.sql.catalyst.ReflectionLock$.<init>(ReflectionLock.scala:5)
    at org.apache.spark.sql.catalyst.ReflectionLock$.<clinit>(ReflectionLock.scala)
    at com.datastax.spark.connector.mapper.ReflectionColumnMapper.<init>(ReflectionColumnMapper.scala:38)
    at com.datastax.spark.connector.mapper.JavaBeanColumnMapper.<init>(JavaBeanColumnMapper.scala:10)
    at com.datastax.spark.connector.util.JavaApiHelper$.javaBeanColumnMapper(JavaApiHelper.scala:93)
    at com.datastax.spark.connector.util.JavaApiHelper.javaBeanColumnMapper(JavaApiHelper.scala)
    at com.datastax.spark.connector.japi.CassandraJavaUtil.mapToRow(CassandraJavaUtil.java:1204)
    at com.datastax.spark.connector.japi.CassandraJavaUtil.mapToRow(CassandraJavaUtil.java:1222)
    at muhrafifm.spark.cass.twitterdw.Demo.generateData(Demo.java:69)
    at muhrafifm.spark.cass.twitterdw.Demo.run(Demo.java:35)
    at muhrafifm.spark.cass.twitterdw.Demo.main(Demo.java:181)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.catalyst.package$ScalaReflectionLock$
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    ... 11 more
16/03/04 13:29:40 INFO CassandraConnector: Disconnected from Cassandra cluster: Test Cluster
16/03/04 13:29:41 INFO SparkContext: Invoking stop() from shutdown hook
16/03/04 13:29:41 INFO SparkUI: Stopped Spark web UI at http://10.144.233.28:4040
16/03/04 13:29:41 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
16/03/04 13:29:42 INFO MemoryStore: MemoryStore cleared
16/03/04 13:29:42 INFO BlockManager: BlockManager stopped
16/03/04 13:29:42 INFO BlockManagerMaster: BlockManagerMaster stopped
16/03/04 13:29:42 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
16/03/04 13:29:42 INFO SparkContext: Successfully stopped SparkContext
16/03/04 13:29:42 INFO ShutdownHookManager: Shutdown hook called
16/03/04 13:29:42 INFO ShutdownHookManager: Deleting directory /tmp/spark-    16fd2ae2-b61b-4411-a776-1e578caabba6
------------------------------------------------------------------------
BUILD FAILURE

有什么我做错了吗?它似乎需要我甚至不使用的包,有什么可以解决的吗?或者我应该使用以前版本的cassandra-spark-connector?

感谢您的回复,谢谢.

解决方法:

代码正在寻找

org/apache/spark/sql/catalyst/package$ScalaReflectionLock$

所以你应该包含spark-sql库,它具有正确的依赖性.

本文标题为:从Java中的Spark运行Cassandra时出错 – org.apache.spark.sql.catalyst中的NoClassDefFoundError