Created
September 6, 2019 05:08
-
-
Save codebrain/9356a8c6db1c887bbd34706a6e77a66b to your computer and use it in GitHub Desktop.
Revisions
-
Stuart Cam created this gist
Sep 6, 2019 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,299 @@ // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // Ensure the paths to these DLLS are set correctly #r "Elasticsearch.Net.dll" #r "Nest.dll" #r "System.Diagnostics.DiagnosticSource.dll" #r "System.Buffers.dll" open Nest open System open System.Data.SqlClient open System.IO open System.Text.RegularExpressions open System.Globalization open System.Threading // Variables to change let unzippedGNAFDir = @"D:\AUG19_GNAF_PipeSeparatedValue\G-NAF"// Will require approx 2GB of disk space let sqlConnectionString = @"Server=SQLEXPRESS;Database=gnaf;Integrated Security=true;" // Will require approx 12GB of disk space let elasticsearch = new Uri("http://localhost:9200") // Will require approx 8GB of disk space let elasticsearchIndex = "address" // Log with timestamp let logFormat = Printf.TextWriterFormat<int->int->int->string->unit>("[%02d:%02d:%02d] %s") let logf format = let time = DateTime.Now Printf.ksprintf (fun s -> printfn logFormat time.Hour time.Minute time.Second s) format let log message = let time = DateTime.Now printfn logFormat time.Hour time.Minute time.Second message // Address types for Elasticsearch type AddressComponent = { BuildingName: string Number: string Street: string Locality: string State: string Postcode: string } let toDisplay (address:AddressComponent) = seq { yield address.Number yield address.Street + "," yield address.Locality yield address.State yield address.Postcode } |> String.concat " " type Address = { Id: string Display: string Location: GeoLocation Component: AddressComponent } // SQL operations let initSql(sqlConnection:SqlConnection) = let getCommand file = let fixLine (line:string) = line.Replace("CREATE OR REPLACE VIEW ADDRESS_VIEW", "CREATE VIEW ADDRESS_VIEW") let fixLines file = File.ReadAllLines(file) |> Array.map fixLine String.Join(Environment.NewLine, fixLines file) let tableScriptsDir = Path.Combine(unzippedGNAFDir, @"Extras\GNAF_TableCreation_Scripts") let createTables = Path.Combine(tableScriptsDir, "create_tables_sqlserver.sql") let constraints = Path.Combine(tableScriptsDir, "add_fk_constraints.sql") let createView = Path.Combine(unzippedGNAFDir, @"Extras\GNAF_View_Scripts\address_view.sql") log "Initialising SQL database" for setupFile in [| createTables; constraints; createView |] do let commandText = getCommand setupFile let command = new SqlCommand(commandText, sqlConnection) if command.ExecuteNonQuery() <> -1 then failwith (sprintf "Received failure return value for %s" setupFile) sqlConnection let indexSql(sqlConnection:SqlConnection) = let indexFiles(dir, regex) = let bulkSqlInsert command table = let command = new SqlCommand(command, sqlConnection) command.CommandTimeout <- 600 let returnValue = command.ExecuteNonQuery() if returnValue = 0 then failwith (sprintf "No records inserted into %s" table) else logf "Inserted %i records into %s" returnValue table for file in Directory.EnumerateFiles(dir) do let fileInfo = FileInfo file let rMatch = Regex.Match(fileInfo.Name, regex) let table = rMatch.Groups.["table"].Value let bulkInsert = sprintf "BULK INSERT %s FROM '%s' WITH (FIELDTERMINATOR = '|', FIRSTROW = 2)" table fileInfo.FullName bulkSqlInsert bulkInsert table log "Indexing Authority Code data" let dataAuthorityDir = Path.Combine(unzippedGNAFDir, @"G-NAF AUGUST 2019\Authority Code") indexFiles (dataAuthorityDir, "^Authority_Code_(?<table>.*?)_psv.psv$") log "Indexing State data" let dataStandardDir = Path.Combine(unzippedGNAFDir, @"G-NAF AUGUST 2019\Standard") indexFiles (dataStandardDir, "^[^_]*_(?<table>.*?)_psv.psv$") sqlConnection // Elasticsearch operations let createIndex (elasticClient:ElasticClient) = let properties = new Properties<Address>() properties.Add(PropertyName.op_Implicit "display", new SearchAsYouTypeProperty()) properties.Add(PropertyName.op_Implicit "location", new GeoPointProperty()) let mapping = new TypeMapping() mapping.Properties <- properties let settings = new IndexSettings() settings.NumberOfReplicas <- Nullable 0 settings.NumberOfShards <- Nullable 1 let createIndexRequest = new CreateIndexRequest(IndexName.op_Implicit elasticsearchIndex) createIndexRequest.Settings <- settings createIndexRequest.Mappings <- mapping logf "Creating index %s" elasticsearchIndex let indexResponse = elasticClient.Indices.Create(createIndexRequest) logf "Created index %O" indexResponse let bulkIndex (elasticClient:ElasticClient) (sqlConnection:SqlConnection) = let timeout = TimeSpan.FromMinutes(60.0) let currentPage = ref 0 let perBulkRequest = Nullable 10000 let backoffTime = "30s" let backoffRetries = Nullable 2 let parallelism = Nullable 4 let columnValue column (reader:SqlDataReader) = let ordinal = reader.GetOrdinal(column) if reader.IsDBNull(ordinal) then String.Empty else reader.[ordinal].ToString() let columnDecimalValue column (reader:SqlDataReader) = reader.GetOrdinal(column) |> reader.GetDecimal let address (reader:SqlDataReader) = let addressNumber (reader:SqlDataReader) = let addressPart item (reader:SqlDataReader) = seq { yield columnValue (item + "_PREFIX") reader yield columnValue item reader yield columnValue (item + "_SUFFIX") reader } |> Seq.filter (fun elem -> String.length elem > 0) |> String.concat " " let addressPartWrapped item prefix suffix (reader:SqlDataReader) = let joined = addressPart item reader if String.length joined > 0 then prefix + joined + suffix else joined let lotNumber = addressPartWrapped "LOT_NUMBER" "Lot " "" reader let flatNumber = addressPartWrapped "FLAT_NUMBER" "" "/" reader let numberFirst = addressPart "NUMBER_FIRST" reader let numberLast = addressPartWrapped "NUMBER_LAST" "-" "" reader [| lotNumber; flatNumber; numberFirst; numberLast |] |> Seq.filter (fun elem -> String.length elem > 0) |> String.concat "" let culture = new CultureInfo("en-AU", false) let toTitleCase (column:string) = let (|Prefix|_|) prefix (candidate:string) = if candidate.StartsWith(prefix) then Some(candidate.Substring(prefix.Length)) else None let titleCase = culture.TextInfo.ToTitleCase(column.ToLowerInvariant()) match titleCase with | Prefix "Mc" trailing -> "Mc" + culture.TextInfo.ToTitleCase(trailing) | _ -> titleCase let number = addressNumber reader let street = columnValue "STREET_NAME" reader |> toTitleCase let streetNameSuffix = columnValue "STREET_SUFFIX_TYPE" reader |> toTitleCase let streetName = let streetTypeCode = columnValue "STREET_TYPE_CODE" reader if String.length streetNameSuffix > 0 then sprintf "%s %s" streetTypeCode streetNameSuffix else streetTypeCode |> toTitleCase let locality = columnValue "LOCALITY_NAME" reader |> toTitleCase let state = columnValue "STATE_ABBREVIATION" reader let postcode = columnValue "POSTCODE" reader let buildingName = columnValue "BUILDING_NAME" reader |> toTitleCase { BuildingName = buildingName Number = number Street = sprintf "%s %s" street streetName Locality = locality State = state Postcode = postcode } let readCommand = new SqlCommand("SELECT * FROM ADDRESS_VIEW", sqlConnection) readCommand.CommandTimeout <- 600 let reader = readCommand.ExecuteReader() let results = seq { while reader.Read() do let id = columnValue "ADDRESS_DETAIL_PID" reader let lat = columnDecimalValue "LATITUDE" reader let lon = columnDecimalValue "LONGITUDE" reader let addressParts = address reader yield { Id = id Display = addressParts |> toDisplay Location = GeoLocation.TryCreate((double)lat, (double)lon) Component = addressParts } } log "Bulk indexing into Elasticsearch" elasticClient.BulkAll(results, fun (b:BulkAllDescriptor<Address>) -> b.Index(IndexName.op_Implicit elasticsearchIndex) .BackOffTime(Time.op_Implicit (backoffTime)) .BackOffRetries(backoffRetries) .RefreshOnCompleted() .MaxDegreeOfParallelism(parallelism) .Size(perBulkRequest) :> IBulkAllRequest<Address>).Wait(timeout, fun next -> let page = Interlocked.Increment(currentPage) logf "%i addresses indexed" (page * perBulkRequest.Value) ) |> ignore log "Bulk indexing complete" let displayResults (searchResponse:ISearchResponse<Address>) = for hit in searchResponse.Hits do Console.WriteLine hit.Source.Display Console.WriteLine("Took: {0}ms", searchResponse.Took) let searchAsYouTypeDemo (elasticClient:ElasticClient) = let search text = let query = new MultiMatchQuery() query.Query <- text query.Type <- Nullable.op_Implicit TextQueryType.BoolPrefix query.Fields <- Fields.op_Implicit "display,display._2gram,display._3gram" let request = new SearchRequest<Address>() request.Query <- new QueryContainer(query) let searchAsYouTypeResponse = elasticClient.Search<Address>(request) displayResults searchAsYouTypeResponse let readLine () = Console.Write "\nEnter search (or type 'quit'): " Console.ReadLine() let readlines = Seq.initInfinite (fun _ -> readLine()) let run item = if item = "quit" then Some(item) else search item None Seq.pick run readlines |> ignore elasticClient let geoSearchDemo (elasticClient:ElasticClient) = let query = new GeoDistanceQuery() query.Field <- Field.op_Implicit "location" query.Distance <- Distance.op_Implicit "20km" query.Location <- new GeoLocation(-25.3444, 131.0369) let request = new SearchRequest<Address>() request.Query <- new QueryContainer(query) let geoSearchResponse = elasticClient.Search<Address>(request) displayResults geoSearchResponse // Execute script let connectionSettings = new ConnectionSettings(elasticsearch) connectionSettings.DefaultIndex(elasticsearchIndex) let elasticClient = new ElasticClient(connectionSettings) elasticClient |> createIndex let sqlConnection = new SqlConnection(sqlConnectionString) sqlConnection.Open() sqlConnection |> initSql |> indexSql |> bulkIndex elasticClient sqlConnection.Close() elasticClient |> searchAsYouTypeDemo |> geoSearchDemo