Skip to content

Instantly share code, notes, and snippets.

@codebrain
Created September 6, 2019 05:08
Show Gist options
  • Save codebrain/9356a8c6db1c887bbd34706a6e77a66b to your computer and use it in GitHub Desktop.
Save codebrain/9356a8c6db1c887bbd34706a6e77a66b to your computer and use it in GitHub Desktop.

Revisions

  1. Stuart Cam created this gist Sep 6, 2019.
    299 changes: 299 additions & 0 deletions elastic-gnaf.fsx
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,299 @@
    // Licensed under the Apache License, Version 2.0 (the "License");
    // you may not use this file except in compliance with the License.
    // You may obtain a copy of the License at
    //
    // http://www.apache.org/licenses/LICENSE-2.0
    //
    // Unless required by applicable law or agreed to in writing, software
    // distributed under the License is distributed on an "AS IS" BASIS,
    // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    // See the License for the specific language governing permissions and
    // limitations under the License.

    // Ensure the paths to these DLLS are set correctly
    #r "Elasticsearch.Net.dll"
    #r "Nest.dll"
    #r "System.Diagnostics.DiagnosticSource.dll"
    #r "System.Buffers.dll"

    open Nest
    open System
    open System.Data.SqlClient
    open System.IO
    open System.Text.RegularExpressions
    open System.Globalization
    open System.Threading

    // Variables to change
    let unzippedGNAFDir = @"D:\AUG19_GNAF_PipeSeparatedValue\G-NAF"// Will require approx 2GB of disk space
    let sqlConnectionString = @"Server=SQLEXPRESS;Database=gnaf;Integrated Security=true;" // Will require approx 12GB of disk space
    let elasticsearch = new Uri("http://localhost:9200") // Will require approx 8GB of disk space
    let elasticsearchIndex = "address"

    // Log with timestamp
    let logFormat = Printf.TextWriterFormat<int->int->int->string->unit>("[%02d:%02d:%02d] %s")

    let logf format =
    let time = DateTime.Now
    Printf.ksprintf (fun s -> printfn logFormat time.Hour time.Minute time.Second s) format

    let log message =
    let time = DateTime.Now
    printfn logFormat time.Hour time.Minute time.Second message

    // Address types for Elasticsearch
    type AddressComponent =
    { BuildingName: string
    Number: string
    Street: string
    Locality: string
    State: string
    Postcode: string }

    let toDisplay (address:AddressComponent) =
    seq {
    yield address.Number
    yield address.Street + ","
    yield address.Locality
    yield address.State
    yield address.Postcode
    }
    |> String.concat " "

    type Address =
    { Id: string
    Display: string
    Location: GeoLocation
    Component: AddressComponent }

    // SQL operations
    let initSql(sqlConnection:SqlConnection) =
    let getCommand file =
    let fixLine (line:string) = line.Replace("CREATE OR REPLACE VIEW ADDRESS_VIEW", "CREATE VIEW ADDRESS_VIEW")
    let fixLines file = File.ReadAllLines(file) |> Array.map fixLine
    String.Join(Environment.NewLine, fixLines file)

    let tableScriptsDir = Path.Combine(unzippedGNAFDir, @"Extras\GNAF_TableCreation_Scripts")
    let createTables = Path.Combine(tableScriptsDir, "create_tables_sqlserver.sql")
    let constraints = Path.Combine(tableScriptsDir, "add_fk_constraints.sql")
    let createView = Path.Combine(unzippedGNAFDir, @"Extras\GNAF_View_Scripts\address_view.sql")

    log "Initialising SQL database"
    for setupFile in [| createTables; constraints; createView |] do
    let commandText = getCommand setupFile
    let command = new SqlCommand(commandText, sqlConnection)
    if command.ExecuteNonQuery() <> -1 then failwith (sprintf "Received failure return value for %s" setupFile)
    sqlConnection

    let indexSql(sqlConnection:SqlConnection) =
    let indexFiles(dir, regex) =
    let bulkSqlInsert command table =
    let command = new SqlCommand(command, sqlConnection)
    command.CommandTimeout <- 600
    let returnValue = command.ExecuteNonQuery()
    if returnValue = 0 then failwith (sprintf "No records inserted into %s" table)
    else logf "Inserted %i records into %s" returnValue table
    for file in Directory.EnumerateFiles(dir) do
    let fileInfo = FileInfo file
    let rMatch = Regex.Match(fileInfo.Name, regex)
    let table = rMatch.Groups.["table"].Value
    let bulkInsert = sprintf "BULK INSERT %s FROM '%s' WITH (FIELDTERMINATOR = '|', FIRSTROW = 2)" table fileInfo.FullName
    bulkSqlInsert bulkInsert table

    log "Indexing Authority Code data"
    let dataAuthorityDir = Path.Combine(unzippedGNAFDir, @"G-NAF AUGUST 2019\Authority Code")
    indexFiles (dataAuthorityDir, "^Authority_Code_(?<table>.*?)_psv.psv$")

    log "Indexing State data"
    let dataStandardDir = Path.Combine(unzippedGNAFDir, @"G-NAF AUGUST 2019\Standard")
    indexFiles (dataStandardDir, "^[^_]*_(?<table>.*?)_psv.psv$")
    sqlConnection

    // Elasticsearch operations
    let createIndex (elasticClient:ElasticClient) =
    let properties = new Properties<Address>()
    properties.Add(PropertyName.op_Implicit "display", new SearchAsYouTypeProperty())
    properties.Add(PropertyName.op_Implicit "location", new GeoPointProperty())

    let mapping = new TypeMapping()
    mapping.Properties <- properties

    let settings = new IndexSettings()
    settings.NumberOfReplicas <- Nullable 0
    settings.NumberOfShards <- Nullable 1

    let createIndexRequest = new CreateIndexRequest(IndexName.op_Implicit elasticsearchIndex)
    createIndexRequest.Settings <- settings
    createIndexRequest.Mappings <- mapping

    logf "Creating index %s" elasticsearchIndex
    let indexResponse = elasticClient.Indices.Create(createIndexRequest)
    logf "Created index %O" indexResponse

    let bulkIndex (elasticClient:ElasticClient) (sqlConnection:SqlConnection) =
    let timeout = TimeSpan.FromMinutes(60.0)
    let currentPage = ref 0
    let perBulkRequest = Nullable 10000
    let backoffTime = "30s"
    let backoffRetries = Nullable 2
    let parallelism = Nullable 4

    let columnValue column (reader:SqlDataReader) =
    let ordinal = reader.GetOrdinal(column)
    if reader.IsDBNull(ordinal) then String.Empty
    else reader.[ordinal].ToString()

    let columnDecimalValue column (reader:SqlDataReader) =
    reader.GetOrdinal(column) |> reader.GetDecimal

    let address (reader:SqlDataReader) =
    let addressNumber (reader:SqlDataReader) =
    let addressPart item (reader:SqlDataReader) =
    seq {
    yield columnValue (item + "_PREFIX") reader
    yield columnValue item reader
    yield columnValue (item + "_SUFFIX") reader
    }
    |> Seq.filter (fun elem -> String.length elem > 0)
    |> String.concat " "

    let addressPartWrapped item prefix suffix (reader:SqlDataReader) =
    let joined = addressPart item reader
    if String.length joined > 0 then prefix + joined + suffix
    else joined

    let lotNumber = addressPartWrapped "LOT_NUMBER" "Lot " "" reader
    let flatNumber = addressPartWrapped "FLAT_NUMBER" "" "/" reader
    let numberFirst = addressPart "NUMBER_FIRST" reader
    let numberLast = addressPartWrapped "NUMBER_LAST" "-" "" reader

    [| lotNumber; flatNumber; numberFirst; numberLast |]
    |> Seq.filter (fun elem -> String.length elem > 0)
    |> String.concat ""

    let culture = new CultureInfo("en-AU", false)

    let toTitleCase (column:string) =
    let (|Prefix|_|) prefix (candidate:string) =
    if candidate.StartsWith(prefix) then Some(candidate.Substring(prefix.Length))
    else None
    let titleCase = culture.TextInfo.ToTitleCase(column.ToLowerInvariant())
    match titleCase with
    | Prefix "Mc" trailing -> "Mc" + culture.TextInfo.ToTitleCase(trailing)
    | _ -> titleCase

    let number = addressNumber reader
    let street = columnValue "STREET_NAME" reader |> toTitleCase
    let streetNameSuffix = columnValue "STREET_SUFFIX_TYPE" reader |> toTitleCase
    let streetName =
    let streetTypeCode = columnValue "STREET_TYPE_CODE" reader
    if String.length streetNameSuffix > 0 then sprintf "%s %s" streetTypeCode streetNameSuffix else streetTypeCode
    |> toTitleCase
    let locality = columnValue "LOCALITY_NAME" reader |> toTitleCase
    let state = columnValue "STATE_ABBREVIATION" reader
    let postcode = columnValue "POSTCODE" reader
    let buildingName = columnValue "BUILDING_NAME" reader |> toTitleCase

    { BuildingName = buildingName
    Number = number
    Street = sprintf "%s %s" street streetName
    Locality = locality
    State = state
    Postcode = postcode }

    let readCommand = new SqlCommand("SELECT * FROM ADDRESS_VIEW", sqlConnection)
    readCommand.CommandTimeout <- 600
    let reader = readCommand.ExecuteReader()
    let results =
    seq {
    while reader.Read() do
    let id = columnValue "ADDRESS_DETAIL_PID" reader
    let lat = columnDecimalValue "LATITUDE" reader
    let lon = columnDecimalValue "LONGITUDE" reader
    let addressParts = address reader
    yield {
    Id = id
    Display = addressParts |> toDisplay
    Location = GeoLocation.TryCreate((double)lat, (double)lon)
    Component = addressParts
    }
    }

    log "Bulk indexing into Elasticsearch"
    elasticClient.BulkAll(results, fun (b:BulkAllDescriptor<Address>) ->
    b.Index(IndexName.op_Implicit elasticsearchIndex)
    .BackOffTime(Time.op_Implicit (backoffTime))
    .BackOffRetries(backoffRetries)
    .RefreshOnCompleted()
    .MaxDegreeOfParallelism(parallelism)
    .Size(perBulkRequest) :> IBulkAllRequest<Address>).Wait(timeout, fun next ->
    let page = Interlocked.Increment(currentPage)
    logf "%i addresses indexed" (page * perBulkRequest.Value)
    ) |> ignore
    log "Bulk indexing complete"

    let displayResults (searchResponse:ISearchResponse<Address>) =
    for hit in searchResponse.Hits do
    Console.WriteLine hit.Source.Display
    Console.WriteLine("Took: {0}ms", searchResponse.Took)

    let searchAsYouTypeDemo (elasticClient:ElasticClient) =
    let search text =
    let query = new MultiMatchQuery()
    query.Query <- text
    query.Type <- Nullable.op_Implicit TextQueryType.BoolPrefix
    query.Fields <- Fields.op_Implicit "display,display._2gram,display._3gram"

    let request = new SearchRequest<Address>()
    request.Query <- new QueryContainer(query)

    let searchAsYouTypeResponse = elasticClient.Search<Address>(request)
    displayResults searchAsYouTypeResponse

    let readLine () =
    Console.Write "\nEnter search (or type 'quit'): "
    Console.ReadLine()

    let readlines = Seq.initInfinite (fun _ -> readLine())
    let run item = if item = "quit"
    then Some(item)
    else
    search item
    None

    Seq.pick run readlines |> ignore
    elasticClient

    let geoSearchDemo (elasticClient:ElasticClient) =
    let query = new GeoDistanceQuery()
    query.Field <- Field.op_Implicit "location"
    query.Distance <- Distance.op_Implicit "20km"
    query.Location <- new GeoLocation(-25.3444, 131.0369)

    let request = new SearchRequest<Address>()
    request.Query <- new QueryContainer(query)

    let geoSearchResponse = elasticClient.Search<Address>(request)
    displayResults geoSearchResponse

    // Execute script
    let connectionSettings = new ConnectionSettings(elasticsearch)
    connectionSettings.DefaultIndex(elasticsearchIndex)
    let elasticClient = new ElasticClient(connectionSettings)

    elasticClient
    |> createIndex

    let sqlConnection = new SqlConnection(sqlConnectionString)
    sqlConnection.Open()

    sqlConnection
    |> initSql
    |> indexSql
    |> bulkIndex elasticClient

    sqlConnection.Close()

    elasticClient
    |> searchAsYouTypeDemo
    |> geoSearchDemo