0 votes

Hi again :)

I'm trying to play with geozip processor and with elasticsearch / Kibana. My sample contains two columns "zipcode,country" and the geozip processor generates me a third column with a Geopoint value (that processor is perfect!) . I also added an other string column which is the concatenation of latitude and longitude, separed by a comma (this is a valid Geopoint format in ES). Example :

42800;POINT (4.6657 45.5598);4.6657,45.5598;France;
69480;POINT (4.7028 45.9126);4.7028,45.9126;France;

I syncronise those datas to elasticsearch. Unfortunately, the automatically inferred schema is not detecting geoPoint types:

{
  "test" : {
    "mappings" : {
      "test" : {
        "properties" : {
          "Country" : {
            "type" : "string",
            "fields" : {
              "Country_facet" : {
                "type" : "string",
                "index" : "not_analyzed"
              }
            }
          },
          "PostCodeZip" : {
            "type" : "long",
            "store" : true
          },
          "geo" : {
            "type" : "string",
            "fields" : {
              "geo_facet" : {
                "type" : "string",
                "index" : "not_analyzed"
              }
            }
          },
          "geopoint" : {
            "type" : "string",
            "fields" : {
              "geopoint_facet" : {
                "type" : "string",
                "index" : "not_analyzed"
              }
            }
          }
        }
      }
    }
  }
}

So I tried to delete the ES index and apply my own schema with correct fields types, through a mapping  : 

curl -XPUT ../test/test/_mapping -d '{
  "test" : {
    "mappings" : {
      "test" : {
        "properties" : {
          "Country" : {
            "type" : "string",
            "fields" : {
              "Country_facet" : {
                "type" : "string",
                "index" : "not_analyzed"
              }
            }
          },
          "PostCodeZip" : {
            "type" : "long",
            "store" : true
          },
          "geo" : {
            "type" : "string",
            "fields" : {
              "geo_facet" : {
                "type" : "string",
                "index" : "not_analyzed"
              }
            }
          },
          "geopoint" : {
            "type" : "geo_point"
              }
            }
          }
        }
      }
    }
  }
}

'

This mapping is erased by DSS before uploading data and a new schema (with wrong types) is auto inferred. 

I found the DSS configuration file for this "sync" module and I edit it to change type from "string" to "geo_point" (valid ES type), just like this :

File $DSS_FOLDER/projects/<project_name>/datasets/<sync_name>.json

  .....
      {
        "name": "geopoint",
        "type": "geo_point",
        "maxLength": -1
      },
  .....

It generates the following error :

[12:12:09] [ERROR] [dku.flow.jobrunner] running sync_test_NP - Activity unexpectedly failed
java.lang.IllegalArgumentException: in running sync_test_NP: Type not found: geo_point
	at com.dataiku.dip.utils.ErrorContext.iae(ErrorContext.java:82)
	at com.dataiku.dip.datasets.Type.forName(Type.java:97)
	at com.dataiku.dip.coremodel.SchemaColumn.getType(SchemaColumn.java:82)
	at com.dataiku.dip.datasets.elasticsearch.ElasticSearchUtils.getElasticSearchType(ElasticSearchUtils.java:55)
	at com.dataiku.dip.datasets.elasticsearch.ElasticSearchUtils.getMappingDefinition(ElasticSearchUtils.java:125)
	at com.dataiku.dip.datasets.elasticsearch.ElasticSearchOutput$ElasticSearchOutputWriter.init(ElasticSearchOutput.java:147)
	at com.dataiku.dip.dataflow.exec.stream.ToDatasetStreamSplitRunner.init(ToDatasetStreamSplitRunner.java:55)
	at com.dataiku.dip.dataflow.exec.sync.FSToAny.init(FSToAny.java:67)
	at com.dataiku.dip.dataflow.exec.SyncRecipeRunner.init(SyncRecipeRunner.java:110)
	at com.dataiku.dip.dataflow.jobrunner.ExecutionRunnablesBuilder.getRunnables(ExecutionRunnablesBuilder.java:49)
	at com.dataiku.dip.dataflow.jobrunner.ActivityRunner.runActivity(ActivityRunner.java:383)
	at com.dataiku.dip.dataflow.jobrunner.JobRunner.runActivity(JobRunner.java:102)
	at com.dataiku.dip.dataflow.jobrunner.JobRunner.access$700(JobRunner.java:27)
	at com.dataiku.dip.dataflow.jobrunner.JobRunner$ActivityExecutorThread.run(JobRunner.java:263)

 

any ideas ?

 

by
edited by

1 Answer

+1 vote
Best answer
Hi Romain,

DSS indeed overrides the index mapping when you run the sync recipe. However, you can enter your custom mapping in the DSS interface, and it will be used instead of the default autogenerated mapping.

To do that, go to the Settings of the Elastic search dataset, and you'll have a text zone to enter your custom mapping.

That being said, it would be better if DSS could automatically generate a proper geo_point column in ES when you have a valid DSS geopoint, we'll add this to our feature backlog.

Regards,
by
selected by
Hi Clement!

Great, I miss this feature. it works now with this :
{"properties": {
    "Country": {
        "fields": {"Country_facet": {
            "index": "not_analyzed",
            "type": "string"
        }},
        "type": "string"
    },
    "PostCodeZip": {
        "store": "true",
        "type": "long"
    },
    "geo": {
        "type": "string"
    },
    "geopoint": {
        "type": "geo_point"
    }
}}

thank you ! :)
1,044 questions
1,081 answers
1,177 comments
9,084 users

©Dataiku 2012-2018 - Privacy Policy