@@ -0,0 +1,299 @@
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Ensure the paths to these DLLS are set correctly
#r " Elasticsearch.Net.dll"
#r " Nest.dll"
#r " System.Diagnostics.DiagnosticSource.dll"
#r " System.Buffers.dll"
open Nest
open System
open System.Data .SqlClient
open System.IO
open System.Text .RegularExpressions
open System.Globalization
open System.Threading
// Variables to change
let unzippedGNAFDir = @" D:\AUG19_GNAF_PipeSeparatedValue\G-NAF" // Will require approx 2GB of disk space
let sqlConnectionString = @" Server=SQLEXPRESS;Database=gnaf;Integrated Security=true;" // Will require approx 12GB of disk space
let elasticsearch = new Uri( " http://localhost:9200" ) // Will require approx 8GB of disk space
let elasticsearchIndex = " address"
// Log with timestamp
let logFormat = Printf.TextWriterFormat< int-> int-> int-> string-> unit>( " [%02d :%02d :%02d ] %s " )
let logf format =
let time = DateTime.Now
Printf.ksprintf ( fun s -> printfn logFormat time.Hour time.Minute time.Second s) format
let log message =
let time = DateTime.Now
printfn logFormat time.Hour time.Minute time.Second message
// Address types for Elasticsearch
type AddressComponent =
{ BuildingName: string
Number: string
Street: string
Locality: string
State: string
Postcode: string }
let toDisplay ( address : AddressComponent ) =
seq {
yield address.Number
yield address.Street + " ,"
yield address.Locality
yield address.State
yield address.Postcode
}
|> String.concat " "
type Address =
{ Id: string
Display: string
Location: GeoLocation
Component: AddressComponent }
// SQL operations
let initSql ( sqlConnection : SqlConnection ) =
let getCommand file =
let fixLine ( line : string ) = line.Replace( " CREATE OR REPLACE VIEW ADDRESS_VIEW" , " CREATE VIEW ADDRESS_VIEW" )
let fixLines file = File.ReadAllLines( file) |> Array.map fixLine
String.Join( Environment.NewLine, fixLines file)
let tableScriptsDir = Path.Combine( unzippedGNAFDir, @" Extras\GNAF_TableCreation_Scripts" )
let createTables = Path.Combine( tableScriptsDir, " create_tables_sqlserver.sql" )
let constraints = Path.Combine( tableScriptsDir, " add_fk_constraints.sql" )
let createView = Path.Combine( unzippedGNAFDir, @" Extras\GNAF_View_Scripts\address_view.sql" )
log " Initialising SQL database"
for setupFile in [| createTables; constraints; createView |] do
let commandText = getCommand setupFile
let command = new SqlCommand( commandText, sqlConnection)
if command.ExecuteNonQuery() <> - 1 then failwith ( sprintf " Received failure return value for %s " setupFile)
sqlConnection
let indexSql ( sqlConnection : SqlConnection ) =
let indexFiles ( dir , regex ) =
let bulkSqlInsert command table =
let command = new SqlCommand( command, sqlConnection)
command.CommandTimeout <- 600
let returnValue = command.ExecuteNonQuery()
if returnValue = 0 then failwith ( sprintf " No records inserted into %s " table)
else logf " Inserted %i records into %s " returnValue table
for file in Directory.EnumerateFiles( dir) do
let fileInfo = FileInfo file
let rMatch = Regex.Match( fileInfo.Name, regex)
let table = rMatch.Groups.[ " table" ]. Value
let bulkInsert = sprintf " BULK INSERT %s FROM '%s ' WITH (FIELDTERMINATOR = '|', FIRSTROW = 2)" table fileInfo.FullName
bulkSqlInsert bulkInsert table
log " Indexing Authority Code data"
let dataAuthorityDir = Path.Combine( unzippedGNAFDir, @" G-NAF AUGUST 2019\Authority Code" )
indexFiles ( dataAuthorityDir, " ^Authority_Code_(?<table>.*?)_psv.psv$" )
log " Indexing State data"
let dataStandardDir = Path.Combine( unzippedGNAFDir, @" G-NAF AUGUST 2019\Standard" )
indexFiles ( dataStandardDir, " ^[^_]*_(?<table>.*?)_psv.psv$" )
sqlConnection
// Elasticsearch operations
let createIndex ( elasticClient : ElasticClient ) =
let properties = new Properties< Address>()
properties.Add( PropertyName.op_ Implicit " display" , new SearchAsYouTypeProperty())
properties.Add( PropertyName.op_ Implicit " location" , new GeoPointProperty())
let mapping = new TypeMapping()
mapping.Properties <- properties
let settings = new IndexSettings()
settings.NumberOfReplicas <- Nullable 0
settings.NumberOfShards <- Nullable 1
let createIndexRequest = new CreateIndexRequest( IndexName.op_ Implicit elasticsearchIndex)
createIndexRequest.Settings <- settings
createIndexRequest.Mappings <- mapping
logf " Creating index %s " elasticsearchIndex
let indexResponse = elasticClient.Indices.Create( createIndexRequest)
logf " Created index %O " indexResponse
let bulkIndex ( elasticClient : ElasticClient ) ( sqlConnection : SqlConnection ) =
let timeout = TimeSpan.FromMinutes( 60.0 )
let currentPage = ref 0
let perBulkRequest = Nullable 10000
let backoffTime = " 30s"
let backoffRetries = Nullable 2
let parallelism = Nullable 4
let columnValue column ( reader : SqlDataReader ) =
let ordinal = reader.GetOrdinal( column)
if reader.IsDBNull( ordinal) then String.Empty
else reader.[ ordinal]. ToString()
let columnDecimalValue column ( reader : SqlDataReader ) =
reader.GetOrdinal( column) |> reader.GetDecimal
let address ( reader : SqlDataReader ) =
let addressNumber ( reader : SqlDataReader ) =
let addressPart item ( reader : SqlDataReader ) =
seq {
yield columnValue ( item + " _PREFIX" ) reader
yield columnValue item reader
yield columnValue ( item + " _SUFFIX" ) reader
}
|> Seq.filter ( fun elem -> String.length elem > 0 )
|> String.concat " "
let addressPartWrapped item prefix suffix ( reader : SqlDataReader ) =
let joined = addressPart item reader
if String.length joined > 0 then prefix + joined + suffix
else joined
let lotNumber = addressPartWrapped " LOT_NUMBER" " Lot " " " reader
let flatNumber = addressPartWrapped " FLAT_NUMBER" " " " /" reader
let numberFirst = addressPart " NUMBER_FIRST" reader
let numberLast = addressPartWrapped " NUMBER_LAST" " -" " " reader
[| lotNumber; flatNumber; numberFirst; numberLast |]
|> Seq.filter ( fun elem -> String.length elem > 0 )
|> String.concat " "
let culture = new CultureInfo( " en-AU" , false )
let toTitleCase ( column : string ) =
let (| Prefix | _ |) prefix ( candidate : string ) =
if candidate.StartsWith( prefix) then Some( candidate.Substring( prefix.Length))
else None
let titleCase = culture.TextInfo.ToTitleCase( column.ToLowerInvariant())
match titleCase with
| Prefix " Mc" trailing -> " Mc" + culture.TextInfo.ToTitleCase( trailing)
| _ -> titleCase
let number = addressNumber reader
let street = columnValue " STREET_NAME" reader |> toTitleCase
let streetNameSuffix = columnValue " STREET_SUFFIX_TYPE" reader |> toTitleCase
let streetName =
let streetTypeCode = columnValue " STREET_TYPE_CODE" reader
if String.length streetNameSuffix > 0 then sprintf " %s %s " streetTypeCode streetNameSuffix else streetTypeCode
|> toTitleCase
let locality = columnValue " LOCALITY_NAME" reader |> toTitleCase
let state = columnValue " STATE_ABBREVIATION" reader
let postcode = columnValue " POSTCODE" reader
let buildingName = columnValue " BUILDING_NAME" reader |> toTitleCase
{ BuildingName = buildingName
Number = number
Street = sprintf " %s %s " street streetName
Locality = locality
State = state
Postcode = postcode }
let readCommand = new SqlCommand( " SELECT * FROM ADDRESS_VIEW" , sqlConnection)
readCommand.CommandTimeout <- 600
let reader = readCommand.ExecuteReader()
let results =
seq {
while reader.Read() do
let id = columnValue " ADDRESS_DETAIL_PID" reader
let lat = columnDecimalValue " LATITUDE" reader
let lon = columnDecimalValue " LONGITUDE" reader
let addressParts = address reader
yield {
Id = id
Display = addressParts |> toDisplay
Location = GeoLocation.TryCreate(( double) lat, ( double) lon)
Component = addressParts
}
}
log " Bulk indexing into Elasticsearch"
elasticClient.BulkAll( results, fun ( b :BulkAllDescriptor < Address >) ->
b.Index( IndexName.op_ Implicit elasticsearchIndex)
.BackOffTime( Time.op_ Implicit ( backoffTime))
.BackOffRetries( backoffRetries)
.RefreshOnCompleted()
.MaxDegreeOfParallelism( parallelism)
.Size( perBulkRequest) :> IBulkAllRequest< Address>) .Wait( timeout, fun next ->
let page = Interlocked.Increment( currentPage)
logf " %i addresses indexed" ( page * perBulkRequest.Value)
) |> ignore
log " Bulk indexing complete"
let displayResults ( searchResponse : ISearchResponse < Address >) =
for hit in searchResponse.Hits do
Console.WriteLine hit.Source.Display
Console.WriteLine( " Took: {0}ms" , searchResponse.Took)
let searchAsYouTypeDemo ( elasticClient : ElasticClient ) =
let search text =
let query = new MultiMatchQuery()
query.Query <- text
query.Type <- Nullable.op_ Implicit TextQueryType.BoolPrefix
query.Fields <- Fields.op_ Implicit " display,display._2gram,display._3gram"
let request = new SearchRequest< Address>()
request.Query <- new QueryContainer( query)
let searchAsYouTypeResponse = elasticClient.Search< Address>( request)
displayResults searchAsYouTypeResponse
let readLine () =
Console.Write " \n Enter search (or type 'quit'): "
Console.ReadLine()
let readlines = Seq.initInfinite ( fun _ -> readLine())
let run item = if item = " quit"
then Some( item)
else
search item
None
Seq.pick run readlines |> ignore
elasticClient
let geoSearchDemo ( elasticClient : ElasticClient ) =
let query = new GeoDistanceQuery()
query.Field <- Field.op_ Implicit " location"
query.Distance <- Distance.op_ Implicit " 20km"
query.Location <- new GeoLocation(- 25.3444 , 131.0369 )
let request = new SearchRequest< Address>()
request.Query <- new QueryContainer( query)
let geoSearchResponse = elasticClient.Search< Address>( request)
displayResults geoSearchResponse
// Execute script
let connectionSettings = new ConnectionSettings( elasticsearch)
connectionSettings.DefaultIndex( elasticsearchIndex)
let elasticClient = new ElasticClient( connectionSettings)
elasticClient
|> createIndex
let sqlConnection = new SqlConnection( sqlConnectionString)
sqlConnection.Open()
sqlConnection
|> initSql
|> indexSql
|> bulkIndex elasticClient
sqlConnection.Close()
elasticClient
|> searchAsYouTypeDemo
|> geoSearchDemo